rmq.py 5.39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
import json

import pika

host='localhost'

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

def pika_connect(host):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host))
    channel = connection.channel()
    return connection, channel


def setup_queue(channel, queue_name=''): # TODO: Decide if this is too little or is ok. Maybe on setting up exchanges I need to expand this out
    channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it


def fanout_exchange(channel, exchange_name, queue_name=None):
    channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
    if queue_name:
        setup_queue(channel=channel, queue_name=queue_name)
        channel.queue_bind(exchange=exchange_name, queue=queue_name)


def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic')
    if queue_name:
        if topic is None:
            print("ERROR: If binding queue to a topic exchange, topic must be provided")
            return
        setup_queue(channel=channel, queue_name=queue_name)
        channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)


def deliver_to_exchange(channel, body, exchange_name, topic=None): # TODO: Definitely verify I can use channel/'ch' like this in the callback
    # connection, channel = pika_connect(host=host)
    if topic is None:
        fanout_exchange(channel=channel, exchange_name=exchange_name)
        channel.basic_publish(exchange=exchange_name, routing_key='', body=body)
    else:
        topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic)
        channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body)
    # connection.close()

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

def write_to_queue(queue_name, msg):
    # write a single message to a queue
    connection, channel = pika_connect(host=host)
    setup_queue(channel=channel, queue_name=queue_name)

    channel.basic_publish(exchange='', routing_key=queue_name, body=msg)
    
    connection.close()


def read_from_queue(queue_name, max_msgs):
    # get messages off of a queue until the queue is empty of max_msgs is hit
    connection, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    messages = []
    while len(messages) < max_msgs:
        method_frame, header_frame, body = channel.basic_get(queue_name)
        if method_frame:
            print(method_frame, header_frame, body)
            channel.basic_ack(method_frame.delivery_tag)
            messages.append(json.loads(body.decode()))
        else:
            print("No message returned")
            break

    connection.close()
    return messages


def broadcast(queue_name, exchange_name):
    # read from a queue, forward onto a 'fanout' exchange
    _, channel = pika_connect(host=host)

    fanout_exchange(channel=channel, exchange_name=exchange_name)

    def broadcast_callback(ch, method, properties, body):
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
    channel.start_consuming()


def forward(queue_name_one, queue_name_two):
    # read from a queue, forward onto a different queue
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name_one)
    setup_queue(channel=channel, queue_name=queue_name_two)

    def forward_callback(ch, method, properties, body):
        write_to_queue(queue_name=queue_name_two, msg=body)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback)
    channel.start_consuming()


def publish(queue_name, exchange_name):
    # read from a queue, forward onto a 'topic' exchange
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    def publish_callback(ch, method, properties, body):
        message = json.loads(body.decode())
        topic = message["topic"]
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
    channel.start_consuming()


def subscribe(queue_name, exchange_name, topic=None):
    # setup bindings between queue and exchange, 
    # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
    _, channel = pika_connect(host=host)

    if topic is None:
        fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name)
    else:
        topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic)


def listen(queue_name, callback):
    # subscribe client to a queue, using the callback arg
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    channel.start_consuming()