From 0638b941a3e55a4a7cff649d1ed3e9add0fd8246 Mon Sep 17 00:00:00 2001 From: James Kirk <james.kirk@noc.ac.uk> Date: Tue, 24 Jan 2023 11:17:07 +0000 Subject: [PATCH] refactor: some name changes refactor: use subscribe as Q to xchange binder --- rmq.py | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/rmq.py b/rmq.py index 483b920..c8ae7ea 100644 --- a/rmq.py +++ b/rmq.py @@ -16,21 +16,12 @@ def setup_queue(channel, queue_name=''): channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it -def fanout_exchange(channel, exchange_name, queue_name=None): +def fanout_exchange(channel, exchange_name): channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True) - if queue_name: - setup_queue(channel=channel, queue_name=queue_name) - channel.queue_bind(exchange=exchange_name, queue=queue_name) -def topic_exchange(channel, exchange_name, topic=None, queue_name=None): +def topic_exchange(channel, exchange_name): channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True) - if queue_name: - if topic is None: - print("ERROR: If binding queue to a topic exchange, topic must be provided") - return - setup_queue(channel=channel, queue_name=queue_name) - channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) def deliver_to_exchange(channel, body, exchange_name, topic=None): @@ -86,7 +77,10 @@ def read_from_queue(queue_name, max_msgs): if method_frame: print(method_frame, header_frame, body) channel.basic_ack(method_frame.delivery_tag) - messages.append(json.loads(body.decode())) + try: + messages.append(json.loads(body.decode())) + except: + messages.append(body.decode()) else: print("No message returned") break @@ -112,17 +106,17 @@ def broadcast(queue_name, exchange_name): print("Caught a channel error: {}, stopping...".format(err)) -def forward(queue_name_one, queue_name_two): +def forward(from_queue, to_queue): # read from a queue, forward onto a different queue _, channel = pika_connect(host=host) - setup_queue(channel=channel, queue_name=queue_name_one) - setup_queue(channel=channel, queue_name=queue_name_two) + setup_queue(channel=channel, queue_name=from_queue) + setup_queue(channel=channel, queue_name=to_queue) def forward_callback(ch, method, properties, body): channel.basic_publish( exchange='', - routing_key=queue_name_two, + routing_key=to_queue, body=body, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE @@ -131,7 +125,7 @@ def forward(queue_name_one, queue_name_two): ch.basic_ack(delivery_tag=method.delivery_tag) try: - channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback) + channel.basic_consume(queue=from_queue, on_message_callback=forward_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) @@ -161,10 +155,14 @@ def subscribe(queue_name, exchange_name, topic=None): # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed connection, channel = pika_connect(host=host) + setup_queue(channel=channel, queue_name=queue_name) + if topic is None: - fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name) + fanout_exchange(channel=channel, exchange_name=exchange_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) else: - topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic) + topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic) + channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) connection.close() -- GitLab