soar_bus.py 1.79 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
#!/usr/bin/env python

# Queues
# (per client)
# [client_id]-inbox - receive subscriptions
# [client_id]-outbox - send messages to the bus
# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime
# soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions

import concurrent.futures
from endpoints.clients import ClientsFile
14
from rmq import publish, subscribe, broadcast, forward
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

THREADS = []
EXCHANGES = {
    "publish": "soar_publish",
    "broadcast": "soar_broadcast",
}


def main():
    clients_file = ClientsFile()
    clients = clients_file.get()

    with concurrent.futures.ProcessPoolExecutor() as executor:
        # publish
        thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
        THREADS.append(thread)

        for (id, client) in clients.items():
            # forward
            thread = executor.submit(forward, f"{id}-outbox", "soar-publish")
            THREADS.append(thread)
            # broadcast
            thread = executor.submit(
                broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast")
            )
            THREADS.append(thread)
            # subscribe
            thread = executor.submit(
43
                subscribe, 
44 45
                f"{id}-inbox",
                EXCHANGES.get("publish"),
46 47 48 49 50 51
                client["subscription"] # topic
            )
            thread = executor.submit(
                subscribe, 
                f"{id}-inbox",
                EXCHANGES.get("broadcast")
52 53 54 55 56 57 58 59 60
            )
            THREADS.append(thread)
            # push
            # TODO - add optional webhook target to client and post to webhook target
            # if present


if __name__ == "__main__":
    main()