Commit b35d3cff authored by 's avatar
Browse files

L'intialisation marche!

parent 0b2b33fc
......@@ -17,10 +17,13 @@ import time
import random
import pika
import sys
import threading
TIMEOUT = 0.1
EXCHANGE = "Lamport"
INIT_EXCHANGE = "Init"
TIMEOUT = 0.5
class LamportClient:
class LamportClient(threading.Thread):
def __init__(self, id, clients):
self.id = id
......@@ -34,17 +37,17 @@ class LamportClient:
self.mustFree = False
self.mustRequest = True
self.channel = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
).channel()
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost"))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.channel.exchange_declare(
exchange="lamport", exchange_type="fanout")
exchange= EXCHANGE, exchange_type="fanout")
self.channel.exchange_declare(
exchange="lamport_init", exchange_type="fanout")
exchange= INIT_EXCHANGE, exchange_type="fanout")
self.reception = self.channel.queue_declare(
exclusive=True).method.queue
......@@ -52,9 +55,9 @@ class LamportClient:
exclusive=True).method.queue
self.channel.queue_bind(
exchange = "lamport", queue = self.reception)
exchange = EXCHANGE, queue = self.reception)
self.channel.queue_bind(
exchange = "lamport_init", queue = self.init_reception)
exchange = INIT_EXCHANGE, queue = self.init_reception)
def run(self):
print("I am {} of {} sites".format(
......@@ -63,43 +66,62 @@ class LamportClient:
print("Well connected to server")
self.channel.basic_consume(
self.initializeCycle, self.init_reception)
self.initializeCycle, self.init_reception, no_ack = True)
init_message = "{} says : Hello World!".format(self.id)
self.channel.basic_publish(
exchange="lamport_init",
body = "{} says : Hello World!".format(self.id), routing_key='')
exchange= INIT_EXCHANGE,
routing_key='',
body = init_message)
print("saying hello to the world")
self.channel.start_consuming()
print("INIT_DONE")
while True:
messages = self.channel.consume(
self.reception, inactivity_timeout=TIMEOUT)
self.channel.basic_consume(self.when_receive, self.reception)
mess = next(messages)
if mess is not None:
*_, body = mess
print("received : {}".format(body))
when_receive(body)
else:
print("waiting for message")
while True:
if self.mustFree:
print("Must Release")
self.free()
self.mustFree = False
if self.mustRequest:
print("Must Request")
self.request()
self.mustRequest = False
self.channel.start_consuming()
#time.sleep(random.randint(1,5))
#client.request()
#while not client.can_acquire():
# pass
#client.acquire()
#critical_section()
#client.free()
def initializeCycle(self, channel, method, properties, body):
_,body,_ = str(body).split("'")
arrival_id = int(body.split(" ")[0])
self.n_clients_here += 1
if arrival_id == 0:
print("Warned everyone arrived. Let's Go!")
self.channel.stop_consuming()
return
print("{} arrived!\nWe are now {} of {}".format(
arrival_id, self.n_clients_here, self.n_clients_total))
if self.n_clients_here == self.n_clients_total:
self.channel.basic_publish(
exchange = INIT_EXCHANGE,
routing_key='',
body= "0 says : All Here, stop waiting!")
self.channel.stop_consuming()
print ("All Ready, let's go!")
......@@ -173,8 +195,6 @@ class LamportClient:
elif message == 'V':
self.when_validated(sender, time)
self.channel.stop_consuming()
def can_acquire(self):
priority_demand = self.queue[0]
if(priority_demand[0]) == self:
......@@ -184,17 +204,33 @@ class LamportClient:
can = False
return can
def to_free(self):
self.mustFree = True
print("{} : Ressource released".format(self.id))
def to_acquire(self):
def critical_section(client):
print("{} in critical section...".format(client.id))
for i in range(5):
print(" "*(5-i)+"."*i)
time.sleep(1,2)
if __name__ == "__main__":
id = int(sys.argv[1])
clients = int(sys.argv[2])
client = LamportClient(id, clients)
client.run()
client.run()
while True:
time.sleep(random.randint(1,5))
client.request()
while not client.can_acquire():
time.sleep(1,5)
print("still can't acquire...")
client.to_acquire()
critical_section()
client.to_free()
......
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
queue_name = channel.queue_declare(exclusive=True).method.queue
channel.queue_bind(exchange='lamport', queue = queue_name)
def callback(ch, method, properties, body):
print(" [x] Received {}".format(body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.exchange_declare(exchange = "lamport", exchange_type="fanout")
message = ' '.join(sys.argv[1:]) or 'Hello World'
channel.basic_publish(exchange= "lamport",
routing_key='',
body=message)
print(" [x] Sent {}".format(message))
#connection.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment