diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fcb5979cac2b97a700e0a993e1e251507ffebe0..25cfaaee488d5663c477bafdbdf491db7a0594e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v1.0.0] - 2024-09-16 + +### Added + +- Create a queues API to enable listing queue size and emptying queues + +### Fixed + +- Removed child dependencies from requirements file + +### Changed + +- Refactored queue listening + ## [v0.1.0] - 2023-03-24 ### Added @@ -16,21 +30,22 @@ Create a mechanism to exchange messages between multiple clients Bus - Implements persistent queues per client -- Implements publish/subscribe message flow between clients -- Implements broadcast message flow between clients +- Implements publish/subscribe message flow between clients +- Implements broadcast message flow between clients - Listens for and responds to add/remove clients API - Implements clients endpoint to manage clients and subscriptions - Implements token endpoint to manage authentication client credentials grants -- Implements send endpoint for publishing -- Implements notify endpoint for broadcasts -- Implements receive endpoint to get messages +- Implements send endpoint for publishing +- Implements notify endpoint for broadcasts +- Implements receive endpoint to get messages Docker - Run local dev environment in docker-compose +[unreleased]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v1.0.0...dev +[v1.0.0]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v0.1.0...v1.0.0 [v0.1.0]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/611d9cab...v0.1.0 -[unreleased]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v0.1.0...dev diff --git a/api.py b/api.py index 015531b11deee1342a492dfa096c2356431757b7..fdfaaffa97c4b6696da88e2037bd49d3d9e027f2 100644 --- a/api.py +++ b/api.py @@ -10,6 +10,7 @@ from endpoints.notify import Notify from endpoints.receive import Receive from endpoints.send import Send from endpoints.token import Token +from endpoints.queues import Queue, QueueList from models.token_model import TokenModel from logger import setup_logging @@ -45,6 +46,8 @@ def create_app(): api.add_resource(Send, "/send") api.add_resource(Notify, "/notify") api.add_resource(Token, "/token") + api.add_resource(QueueList, "/queue") + api.add_resource(Queue, "/queue/<queue_name>") return app diff --git a/endpoints/queues.py b/endpoints/queues.py new file mode 100644 index 0000000000000000000000000000000000000000..e67732baf6ea8a319a010e05261ee510a1d31d0d --- /dev/null +++ b/endpoints/queues.py @@ -0,0 +1,49 @@ +from flask_restful import request, abort +from marshmallow import Schema, fields +from endpoints.auth_resource import AuthResource +from models.client_model import ClientModel +from rmq import get_queue_status, empty_queue + + +class Queue(AuthResource): + def get(self, queue_name): + allow = self.auth(request) + if allow: + queue_status = get_queue_status(queue_name) + return {"queue": queue_name, "properties": queue_status} + + def delete(self, queue_name): + allow = self.auth(request) + if allow: + emptied = empty_queue(queue_name) + queue_status = get_queue_status(queue_name) + queue_status["empty"] = emptied + return {"queue": queue_name, "properties": queue_status} + + +class QueueList(AuthResource): + def __init__(self): + super().__init__() + self.clients_file = ClientModel() + + def get(self): + allow = self.auth(request) + if allow: + queue_status = { + client_id: { + "inbox": get_queue_status(f"{client_id}-inbox"), + "outbox": get_queue_status(f"{client_id}-outbox"), + "broadcast": get_queue_status(f"{client_id}-broadcast"), + } + for client_id in self.clients_file.get().keys() + } + return [ + { + "client_id": client_id, + "queue": f"{client_id}-{queue_type}", + "type": queue_type, + "properties": queue_status, + } + for client_id, client_queues in queue_status.items() + for queue_type, queue_status in client_queues.items() + ] diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/models/__init__.pyc b/models/__init__.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0a3a9f8bcf422cfde5114416ad572ef7652e496f Binary files /dev/null and b/models/__init__.pyc differ diff --git a/models/client_model.pyc b/models/client_model.pyc new file mode 100644 index 0000000000000000000000000000000000000000..99e34bc823c72c20e457c213f9d73756a9c13be8 Binary files /dev/null and b/models/client_model.pyc differ diff --git a/requirements.txt b/requirements.txt index b3dd842a94e6f3fb0fdada32fa9465e044851322..8225e2d6dd6d818aa61b8087f8c676a55b7f988e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,31 +1,8 @@ --i https://pypi.org/simple -amqp==5.1.1 ; python_version >= '3.6' -aniso8601==9.0.1 -bson==0.5.10 -cffi==1.15.1 -click==8.1.3 ; python_version >= '3.7' cryptography==38.0.3 flask==2.2.2 flask-cors==3.0.10 flask-restful==0.3.9 -future==0.18.2 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3' -httplib2==0.21.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' -importlib-metadata==5.1.0 ; python_version < '3.10' -itsdangerous==2.1.2 ; python_version >= '3.7' -jinja2==3.1.2 ; python_version >= '3.7' -kombu==5.2.4 ; python_version >= '3.7' -markupsafe==2.1.1 ; python_version >= '3.7' marshmallow==3.19.0 -packaging>=22.0 ; python_version >= '3.6' pika==1.3.1 -pubsubpy==2.3.0 -pycparser==2.21 -pyparsing==3.0.9 ; python_full_version >= '3.6.8' -pyrabbit==1.1.0 -python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' -pytz==2022.6 -six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' -vine==5.0.0 ; python_version >= '3.6' watchdog==2.2.1 -werkzeug==2.2.2 ; python_version >= '3.7' -zipp==3.10.0 ; python_version >= '3.7' +werkzeug==2.2.2 diff --git a/rmq.py b/rmq.py index f103953ac928d646603191445b8a0718999a6017..ac05e02ffb20bbb6fa31f0433543ee315a7b232c 100644 --- a/rmq.py +++ b/rmq.py @@ -4,6 +4,7 @@ import os import pika + host = os.getenv( "MQ_HOST", "localhost" ) # Sets to whatever MQ_HOST is, or defaults to localhost @@ -11,27 +12,6 @@ host = os.getenv( # ------------------------------------------------------------------------------------------------------------------------------------------------------------- -def pika_connect(host): - try: - connection = pika.BlockingConnection(pika.ConnectionParameters(host)) - except Exception: - connection = None - - if connection is not None: - channel = connection.channel() - else: - logging.error( - "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" - % host - ) - raise Exception( - "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" - % host - ) - - return connection, channel - - def setup_queue(channel, queue_name=""): channel.queue_declare( queue=queue_name, exclusive=False, durable=True @@ -74,6 +54,25 @@ def deliver_to_exchange(channel, body, exchange_name, topic=None): # ------------------------------------------------------------------------------------------------------------------------------------------------------------- +def pika_connect(host): + try: + connection = pika.BlockingConnection(pika.ConnectionParameters(host)) + except Exception: + connection = None + + if connection is not None: + channel = connection.channel() + else: + logging.error( + "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" + % host + ) + raise Exception( + "ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?" + % host + ) + + return connection, channel def write_to_queue(queue_name, msg): @@ -121,10 +120,8 @@ def read_from_queue(queue_name, max_msgs): return messages -def broadcast(queue_name, exchange_name): +def broadcast(channel, queue_name, exchange_name): # read from a queue, forward onto a 'fanout' exchange - _, channel = pika_connect(host=host) - setup_queue(channel=channel, queue_name=queue_name) def broadcast_callback(ch, method, properties, body): @@ -134,16 +131,15 @@ def broadcast(queue_name, exchange_name): ch.basic_ack(delivery_tag=method.delivery_tag) try: - channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback) - channel.start_consuming() + return channel.basic_consume( + queue=queue_name, on_message_callback=broadcast_callback + ) except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) -def forward(from_queue, to_queue): +def forward(channel, from_queue, to_queue): # read from a queue, forward onto a different queue - _, channel = pika_connect(host=host) - setup_queue(channel=channel, queue_name=from_queue) setup_queue(channel=channel, queue_name=to_queue) @@ -161,16 +157,15 @@ def forward(from_queue, to_queue): ch.basic_ack(delivery_tag=method.delivery_tag) try: - channel.basic_consume(queue=from_queue, on_message_callback=forward_callback) - channel.start_consuming() + return channel.basic_consume( + queue=from_queue, on_message_callback=forward_callback + ) except pika.exceptions.AMQPChannelError as err: logging.error("Caught a channel error: {}, stopping...".format(err)) -def publish(queue_name, exchange_name): +def publish(channel, queue_name, exchange_name): # read from a queue, forward onto a 'topic' exchange - _, channel = pika_connect(host=host) - setup_queue(channel=channel, queue_name=queue_name) def publish_callback(ch, method, properties, body): @@ -187,18 +182,16 @@ def publish(queue_name, exchange_name): try: channel.basic_consume(queue=queue_name, on_message_callback=publish_callback) - channel.start_consuming() except pika.exceptions.AMQPChannelError as err: print("Caught a channel error: {}, stopping...".format(err)) -def subscribe(queue_name, exchange_name, topic=None): +def subscribe(channel, queue_name, exchange_name, topic=None): logging.debug( f"Subscribe queue: {queue_name} to {exchange_name} with topic {topic}" ) # setup bindings between queue and exchange, # exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed - connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name) if topic is None: @@ -208,15 +201,34 @@ def subscribe(queue_name, exchange_name, topic=None): topic_exchange(channel=channel, exchange_name=exchange_name) channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic) - connection.close() - -def listen(queue_name, callback): +def listen(channel, queue_name, callback): logging.debug(f"Listen to queue: {queue_name}") # subscribe client to a queue, using the callback arg - _, channel = pika_connect(host=host) - setup_queue(channel=channel, queue_name=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=callback) - channel.start_consuming() + + +def get_queue_status(queue_name): + connection, channel = pika_connect(host=host) + response = channel.queue_declare(queue=queue_name, passive=True) + queue_status = { + "size": response.method.message_count, + "has_consumer": response.method.consumer_count > 0, + } + logging.debug(f"Queue: {queue_name} contains {queue_status['size']} messages") + connection.close() + return queue_status + + +def empty_queue(queue_name): + connection, channel = pika_connect(host=host) + emptied = True + try: + channel.queue_purge(queue_name) + except ValueError: + emptied = False + + connection.close() + return emptied diff --git a/setup.py b/setup.py index d2e0c1d220b6bea395fc253003d8531465bf33d9..2671f40604b76c30a12c512a36148676e9fab5e5 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ requirements = [x.strip() for x in open("requirements.txt", "r") if x.strip() != setup( name="communications-backbone", - version="0.1.0", + version="1.0.0", description="Communications backbone ", author="NOC C2 Team", author_email="c2@noc.ac.uk", diff --git a/soar_bus.py b/soar_bus.py index aa8fc0054859253232ac53d1f33e0bb25d6661b2..ec5bdae2327f426be019f9357859de429b8dc157 100644 --- a/soar_bus.py +++ b/soar_bus.py @@ -11,22 +11,21 @@ import logging import os import socket +import threading import time -import concurrent.futures -from watchdog.observers import Observer +import pika from watchdog.events import FileSystemEventHandler - -from rmq import broadcast, forward, publish, subscribe -from models.client_model import ClientModel +from watchdog.observers import Observer from logger import setup_logging - +from models.client_model import ClientModel +from rmq import broadcast, forward, publish, subscribe setup_logging() -THREADS = {} RUNNING_CLIENTS = [] +CONSUMER_TAGS = {} EXCHANGES = { "publish": "soar_publish", "broadcast": "soar_broadcast", @@ -34,8 +33,9 @@ EXCHANGES = { class ConfigHandler(FileSystemEventHandler): - def __init__(self): + def __init__(self, channel): self.client_model = ClientModel() + self.channel = channel super().__init__() def on_modified(self, event): @@ -44,120 +44,148 @@ class ConfigHandler(FileSystemEventHandler): logging.debug("Reloading client config...") clients = self.client_model.get() updated_client_ids = list(clients.keys()) - update_clients(updated_client_ids) + self.update_clients(updated_client_ids) - -def update_clients(updated_client_ids): - global RUNNING_CLIENTS - with concurrent.futures.ThreadPoolExecutor() as executor: + def update_clients(self, updated_client_ids): + global RUNNING_CLIENTS logging.debug("Old: " + str(RUNNING_CLIENTS)) logging.debug("New: " + str(updated_client_ids)) for client_id in updated_client_ids: if client_id not in RUNNING_CLIENTS: - run_client(client_id, executor) + run_client(client_id, self.channel) logging.info(f"Started client: {client_id}") for client_id in RUNNING_CLIENTS: if client_id not in updated_client_ids: - stop_client(client_id) + self.stop_client(client_id) logging.info(f"Shutdown client: {client_id}") - -def watch_config(running_clients): - # Set global RUNNING_CLIENTS inside thread - global RUNNING_CLIENTS - RUNNING_CLIENTS = running_clients - logging.info("Starting config watcher...") - event_handler = ConfigHandler() - observer = Observer() - observer.schedule(event_handler, path="./data", recursive=False) - observer.start() - - while True: + def stop_client(self, client_id): + global RUNNING_CLIENTS + global CONSUMER_TAGS + stopping = False try: - pass - except KeyboardInterrupt as interrupt: - observer.stop() - raise interrupt - - -def stop_client(client_id): - global RUNNING_CLIENTS - stopping = False - try: - logging.info(f"Stopping client: {client_id}") - client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"] - for thread in client_threads: - thread_name = f"{client_id}-{thread}" - if thread_name in THREADS: - THREADS[thread_name].cancel() - if client_id in RUNNING_CLIENTS: - RUNNING_CLIENTS.remove(client_id) - stopping = True - except Exception as error: - logging.error(str(error)) - return stopping - - -def run_client(client_id, executor): + logging.info(f"Stopping client: {client_id}") + client_tags = CONSUMER_TAGS[client_id] + + self.channel.basic_cancel(client_tags[f"{client_id}-broadcast"]) + self.channel.basic_cancel(client_tags[f"{client_id}-outbox"]) + self.channel.queue_unbind( + queue=f"{client_id}-inbox", + exchange=client_tags[f"{client_id}-inbox-publish"], + ) + self.channel.queue_unbind( + queue=f"{client_id}-inbox", + exchange=client_tags[f"{client_id}-inbox-broadcast"], + ) + + if client_id in RUNNING_CLIENTS: + RUNNING_CLIENTS.remove(client_id) + stopping = True + except Exception as error: + logging.error(str(error)) + return stopping + + +class WatchConfigThread(threading.Thread): + def __init__(self, running_clients, channel): + threading.Thread.__init__(self) + self.daemon = True + self.running_clients = running_clients + self.channel = channel + self.start() + + def run(self): + logging.info("Starting config watcher...") + event_handler = ConfigHandler(self.channel) + observer = Observer() + observer.schedule(event_handler, path="./data", recursive=False) + observer.start() + + while True: + try: + time.sleep(1) + except KeyboardInterrupt as interrupt: + observer.stop() + raise interrupt + + +class SoarBusThread(threading.Thread): + def __init__(self, clients, channel): + threading.Thread.__init__(self) + self.daemon = True + self.clients = clients + self.channel = channel + self.start() + + def run(self): + logging.info("Starting SOAR bus...") + + publish(self.channel, "soar-publish", EXCHANGES.get("publish")) + + for id in self.clients.keys(): + run_client(id, self.channel) + + +def run_client(client_id, channel): global RUNNING_CLIENTS + global CONSUMER_TAGS client_model = ClientModel() client = client_model.find(client_id) - running = False + try: client_id = client["client_id"] logging.info(f"Running client: {client_id}") - # forward - thread = executor.submit(forward, f"{client_id}-outbox", "soar-publish") - THREADS[f"{client_id}-outbox"] = thread - # broadcast - thread = executor.submit( - broadcast, f"{client_id}-broadcast", EXCHANGES.get("broadcast") + forward_consumer_tag = forward(channel, f"{client_id}-outbox", "soar-publish") + broadcast_consumer_tag = broadcast( + channel, f"{client_id}-broadcast", EXCHANGES.get("broadcast") ) - THREADS[f"{client_id}-broadcast"] = thread - subscribe( + channel, f"{client_id}-inbox", EXCHANGES.get("publish"), client["subscription"], ) - subscribe(f"{client_id}-inbox", EXCHANGES.get("broadcast")) + subscribe(channel, f"{client_id}-inbox", EXCHANGES.get("broadcast")) + + CONSUMER_TAGS[client_id] = { + f"{client_id}-broadcast": broadcast_consumer_tag, + f"{client_id}-outbox": forward_consumer_tag, + f"{client_id}-inbox-publish": EXCHANGES.get("publish"), + f"{client_id}-inbox-broadcast": EXCHANGES.get("broadcast"), + } + if client_id not in RUNNING_CLIENTS: + logging.debug(f"Appending client_id '{client_id}'") RUNNING_CLIENTS.append(client_id) - running = True + except Exception as error: logging.error(str(error)) - return running -def main(clients, executor): +def on_channel_open(channel): + # Invoked when the channel is open global RUNNING_CLIENTS - logging.info("Starting SOAR bus...") - # publish - thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish")) - THREADS["soar-publish"] = thread + client_model = ClientModel() + clients = client_model.get() + client_count = len(clients.keys()) + logging.debug(f"Running {client_count} clients") - for id in clients.keys(): - run_client(id, executor) + SoarBusThread(clients, channel) + WatchConfigThread(RUNNING_CLIENTS, channel) - # Global vars are not shared across threads so you - # have to pass the global var into the thread - thread = executor.submit(watch_config, RUNNING_CLIENTS) - THREADS["config-watcher"] = thread - # Make sure the threads are actually running, error if not, - # this allows the SOAR Bus to actually wait for RMQ to start running - for thread_name, thread in THREADS.items(): - thread.result() - try: - logging.debug(thread_name) - logging.debug(thread.result()) - except Exception as e: - logging.error(e) - raise e +def on_connection_open(connection): + # Invoked when the connection is open + connection.channel(on_open_callback=on_channel_open) + + +def on_connection_close(connection, exception): + # Invoked when the connection is closed + connection.ioloop.stop() if __name__ == "__main__": @@ -176,16 +204,16 @@ if __name__ == "__main__": pingcounter += 1 s.close() + host = os.getenv("MQ_HOST", "localhost") + connection = pika.SelectConnection( + pika.ConnectionParameters(host), + on_open_callback=on_connection_open, + on_close_callback=on_connection_close, + ) + try: - client_model = ClientModel() - clients = client_model.get() - client_count = len(clients.keys()) - thread_count = (client_count * 2) + 2 - logging.debug(f"Running {thread_count} workers for {client_count} clients") - - with concurrent.futures.ThreadPoolExecutor( - max_workers=thread_count - ) as executor: - main(clients, executor) + connection.ioloop.start() except KeyboardInterrupt: - executor.shutdown(wait=False) + # Loop until fully closed + connection.close() + connection.ioloop.start()