diff --git a/endpoints/hello.py b/endpoints/hello.py deleted file mode 100644 index 7bb0bab0de35c7cebf2d22fac1f1213b16ef90df..0000000000000000000000000000000000000000 --- a/endpoints/hello.py +++ /dev/null @@ -1,6 +0,0 @@ -from flask_restful import Resource - - -class HelloWorld(Resource): - def get(self): - return {"hello": "world"} diff --git a/endpoints/notify.py b/endpoints/notify.py index a953db879eee9b7b7b22ce34de44e57f93d9e772..22bcb8df0eb1e8302db0549e30d769339826979d 100644 --- a/endpoints/notify.py +++ b/endpoints/notify.py @@ -1,8 +1,10 @@ import json -from flask_restful import request, abort + +from flask_restful import abort, request from marshmallow import Schema, fields -import pika + from endpoints.auth_resource import AuthResource +from rmq import write_to_queue class NotifySchema(Schema): @@ -33,8 +35,4 @@ class Notify(AuthResource): if allow: notify_queue = self.client['client_id'] + "-broadcast" - connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) - channel = connection.channel() - channel.queue_declare(queue=notify_queue, durable=True) - channel.basic_publish(exchange="", routing_key=notify_queue, body=json.dumps(message)) - connection.close() + write_to_queue(queue_name=notify_queue, msg=json.dumps(message)) \ No newline at end of file diff --git a/endpoints/receive.py b/endpoints/receive.py index bc1febaa41d5fbd2e724c2bf77521842952c6c88..5c1f99c9865248c259e83ecb9fd2df6ddfabb084 100644 --- a/endpoints/receive.py +++ b/endpoints/receive.py @@ -1,9 +1,8 @@ -from flask_restful import request, abort +from flask_restful import abort, request from marshmallow import Schema, fields -import pika -import json -from models.token import TokenModel + from endpoints.auth_resource import AuthResource +from rmq import read_from_queue class ReceiveQuerySchema(Schema): @@ -23,28 +22,9 @@ class Receive(AuthResource): if errors: abort(400, message=str(errors)) - messages = [] max_messages = request.args.get("max_messages", 10) - + allow = self.auth(request) if allow: inbox_queue = self.client['client_id'] + "-inbox" - - if allow: - connection = pika.BlockingConnection( - pika.ConnectionParameters(host="localhost") - ) - channel = connection.channel() - channel.queue_declare(queue=inbox_queue, durable=True) - while len(messages) < max_messages: - method_frame, header_frame, body = channel.basic_get(inbox_queue) - if method_frame: - print(method_frame, header_frame, body) - channel.basic_ack(method_frame.delivery_tag) - messages.append(json.loads(body.decode())) - else: - print("No message returned") - break - channel.close() - connection.close() - return messages + return read_from_queue(queue_name=inbox_queue, max_msgs=max_messages) \ No newline at end of file diff --git a/endpoints/send.py b/endpoints/send.py index 900e8a5c63d066921315e9e93e22fb8d4eb59d9a..9fe91ca548973ee32d36ce51cd5e8bdd89f23533 100644 --- a/endpoints/send.py +++ b/endpoints/send.py @@ -1,8 +1,11 @@ import json -from flask_restful import request, abort + +from flask_restful import abort, request from marshmallow import Schema, fields -import pika + from endpoints.auth_resource import AuthResource +from rmq import write_to_queue + class SendSchema(Schema): body = fields.Str(required=True) @@ -28,13 +31,9 @@ class Send(AuthResource): body = args.get("body") topic = args.get("topic") outbox_queue = self.client['client_id'] + "-outbox" - - connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) - channel = connection.channel() - channel.queue_declare(queue=outbox_queue, durable=True) message = { 'topic': topic, 'message': body, } - channel.basic_publish(exchange="", routing_key=outbox_queue, body=json.dumps(message)) - connection.close() + + write_to_queue(queue_name=outbox_queue, msg=json.dumps(message)) \ No newline at end of file diff --git a/soar_broadcast.py b/old_mq_wrapper/soar_broadcast.py similarity index 100% rename from soar_broadcast.py rename to old_mq_wrapper/soar_broadcast.py diff --git a/soar_forward.py b/old_mq_wrapper/soar_forward.py similarity index 100% rename from soar_forward.py rename to old_mq_wrapper/soar_forward.py diff --git a/soar_publish.py b/old_mq_wrapper/soar_publish.py similarity index 100% rename from soar_publish.py rename to old_mq_wrapper/soar_publish.py diff --git a/soar_push.py b/old_mq_wrapper/soar_push.py similarity index 100% rename from soar_push.py rename to old_mq_wrapper/soar_push.py diff --git a/soar_subscribe.py b/old_mq_wrapper/soar_subscribe.py similarity index 100% rename from soar_subscribe.py rename to old_mq_wrapper/soar_subscribe.py diff --git a/rmq.py b/rmq.py new file mode 100644 index 0000000000000000000000000000000000000000..e5f7d35a3465ef6101b943084a476d82b0f27362 --- /dev/null +++ b/rmq.py @@ -0,0 +1,177 @@ +import json + +import pika + +host='localhost' # TODO Handle host being passed in (https://git.noc.ac.uk/communications-backbone-system/communications-backbone/-/issues/17) + +# ------------------------------------------------------------------------------------------------------------------------------------------------------------- + +def pika_connect(host): + connection = pika.BlockingConnection(pika.ConnectionParameters(host)) + channel = connection.channel() + return connection, channel + + +def setup_queue(channel, queue_name=''): + channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it + + +def fanout_exchange(channel, exchange_name): + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True) + + +def topic_exchange(channel, exchange_name): + channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True) + + +def deliver_to_exchange(channel, body, exchange_name, topic=None): + if topic is None: + fanout_exchange(channel=channel, exchange_name=exchange_name) + channel.basic_publish( + exchange=exchange_name, + routing_key='', + body=body, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + else: + topic_exchange(channel=channel, exchange_name=exchange_name) + channel.basic_publish( + exchange=exchange_name, + routing_key=topic, + body=body, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + +# ------------------------------------------------------------------------------------------------------------------------------------------------------------- + +def write_to_queue(queue_name, msg): + # write a single message to a queue + connection, channel = pika_connect(host=host) + setup_queue(channel=channel, queue_name=queue_name) + + channel.basic_publish( + exchange='', + routing_key=queue_name, + body=msg, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + + connection.close() + + +def read_from_queue(queue_name, max_msgs): + # get messages off of a queue until the queue is empty or max_msgs is hit + connection, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + messages = [] + while len(messages) < max_msgs: + method_frame, header_frame, body = channel.basic_get(queue_name) + if method_frame: + print(method_frame, header_frame, body) + channel.basic_ack(method_frame.delivery_tag) + try: + messages.append(json.loads(body.decode())) + except json.decoder.JSONDecodeError: + messages.append(body.decode()) + else: + print("No message returned") + break + + connection.close() + return messages + + +def broadcast(queue_name, exchange_name): + # read from a queue, forward onto a 'fanout' exchange + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + def broadcast_callback(ch, method, properties, body): + deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name) + ch.basic_ack(delivery_tag=method.delivery_tag) + + try: + channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback) + channel.start_consuming() + except pika.exceptions.AMQPChannelError as err: + print("Caught a channel error: {}, stopping...".format(err)) + + +def forward(from_queue, to_queue): + # read from a queue, forward onto a different queue + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=from_queue) + setup_queue(channel=channel, queue_name=to_queue) + + def forward_callback(ch, method, properties, body): + channel.basic_publish( + exchange='', + routing_key=to_queue, + body=body, + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + ) + ) + ch.basic_ack(delivery_tag=method.delivery_tag) + + try: + channel.basic_consume(queue=from_queue, on_message_callback=forward_callback) + channel.start_consuming() + except pika.exceptions.AMQPChannelError as err: + print("Caught a channel error: {}, stopping...".format(err)) + + +def publish(queue_name, exchange_name): + # read from a queue, forward onto a 'topic' exchange + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + def publish_callback(ch, method, properties, body): + message = json.loads(body.decode()) + topic = message["topic"] + deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic) + ch.basic_ack(delivery_tag=method.delivery_tag) + + try: + channel.basic_consume(queue=queue_name, on_message_callback=publish_callback) + channel.start_consuming() + except pika.exceptions.AMQPChannelError as err: + print("Caught a channel error: {}, stopping...".format(err)) + + +def subscribe(queue_name, exchange_name, topic=None): + # setup bindings between queue and exchange, + # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed + connection, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + if topic is None: + fanout_exchange(channel=channel, exchange_name=exchange_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + else: + topic_exchange(channel=channel, exchange_name=exchange_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) + + connection.close() + + +def listen(queue_name, callback): + # subscribe client to a queue, using the callback arg + _, channel = pika_connect(host=host) + + setup_queue(channel=channel, queue_name=queue_name) + + channel.basic_consume(queue=queue_name, on_message_callback=callback) + channel.start_consuming() \ No newline at end of file diff --git a/soar_bus.py b/soar_bus.py index 3576ea3a75a390ec4cd2de7ce111d7435d5eabf4..44078320ea8b6b5ae3f62028017590e4d2991e3e 100644 --- a/soar_bus.py +++ b/soar_bus.py @@ -11,10 +11,7 @@ import concurrent.futures from endpoints.clients import ClientsFile -from soar_broadcast import broadcast -from soar_forward import forward -from soar_publish import publish -from soar_subscribe import subscribe +from rmq import publish, subscribe, broadcast, forward THREADS = [] EXCHANGES = { @@ -43,11 +40,15 @@ def main(): THREADS.append(thread) # subscribe thread = executor.submit( - subscribe, + subscribe, f"{id}-inbox", - client["subscription"], EXCHANGES.get("publish"), - EXCHANGES.get("broadcast"), + client["subscription"] # topic + ) + thread = executor.submit( + subscribe, + f"{id}-inbox", + EXCHANGES.get("broadcast") ) THREADS.append(thread) # push