Commit b9754c55 authored by Dan Jones's avatar Dan Jones
Browse files

Merge branch '56-release-v1-0-0' into 'master'

Resolve "Release v1.0.0"

See merge request !26
1 merge request!26Resolve "Release v1.0.0"
Pipeline #233750 passed with stages
in 47 seconds
......@@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
## [v1.0.0] - 2024-09-16
### Added
- Create a queues API to enable listing queue size and emptying queues
### Fixed
- Removed child dependencies from requirements file
### Changed
- Refactored queue listening
## [v0.1.0] - 2023-03-24
### Added
......@@ -16,21 +30,22 @@ Create a mechanism to exchange messages between multiple clients
Bus
- Implements persistent queues per client
- Implements publish/subscribe message flow between clients
- Implements broadcast message flow between clients
- Implements publish/subscribe message flow between clients
- Implements broadcast message flow between clients
- Listens for and responds to add/remove clients
API
- Implements clients endpoint to manage clients and subscriptions
- Implements token endpoint to manage authentication client credentials grants
- Implements send endpoint for publishing
- Implements notify endpoint for broadcasts
- Implements receive endpoint to get messages
- Implements send endpoint for publishing
- Implements notify endpoint for broadcasts
- Implements receive endpoint to get messages
Docker
- Run local dev environment in docker-compose
[unreleased]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v1.0.0...dev
[v1.0.0]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v0.1.0...v1.0.0
[v0.1.0]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/611d9cab...v0.1.0
[unreleased]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v0.1.0...dev
......@@ -10,6 +10,7 @@ from endpoints.notify import Notify
from endpoints.receive import Receive
from endpoints.send import Send
from endpoints.token import Token
from endpoints.queues import Queue, QueueList
from models.token_model import TokenModel
from logger import setup_logging
......@@ -45,6 +46,8 @@ def create_app():
api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token")
api.add_resource(QueueList, "/queue")
api.add_resource(Queue, "/queue/<queue_name>")
return app
......
from flask_restful import request, abort
from marshmallow import Schema, fields
from endpoints.auth_resource import AuthResource
from models.client_model import ClientModel
from rmq import get_queue_status, empty_queue
class Queue(AuthResource):
def get(self, queue_name):
allow = self.auth(request)
if allow:
queue_status = get_queue_status(queue_name)
return {"queue": queue_name, "properties": queue_status}
def delete(self, queue_name):
allow = self.auth(request)
if allow:
emptied = empty_queue(queue_name)
queue_status = get_queue_status(queue_name)
queue_status["empty"] = emptied
return {"queue": queue_name, "properties": queue_status}
class QueueList(AuthResource):
def __init__(self):
super().__init__()
self.clients_file = ClientModel()
def get(self):
allow = self.auth(request)
if allow:
queue_status = {
client_id: {
"inbox": get_queue_status(f"{client_id}-inbox"),
"outbox": get_queue_status(f"{client_id}-outbox"),
"broadcast": get_queue_status(f"{client_id}-broadcast"),
}
for client_id in self.clients_file.get().keys()
}
return [
{
"client_id": client_id,
"queue": f"{client_id}-{queue_type}",
"type": queue_type,
"properties": queue_status,
}
for client_id, client_queues in queue_status.items()
for queue_type, queue_status in client_queues.items()
]
File added
File added
-i https://pypi.org/simple
amqp==5.1.1 ; python_version >= '3.6'
aniso8601==9.0.1
bson==0.5.10
cffi==1.15.1
click==8.1.3 ; python_version >= '3.7'
cryptography==38.0.3
flask==2.2.2
flask-cors==3.0.10
flask-restful==0.3.9
future==0.18.2 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
httplib2==0.21.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
importlib-metadata==5.1.0 ; python_version < '3.10'
itsdangerous==2.1.2 ; python_version >= '3.7'
jinja2==3.1.2 ; python_version >= '3.7'
kombu==5.2.4 ; python_version >= '3.7'
markupsafe==2.1.1 ; python_version >= '3.7'
marshmallow==3.19.0
packaging>=22.0 ; python_version >= '3.6'
pika==1.3.1
pubsubpy==2.3.0
pycparser==2.21
pyparsing==3.0.9 ; python_full_version >= '3.6.8'
pyrabbit==1.1.0
python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
pytz==2022.6
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
vine==5.0.0 ; python_version >= '3.6'
watchdog==2.2.1
werkzeug==2.2.2 ; python_version >= '3.7'
zipp==3.10.0 ; python_version >= '3.7'
werkzeug==2.2.2
......@@ -4,6 +4,7 @@ import os
import pika
host = os.getenv(
"MQ_HOST", "localhost"
) # Sets to whatever MQ_HOST is, or defaults to localhost
......@@ -11,27 +12,6 @@ host = os.getenv(
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def pika_connect(host):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
except Exception:
connection = None
if connection is not None:
channel = connection.channel()
else:
logging.error(
"ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?"
% host
)
raise Exception(
"ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?"
% host
)
return connection, channel
def setup_queue(channel, queue_name=""):
channel.queue_declare(
queue=queue_name, exclusive=False, durable=True
......@@ -74,6 +54,25 @@ def deliver_to_exchange(channel, body, exchange_name, topic=None):
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def pika_connect(host):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
except Exception:
connection = None
if connection is not None:
channel = connection.channel()
else:
logging.error(
"ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?"
% host
)
raise Exception(
"ERROR: Pika has been unable to connect to host '%s'. Is RabbitMQ running?"
% host
)
return connection, channel
def write_to_queue(queue_name, msg):
......@@ -121,10 +120,8 @@ def read_from_queue(queue_name, max_msgs):
return messages
def broadcast(queue_name, exchange_name):
def broadcast(channel, queue_name, exchange_name):
# read from a queue, forward onto a 'fanout' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def broadcast_callback(ch, method, properties, body):
......@@ -134,16 +131,15 @@ def broadcast(queue_name, exchange_name):
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
channel.start_consuming()
return channel.basic_consume(
queue=queue_name, on_message_callback=broadcast_callback
)
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def forward(from_queue, to_queue):
def forward(channel, from_queue, to_queue):
# read from a queue, forward onto a different queue
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=from_queue)
setup_queue(channel=channel, queue_name=to_queue)
......@@ -161,16 +157,15 @@ def forward(from_queue, to_queue):
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
channel.start_consuming()
return channel.basic_consume(
queue=from_queue, on_message_callback=forward_callback
)
except pika.exceptions.AMQPChannelError as err:
logging.error("Caught a channel error: {}, stopping...".format(err))
def publish(queue_name, exchange_name):
def publish(channel, queue_name, exchange_name):
# read from a queue, forward onto a 'topic' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def publish_callback(ch, method, properties, body):
......@@ -187,18 +182,16 @@ def publish(queue_name, exchange_name):
try:
channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def subscribe(queue_name, exchange_name, topic=None):
def subscribe(channel, queue_name, exchange_name, topic=None):
logging.debug(
f"Subscribe queue: {queue_name} to {exchange_name} with topic {topic}"
)
# setup bindings between queue and exchange,
# exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
if topic is None:
......@@ -208,15 +201,34 @@ def subscribe(queue_name, exchange_name, topic=None):
topic_exchange(channel=channel, exchange_name=exchange_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
connection.close()
def listen(queue_name, callback):
def listen(channel, queue_name, callback):
logging.debug(f"Listen to queue: {queue_name}")
# subscribe client to a queue, using the callback arg
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()
def get_queue_status(queue_name):
connection, channel = pika_connect(host=host)
response = channel.queue_declare(queue=queue_name, passive=True)
queue_status = {
"size": response.method.message_count,
"has_consumer": response.method.consumer_count > 0,
}
logging.debug(f"Queue: {queue_name} contains {queue_status['size']} messages")
connection.close()
return queue_status
def empty_queue(queue_name):
connection, channel = pika_connect(host=host)
emptied = True
try:
channel.queue_purge(queue_name)
except ValueError:
emptied = False
connection.close()
return emptied
......@@ -5,7 +5,7 @@ requirements = [x.strip() for x in open("requirements.txt", "r") if x.strip() !=
setup(
name="communications-backbone",
version="0.1.0",
version="1.0.0",
description="Communications backbone ",
author="NOC C2 Team",
author_email="c2@noc.ac.uk",
......
......@@ -11,22 +11,21 @@
import logging
import os
import socket
import threading
import time
import concurrent.futures
from watchdog.observers import Observer
import pika
from watchdog.events import FileSystemEventHandler
from rmq import broadcast, forward, publish, subscribe
from models.client_model import ClientModel
from watchdog.observers import Observer
from logger import setup_logging
from models.client_model import ClientModel
from rmq import broadcast, forward, publish, subscribe
setup_logging()
THREADS = {}
RUNNING_CLIENTS = []
CONSUMER_TAGS = {}
EXCHANGES = {
"publish": "soar_publish",
"broadcast": "soar_broadcast",
......@@ -34,8 +33,9 @@ EXCHANGES = {
class ConfigHandler(FileSystemEventHandler):
def __init__(self):
def __init__(self, channel):
self.client_model = ClientModel()
self.channel = channel
super().__init__()
def on_modified(self, event):
......@@ -44,120 +44,148 @@ class ConfigHandler(FileSystemEventHandler):
logging.debug("Reloading client config...")
clients = self.client_model.get()
updated_client_ids = list(clients.keys())
update_clients(updated_client_ids)
self.update_clients(updated_client_ids)
def update_clients(updated_client_ids):
global RUNNING_CLIENTS
with concurrent.futures.ThreadPoolExecutor() as executor:
def update_clients(self, updated_client_ids):
global RUNNING_CLIENTS
logging.debug("Old: " + str(RUNNING_CLIENTS))
logging.debug("New: " + str(updated_client_ids))
for client_id in updated_client_ids:
if client_id not in RUNNING_CLIENTS:
run_client(client_id, executor)
run_client(client_id, self.channel)
logging.info(f"Started client: {client_id}")
for client_id in RUNNING_CLIENTS:
if client_id not in updated_client_ids:
stop_client(client_id)
self.stop_client(client_id)
logging.info(f"Shutdown client: {client_id}")
def watch_config(running_clients):
# Set global RUNNING_CLIENTS inside thread
global RUNNING_CLIENTS
RUNNING_CLIENTS = running_clients
logging.info("Starting config watcher...")
event_handler = ConfigHandler()
observer = Observer()
observer.schedule(event_handler, path="./data", recursive=False)
observer.start()
while True:
def stop_client(self, client_id):
global RUNNING_CLIENTS
global CONSUMER_TAGS
stopping = False
try:
pass
except KeyboardInterrupt as interrupt:
observer.stop()
raise interrupt
def stop_client(client_id):
global RUNNING_CLIENTS
stopping = False
try:
logging.info(f"Stopping client: {client_id}")
client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"]
for thread in client_threads:
thread_name = f"{client_id}-{thread}"
if thread_name in THREADS:
THREADS[thread_name].cancel()
if client_id in RUNNING_CLIENTS:
RUNNING_CLIENTS.remove(client_id)
stopping = True
except Exception as error:
logging.error(str(error))
return stopping
def run_client(client_id, executor):
logging.info(f"Stopping client: {client_id}")
client_tags = CONSUMER_TAGS[client_id]
self.channel.basic_cancel(client_tags[f"{client_id}-broadcast"])
self.channel.basic_cancel(client_tags[f"{client_id}-outbox"])
self.channel.queue_unbind(
queue=f"{client_id}-inbox",
exchange=client_tags[f"{client_id}-inbox-publish"],
)
self.channel.queue_unbind(
queue=f"{client_id}-inbox",
exchange=client_tags[f"{client_id}-inbox-broadcast"],
)
if client_id in RUNNING_CLIENTS:
RUNNING_CLIENTS.remove(client_id)
stopping = True
except Exception as error:
logging.error(str(error))
return stopping
class WatchConfigThread(threading.Thread):
def __init__(self, running_clients, channel):
threading.Thread.__init__(self)
self.daemon = True
self.running_clients = running_clients
self.channel = channel
self.start()
def run(self):
logging.info("Starting config watcher...")
event_handler = ConfigHandler(self.channel)
observer = Observer()
observer.schedule(event_handler, path="./data", recursive=False)
observer.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt as interrupt:
observer.stop()
raise interrupt
class SoarBusThread(threading.Thread):
def __init__(self, clients, channel):
threading.Thread.__init__(self)
self.daemon = True
self.clients = clients
self.channel = channel
self.start()
def run(self):
logging.info("Starting SOAR bus...")
publish(self.channel, "soar-publish", EXCHANGES.get("publish"))
for id in self.clients.keys():
run_client(id, self.channel)
def run_client(client_id, channel):
global RUNNING_CLIENTS
global CONSUMER_TAGS
client_model = ClientModel()
client = client_model.find(client_id)
running = False
try:
client_id = client["client_id"]
logging.info(f"Running client: {client_id}")
# forward
thread = executor.submit(forward, f"{client_id}-outbox", "soar-publish")
THREADS[f"{client_id}-outbox"] = thread
# broadcast
thread = executor.submit(
broadcast, f"{client_id}-broadcast", EXCHANGES.get("broadcast")
forward_consumer_tag = forward(channel, f"{client_id}-outbox", "soar-publish")
broadcast_consumer_tag = broadcast(
channel, f"{client_id}-broadcast", EXCHANGES.get("broadcast")
)
THREADS[f"{client_id}-broadcast"] = thread
subscribe(
channel,
f"{client_id}-inbox",
EXCHANGES.get("publish"),
client["subscription"],
)
subscribe(f"{client_id}-inbox", EXCHANGES.get("broadcast"))
subscribe(channel, f"{client_id}-inbox", EXCHANGES.get("broadcast"))
CONSUMER_TAGS[client_id] = {
f"{client_id}-broadcast": broadcast_consumer_tag,
f"{client_id}-outbox": forward_consumer_tag,
f"{client_id}-inbox-publish": EXCHANGES.get("publish"),
f"{client_id}-inbox-broadcast": EXCHANGES.get("broadcast"),
}
if client_id not in RUNNING_CLIENTS:
logging.debug(f"Appending client_id '{client_id}'")
RUNNING_CLIENTS.append(client_id)
running = True
except Exception as error:
logging.error(str(error))
return running
def main(clients, executor):
def on_channel_open(channel):
# Invoked when the channel is open
global RUNNING_CLIENTS
logging.info("Starting SOAR bus...")
# publish
thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
THREADS["soar-publish"] = thread
client_model = ClientModel()
clients = client_model.get()
client_count = len(clients.keys())
logging.debug(f"Running {client_count} clients")
for id in clients.keys():
run_client(id, executor)
SoarBusThread(clients, channel)
WatchConfigThread(RUNNING_CLIENTS, channel)
# Global vars are not shared across threads so you
# have to pass the global var into the thread
thread = executor.submit(watch_config, RUNNING_CLIENTS)
THREADS["config-watcher"] = thread
# Make sure the threads are actually running, error if not,
# this allows the SOAR Bus to actually wait for RMQ to start running
for thread_name, thread in THREADS.items():
thread.result()
try:
logging.debug(thread_name)
logging.debug(thread.result())
except Exception as e:
logging.error(e)
raise e
def on_connection_open(connection):
# Invoked when the connection is open
connection.channel(on_open_callback=on_channel_open)
def on_connection_close(connection, exception):
# Invoked when the connection is closed
connection.ioloop.stop()
if __name__ == "__main__":
......@@ -176,16 +204,16 @@ if __name__ == "__main__":
pingcounter += 1
s.close()
host = os.getenv("MQ_HOST", "localhost")
connection = pika.SelectConnection(
pika.ConnectionParameters(host),
on_open_callback=on_connection_open,
on_close_callback=on_connection_close,
)
try:
client_model = ClientModel()
clients = client_model.get()
client_count = len(clients.keys())
thread_count = (client_count * 2) + 2
logging.debug(f"Running {thread_count} workers for {client_count} clients")
with concurrent.futures.ThreadPoolExecutor(
max_workers=thread_count
) as executor:
main(clients, executor)
connection.ioloop.start()
except KeyboardInterrupt:
executor.shutdown(wait=False)
# Loop until fully closed
connection.close()
connection.ioloop.start()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment