Commit 48a3eeff authored by James Kirk's avatar James Kirk
Browse files

refactor: changed threading method to prevent while True loop blocking

refactor: unbind and cancel consumers manually rather than just killing threads, as we cant do that anymore
refactor: broadcast and forward now return consumer tags
Pipeline #134585 failed with stages
in 22 seconds
......@@ -130,8 +130,7 @@ def broadcast(channel, queue_name, exchange_name):
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
# channel.start_consuming()
return channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
......@@ -155,7 +154,7 @@ def forward(channel, from_queue, to_queue):
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
return channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
except pika.exceptions.AMQPChannelError as err:
logging.error("Caught a channel error: {}, stopping...".format(err))
......
......@@ -11,30 +11,30 @@
import logging
import os
import socket
import threading
import time
from watchdog.observers import Observer
import pika
from watchdog.events import FileSystemEventHandler
from rmq import broadcast, forward, publish, subscribe
from models.client_model import ClientModel
from watchdog.observers import Observer
from logger import setup_logging
import pika
from models.client_model import ClientModel
from rmq import broadcast, forward, publish, subscribe
setup_logging()
THREADS = {}
RUNNING_CLIENTS = []
CONSUMER_TAGS = {}
EXCHANGES = {
"publish": "soar_publish",
"broadcast": "soar_broadcast",
}
current_channel = None
class ConfigHandler(FileSystemEventHandler):
def __init__(self):
def __init__(self, channel):
self.client_model = ClientModel()
self.channel = channel
super().__init__()
def on_modified(self, event):
......@@ -43,132 +43,158 @@ class ConfigHandler(FileSystemEventHandler):
logging.debug("Reloading client config...")
clients = self.client_model.get()
updated_client_ids = list(clients.keys())
update_clients(updated_client_ids)
def update_clients(updated_client_ids):
global RUNNING_CLIENTS
global current_channel
logging.debug("Old: " + str(RUNNING_CLIENTS))
logging.debug("New: " + str(updated_client_ids))
for client_id in updated_client_ids:
if client_id not in RUNNING_CLIENTS:
run_client(client_id)
logging.info(f"Started client: {client_id}")
for client_id in RUNNING_CLIENTS:
if client_id not in updated_client_ids:
stop_client(client_id)
logging.info(f"Shutdown client: {client_id}")
def watch_config(running_clients):
# Set global RUNNING_CLIENTS inside thread
global RUNNING_CLIENTS
RUNNING_CLIENTS = running_clients
logging.info("Starting config watcher...")
event_handler = ConfigHandler()
observer = Observer()
observer.schedule(event_handler, path="./data", recursive=False)
observer.start()
while True:
self.update_clients(updated_client_ids)
def update_clients(self, updated_client_ids):
global RUNNING_CLIENTS
logging.debug("Old: " + str(RUNNING_CLIENTS))
logging.debug("New: " + str(updated_client_ids))
for client_id in updated_client_ids:
if client_id not in RUNNING_CLIENTS:
run_client(client_id, self.channel)
logging.info(f"Started client: {client_id}")
for client_id in RUNNING_CLIENTS:
if client_id not in updated_client_ids:
self.stop_client(client_id)
logging.info(f"Shutdown client: {client_id}")
def stop_client(self, client_id):
global RUNNING_CLIENTS
global CONSUMER_TAGS
stopping = False
try:
time.sleep(1)
except KeyboardInterrupt as interrupt:
observer.stop()
raise interrupt
logging.info(f"Stopping client: {client_id}")
client_tags = CONSUMER_TAGS[client_id]
self.channel.basic_cancel(client_tags[f"{client_id}-broadcast"])
self.channel.basic_cancel(client_tags[f"{client_id}-outbox"])
self.channel.queue_unbind(queue=f"{client_id}-inbox", exchange=client_tags[f"{client_id}-inbox-publish"])
self.channel.queue_unbind(queue=f"{client_id}-inbox", exchange=client_tags[f"{client_id}-inbox-broadcast"])
if client_id in RUNNING_CLIENTS:
RUNNING_CLIENTS.remove(client_id)
stopping = True
except Exception as error:
logging.error(str(error))
return stopping
class WatchConfigThread(threading.Thread):
def __init__(self, running_clients, channel):
threading.Thread.__init__(self)
self.daemon = True
self.running_clients = running_clients
self.channel = channel
self.start()
def run(self):
logging.info("Starting config watcher...")
event_handler = ConfigHandler(self.channel)
observer = Observer()
observer.schedule(event_handler, path="./data", recursive=False)
observer.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt as interrupt:
observer.stop()
raise interrupt
class SoarBusThread(threading.Thread):
def __init__(self, clients, channel):
threading.Thread.__init__(self)
self.daemon = True
self.clients = clients
self.channel = channel
self.start()
def run(self):
logging.info("Starting SOAR bus...")
publish(
self.channel,
"soar-publish",
EXCHANGES.get("publish")
)
def stop_client(client_id):
global RUNNING_CLIENTS
stopping = False
try:
logging.info(f"Stopping client: {client_id}")
client_threads = ["outbox", "broadcast", "inbox-published", "inbox-broadcast"]
for thread in client_threads:
thread_name = f"{client_id}-{thread}"
if thread_name in THREADS:
THREADS[thread_name].cancel()
if client_id in RUNNING_CLIENTS:
RUNNING_CLIENTS.remove(client_id)
stopping = True
except Exception as error:
logging.error(str(error))
return stopping
for id in self.clients.keys():
run_client(id, self.channel)
def run_client(client_id):
def run_client(client_id, channel):
global RUNNING_CLIENTS
global current_channel
global CONSUMER_TAGS
client_model = ClientModel()
client = client_model.find(client_id)
try:
client_id = client["client_id"]
logging.info(f"Running client: {client_id}")
forward(
current_channel,
forward_consumer_tag = forward(
channel,
f"{client_id}-outbox",
"soar-publish"
)
broadcast(
current_channel,
broadcast_consumer_tag = broadcast(
channel,
f"{client_id}-broadcast",
EXCHANGES.get("broadcast")
)
subscribe(
current_channel,
channel,
f"{client_id}-inbox",
EXCHANGES.get("publish"),
client["subscription"],
)
subscribe(
current_channel,
channel,
f"{client_id}-inbox",
EXCHANGES.get("broadcast")
)
CONSUMER_TAGS[client_id] = {
f"{client_id}-broadcast": broadcast_consumer_tag,
f"{client_id}-outbox": forward_consumer_tag,
f"{client_id}-inbox-publish": EXCHANGES.get("publish"),
f"{client_id}-inbox-broadcast": EXCHANGES.get("broadcast"),
}
if client_id not in RUNNING_CLIENTS:
logging.debug(f"Appending client_id '{client_id}'")
RUNNING_CLIENTS.append(client_id)
except Exception as error:
logging.error(str(error))
def run_bus(clients, channel):
global RUNNING_CLIENTS
global current_channel
current_channel = channel
logging.info("Starting SOAR bus...")
publish(
current_channel,
"soar-publish",
EXCHANGES.get("publish")
)
for id in clients.keys():
run_client(id)
# Global vars are not shared across threads so you
# have to pass the global var into the thread
# watch_config(RUNNING_CLIENTS)
def on_channel_open(channel):
# Invoked when the channel is open
global RUNNING_CLIENTS
client_model = ClientModel()
clients = client_model.get()
client_count = len(clients.keys())
logging.debug(f"Running {client_count} clients")
run_bus(clients, channel)
SoarBusThread(clients, channel)
WatchConfigThread(RUNNING_CLIENTS, channel)
def on_connection_open(connection):
# Invoked when the connection is open
connection.channel(on_open_callback=on_channel_open)
def on_connection_close(connection, exception):
# Invoked when the connection is closed
connection.ioloop.stop()
if __name__ == "__main__":
pingcounter = 0
retries = 10
......@@ -188,9 +214,11 @@ if __name__ == "__main__":
host = os.getenv(
"MQ_HOST", "localhost"
)
connection = pika.SelectConnection(pika.ConnectionParameters(host), on_open_callback=on_connection_open)
connection = pika.SelectConnection(pika.ConnectionParameters(host), on_open_callback=on_connection_open, on_close_callback=on_connection_close)
try:
connection.ioloop.start()
except KeyboardInterrupt:
connection.close()
\ No newline at end of file
# Loop until fully closed
connection.close()
connection.ioloop.start()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment