diff --git a/soar_broadcast.py b/old_mq_wrapper/soar_broadcast.py similarity index 100% rename from soar_broadcast.py rename to old_mq_wrapper/soar_broadcast.py diff --git a/soar_forward.py b/old_mq_wrapper/soar_forward.py similarity index 100% rename from soar_forward.py rename to old_mq_wrapper/soar_forward.py diff --git a/soar_publish.py b/old_mq_wrapper/soar_publish.py similarity index 100% rename from soar_publish.py rename to old_mq_wrapper/soar_publish.py diff --git a/soar_push.py b/old_mq_wrapper/soar_push.py similarity index 100% rename from soar_push.py rename to old_mq_wrapper/soar_push.py diff --git a/soar_subscribe.py b/old_mq_wrapper/soar_subscribe.py similarity index 100% rename from soar_subscribe.py rename to old_mq_wrapper/soar_subscribe.py diff --git a/rmq.py b/rmq.py new file mode 100644 index 0000000000000000000000000000000000000000..90cbda69bcc9052dd304e9e39d03cca7c52036c0 --- /dev/null +++ b/rmq.py @@ -0,0 +1,142 @@ +import json + +import pika + +host='localhost' + +# ------------------------------------------------------------------------------------------------------------------------------------------------------------- + +def pika_connect(host): + connection = pika.BlockingConnection(pika.ConnectionParameters(host)) + channel = connection.channel() + return connection, channel + + +def setup_queue(channel, queue_name=''): # TODO: Decide if this is too little or is ok. Maybe on setting up exchanges I need to expand this out + channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it + + +def fanout_exchange(channel, exchange_name, queue_name=None): + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') + if queue_name: + setup_queue(channel=channel, queue_name=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + + +def topic_exchange(channel, exchange_name, topic=None, queue_name=None): + channel.exchange_declare(exchange=exchange_name, exchange_type='topic') + if queue_name: + if topic is None: + print("ERROR: If binding queue to a topic exchange, topic must be provided") + return + setup_queue(channel=channel, queue_name=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) + + +def deliver_to_exchange(channel, body, exchange_name, topic=None): # TODO: Definitely verify I can use channel/'ch' like this in the callback + # connection, channel = pika_connect(host=host) + if topic is None: + fanout_exchange(channel=channel, exchange_name=exchange_name) + channel.basic_publish(exchange=exchange_name, routing_key='', body=body) + else: + topic_exchange(channel=channel, exchange_name=exchange_name, topic=topic) + channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body) + # connection.close() + +# ------------------------------------------------------------------------------------------------------------------------------------------------------------- + +def write_to_queue(queue_name, msg): + # write a single message to a queue + connection, channel = pika_connect(host=host) + setup_queue(channel=channel, queue_name=queue_name) + + channel.basic_publish(exchange='', routing_key=queue_name, body=msg) + + connection.close() + + +def read_from_queue(queue_name, max_msgs): + # get messages off of a queue until the queue is empty of max_msgs is hit + connection, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + messages = [] + while len(messages) < max_msgs: + method_frame, header_frame, body = channel.basic_get(queue_name) + if method_frame: + print(method_frame, header_frame, body) + channel.basic_ack(method_frame.delivery_tag) + messages.append(json.loads(body.decode())) + else: + print("No message returned") + break + + connection.close() + return messages + + +def broadcast(queue_name, exchange_name): + # read from a queue, forward onto a 'fanout' exchange + _, channel = pika_connect(host=host) + + fanout_exchange(channel=channel, exchange_name=exchange_name) + + def broadcast_callback(ch, method, properties, body): + deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name) + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback) + channel.start_consuming() + + +def forward(queue_name_one, queue_name_two): + # read from a queue, forward onto a different queue + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name_one) + setup_queue(channel=channel, queue_name=queue_name_two) + + def forward_callback(ch, method, properties, body): + write_to_queue(queue_name=queue_name_two, msg=body) + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume(queue=queue_name_one, on_message_callback=forward_callback) + channel.start_consuming() + + +def publish(queue_name, exchange_name): + # read from a queue, forward onto a 'topic' exchange + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + def publish_callback(ch, method, properties, body): + message = json.loads(body.decode()) + topic = message["topic"] + deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic) + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume(queue=queue_name, on_message_callback=publish_callback) + channel.start_consuming() + + +def subscribe(queue_name, exchange_name, topic=None): + # setup bindings between queue and exchange, + # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed + _, channel = pika_connect(host=host) + + if topic is None: + fanout_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name) + else: + topic_exchange(channel=channel, queue_name=queue_name, exchange_name=exchange_name, topic=topic) + + +def listen(queue_name, callback): + # subscribe client to a queue, using the callback arg + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + channel.basic_consume(queue=queue_name, on_message_callback=callback) + channel.start_consuming() \ No newline at end of file diff --git a/soar_bus.py b/soar_bus.py index 3576ea3a75a390ec4cd2de7ce111d7435d5eabf4..44078320ea8b6b5ae3f62028017590e4d2991e3e 100644 --- a/soar_bus.py +++ b/soar_bus.py @@ -11,10 +11,7 @@ import concurrent.futures from endpoints.clients import ClientsFile -from soar_broadcast import broadcast -from soar_forward import forward -from soar_publish import publish -from soar_subscribe import subscribe +from rmq import publish, subscribe, broadcast, forward THREADS = [] EXCHANGES = { @@ -43,11 +40,15 @@ def main(): THREADS.append(thread) # subscribe thread = executor.submit( - subscribe, + subscribe, f"{id}-inbox", - client["subscription"], EXCHANGES.get("publish"), - EXCHANGES.get("broadcast"), + client["subscription"] # topic + ) + thread = executor.submit( + subscribe, + f"{id}-inbox", + EXCHANGES.get("broadcast") ) THREADS.append(thread) # push