rmq.py 6.39 KB
Newer Older
1
import json
James Kirk's avatar
James Kirk committed
2
import os
3 4 5

import pika

James Kirk's avatar
James Kirk committed
6
host = os.getenv("MQ_HOST", "localhost") # Sets to whatever MQ_HOST is, or defaults to localhost
7 8 9 10

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

def pika_connect(host):
James Kirk's avatar
James Kirk committed
11 12 13 14 15 16 17 18 19 20 21
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host))
    except Exception:
        connection = None

    if connection is not None:
        channel = connection.channel()
    else:
        print("ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" % host)
        raise Exception("ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" % host)

22 23 24
    return connection, channel


25
def setup_queue(channel, queue_name=''):
26 27 28
    channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it


James Kirk's avatar
James Kirk committed
29
def fanout_exchange(channel, exchange_name):
30
    channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
31 32


James Kirk's avatar
James Kirk committed
33
def topic_exchange(channel, exchange_name):
34
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
35 36


37
def deliver_to_exchange(channel, body, exchange_name, topic=None):
38 39
    if topic is None:
        fanout_exchange(channel=channel, exchange_name=exchange_name)
40 41 42 43 44 45 46 47
        channel.basic_publish(
            exchange=exchange_name,
            routing_key='', 
            body=body, 
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
            )
        )
48
    else:
49
        topic_exchange(channel=channel, exchange_name=exchange_name)
50 51 52 53 54 55 56 57
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=topic, 
            body=body, 
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
            )
        )
58 59 60 61 62 63 64 65

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

def write_to_queue(queue_name, msg):
    # write a single message to a queue
    connection, channel = pika_connect(host=host)
    setup_queue(channel=channel, queue_name=queue_name)

66 67 68 69 70 71 72 73 74
    channel.basic_publish(
        exchange='', 
        routing_key=queue_name, 
        body=msg,
        properties=pika.BasicProperties(
            delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
        )
    )

75 76 77 78
    connection.close()


def read_from_queue(queue_name, max_msgs):
79
    # get messages off of a queue until the queue is empty or max_msgs is hit
80 81 82 83 84 85 86 87 88 89
    connection, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    messages = []
    while len(messages) < max_msgs:
        method_frame, header_frame, body = channel.basic_get(queue_name)
        if method_frame:
            print(method_frame, header_frame, body)
            channel.basic_ack(method_frame.delivery_tag)
James Kirk's avatar
James Kirk committed
90 91
            try:
                messages.append(json.loads(body.decode()))
92
            except json.decoder.JSONDecodeError:
James Kirk's avatar
James Kirk committed
93
                messages.append(body.decode())
94 95 96 97 98 99 100 101 102 103 104 105
        else:
            print("No message returned")
            break

    connection.close()
    return messages


def broadcast(queue_name, exchange_name):
    # read from a queue, forward onto a 'fanout' exchange
    _, channel = pika_connect(host=host)

106
    setup_queue(channel=channel, queue_name=queue_name)
107 108 109 110 111

    def broadcast_callback(ch, method, properties, body):
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
        ch.basic_ack(delivery_tag=method.delivery_tag)

112 113 114 115 116
    try:
        channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
117 118


James Kirk's avatar
James Kirk committed
119
def forward(from_queue, to_queue):
120 121 122
    # read from a queue, forward onto a different queue
    _, channel = pika_connect(host=host)

James Kirk's avatar
James Kirk committed
123 124
    setup_queue(channel=channel, queue_name=from_queue)
    setup_queue(channel=channel, queue_name=to_queue)
125 126

    def forward_callback(ch, method, properties, body):
127 128
        channel.basic_publish(
            exchange='',
James Kirk's avatar
James Kirk committed
129
            routing_key=to_queue, 
130 131 132 133 134
            body=body,
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
            )
        )
135 136
        ch.basic_ack(delivery_tag=method.delivery_tag)

137
    try:
James Kirk's avatar
James Kirk committed
138
        channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
139 140 141
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
142 143 144 145 146 147 148 149 150 151 152 153 154 155


def publish(queue_name, exchange_name):
    # read from a queue, forward onto a 'topic' exchange
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    def publish_callback(ch, method, properties, body):
        message = json.loads(body.decode())
        topic = message["topic"]
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
        ch.basic_ack(delivery_tag=method.delivery_tag)

156 157 158 159 160
    try:
        channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
161 162 163 164 165


def subscribe(queue_name, exchange_name, topic=None):
    # setup bindings between queue and exchange, 
    # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
166
    connection, channel = pika_connect(host=host)
James Kirk's avatar
James Kirk committed
167 168
    setup_queue(channel=channel, queue_name=queue_name)

169
    if topic is None:
James Kirk's avatar
James Kirk committed
170 171
        fanout_exchange(channel=channel, exchange_name=exchange_name)
        channel.queue_bind(exchange=exchange_name, queue=queue_name)
172
    else:
173
        topic_exchange(channel=channel, exchange_name=exchange_name)
James Kirk's avatar
James Kirk committed
174
        channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
175

176 177
    connection.close()

178 179 180 181 182 183 184 185 186

def listen(queue_name, callback):
    # subscribe client to a queue, using the callback arg
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    channel.start_consuming()