Commit f4a9de6d authored by James Kirk's avatar James Kirk
Browse files

feat: setup rmq functions in a wrapper

refactor: moved dans work into old_mq_wrapper
parent 73f478a0
File moved
File moved
File moved
File moved
File moved
import json
import pika
host='localhost'
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def pika_connect(host):
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
return connection, channel
def setup_queue(channel, queue_name=''): # TODO: Decide if this is too little or is ok. Maybe on setting up exchanges I need to expand this out
channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it
def fanout_exchange(channel, exchange_name, queue_name=None):
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
if queue_name:
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def topic_exchange(channel, exchange_name, topic=None, queue_name=None):
channel.exchange_declare(exchange=exchange_name, exchange_type='topic')
if queue_name:
if topic is None:
print("ERROR: If binding queue to a topic exchange, topic must be provided")
return
setup_queue(channel=channel, queue_name=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
def deliver_to_exchange(channel, body, exchange_name, topic=None): # TODO: Definitely verify I can use channel/'ch' like this in the callback
# connection, channel = pika_connect(host=host)
if topic is None:
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(exchange=exchange_name, routing_key='', body=body)
else:
topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic)
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body)
# connection.close()
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def write_to_queue(queue_name, msg):
# write a single message to a queue
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=msg)
connection.close()
def read_from_queue(queue_name, max_msgs):
# get messages off of a queue until the queue is empty of max_msgs is hit
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
messages = []
while len(messages) < max_msgs:
method_frame, header_frame, body = channel.basic_get(queue_name)
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
messages.append(json.loads(body.decode()))
else:
print("No message returned")
break
connection.close()
return messages
def broadcast(queue_name, exchange_name):
# read from a queue, forward onto a 'fanout' exchange
_, channel = pika_connect(host=host)
fanout_exchange(channel=channel, exchange_name=exchange_name)
def broadcast_callback(ch, method, properties, body):
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
channel.start_consuming()
def forward(queue_name_one, queue_name_two):
# read from a queue, forward onto a different queue
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name_one)
setup_queue(channel=channel, queue_name=queue_name_two)
def forward_callback(ch, method, properties, body):
write_to_queue(queue_name=queue_name_two, msg=body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback)
channel.start_consuming()
def publish(queue_name, exchange_name):
# read from a queue, forward onto a 'topic' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def publish_callback(ch, method, properties, body):
message = json.loads(body.decode())
topic = message["topic"]
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
channel.start_consuming()
def subscribe(queue_name, exchange_name, topic=None):
# setup bindings between queue and exchange,
# exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
_, channel = pika_connect(host=host)
if topic is None:
fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name)
else:
topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic)
def listen(queue_name, callback):
# subscribe client to a queue, using the callback arg
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()
\ No newline at end of file
......@@ -11,10 +11,7 @@
import concurrent.futures
from endpoints.clients import ClientsFile
from soar_broadcast import broadcast
from soar_forward import forward
from soar_publish import publish
from soar_subscribe import subscribe
from rmq import publish, subscribe, broadcast, forward
THREADS = []
EXCHANGES = {
......@@ -43,11 +40,15 @@ def main():
THREADS.append(thread)
# subscribe
thread = executor.submit(
subscribe,
subscribe,
f"{id}-inbox",
client["subscription"],
EXCHANGES.get("publish"),
EXCHANGES.get("broadcast"),
client["subscription"] # topic
)
thread = executor.submit(
subscribe,
f"{id}-inbox",
EXCHANGES.get("broadcast")
)
THREADS.append(thread)
# push
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment