#!/usr/bin/env python # Queues # (per client) # [client_id]-inbox - receive subscriptions # [client_id]-outbox - send messages to the bus # [client-id]-broadcast - send message to all subscribers? - eg notify of downtime # soar-publisher - fan-in from client-outboxes # soar-dlq - undeliverables # soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions import json import logging import os import socket import sys import time import concurrent.futures from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from rmq import broadcast, forward, publish, subscribe from models.client_model import ClientModel logging.basicConfig(level=logging.DEBUG) logging.getLogger("pika").setLevel(logging.ERROR) logging.getLogger("watchdog").setLevel(logging.ERROR) THREADS = {} RUNNING_CLIENTS = [] EXCHANGES = { "publish": "soar_publish", "broadcast": "soar_broadcast", } class ConfigHandler(FileSystemEventHandler): def __init__(self): self.client_model = ClientModel() super().__init__() def on_modified(self, event): file_name = event.src_path.strip() if file_name.endswith("clients.json"): logging.debug("Reloading client config...") clients = self.client_model.get() updated_client_ids = list(clients.keys()) update_clients(updated_client_ids) def update_clients(updated_client_ids): global RUNNING_CLIENTS with concurrent.futures.ProcessPoolExecutor() as executor: logging.info("Old: " + str(RUNNING_CLIENTS)) logging.info("New: " + str(updated_client_ids)) for client_id in updated_client_ids: if client_id not in RUNNING_CLIENTS: run_client(client_id, executor) logging.info(f"Started client: {client_id}") for client_id in RUNNING_CLIENTS: if client_id not in updated_client_ids: stop_client(client_id) logging.info(f"Shutdown client: {client_id}") def watch_config(running_clients): # Set global RUNNING_CLIENTS inside thread global RUNNING_CLIENTS RUNNING_CLIENTS = running_clients logging.info("Starting config watcher...") event_handler = ConfigHandler() observer = Observer() observer.schedule(event_handler, path="./data", recursive=False) observer.start() while True: try: pass except KeyboardInterrupt as interrupt: observer.stop() raise interrupt def stop_client(client_id): global RUNNING_CLIENTS stopping = False try: logging.info(f"Stopping client: {client_id}") client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"] for thread in client_threads: thread_name = f"{client_id}-{thread}" if thread_name in THREADS: THREADS[thread_name].cancel() if client_id in RUNNING_CLIENTS: RUNNING_CLIENTS.remove(client_id) stopping = True except Exception as error: logging.error(str(error)) return stopping def run_client(client_id, executor): global RUNNING_CLIENTS client_model = ClientModel() client = client_model.find(client_id) running = False try: client_id = client["client_id"] logging.info(f"Running client: {client_id}") # forward thread = executor.submit(forward, f"{client_id}-outbox", "soar-publish") THREADS[f"{client_id}-outbox"] = thread # broadcast thread = executor.submit( broadcast, f"{client_id}-broadcast", EXCHANGES.get("broadcast") ) THREADS[f"{client_id}-broadcast"] = thread # subscribe thread = executor.submit( subscribe, f"{client_id}-inbox", EXCHANGES.get("publish"), client["subscription"], # topic ) THREADS[f"{client_id}-inbox-published"] = thread thread = executor.submit( subscribe, f"{client_id}-inbox", EXCHANGES.get("broadcast") ) THREADS[f"{client_id}-inbox-broadcast"] = thread if client_id not in RUNNING_CLIENTS: RUNNING_CLIENTS.append(client_id) running = True except Exception as error: logging.error(str(error)) return running def main(executor): global RUNNING_CLIENTS logging.info("Starting SOAR bus...") client_model = ClientModel() clients = client_model.get() # publish thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish")) THREADS["soar-publish"] = thread for id, client in clients.items(): run_client(id, executor) # Global vars are not shared across threads so you # have to pass the global var into the thread thread = executor.submit(watch_config, RUNNING_CLIENTS) THREADS["config-watcher"] = thread # Make sure the threads are actually running, error if not, # this allows the SOAR Bus to actually wait for RMQ to start running for thread_name, thread in THREADS.items(): thread.result() try: logging.debug(thread_name) logging.debug(thread.result()) except Exception as e: logging.error(e) raise e if __name__ == "__main__": pingcounter = 0 retries = 10 interval = 5 isreachable = False while isreachable is False and pingcounter < retries: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect(("rmq", 5672)) isreachable = True except socket.error as e: time.sleep(interval) pingcounter += 1 s.close() logging.info(f"RabbitMQ unavailable: retrying in {interval}s") try: with concurrent.futures.ProcessPoolExecutor() as executor: main(executor) except KeyboardInterrupt: executor.shutdown(wait=False)