Commit a1e7c64e authored by James Kirk's avatar James Kirk
Browse files

feat: reenabled watch_config in a single thread

fix: funcs used by the api have been set to a blockingconnection
parent 94956fd7
Pipeline #133920 failed with stages
in 23 seconds
......@@ -156,7 +156,6 @@ def forward(channel, from_queue, to_queue):
try:
channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
# channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
logging.error("Caught a channel error: {}, stopping...".format(err))
......@@ -179,7 +178,6 @@ def publish(channel, queue_name, exchange_name):
try:
channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
# channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
......@@ -206,23 +204,27 @@ def listen(channel, queue_name, callback):
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
# channel.start_consuming()
def get_queue_status(channel, queue_name):
def get_queue_status(queue_name):
connection, channel = pika_connect(host=host)
response = channel.queue_declare(queue=queue_name, passive=True)
queue_status = {
"size": response.method.message_count,
"has_consumer": response.method.consumer_count > 0,
}
logging.debug(f"Queue: {queue_name} contains {queue_status['size']} messages")
connection.close()
return queue_status
def empty_queue(channel, queue_name):
def empty_queue(queue_name):
connection, channel = pika_connect(host=host)
emptied = True
try:
channel.queue_purge(queue_name)
except ValueError:
emptied = False
connection.close()
return emptied
......@@ -13,7 +13,6 @@ import os
import socket
import time
import concurrent.futures
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
......@@ -31,7 +30,7 @@ EXCHANGES = {
"publish": "soar_publish",
"broadcast": "soar_broadcast",
}
current_channel = None
class ConfigHandler(FileSystemEventHandler):
def __init__(self):
......@@ -49,19 +48,19 @@ class ConfigHandler(FileSystemEventHandler):
def update_clients(updated_client_ids):
global RUNNING_CLIENTS
with concurrent.futures.ThreadPoolExecutor() as executor:
logging.debug("Old: " + str(RUNNING_CLIENTS))
logging.debug("New: " + str(updated_client_ids))
global current_channel
logging.debug("Old: " + str(RUNNING_CLIENTS))
logging.debug("New: " + str(updated_client_ids))
for client_id in updated_client_ids:
if client_id not in RUNNING_CLIENTS:
run_client(client_id, executor)
logging.info(f"Started client: {client_id}")
for client_id in updated_client_ids:
if client_id not in RUNNING_CLIENTS:
run_client(client_id)
logging.info(f"Started client: {client_id}")
for client_id in RUNNING_CLIENTS:
if client_id not in updated_client_ids:
stop_client(client_id)
logging.info(f"Shutdown client: {client_id}")
for client_id in RUNNING_CLIENTS:
if client_id not in updated_client_ids:
stop_client(client_id)
logging.info(f"Shutdown client: {client_id}")
def watch_config(running_clients):
......@@ -76,7 +75,7 @@ def watch_config(running_clients):
while True:
try:
pass
time.sleep(1)
except KeyboardInterrupt as interrupt:
observer.stop()
raise interrupt
......@@ -100,8 +99,9 @@ def stop_client(client_id):
return stopping
def run_client(client_id, channel):
def run_client(client_id):
global RUNNING_CLIENTS
global current_channel
client_model = ClientModel()
client = client_model.find(client_id)
......@@ -109,59 +109,63 @@ def run_client(client_id, channel):
client_id = client["client_id"]
logging.info(f"Running client: {client_id}")
forward(
channel,
current_channel,
f"{client_id}-outbox",
"soar-publish"
)
broadcast(
channel,
current_channel,
f"{client_id}-broadcast",
EXCHANGES.get("broadcast")
)
subscribe(
channel,
current_channel,
f"{client_id}-inbox",
EXCHANGES.get("publish"),
client["subscription"],
)
subscribe(
channel,
current_channel,
f"{client_id}-inbox",
EXCHANGES.get("broadcast")
)
if client_id not in RUNNING_CLIENTS:
RUNNING_CLIENTS.append(client_id)
except Exception as error:
logging.error(str(error))
def run_bus(clients, channel):
global RUNNING_CLIENTS
global current_channel
current_channel = channel
logging.info("Starting SOAR bus...")
# publish
publish(
channel,
current_channel,
"soar-publish",
EXCHANGES.get("publish")
)
for id in clients.keys():
run_client(id, channel)
run_client(id)
# Global vars are not shared across threads so you
# have to pass the global var into the thread
# thread = executor.submit(watch_config, RUNNING_CLIENTS) # TODO: Sort this out
# THREADS["config-watcher"] = thread
watch_config(RUNNING_CLIENTS)
def on_channel_open(channel):
print("in on_channel_open")
# Invoked when the channel is open
client_model = ClientModel()
clients = client_model.get()
client_count = len(clients.keys())
logging.debug(f"Running {client_count} clients")
run_bus(clients, channel)
def on_connection_open(connection):
print("in on_open")
# Invoked when the connection is open
connection.channel(on_open_callback=on_channel_open)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment