import json import os import pika host = os.getenv( "MQ_HOST", "localhost" ) # Sets to whatever MQ_HOST is, or defaults to localhost # ------------------------------------------------------------------------------------------------------------------------------------------------------------- def pika_connect(host): try: connection = pika.BlockingConnection(pika.ConnectionParameters(host)) except Exception: connection = None if connection is not None: channel = connection.channel() else: print( "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" % host ) raise Exception( "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" % host ) return connection, channel def setup_queue(channel, queue_name=""): channel.queue_declare( queue=queue_name, exclusive=False, durable=True ) # exclusive means the queue can only be used by the connection that created it def fanout_exchange(channel, exchange_name): channel.exchange_declare( exchange=exchange_name, exchange_type="fanout", durable=True ) def topic_exchange(channel, exchange_name): channel.exchange_declare( exchange=exchange_name, exchange_type="topic", durable=True ) def deliver_to_exchange(channel, body, exchange_name, topic=None): if topic is None: fanout_exchange(channel=channel, exchange_name=exchange_name) channel.basic_publish( exchange=exchange_name, routing_key="", body=body, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ), ) else: topic_exchange(channel=channel, exchange_name=exchange_name) channel.basic_publish( exchange=exchange_name, routing_key=topic, body=body, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ), ) # ------------------------------------------------------------------------------------------------------------------------------------------------------------- def write_to_queue(queue_name, msg): # write a single message to a queue connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) channel.basic_publish( exchange="", routing_key=queue_name, body=msg, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ), ) connection.close() def read_from_queue(queue_name, max_msgs): # get messages off of a queue until the queue is empty or max_msgs is hit connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) messages = [] while len(messages) < max_msgs: method_frame, header_frame, body = channel.basic_get(queue_name) if method_frame: print(method_frame, header_frame, body) channel.basic_ack(method_frame.delivery_tag) try: messages.append(json.loads(body.decode())) except json.decoder.JSONDecodeError: messages.append(body.decode()) else: print("No message returned") break connection.close() return messages def broadcast(queue_name, exchange_name): # read from a queue, forward onto a 'fanout' exchange _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) def broadcast_callback(ch, method, properties, body): deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name) ch.basic_ack(delivery_tag=method.delivery_tag) try: channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) def forward(from_queue, to_queue): # read from a queue, forward onto a different queue _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=from_queue) setup_queue(channel=channel, queue_name=to_queue) def forward_callback(ch, method, properties, body): channel.basic_publish( exchange="", routing_key=to_queue, body=body, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ), ) ch.basic_ack(delivery_tag=method.delivery_tag) try: channel.basic_consume(queue=from_queue, on_message_callback=forward_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) def publish(queue_name, exchange_name): # read from a queue, forward onto a 'topic' exchange _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) def publish_callback(ch, method, properties, body): message = json.loads(body.decode()) topic = message["topic"] deliver_to_exchange( channel=ch, body=body, exchange_name=exchange_name, topic=topic ) ch.basic_ack(delivery_tag=method.delivery_tag) try: channel.basic_consume(queue=queue_name, on_message_callback=publish_callback) channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) def subscribe(queue_name, exchange_name, topic=None): # setup bindings between queue and exchange, # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) if topic is None: fanout_exchange(channel=channel, exchange_name=exchange_name) channel.queue_bind(exchange=exchange_name, queue=queue_name) else: topic_exchange(channel=channel, exchange_name=exchange_name) channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) connection.close() def listen(queue_name, callback): # subscribe client to a queue, using the callback arg _, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=callback) channel.start_consuming()