Commit 17021fa3 authored by James Kirk's avatar James Kirk
Browse files

feat: made everything MQ-y durable and persistent

parent ed42bf7e
......@@ -17,14 +17,14 @@ def setup_queue(channel, queue_name=''):
def fanout_exchange(channel, exchange_name, queue_name=None):
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
if queue_name:
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
channel.exchange_declare(exchange=exchange_name, exchange_type='topic')
channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
if queue_name:
if topic is None:
print("ERROR: If binding queue to a topic exchange, topic must be provided")
......@@ -36,10 +36,24 @@ def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
def deliver_to_exchange(channel, body, exchange_name, topic=None):
if topic is None:
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(exchange=exchange_name, routing_key='', body=body)
channel.basic_publish(
exchange=exchange_name,
routing_key='',
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
else:
topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic)
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body)
channel.basic_publish(
exchange=exchange_name,
routing_key=topic,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
......@@ -48,7 +62,15 @@ def write_to_queue(queue_name, msg):
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=msg)
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=msg,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
connection.close()
......@@ -98,7 +120,14 @@ def forward(queue_name_one, queue_name_two):
setup_queue(channel=channel, queue_name=queue_name_two)
def forward_callback(ch, method, properties, body):
channel.basic_publish(exchange='', routing_key=queue_name_two, body=body)
channel.basic_publish(
exchange='',
routing_key=queue_name_two,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment