lamportclient.py 7.55 KB
Newer Older
's avatar
committed
1
2
3
"""lamportclient.py
author: Francois Neron
email: francois.neron@telecom-bretagne.eu
's avatar
début    
committed
4

's avatar
committed
5
6
7
8
Lamport Algorithm put to the test.
Launch with :
-the identifier of the site represented by this instance,
-the number of sites in total.
's avatar
début    
committed
9

's avatar
committed
10
11
12
13
14
15
16
as follow : lamportclient.py 1 4

where identifiers of the other sites would be 2,3, and 4

"""

import time
's avatar
début    
committed
17
18
19
import random
import pika
import sys
's avatar
committed
20
import threading
21
import operator
's avatar
committed
22

's avatar
committed
23
24
EXCHANGE = "Lamport"
INIT_EXCHANGE = "Init"
's avatar
committed
25
TIMEOUT = 0.1
's avatar
début    
committed
26

's avatar
committed
27
class LamportClient(threading.Thread):
's avatar
début    
committed
28
29
    def __init__(self, id, clients):

's avatar
committed
30
31
        super().__init__()
    
's avatar
début    
committed
32
33
        self.id = id
        
34
35
        self.n_clients_here = 0
        self.n_clients_total = clients
36
37
        self.isready = False
        
's avatar
début    
committed
38
        self.queue = list();
's avatar
committed
39
        self.times = {n:0 for n in range(1,clients+1)}
's avatar
committed
40

41
        self.mustFree = False
42
        self.mustRequest = False
's avatar
committed
43
44
45
46
        
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host="localhost"))
        self.channel = self.connection.channel()
47
48

        self.channel.basic_qos(prefetch_count=1)
's avatar
committed
49
        
50
        self.channel.exchange_declare(
's avatar
committed
51
            exchange= EXCHANGE, exchange_type="fanout")
52
        self.channel.exchange_declare(
's avatar
committed
53
            exchange= INIT_EXCHANGE, exchange_type="fanout")
54

55
56
57
58
        self.reception = self.channel.queue_declare(
            exclusive=True).method.queue
        self.init_reception = self.channel.queue_declare(
            exclusive=True).method.queue
's avatar
committed
59

60
        self.channel.queue_bind(
's avatar
committed
61
            exchange = EXCHANGE, queue = self.reception)
62
        self.channel.queue_bind(
's avatar
committed
63
            exchange = INIT_EXCHANGE, queue = self.init_reception)
's avatar
committed
64
65

    def run(self):
66
67
        print("I am {} of {} sites".format(
            self.id, len(list(self.times.keys()))))
's avatar
committed
68
69
        if self.channel.connection.is_open:
            print("Well connected to server")
70
        
71
        self.channel.basic_consume(
's avatar
committed
72
73
74
            self.initializeCycle, self.init_reception, no_ack = True)

        init_message = "{} says : Hello World!".format(self.id)
75

76
        self.channel.basic_publish(
's avatar
committed
77
78
79
            exchange= INIT_EXCHANGE,
            routing_key='',
            body = init_message)
80
            
81
82
        print("saying hello to the world")
        self.channel.start_consuming()
83
84
85
        
        self.isready = True
        print("INIT_DONE\n")        
's avatar
committed
86
87
88
        
        while True:
            messages = self.channel.consume(
89
90
                self.reception, inactivity_timeout=TIMEOUT,
                no_ack=True)
's avatar
committed
91
            mess = next(messages)
92
            
's avatar
committed
93
94
            if mess is not None:
                *_, body = mess
95
96
97
                self.when_receive(body)
            #else:
                #print("waiting for message")
's avatar
committed
98
99
100
101
102
103
104
105
106
107

            if self.mustFree:
                self.free()
                self.mustFree = False

            if self.mustRequest:
                self.request()
                self.mustRequest = False


108
    def initializeCycle(self, channel, method, properties, body):
's avatar
committed
109
        _,body,_ = str(body).split("'")
110
111
        arrival_id = int(body.split(" ")[0])
        self.n_clients_here += 1
's avatar
committed
112
113
114
115
116
        
        if arrival_id == 0:
            print("Warned everyone arrived. Let's Go!")
            self.channel.stop_consuming()
            return
117

's avatar
committed
118
119
        print("{} arrived!\nWe are now {} of {}".format(
            arrival_id, self.n_clients_here, self.n_clients_total))
120
121
        
        if self.n_clients_here == self.n_clients_total:
's avatar
committed
122
123
124
125
126
127

            self.channel.basic_publish(
                    exchange = INIT_EXCHANGE,
                    routing_key='',
                body= "0 says : All Here, stop waiting!")

128
129
            self.channel.stop_consuming()
            print ("All Ready, let's go!")
's avatar
début    
committed
130
131
132
133
134

    def publish(self, message):
        """publish 
        send message to server 
        """
's avatar
committed
135
        self.channel.basic_publish(
136
            exchange = EXCHANGE, body=message, routing_key='')
137
        self.times[self.id] = time.time()
's avatar
début    
committed
138
139

    def request(self):
's avatar
committed
140
141
142
        """request
        request access to critical section
        """
143
144
145
146
        mtime = time.time()
        self.queue.append((self.id, mtime))
        
        message = "{},R,{}".format(self.id, mtime)
's avatar
début    
committed
147
        self.publish(message)
148
        print("{} requesting at time {}".format(self.id, mtime))
149
        print("queue is now {}".format(self.queue))
's avatar
début    
committed
150
151

    def free(self):
's avatar
committed
152
        self.queue.pop(0)
153
        message = "{},F,{}".format(self.id, time.time())
's avatar
début    
committed
154
        self.publish(message)
155
        print("{} freeing".format(self.id))
's avatar
début    
committed
156
157
        
    def validate(self):
158
        message = "{},V,{}".format(self.id, time.time())
159
160
        self.publish(message)
        print("{} validating".format(self.id))
's avatar
début    
committed
161
162

    def when_requested(self, sender, time):
's avatar
committed
163
        self.queue.append((sender, time))
164
165
        print("received request from {} from time = {}".format(sender, time))
        self.queue.sort(key = operator.itemgetter(1))
166
        print("queue is now {}".format(self.queue))
's avatar
début    
committed
167
168
169
        self.validate()

    def when_freed(self, sender, time):
's avatar
committed
170
171
172
        for (previousSender, previousTime) in self.queue:
            if (previousSender == sender):
                self.queue.remove((previousSender, previousTime))
173
        print("received release notification from {} from time = {}".format(
174
                sender, time))
's avatar
début    
committed
175

176
177
178
    def update_time(self, sender, time):
        self.times[sender] = time
                
's avatar
début    
committed
179
    def when_validated(self, sender, time):
180
        print("received validation from {} from time = {}".format(sender, time))
's avatar
début    
committed
181

182
    def when_receive(self, body):
's avatar
début    
committed
183
        body = str(body)
's avatar
committed
184
        _,body,_ = body.split("'") 
185

186
        sender, message, mtime = body.split(',')
's avatar
début    
committed
187
        sender = int(sender)
188
        mtime = float(mtime)
's avatar
début    
committed
189
190
191

        if sender == self.id:
            return
192
        self.update_time(sender, mtime)
's avatar
début    
committed
193
        if message == 'R':
194
            self.when_requested(sender, mtime)
's avatar
début    
committed
195
        elif message == 'F':
196
            self.when_freed(sender, mtime)
's avatar
début    
committed
197
        elif message == 'V':
198
            self.when_validated(sender, mtime)
's avatar
début    
committed
199

's avatar
committed
200
    def can_acquire(self):
's avatar
committed
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
        if len(self.queue) > 0:
            priority_demand = self.queue[0]
            print("next is {}".format(priority_demand))
            if(priority_demand[0]) == self.id:
                print("I am next... can I go?")
                can = True
                print("I received messages at times {}".format(self.times))
                for time in self.times.values():
                    if (time <= priority_demand[1]):
                        can = False
                return can
            else:
                print("waiting for my turn...")
        else:
            return False
's avatar
committed
216

's avatar
committed
217
218
    def to_free(self):
        self.mustFree = True
219
220
221
222
223
        print("{} finished using ressource".format(self.id))
        
    def to_request(self):
        self.mustRequest = True
        print("User needs {} to access ressource".format(self.id))
's avatar
committed
224
    
225
226
    def ready(self):
        return self.isready
's avatar
committed
227
228
229

def critical_section(client):
    print("{} in critical section...".format(client.id))
's avatar
committed
230
    for i in range(5):
231
232
        j= i+1
        print(" "*(5-j)+"."*2*j)
's avatar
committed
233
        time.sleep(1)
's avatar
début    
committed
234
235
236
    
if __name__ == "__main__":
    id = int(sys.argv[1])
's avatar
committed
237
    clients = int(sys.argv[2])
's avatar
début    
committed
238
    client = LamportClient(id, clients)
's avatar
committed
239
240
241
    print("running client thread")
    client.start()
    print("client thread lauched")
242
243
244
245
    
    while not client.ready():
        pass
    
's avatar
committed
246
    while True:
's avatar
committed
247
        time.sleep(random.randint(1,2))
248
        client.to_request()
's avatar
committed
249
        while not client.can_acquire():
's avatar
committed
250
            time.sleep(random.randint(1,2))
251
252
            #print("still can't acquire...")
        print("ready to acquire!")
's avatar
committed
253
        critical_section(client)
's avatar
committed
254
        client.to_free()
's avatar
début    
committed
255
256
257
258
259
260