Unverified Commit be57512a authored by Dan Jones's avatar Dan Jones
Browse files

tests: unit tests for models and api endpoints

- done
  - client and token model
  - client and token api endpoints
  - send api endpoint
- todo
  - notify api endpoint
  - receive api endpoint
parent 61d88a37
......@@ -7,6 +7,7 @@ host = os.getenv("MQ_HOST", "localhost") # Sets to whatever MQ_HOST is, or defau
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def pika_connect(host):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
......@@ -22,16 +23,22 @@ def pika_connect(host):
return connection, channel
def setup_queue(channel, queue_name=''):
channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it
def setup_queue(channel, queue_name=""):
channel.queue_declare(
queue=queue_name, exclusive=False, durable=True
) # exclusive means the queue can only be used by the connection that created it
def fanout_exchange(channel, exchange_name):
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
channel.exchange_declare(
exchange=exchange_name, exchange_type="fanout", durable=True
)
def topic_exchange(channel, exchange_name):
channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
channel.exchange_declare(
exchange=exchange_name, exchange_type="topic", durable=True
)
def deliver_to_exchange(channel, body, exchange_name, topic=None):
......@@ -39,37 +46,39 @@ def deliver_to_exchange(channel, body, exchange_name, topic=None):
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(
exchange=exchange_name,
routing_key='',
body=body,
routing_key="",
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
),
)
else:
topic_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(
exchange=exchange_name,
routing_key=topic,
body=body,
routing_key=topic,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
),
)
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def write_to_queue(queue_name, msg):
# write a single message to a queue
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_publish(
exchange='',
routing_key=queue_name,
exchange="",
routing_key=queue_name,
body=msg,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
),
)
connection.close()
......@@ -125,12 +134,12 @@ def forward(from_queue, to_queue):
def forward_callback(ch, method, properties, body):
channel.basic_publish(
exchange='',
routing_key=to_queue,
exchange="",
routing_key=to_queue,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
),
)
ch.basic_ack(delivery_tag=method.delivery_tag)
......@@ -150,7 +159,9 @@ def publish(queue_name, exchange_name):
def publish_callback(ch, method, properties, body):
message = json.loads(body.decode())
topic = message["topic"]
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
deliver_to_exchange(
channel=ch, body=body, exchange_name=exchange_name, topic=topic
)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
......@@ -161,7 +172,7 @@ def publish(queue_name, exchange_name):
def subscribe(queue_name, exchange_name, topic=None):
# setup bindings between queue and exchange,
# setup bindings between queue and exchange,
# exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
......@@ -183,4 +194,4 @@ def listen(queue_name, callback):
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()
\ No newline at end of file
channel.start_consuming()
......@@ -11,8 +11,8 @@
import concurrent.futures
from endpoints.clients import ClientsFile
from rmq import broadcast, forward, publish, subscribe
from models.client_model import ClientModel
THREADS = []
EXCHANGES = {
......@@ -23,7 +23,7 @@ EXCHANGES = {
def main():
print("Starting SOAR bus...")
clients_file = ClientsFile()
clients_file = ClientModel()
clients = clients_file.get()
with concurrent.futures.ProcessPoolExecutor() as executor:
......@@ -31,7 +31,7 @@ def main():
thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
THREADS.append(thread)
for (id, client) in clients.items():
for id, client in clients.items():
# forward
thread = executor.submit(forward, f"{id}-outbox", "soar-publish")
THREADS.append(thread)
......@@ -42,16 +42,14 @@ def main():
THREADS.append(thread)
# subscribe
thread = executor.submit(
subscribe,
subscribe,
f"{id}-inbox",
EXCHANGES.get("publish"),
client["subscription"] # topic
client["subscription"], # topic
)
THREADS.append(thread)
thread = executor.submit(
subscribe,
f"{id}-inbox",
EXCHANGES.get("broadcast")
subscribe, f"{id}-inbox", EXCHANGES.get("broadcast")
)
THREADS.append(thread)
# push
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment