rmq.py 6.53 KB
Newer Older
1
import json
James Kirk's avatar
James Kirk committed
2
import os
3 4 5

import pika

6 7 8
host = os.getenv(
    "MQ_HOST", "localhost"
)  # Sets to whatever MQ_HOST is, or defaults to localhost
9 10 11

# -------------------------------------------------------------------------------------------------------------------------------------------------------------

12

13
def pika_connect(host):
James Kirk's avatar
James Kirk committed
14 15 16 17 18 19 20 21
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host))
    except Exception:
        connection = None

    if connection is not None:
        channel = connection.channel()
    else:
22 23 24 25 26 27 28 29
        print(
            "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?"
            % host
        )
        raise Exception(
            "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?"
            % host
        )
James Kirk's avatar
James Kirk committed
30

31 32 33
    return connection, channel


34 35 36 37
def setup_queue(channel, queue_name=""):
    channel.queue_declare(
        queue=queue_name, exclusive=False, durable=True
    )  # exclusive means the queue can only be used by the connection that created it
38 39


James Kirk's avatar
James Kirk committed
40
def fanout_exchange(channel, exchange_name):
41 42 43
    channel.exchange_declare(
        exchange=exchange_name, exchange_type="fanout", durable=True
    )
44 45


James Kirk's avatar
James Kirk committed
46
def topic_exchange(channel, exchange_name):
47 48 49
    channel.exchange_declare(
        exchange=exchange_name, exchange_type="topic", durable=True
    )
50 51


52
def deliver_to_exchange(channel, body, exchange_name, topic=None):
53 54
    if topic is None:
        fanout_exchange(channel=channel, exchange_name=exchange_name)
55 56
        channel.basic_publish(
            exchange=exchange_name,
57 58
            routing_key="",
            body=body,
59 60
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
61
            ),
62
        )
63
    else:
64
        topic_exchange(channel=channel, exchange_name=exchange_name)
65 66
        channel.basic_publish(
            exchange=exchange_name,
67 68
            routing_key=topic,
            body=body,
69 70
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
71
            ),
72
        )
73

74

75 76
# -------------------------------------------------------------------------------------------------------------------------------------------------------------

77

78 79 80 81 82
def write_to_queue(queue_name, msg):
    # write a single message to a queue
    connection, channel = pika_connect(host=host)
    setup_queue(channel=channel, queue_name=queue_name)

83
    channel.basic_publish(
84 85
        exchange="",
        routing_key=queue_name,
86 87 88
        body=msg,
        properties=pika.BasicProperties(
            delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
89
        ),
90 91
    )

92 93 94 95
    connection.close()


def read_from_queue(queue_name, max_msgs):
96
    # get messages off of a queue until the queue is empty or max_msgs is hit
97 98 99 100 101 102 103 104 105 106
    connection, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    messages = []
    while len(messages) < max_msgs:
        method_frame, header_frame, body = channel.basic_get(queue_name)
        if method_frame:
            print(method_frame, header_frame, body)
            channel.basic_ack(method_frame.delivery_tag)
James Kirk's avatar
James Kirk committed
107 108
            try:
                messages.append(json.loads(body.decode()))
109
            except json.decoder.JSONDecodeError:
James Kirk's avatar
James Kirk committed
110
                messages.append(body.decode())
111 112 113 114 115 116 117 118 119 120 121 122
        else:
            print("No message returned")
            break

    connection.close()
    return messages


def broadcast(queue_name, exchange_name):
    # read from a queue, forward onto a 'fanout' exchange
    _, channel = pika_connect(host=host)

123
    setup_queue(channel=channel, queue_name=queue_name)
124 125 126 127 128

    def broadcast_callback(ch, method, properties, body):
        deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
        ch.basic_ack(delivery_tag=method.delivery_tag)

129 130 131 132 133
    try:
        channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
134 135


James Kirk's avatar
James Kirk committed
136
def forward(from_queue, to_queue):
137 138 139
    # read from a queue, forward onto a different queue
    _, channel = pika_connect(host=host)

James Kirk's avatar
James Kirk committed
140 141
    setup_queue(channel=channel, queue_name=from_queue)
    setup_queue(channel=channel, queue_name=to_queue)
142 143

    def forward_callback(ch, method, properties, body):
144
        channel.basic_publish(
145 146
            exchange="",
            routing_key=to_queue,
147 148 149
            body=body,
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
150
            ),
151
        )
152 153
        ch.basic_ack(delivery_tag=method.delivery_tag)

154
    try:
James Kirk's avatar
James Kirk committed
155
        channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
156 157 158
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
159 160 161 162 163 164 165 166 167 168 169


def publish(queue_name, exchange_name):
    # read from a queue, forward onto a 'topic' exchange
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    def publish_callback(ch, method, properties, body):
        message = json.loads(body.decode())
        topic = message["topic"]
170 171 172
        deliver_to_exchange(
            channel=ch, body=body, exchange_name=exchange_name, topic=topic
        )
173 174
        ch.basic_ack(delivery_tag=method.delivery_tag)

175 176 177 178 179
    try:
        channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
        channel.start_consuming()
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {}, stopping...".format(err))
180 181 182


def subscribe(queue_name, exchange_name, topic=None):
183
    # setup bindings between queue and exchange,
184
    # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
185
    connection, channel = pika_connect(host=host)
James Kirk's avatar
James Kirk committed
186 187
    setup_queue(channel=channel, queue_name=queue_name)

188
    if topic is None:
James Kirk's avatar
James Kirk committed
189 190
        fanout_exchange(channel=channel, exchange_name=exchange_name)
        channel.queue_bind(exchange=exchange_name, queue=queue_name)
191
    else:
192
        topic_exchange(channel=channel, exchange_name=exchange_name)
James Kirk's avatar
James Kirk committed
193
        channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
194

195 196
    connection.close()

197 198 199 200 201 202 203 204

def listen(queue_name, callback):
    # subscribe client to a queue, using the callback arg
    _, channel = pika_connect(host=host)

    setup_queue(channel=channel, queue_name=queue_name)

    channel.basic_consume(queue=queue_name, on_message_callback=callback)
205
    channel.start_consuming()