rmq.py 5.62 KB
Newer Older
1 2 3 4
import json

import pika

5
host='localhost' # TODO Handle host being passed in
6 7 8 9 10 11 12 13 14

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

def pika_connect(host):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host))
    channel = connection.channel()
    return connection, channel


15
def setup_queue(channel, queue_name=''):
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
    channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it


def fanout_exchange(channel, exchange_name, queue_name=None):
    channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
    if queue_name:
        setup_queue(channel=channel, queue_name=queue_name)
        channel.queue_bind(exchange=exchange_name, queue=queue_name)


def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic')
    if queue_name:
        if topic is None:
            print("ERROR: If binding queue to a topic exchange, topic must be provided")
            return
        setup_queue(channel=channel, queue_name=queue_name)
        channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)


36
def deliver_to_exchange(channel, body, exchange_name, topic=None):
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    if topic is None:
        fanout_exchange(channel=channel, exchange_name=exchange_name)
        channel.basic_publish(exchange=exchange_name, routing_key='', body=body)
    else:
        topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic)
        channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body)

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

def write_to_queue(queue_name, msg):
    # write a single message to a queue
    connection, channel = pika_connect(host=host)
    setup_queue(channel=channel, queue_name=queue_name)

    channel.basic_publish(exchange='', routing_key=queue_name, body=msg)
    connection.close()


def read_from_queue(queue_name, max_msgs):
    # get messages off of a queue until the queue is empty of max_msgs is hit
    connection, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    messages = []
    while len(messages) < max_msgs:
        method_frame, header_frame, body = channel.basic_get(queue_name)
        if method_frame:
            print(method_frame, header_frame, body)
            channel.basic_ack(method_frame.delivery_tag)
            messages.append(json.loads(body.decode()))
        else:
            print("No message returned")
            break

    connection.close()
    return messages


def broadcast(queue_name, exchange_name):
    # read from a queue, forward onto a 'fanout' exchange
    _, channel = pika_connect(host=host)

80
    setup_queue(channel=channel, queue_name=queue_name)
81 82 83 84 85

    def broadcast_callback(ch, method, properties, body):
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
        ch.basic_ack(delivery_tag=method.delivery_tag)

86 87 88 89 90
    try:
        channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
91 92 93 94 95 96 97 98 99 100


def forward(queue_name_one, queue_name_two):
    # read from a queue, forward onto a different queue
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name_one)
    setup_queue(channel=channel, queue_name=queue_name_two)

    def forward_callback(ch, method, properties, body):
101
        channel.basic_publish(exchange='', routing_key=queue_name_two, body=body)
102 103
        ch.basic_ack(delivery_tag=method.delivery_tag)

104 105 106 107 108
    try:
        channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
109 110 111 112 113 114 115 116 117 118 119 120 121 122


def publish(queue_name, exchange_name):
    # read from a queue, forward onto a 'topic' exchange
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    def publish_callback(ch, method, properties, body):
        message = json.loads(body.decode())
        topic = message["topic"]
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
        ch.basic_ack(delivery_tag=method.delivery_tag)

123 124 125 126 127
    try:
        channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
128 129 130 131 132


def subscribe(queue_name, exchange_name, topic=None):
    # setup bindings between queue and exchange, 
    # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
133
    connection, channel = pika_connect(host=host)
134 135 136 137 138 139

    if topic is None:
        fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name)
    else:
        topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic)

140 141
    connection.close()

142 143 144 145 146 147 148 149 150

def listen(queue_name, callback):
    # subscribe client to a queue, using the callback arg
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    channel.start_consuming()