soar_bus.py 5.86 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
#!/usr/bin/env python

# Queues
# (per client)
# [client_id]-inbox - receive subscriptions
# [client_id]-outbox - send messages to the bus
# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime
# soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions
11 12 13 14
import logging
import os
import socket
import time
15 16

import concurrent.futures
17 18
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
James Kirk's avatar
James Kirk committed
19 20

from rmq import broadcast, forward, publish, subscribe
21
from models.client_model import ClientModel
22

23 24 25 26 27 28 29

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("pika").setLevel(logging.ERROR)
logging.getLogger("watchdog").setLevel(logging.ERROR)

THREADS = {}
RUNNING_CLIENTS = []
30 31 32 33 34 35
EXCHANGES = {
    "publish": "soar_publish",
    "broadcast": "soar_broadcast",
}


36 37 38 39 40 41 42 43 44 45 46 47 48 49
class ConfigHandler(FileSystemEventHandler):
    def __init__(self):
        self.client_model = ClientModel()
        super().__init__()

    def on_modified(self, event):
        file_name = event.src_path.strip()
        if file_name.endswith("clients.json"):
            logging.debug("Reloading client config...")
            clients = self.client_model.get()
            updated_client_ids = list(clients.keys())
            update_clients(updated_client_ids)


Dan Jones's avatar
Dan Jones committed
50
def update_clients(updated_client_ids):
51
    global RUNNING_CLIENTS
Dan Jones's avatar
Dan Jones committed
52
    with concurrent.futures.ProcessPoolExecutor() as executor:
53 54
        logging.info("Old: " + str(RUNNING_CLIENTS))
        logging.info("New: " + str(updated_client_ids))
Dan Jones's avatar
Dan Jones committed
55

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        for client_id in updated_client_ids:
            if client_id not in RUNNING_CLIENTS:
                run_client(client_id, executor)
                logging.info(f"Started client: {client_id}")

        for client_id in RUNNING_CLIENTS:
            if client_id not in updated_client_ids:
                stop_client(client_id)
                logging.info(f"Shutdown client: {client_id}")


def watch_config(running_clients):
    # Set global RUNNING_CLIENTS inside thread
    global RUNNING_CLIENTS
    RUNNING_CLIENTS = running_clients
    logging.info("Starting config watcher...")
    event_handler = ConfigHandler()
    observer = Observer()
    observer.schedule(event_handler, path="./data", recursive=False)
    observer.start()

    while True:
        try:
            pass
        except KeyboardInterrupt as interrupt:
            observer.stop()
            raise interrupt


def stop_client(client_id):
    global RUNNING_CLIENTS
    stopping = False
Dan Jones's avatar
Dan Jones committed
88
    try:
89 90 91 92 93 94
        logging.info(f"Stopping client: {client_id}")
        client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"]
        for thread in client_threads:
            thread_name = f"{client_id}-{thread}"
            if thread_name in THREADS:
                THREADS[thread_name].cancel()
Dan Jones's avatar
Dan Jones committed
95
        if client_id in RUNNING_CLIENTS:
96 97
            RUNNING_CLIENTS.remove(client_id)
        stopping = True
Dan Jones's avatar
Dan Jones committed
98
    except Exception as error:
99 100 101 102 103 104 105
        logging.error(str(error))
    return stopping


def run_client(client_id, executor):
    global RUNNING_CLIENTS
    client_model = ClientModel()
Dan Jones's avatar
Dan Jones committed
106
    client = client_model.find(client_id)
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    running = False
    try:
        client_id = client["client_id"]
        logging.info(f"Running client: {client_id}")
        # forward
        thread = executor.submit(forward, f"{client_id}-outbox", "soar-publish")
        THREADS[f"{client_id}-outbox"] = thread

        # broadcast
        thread = executor.submit(
            broadcast, f"{client_id}-broadcast", EXCHANGES.get("broadcast")
        )
        THREADS[f"{client_id}-broadcast"] = thread

        # subscribe
        thread = executor.submit(
            subscribe,
            f"{client_id}-inbox",
            EXCHANGES.get("publish"),
            client["subscription"],  # topic
        )
        THREADS[f"{client_id}-inbox-published"] = thread

        thread = executor.submit(
            subscribe, f"{client_id}-inbox", EXCHANGES.get("broadcast")
        )
        THREADS[f"{client_id}-inbox-broadcast"] = thread
Dan Jones's avatar
Dan Jones committed
134
        if client_id not in RUNNING_CLIENTS:
135 136
            RUNNING_CLIENTS.append(client_id)
        running = True
Dan Jones's avatar
Dan Jones committed
137 138
    except Exception as error:
        logging.error(str(error))
139 140 141 142 143 144 145 146 147 148 149 150 151 152
    return running


def main(executor):
    global RUNNING_CLIENTS
    logging.info("Starting SOAR bus...")

    client_model = ClientModel()
    clients = client_model.get()

    # publish
    thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
    THREADS["soar-publish"] = thread

153
    for id in clients.keys():
154 155
        run_client(id, executor)

Dan Jones's avatar
Dan Jones committed
156
    # Global vars are not shared across threads so you
157 158
    # have to pass the global var into the thread
    thread = executor.submit(watch_config, RUNNING_CLIENTS)
Dan Jones's avatar
Dan Jones committed
159
    THREADS["config-watcher"] = thread
160 161 162 163 164 165 166 167 168 169 170

    # Make sure the threads are actually running, error if not,
    # this allows the SOAR Bus to actually wait for RMQ to start running
    for thread_name, thread in THREADS.items():
        thread.result()
        try:
            logging.debug(thread_name)
            logging.debug(thread.result())
        except Exception as e:
            logging.error(e)
            raise e
171

172

173
if __name__ == "__main__":
174 175 176 177 178 179 180
    pingcounter = 0
    retries = 10
    interval = 5
    isreachable = False
    while isreachable is False and pingcounter < retries:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
181
            s.connect((os.getenv("MQ_HOST", "localhost"), 5672))
182 183
            isreachable = True
        except socket.error as e:
184
            logging.info(f"RabbitMQ unavailable: retrying in {interval}s")
185 186 187
            time.sleep(interval)
            pingcounter += 1
        s.close()
Dan Jones's avatar
Dan Jones committed
188

Dan Jones's avatar
Dan Jones committed
189
    try:
190 191 192 193
        with concurrent.futures.ProcessPoolExecutor() as executor:
            main(executor)
    except KeyboardInterrupt:
        executor.shutdown(wait=False)