#!/usr/bin/env python # Queues # (per client) # [client_id]-inbox - receive subscriptions # [client_id]-outbox - send messages to the bus # [client-id]-broadcast - send message to all subscribers? - eg notify of downtime # soar-publisher - fan-in from client-outboxes # soar-dlq - undeliverables # soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions import concurrent.futures from endpoints.clients import ClientsFile from rmq import publish, subscribe, broadcast, forward THREADS = [] EXCHANGES = { "publish": "soar_publish", "broadcast": "soar_broadcast", } def main(): clients_file = ClientsFile() clients = clients_file.get() with concurrent.futures.ProcessPoolExecutor() as executor: # publish thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish")) THREADS.append(thread) for (id, client) in clients.items(): # forward thread = executor.submit(forward, f"{id}-outbox", "soar-publish") THREADS.append(thread) # broadcast thread = executor.submit( broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast") ) THREADS.append(thread) # subscribe thread = executor.submit( subscribe, f"{id}-inbox", EXCHANGES.get("publish"), client["subscription"] # topic ) thread = executor.submit( subscribe, f"{id}-inbox", EXCHANGES.get("broadcast") ) THREADS.append(thread) # push # TODO - add optional webhook target to client and post to webhook target # if present if __name__ == "__main__": main()