Unverified Commit 101a3c20 authored by Dan Jones's avatar Dan Jones
Browse files

feat: detect new or removed clients

Start and stop threads when clients are created or removed
Watchdog watches for changes to the ./data/clients.json
When a change is detected it compares the list of running
clients to the content of clients.json and starts or
stops client processing threads as required
2 merge requests!23Resolve "Release v0.1.0",!13Resolve "Detect new clients and add to thread pool"
Pipeline #107763 failed with stages
in 24 seconds
...@@ -26,5 +26,6 @@ python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0, ...@@ -26,5 +26,6 @@ python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0,
pytz==2022.6 pytz==2022.6
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
vine==5.0.0 ; python_version >= '3.6' vine==5.0.0 ; python_version >= '3.6'
watchdog==2.2.1
werkzeug==2.2.2 ; python_version >= '3.7' werkzeug==2.2.2 ; python_version >= '3.7'
zipp==3.10.0 ; python_version >= '3.7' zipp==3.10.0 ; python_version >= '3.7'
...@@ -8,63 +8,190 @@ ...@@ -8,63 +8,190 @@
# soar-publisher - fan-in from client-outboxes # soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables # soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions # soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions
import json
import logging
import os
import socket
import sys
import time
import concurrent.futures import concurrent.futures
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from rmq import broadcast, forward, publish, subscribe from rmq import broadcast, forward, publish, subscribe
from models.client_model import ClientModel from models.client_model import ClientModel
THREADS = []
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("pika").setLevel(logging.ERROR)
logging.getLogger("watchdog").setLevel(logging.ERROR)
THREADS = {}
RUNNING_CLIENTS = []
EXCHANGES = { EXCHANGES = {
"publish": "soar_publish", "publish": "soar_publish",
"broadcast": "soar_broadcast", "broadcast": "soar_broadcast",
} }
def main(): class ConfigHandler(FileSystemEventHandler):
print("Starting SOAR bus...") def __init__(self):
clients_file = ClientModel() self.client_model = ClientModel()
clients = clients_file.get() super().__init__()
with concurrent.futures.ProcessPoolExecutor() as executor: def on_modified(self, event):
# publish file_name = event.src_path.strip()
thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish")) if file_name.endswith("clients.json"):
THREADS.append(thread) logging.debug("Reloading client config...")
clients = self.client_model.get()
for id, client in clients.items(): updated_client_ids = list(clients.keys())
# forward update_clients(updated_client_ids)
thread = executor.submit(forward, f"{id}-outbox", "soar-publish")
THREADS.append(thread)
# broadcast def update_clients(updated_client_ids):
thread = executor.submit( global RUNNING_CLIENTS
broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast") with concurrent.futures.ProcessPoolExecutor() as executor:
) logging.info("Old: " + str(RUNNING_CLIENTS))
THREADS.append(thread) logging.info("New: " + str(updated_client_ids))
# subscribe
thread = executor.submit( for client_id in updated_client_ids:
subscribe, if client_id not in RUNNING_CLIENTS:
f"{id}-inbox", run_client(client_id, executor)
EXCHANGES.get("publish"), logging.info(f"Started client: {client_id}")
client["subscription"], # topic
) for client_id in RUNNING_CLIENTS:
THREADS.append(thread) if client_id not in updated_client_ids:
thread = executor.submit( stop_client(client_id)
subscribe, f"{id}-inbox", EXCHANGES.get("broadcast") logging.info(f"Shutdown client: {client_id}")
)
THREADS.append(thread)
# push def watch_config(running_clients):
# TODO - add optional webhook target to client and post to webhook target # Set global RUNNING_CLIENTS inside thread
# if present global RUNNING_CLIENTS
RUNNING_CLIENTS = running_clients
# Make sure the threads are actually running, error if not, logging.info("Starting config watcher...")
# this allows the SOAR Bus to actually wait for RMQ to start running event_handler = ConfigHandler()
for thread in THREADS: observer = Observer()
thread.result() observer.schedule(event_handler, path="./data", recursive=False)
try: observer.start()
print(thread.result())
except Exception as e: while True:
print(e) try:
pass
except KeyboardInterrupt as interrupt:
observer.stop()
raise interrupt
def stop_client(client_id):
global RUNNING_CLIENTS
stopping = False
try:
logging.info(f"Stopping client: {client_id}")
client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"]
for thread in client_threads:
thread_name = f"{client_id}-{thread}"
if thread_name in THREADS:
THREADS[thread_name].cancel()
if client_id in RUNNING_CLIENTS:
RUNNING_CLIENTS.remove(client_id)
stopping = True
except Exception as error:
logging.error(str(error))
return stopping
def run_client(client_id, executor):
global RUNNING_CLIENTS
client_model = ClientModel()
client = client_model.find(client_id)
running = False
try:
client_id = client["client_id"]
logging.info(f"Running client: {client_id}")
# forward
thread = executor.submit(forward, f"{client_id}-outbox", "soar-publish")
THREADS[f"{client_id}-outbox"] = thread
# broadcast
thread = executor.submit(
broadcast, f"{client_id}-broadcast", EXCHANGES.get("broadcast")
)
THREADS[f"{client_id}-broadcast"] = thread
# subscribe
thread = executor.submit(
subscribe,
f"{client_id}-inbox",
EXCHANGES.get("publish"),
client["subscription"], # topic
)
THREADS[f"{client_id}-inbox-published"] = thread
thread = executor.submit(
subscribe, f"{client_id}-inbox", EXCHANGES.get("broadcast")
)
THREADS[f"{client_id}-inbox-broadcast"] = thread
if client_id not in RUNNING_CLIENTS:
RUNNING_CLIENTS.append(client_id)
running = True
except Exception as error:
logging.error(str(error))
return running
def main(executor):
global RUNNING_CLIENTS
logging.info("Starting SOAR bus...")
client_model = ClientModel()
clients = client_model.get()
# publish
thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
THREADS["soar-publish"] = thread
for id, client in clients.items():
run_client(id, executor)
# Global vars are not shared across threads so you
# have to pass the global var into the thread
thread = executor.submit(watch_config, RUNNING_CLIENTS)
THREADS["config-watcher"] = thread
# Make sure the threads are actually running, error if not,
# this allows the SOAR Bus to actually wait for RMQ to start running
for thread_name, thread in THREADS.items():
thread.result()
try:
logging.debug(thread_name)
logging.debug(thread.result())
except Exception as e:
logging.error(e)
raise e
if __name__ == "__main__": if __name__ == "__main__":
main()
pingcounter = 0
retries = 10
interval = 5
isreachable = False
while isreachable is False and pingcounter < retries:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect(('rmq', 5672))
isreachable = True
except socket.error as e:
time.sleep(interval)
pingcounter += 1
s.close()
logging.info(f"RabbitMQ unavailable: retrying in {interval}s")
try:
with concurrent.futures.ProcessPoolExecutor() as executor:
main(executor)
except KeyboardInterrupt:
executor.shutdown(wait=False)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment