soar_bus.py 5.86 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
#!/usr/bin/env python

# Queues
# (per client)
# [client_id]-inbox - receive subscriptions
# [client_id]-outbox - send messages to the bus
# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime
# soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions
11 12 13 14 15 16
import json
import logging
import os
import socket
import sys
import time
17 18

import concurrent.futures
19 20
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
James Kirk's avatar
James Kirk committed
21 22

from rmq import broadcast, forward, publish, subscribe
23
from models.client_model import ClientModel
24

25 26 27 28 29 30 31

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("pika").setLevel(logging.ERROR)
logging.getLogger("watchdog").setLevel(logging.ERROR)

THREADS = {}
RUNNING_CLIENTS = []
32 33 34 35 36 37
EXCHANGES = {
    "publish": "soar_publish",
    "broadcast": "soar_broadcast",
}


38 39 40 41 42 43 44 45 46 47 48 49 50 51
class ConfigHandler(FileSystemEventHandler):
    def __init__(self):
        self.client_model = ClientModel()
        super().__init__()

    def on_modified(self, event):
        file_name = event.src_path.strip()
        if file_name.endswith("clients.json"):
            logging.debug("Reloading client config...")
            clients = self.client_model.get()
            updated_client_ids = list(clients.keys())
            update_clients(updated_client_ids)


Dan Jones's avatar
Dan Jones committed
52
def update_clients(updated_client_ids):
53
    global RUNNING_CLIENTS
Dan Jones's avatar
Dan Jones committed
54
    with concurrent.futures.ProcessPoolExecutor() as executor:
55 56
        logging.info("Old: " + str(RUNNING_CLIENTS))
        logging.info("New: " + str(updated_client_ids))
Dan Jones's avatar
Dan Jones committed
57

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
        for client_id in updated_client_ids:
            if client_id not in RUNNING_CLIENTS:
                run_client(client_id, executor)
                logging.info(f"Started client: {client_id}")

        for client_id in RUNNING_CLIENTS:
            if client_id not in updated_client_ids:
                stop_client(client_id)
                logging.info(f"Shutdown client: {client_id}")


def watch_config(running_clients):
    # Set global RUNNING_CLIENTS inside thread
    global RUNNING_CLIENTS
    RUNNING_CLIENTS = running_clients
    logging.info("Starting config watcher...")
    event_handler = ConfigHandler()
    observer = Observer()
    observer.schedule(event_handler, path="./data", recursive=False)
    observer.start()

    while True:
        try:
            pass
        except KeyboardInterrupt as interrupt:
            observer.stop()
            raise interrupt


def stop_client(client_id):
    global RUNNING_CLIENTS
    stopping = False
Dan Jones's avatar
Dan Jones committed
90
    try:
91 92 93 94 95 96
        logging.info(f"Stopping client: {client_id}")
        client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"]
        for thread in client_threads:
            thread_name = f"{client_id}-{thread}"
            if thread_name in THREADS:
                THREADS[thread_name].cancel()
Dan Jones's avatar
Dan Jones committed
97
        if client_id in RUNNING_CLIENTS:
98 99
            RUNNING_CLIENTS.remove(client_id)
        stopping = True
Dan Jones's avatar
Dan Jones committed
100
    except Exception as error:
101 102 103 104 105 106 107
        logging.error(str(error))
    return stopping


def run_client(client_id, executor):
    global RUNNING_CLIENTS
    client_model = ClientModel()
Dan Jones's avatar
Dan Jones committed
108
    client = client_model.find(client_id)
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    running = False
    try:
        client_id = client["client_id"]
        logging.info(f"Running client: {client_id}")
        # forward
        thread = executor.submit(forward, f"{client_id}-outbox", "soar-publish")
        THREADS[f"{client_id}-outbox"] = thread

        # broadcast
        thread = executor.submit(
            broadcast, f"{client_id}-broadcast", EXCHANGES.get("broadcast")
        )
        THREADS[f"{client_id}-broadcast"] = thread

        # subscribe
        thread = executor.submit(
            subscribe,
            f"{client_id}-inbox",
            EXCHANGES.get("publish"),
            client["subscription"],  # topic
        )
        THREADS[f"{client_id}-inbox-published"] = thread

        thread = executor.submit(
            subscribe, f"{client_id}-inbox", EXCHANGES.get("broadcast")
        )
        THREADS[f"{client_id}-inbox-broadcast"] = thread
Dan Jones's avatar
Dan Jones committed
136
        if client_id not in RUNNING_CLIENTS:
137 138
            RUNNING_CLIENTS.append(client_id)
        running = True
Dan Jones's avatar
Dan Jones committed
139 140
    except Exception as error:
        logging.error(str(error))
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
    return running


def main(executor):
    global RUNNING_CLIENTS
    logging.info("Starting SOAR bus...")

    client_model = ClientModel()
    clients = client_model.get()

    # publish
    thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
    THREADS["soar-publish"] = thread

    for id, client in clients.items():
        run_client(id, executor)

Dan Jones's avatar
Dan Jones committed
158
    # Global vars are not shared across threads so you
159 160
    # have to pass the global var into the thread
    thread = executor.submit(watch_config, RUNNING_CLIENTS)
Dan Jones's avatar
Dan Jones committed
161
    THREADS["config-watcher"] = thread
162 163 164 165 166 167 168 169 170 171 172

    # Make sure the threads are actually running, error if not,
    # this allows the SOAR Bus to actually wait for RMQ to start running
    for thread_name, thread in THREADS.items():
        thread.result()
        try:
            logging.debug(thread_name)
            logging.debug(thread.result())
        except Exception as e:
            logging.error(e)
            raise e
173

174

175
if __name__ == "__main__":
176 177 178 179 180 181 182
    pingcounter = 0
    retries = 10
    interval = 5
    isreachable = False
    while isreachable is False and pingcounter < retries:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
Dan Jones's avatar
Dan Jones committed
183
            s.connect(("rmq", 5672))
184 185 186 187 188 189
            isreachable = True
        except socket.error as e:
            time.sleep(interval)
            pingcounter += 1
        s.close()
        logging.info(f"RabbitMQ unavailable: retrying in {interval}s")
Dan Jones's avatar
Dan Jones committed
190 191

    try:
192 193 194 195
        with concurrent.futures.ProcessPoolExecutor() as executor:
            main(executor)
    except KeyboardInterrupt:
        executor.shutdown(wait=False)