soar_bus.py 2.14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python

# Queues
# (per client)
# [client_id]-inbox - receive subscriptions
# [client_id]-outbox - send messages to the bus
# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime
# soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions

import concurrent.futures
James Kirk's avatar
James Kirk committed
13 14

from rmq import broadcast, forward, publish, subscribe
15
from models.client_model import ClientModel
16 17 18 19 20 21 22 23 24

THREADS = []
EXCHANGES = {
    "publish": "soar_publish",
    "broadcast": "soar_broadcast",
}


def main():
James Kirk's avatar
James Kirk committed
25
    print("Starting SOAR bus...")
26
    clients_file = ClientModel()
27 28 29 30 31 32 33
    clients = clients_file.get()

    with concurrent.futures.ProcessPoolExecutor() as executor:
        # publish
        thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
        THREADS.append(thread)

34
        for id, client in clients.items():
35 36 37 38 39 40 41 42 43 44
            # forward
            thread = executor.submit(forward, f"{id}-outbox", "soar-publish")
            THREADS.append(thread)
            # broadcast
            thread = executor.submit(
                broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast")
            )
            THREADS.append(thread)
            # subscribe
            thread = executor.submit(
45
                subscribe,
46 47
                f"{id}-inbox",
                EXCHANGES.get("publish"),
48
                client["subscription"],  # topic
49
            )
James Kirk's avatar
James Kirk committed
50
            THREADS.append(thread)
51
            thread = executor.submit(
52
                subscribe, f"{id}-inbox", EXCHANGES.get("broadcast")
53 54 55 56 57 58
            )
            THREADS.append(thread)
            # push
            # TODO - add optional webhook target to client and post to webhook target
            # if present

James Kirk's avatar
James Kirk committed
59 60 61 62 63 64 65 66
        # Make sure the threads are actually running, error if not,
        # this allows the SOAR Bus to actually wait for RMQ to start running
        for thread in THREADS:
            thread.result()
            try:
                print(thread.result())
            except Exception as e:
                print(e)
67 68 69

if __name__ == "__main__":
    main()