#!/usr/bin/env python # Queues # (per client) # [client_id]-inbox - receive subscriptions # [client_id]-outbox - send messages to the bus # [client-id]-broadcast - send message to all subscribers? - eg notify of downtime # soar-publisher - fan-in from client-outboxes # soar-dlq - undeliverables # soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions import concurrent.futures from rmq import broadcast, forward, publish, subscribe from models.client_model import ClientModel THREADS = [] EXCHANGES = { "publish": "soar_publish", "broadcast": "soar_broadcast", } def main(): print("Starting SOAR bus...") clients_file = ClientModel() clients = clients_file.get() with concurrent.futures.ProcessPoolExecutor() as executor: # publish thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish")) THREADS.append(thread) for id, client in clients.items(): # forward thread = executor.submit(forward, f"{id}-outbox", "soar-publish") THREADS.append(thread) # broadcast thread = executor.submit( broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast") ) THREADS.append(thread) # subscribe thread = executor.submit( subscribe, f"{id}-inbox", EXCHANGES.get("publish"), client["subscription"], # topic ) THREADS.append(thread) thread = executor.submit( subscribe, f"{id}-inbox", EXCHANGES.get("broadcast") ) THREADS.append(thread) # push # TODO - add optional webhook target to client and post to webhook target # if present # Make sure the threads are actually running, error if not, # this allows the SOAR Bus to actually wait for RMQ to start running for thread in THREADS: thread.result() try: print(thread.result()) except Exception as e: print(e) if __name__ == "__main__": main()