1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import json
import pika
host='localhost' # TODO Handle host being passed in
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def pika_connect(host):
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
return connection, channel
def setup_queue(channel, queue_name=''):
channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it
def fanout_exchange(channel, exchange_name, queue_name=None):
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
if queue_name:
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
channel.exchange_declare(exchange=exchange_name, exchange_type='topic')
if queue_name:
if topic is None:
print("ERROR: If binding queue to a topic exchange, topic must be provided")
return
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
def deliver_to_exchange(channel, body, exchange_name, topic=None):
if topic is None:
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(exchange=exchange_name, routing_key='', body=body)
else:
topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic)
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body)
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def write_to_queue(queue_name, msg):
# write a single message to a queue
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=msg)
connection.close()
def read_from_queue(queue_name, max_msgs):
# get messages off of a queue until the queue is empty or max_msgs is hit
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
messages = []
while len(messages) < max_msgs:
method_frame, header_frame, body = channel.basic_get(queue_name)
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
messages.append(json.loads(body.decode()))
else:
print("No message returned")
break
connection.close()
return messages
def broadcast(queue_name, exchange_name):
# read from a queue, forward onto a 'fanout' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def broadcast_callback(ch, method, properties, body):
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def forward(queue_name_one, queue_name_two):
# read from a queue, forward onto a different queue
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name_one)
setup_queue(channel=channel, queue_name=queue_name_two)
def forward_callback(ch, method, properties, body):
channel.basic_publish(exchange='', routing_key=queue_name_two, body=body)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def publish(queue_name, exchange_name):
# read from a queue, forward onto a 'topic' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def publish_callback(ch, method, properties, body):
message = json.loads(body.decode())
topic = message["topic"]
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def subscribe(queue_name, exchange_name, topic=None):
# setup bindings between queue and exchange,
# exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
connection, channel = pika_connect(host=host)
if topic is None:
fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name)
else:
topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic)
connection.close()
def listen(queue_name, callback):
# subscribe client to a queue, using the callback arg
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()