Commit 0638b941 authored by James Kirk's avatar James Kirk
Browse files

refactor: some name changes

refactor: use subscribe as Q to xchange binder
parent 17021fa3
......@@ -16,21 +16,12 @@ def setup_queue(channel, queue_name=''):
channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it
def fanout_exchange(channel, exchange_name, queue_name=None):
def fanout_exchange(channel, exchange_name):
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
if queue_name:
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
def topic_exchange(channel, exchange_name):
channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
if queue_name:
if topic is None:
print("ERROR: If binding queue to a topic exchange, topic must be provided")
return
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
def deliver_to_exchange(channel, body, exchange_name, topic=None):
......@@ -86,7 +77,10 @@ def read_from_queue(queue_name, max_msgs):
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
messages.append(json.loads(body.decode()))
try:
messages.append(json.loads(body.decode()))
except:
messages.append(body.decode())
else:
print("No message returned")
break
......@@ -112,17 +106,17 @@ def broadcast(queue_name, exchange_name):
print("Caught a channel error: {}, stopping...".format(err))
def forward(queue_name_one, queue_name_two):
def forward(from_queue, to_queue):
# read from a queue, forward onto a different queue
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name_one)
setup_queue(channel=channel, queue_name=queue_name_two)
setup_queue(channel=channel, queue_name=from_queue)
setup_queue(channel=channel, queue_name=to_queue)
def forward_callback(ch, method, properties, body):
channel.basic_publish(
exchange='',
routing_key=queue_name_two,
routing_key=to_queue,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
......@@ -131,7 +125,7 @@ def forward(queue_name_one, queue_name_two):
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback)
channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
......@@ -161,10 +155,14 @@ def subscribe(queue_name, exchange_name, topic=None):
# exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
if topic is None:
fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name)
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
else:
topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic)
topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
connection.close()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment