From 17021fa3334ab79cfa985a93b3b5e4c3826fc40e Mon Sep 17 00:00:00 2001 From: James Kirk <james.kirk@noc.ac.uk> Date: Mon, 23 Jan 2023 17:04:36 +0000 Subject: [PATCH] feat: made everything MQ-y durable and persistent --- rmq.py | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/rmq.py b/rmq.py index 8216ab5..483b920 100644 --- a/rmq.py +++ b/rmq.py @@ -17,14 +17,14 @@ def setup_queue(channel, queue_name=''): def fanout_exchange(channel, exchange_name, queue_name=None): - channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True) if queue_name: setup_queue(channel=channel, queue_name=queue_name) channel.queue_bind(exchange=exchange_name, queue=queue_name) def topic_exchange(channel, exchange_name, topic=None, queue_name=None): - channel.exchange_declare(exchange=exchange_name, exchange_type='topic') + channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True) if queue_name: if topic is None: print("ERROR: If binding queue to a topic exchange, topic must be provided") @@ -36,10 +36,24 @@ def topic_exchange(channel, exchange_name, topic=None, queue_name=None): def deliver_to_exchange(channel, body, exchange_name, topic=None): if topic is None: fanout_exchange(channel=channel, exchange_name=exchange_name) - channel.basic_publish(exchange=exchange_name, routing_key='', body=body) + channel.basic_publish( + exchange=exchange_name, + routing_key='', + body=body, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) else: topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic) - channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body) + channel.basic_publish( + exchange=exchange_name, + routing_key=topic, + body=body, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) # ------------------------------------------------------------------------------------------------------------------------------------------------------------- @@ -48,7 +62,15 @@ def write_to_queue(queue_name, msg): connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) - channel.basic_publish(exchange='', routing_key=queue_name, body=msg) + channel.basic_publish( + exchange='', + routing_key=queue_name, + body=msg, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + connection.close() @@ -98,7 +120,14 @@ def forward(queue_name_one, queue_name_two): setup_queue(channel=channel, queue_name=queue_name_two) def forward_callback(ch, method, properties, body): - channel.basic_publish(exchange='', routing_key=queue_name_two, body=body) + channel.basic_publish( + exchange='', + routing_key=queue_name_two, + body=body, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) ch.basic_ack(delivery_tag=method.delivery_tag) try: -- GitLab