lamportclient.py 7.54 KB
Newer Older
's avatar
committed
1
2
3
"""lamportclient.py
author: Francois Neron
email: francois.neron@telecom-bretagne.eu
's avatar
début    
committed
4

's avatar
committed
5
6
7
8
Lamport Algorithm put to the test.
Launch with :
-the identifier of the site represented by this instance,
-the number of sites in total.
's avatar
début    
committed
9

's avatar
committed
10
11
12
13
14
15
16
as follow : lamportclient.py 1 4

where identifiers of the other sites would be 2,3, and 4

"""

import time
's avatar
début    
committed
17
18
19
import random
import pika
import sys
's avatar
committed
20
import threading
's avatar
committed
21

's avatar
committed
22
23
EXCHANGE = "Lamport"
INIT_EXCHANGE = "Init"
's avatar
committed
24
TIMEOUT = 0.1
's avatar
début    
committed
25

's avatar
committed
26
class LamportClient(threading.Thread):
's avatar
début    
committed
27
28
    def __init__(self, id, clients):

's avatar
committed
29
30
        super().__init__()
    
's avatar
début    
committed
31
32
33
        self.id = id
        self.time = 0
        
34
35
        self.n_clients_here = 0
        self.n_clients_total = clients
36
37
        self.isready = False
        
's avatar
début    
committed
38
        self.queue = list();
's avatar
committed
39
        self.times = {n:0 for n in range(1,clients+1)}
's avatar
committed
40

41
        self.mustFree = False
42
        self.mustRequest = False
's avatar
committed
43
44
45
46
        
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host="localhost"))
        self.channel = self.connection.channel()
47
48

        self.channel.basic_qos(prefetch_count=1)
's avatar
committed
49
        
50
        self.channel.exchange_declare(
's avatar
committed
51
            exchange= EXCHANGE, exchange_type="fanout")
52
        self.channel.exchange_declare(
's avatar
committed
53
            exchange= INIT_EXCHANGE, exchange_type="fanout")
54

55
56
57
58
        self.reception = self.channel.queue_declare(
            exclusive=True).method.queue
        self.init_reception = self.channel.queue_declare(
            exclusive=True).method.queue
's avatar
committed
59

60
        self.channel.queue_bind(
's avatar
committed
61
            exchange = EXCHANGE, queue = self.reception)
62
        self.channel.queue_bind(
's avatar
committed
63
            exchange = INIT_EXCHANGE, queue = self.init_reception)
's avatar
committed
64
65

    def run(self):
66
67
        print("I am {} of {} sites".format(
            self.id, len(list(self.times.keys()))))
's avatar
committed
68
69
        if self.channel.connection.is_open:
            print("Well connected to server")
70
        
71
        self.channel.basic_consume(
's avatar
committed
72
73
74
            self.initializeCycle, self.init_reception, no_ack = True)

        init_message = "{} says : Hello World!".format(self.id)
75

76
        self.channel.basic_publish(
's avatar
committed
77
78
79
            exchange= INIT_EXCHANGE,
            routing_key='',
            body = init_message)
80
            
81
82
        print("saying hello to the world")
        self.channel.start_consuming()
83
84
85
        
        self.isready = True
        print("INIT_DONE\n")        
's avatar
committed
86
87
88
        
        while True:
            messages = self.channel.consume(
89
90
                self.reception, inactivity_timeout=TIMEOUT,
                no_ack=True)
's avatar
committed
91
            mess = next(messages)
92
            
's avatar
committed
93
94
            if mess is not None:
                *_, body = mess
95
96
97
                self.when_receive(body)
            #else:
                #print("waiting for message")
's avatar
committed
98
99
100
101
102
103
104
105
106
107

            if self.mustFree:
                self.free()
                self.mustFree = False

            if self.mustRequest:
                self.request()
                self.mustRequest = False


108
    def initializeCycle(self, channel, method, properties, body):
's avatar
committed
109
        _,body,_ = str(body).split("'")
110
111
        arrival_id = int(body.split(" ")[0])
        self.n_clients_here += 1
's avatar
committed
112
113
114
115
116
        
        if arrival_id == 0:
            print("Warned everyone arrived. Let's Go!")
            self.channel.stop_consuming()
            return
117

's avatar
committed
118
119
        print("{} arrived!\nWe are now {} of {}".format(
            arrival_id, self.n_clients_here, self.n_clients_total))
120
121
        
        if self.n_clients_here == self.n_clients_total:
's avatar
committed
122
123
124
125
126
127

            self.channel.basic_publish(
                    exchange = INIT_EXCHANGE,
                    routing_key='',
                body= "0 says : All Here, stop waiting!")

128
129
            self.channel.stop_consuming()
            print ("All Ready, let's go!")
's avatar
début    
committed
130
131
132
133
134

    def publish(self, message):
        """publish 
        send message to server 
        """
's avatar
committed
135
        self.channel.basic_publish(
136
            exchange = EXCHANGE, body=message, routing_key='')
's avatar
committed
137
        self.times[self.id] = self.time
's avatar
début    
committed
138
139

    def request(self):
's avatar
committed
140
141
142
        """request
        request access to critical section
        """
's avatar
début    
committed
143
        self.queue.append((self.id, self.time))
's avatar
committed
144
              
's avatar
début    
committed
145
146
147
        self.time += 1
        message = "{},R,{}".format(self.id, self.time)
        self.publish(message)
148
149
        print("{} requesting".format(self.id))
        print("queue is now {}".format(self.queue))
's avatar
début    
committed
150
151
152

    def free(self):
        self.time += 1
's avatar
committed
153
        self.queue.pop(0)
's avatar
début    
committed
154
155
        message = "{},F,{}".format(self.id, self.time)
        self.publish(message)
156
        print("{} freeing".format(self.id))
's avatar
début    
committed
157
158
        
    def validate(self):
159
        self.time += 1
's avatar
début    
committed
160
        message = "{},V,{}".format(self.id, self.time)
161
162
        self.publish(message)
        print("{} validating".format(self.id))
's avatar
début    
committed
163

's avatar
committed
164
165
    def update_times(self, sender, time):
        self.time = max(self.time, time)+1
's avatar
committed
166
        self.times[sender] = time
's avatar
committed
167

's avatar
début    
committed
168
    def when_requested(self, sender, time):
's avatar
committed
169
        self.queue.append((sender, time))
170
171
        print("received request from {} at time = {}".format(sender, time))
        print("queue is now {}".format(self.queue))
's avatar
début    
committed
172
173
174
        self.validate()

    def when_freed(self, sender, time):
's avatar
committed
175
176
177
        for (previousSender, previousTime) in self.queue:
            if (previousSender == sender):
                self.queue.remove((previousSender, previousTime))
178
179
        print("received release notification from {} at time = {}".format(
                sender, time))
's avatar
début    
committed
180
181

    def when_validated(self, sender, time):
182
        print("received validation from {} at time = {}".format(sender, time))
's avatar
début    
committed
183

184
    def when_receive(self, body):
's avatar
début    
committed
185
        body = str(body)
's avatar
committed
186
        _,body,_ = body.split("'") 
187

's avatar
début    
committed
188
189
190
191
192
193
194
        sender, message, time = body.split(',')
        sender = int(sender)
        time = int(time)

        if sender == self.id:
            return

's avatar
committed
195
        self.update_times(sender, time)
's avatar
début    
committed
196
197
198
199
200
201
202
        if message == 'R':
            self.when_requested(sender, time)
        elif message == 'F':
            self.when_freed(sender, time)
        elif message == 'V':
            self.when_validated(sender, time)

's avatar
committed
203
    def can_acquire(self):
's avatar
committed
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
        if len(self.queue) > 0:
            priority_demand = self.queue[0]
            print("next is {}".format(priority_demand))
            if(priority_demand[0]) == self.id:
                print("I am next... can I go?")
                can = True
                print("I received messages at times {}".format(self.times))
                for time in self.times.values():
                    if (time <= priority_demand[1]):
                        can = False
                return can
            else:
                print("waiting for my turn...")
        else:
            return False
's avatar
committed
219

's avatar
committed
220
221
    def to_free(self):
        self.mustFree = True
222
223
224
225
226
        print("{} finished using ressource".format(self.id))
        
    def to_request(self):
        self.mustRequest = True
        print("User needs {} to access ressource".format(self.id))
's avatar
committed
227
    
228
229
    def ready(self):
        return self.isready
's avatar
committed
230
231
232

def critical_section(client):
    print("{} in critical section...".format(client.id))
's avatar
committed
233
234
    for i in range(5):
        print(" "*(5-i)+"."*i)
's avatar
committed
235
        time.sleep(1)
's avatar
début    
committed
236
237
238
    
if __name__ == "__main__":
    id = int(sys.argv[1])
's avatar
committed
239
    clients = int(sys.argv[2])
's avatar
début    
committed
240
    client = LamportClient(id, clients)
's avatar
committed
241
242
243
    print("running client thread")
    client.start()
    print("client thread lauched")
244
245
246
247
    
    while not client.ready():
        pass
    
's avatar
committed
248
    while True:
's avatar
committed
249
        time.sleep(random.randint(1,2))
250
        client.to_request()
's avatar
committed
251
        while not client.can_acquire():
's avatar
committed
252
            time.sleep(random.randint(1,2))
253
254
            #print("still can't acquire...")
        print("ready to acquire!")
's avatar
committed
255
        critical_section(client)
's avatar
committed
256
        client.to_free()
's avatar
début    
committed
257
258
259
260
261
262