import json import pika host='localhost' # TODO Handle host being passed in # ------------------------------------------------------------------------------------------------------------------------------------------------------------- def pika_connect(host): connection = pika.BlockingConnection(pika.ConnectionParameters(host)) channel = connection.channel() return connection, channel def setup_queue(channel, queue_name=''): channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it def fanout_exchange(channel, exchange_name, queue_name=None): channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') if queue_name: setup_queue(channel=channel, queue_name=queue_name) channel.queue_bind(exchange=exchange_name, queue=queue_name) def topic_exchange(channel, exchange_name, topic=None, queue_name=None): channel.exchange_declare(exchange=exchange_name, exchange_type='topic') if queue_name: if topic is None: print("ERROR: If binding queue to a topic exchange, topic must be provided") return setup_queue(channel=channel, queue_name=queue_name) channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) def deliver_to_exchange(channel, body, exchange_name, topic=None): if topic is None: fanout_exchange(channel=channel, exchange_name=exchange_name) channel.basic_publish(exchange=exchange_name, routing_key='', body=body) else: topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic) channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body) # ------------------------------------------------------------------------------------------------------------------------------------------------------------- def write_to_queue(queue_name, msg): # write a single message to a queue connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=msg) connection.close() def read_from_queue(queue_name, max_msgs): # get messages off of a queue until the queue is empty or max_msgs is hit connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) messages = [] while len(messages) < max_msgs: method_frame, header_frame, body = channel.basic_get(queue_name) if method_frame: print(method_frame, header_frame, body) channel.basic_ack(method_frame.delivery_tag) messages.append(json.loads(body.decode())) else: print("No message returned") break connection.close() return messages def broadcast(queue_name, exchange_name): # read from a queue, forward onto a 'fanout' exchange _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) def broadcast_callback(ch, method, properties, body): deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name) ch.basic_ack(delivery_tag=method.delivery_tag) try: channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) def forward(queue_name_one, queue_name_two): # read from a queue, forward onto a different queue _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name_one) setup_queue(channel=channel, queue_name=queue_name_two) def forward_callback(ch, method, properties, body): channel.basic_publish(exchange='', routing_key=queue_name_two, body=body) ch.basic_ack(delivery_tag=method.delivery_tag) try: channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) def publish(queue_name, exchange_name): # read from a queue, forward onto a 'topic' exchange _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) def publish_callback(ch, method, properties, body): message = json.loads(body.decode()) topic = message["topic"] deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic) ch.basic_ack(delivery_tag=method.delivery_tag) try: channel.basic_consume(queue=queue_name, on_message_callback=publish_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) def subscribe(queue_name, exchange_name, topic=None): # setup bindings between queue and exchange, # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed connection, channel = pika_connect(host=host) if topic is None: fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name) else: topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic) connection.close() def listen(queue_name, callback): # subscribe client to a queue, using the callback arg _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=callback) channel.start_consuming()