"""lamportclient.py author: Francois Neron email: francois.neron@telecom-bretagne.eu Lamport Algorithm put to the test. Launch with : -the identifier of the site represented by this instance, -the number of sites in total. as follow : lamportclient.py 1 4 where identifiers of the other sites would be 2,3, and 4 """ import time import random import pika import sys import threading EXCHANGE = "Lamport" INIT_EXCHANGE = "Init" TIMEOUT = 0.1 class LamportClient(threading.Thread): def __init__(self, id, clients): super().__init__() self.id = id self.time = 0 self.n_clients_here = 0 self.n_clients_total = clients self.isready = False self.queue = list(); self.times = {n:0 for n in range(1,clients+1)} self.mustFree = False self.mustRequest = False self.connection = pika.BlockingConnection( pika.ConnectionParameters(host="localhost")) self.channel = self.connection.channel() self.channel.basic_qos(prefetch_count=1) self.channel.exchange_declare( exchange= EXCHANGE, exchange_type="fanout") self.channel.exchange_declare( exchange= INIT_EXCHANGE, exchange_type="fanout") self.reception = self.channel.queue_declare( exclusive=True).method.queue self.init_reception = self.channel.queue_declare( exclusive=True).method.queue self.channel.queue_bind( exchange = EXCHANGE, queue = self.reception) self.channel.queue_bind( exchange = INIT_EXCHANGE, queue = self.init_reception) def run(self): print("I am {} of {} sites".format( self.id, len(list(self.times.keys())))) if self.channel.connection.is_open: print("Well connected to server") self.channel.basic_consume( self.initializeCycle, self.init_reception, no_ack = True) init_message = "{} says : Hello World!".format(self.id) self.channel.basic_publish( exchange= INIT_EXCHANGE, routing_key='', body = init_message) print("saying hello to the world") self.channel.start_consuming() self.isready = True print("INIT_DONE\n") while True: messages = self.channel.consume( self.reception, inactivity_timeout=TIMEOUT, no_ack=True) mess = next(messages) if mess is not None: *_, body = mess self.when_receive(body) #else: #print("waiting for message") if self.mustFree: self.free() self.mustFree = False if self.mustRequest: self.request() self.mustRequest = False def initializeCycle(self, channel, method, properties, body): _,body,_ = str(body).split("'") arrival_id = int(body.split(" ")[0]) self.n_clients_here += 1 if arrival_id == 0: print("Warned everyone arrived. Let's Go!") self.channel.stop_consuming() return print("{} arrived!\nWe are now {} of {}".format( arrival_id, self.n_clients_here, self.n_clients_total)) if self.n_clients_here == self.n_clients_total: self.channel.basic_publish( exchange = INIT_EXCHANGE, routing_key='', body= "0 says : All Here, stop waiting!") self.channel.stop_consuming() print ("All Ready, let's go!") def publish(self, message): """publish send message to server """ self.channel.basic_publish( exchange = EXCHANGE, body=message, routing_key='') self.times[self.id] = self.time def request(self): """request request access to critical section """ self.queue.append((self.id, self.time)) self.time += 1 message = "{},R,{}".format(self.id, self.time) self.publish(message) print("{} requesting".format(self.id)) print("queue is now {}".format(self.queue)) def free(self): self.time += 1 self.queue.pop(0) message = "{},F,{}".format(self.id, self.time) self.publish(message) print("{} freeing".format(self.id)) def validate(self): self.time += 1 message = "{},V,{}".format(self.id, self.time) self.publish(message) print("{} validating".format(self.id)) def update_times(self, sender, time): self.time = max(self.time, time)+1 self.times[sender] = time def when_requested(self, sender, time): self.queue.append((sender, time)) print("received request from {} at time = {}".format(sender, time)) print("queue is now {}".format(self.queue)) self.validate() def when_freed(self, sender, time): for (previousSender, previousTime) in self.queue: if (previousSender == sender): self.queue.remove((previousSender, previousTime)) print("received release notification from {} at time = {}".format( sender, time)) def when_validated(self, sender, time): print("received validation from {} at time = {}".format(sender, time)) def when_receive(self, body): body = str(body) _,body,_ = body.split("'") sender, message, time = body.split(',') sender = int(sender) time = int(time) if sender == self.id: return self.update_times(sender, time) if message == 'R': self.when_requested(sender, time) elif message == 'F': self.when_freed(sender, time) elif message == 'V': self.when_validated(sender, time) def can_acquire(self): if len(self.queue) > 0: priority_demand = self.queue[0] print("next is {}".format(priority_demand)) if(priority_demand[0]) == self.id: print("I am next... can I go?") can = True print("I received messages at times {}".format(self.times)) for time in self.times.values(): if (time <= priority_demand[1]): can = False return can else: print("waiting for my turn...") else: return False def to_free(self): self.mustFree = True print("{} finished using ressource".format(self.id)) def to_request(self): self.mustRequest = True print("User needs {} to access ressource".format(self.id)) def ready(self): return self.isready def critical_section(client): print("{} in critical section...".format(client.id)) for i in range(5): print(" "*(5-i)+"."*i) time.sleep(1) if __name__ == "__main__": id = int(sys.argv[1]) clients = int(sys.argv[2]) client = LamportClient(id, clients) print("running client thread") client.start() print("client thread lauched") while not client.ready(): pass while True: time.sleep(random.randint(1,2)) client.to_request() while not client.can_acquire(): time.sleep(random.randint(1,2)) #print("still can't acquire...") print("ready to acquire!") critical_section(client) client.to_free()