Commit 298299d5 authored by James Kirk's avatar James Kirk
Browse files

Merge branch '7-message-q-wrapping' into 'dev'

Wrapping rmq

Closes #16 and #7

See merge request !5
parents b05b8172 2b132a1b
from flask_restful import Resource
class HelloWorld(Resource):
def get(self):
return {"hello": "world"}
import json
from flask_restful import request, abort
from flask_restful import abort, request
from marshmallow import Schema, fields
import pika
from endpoints.auth_resource import AuthResource
from rmq import write_to_queue
class NotifySchema(Schema):
......@@ -33,8 +35,4 @@ class Notify(AuthResource):
if allow:
notify_queue = self.client['client_id'] + "-broadcast"
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue=notify_queue, durable=True)
channel.basic_publish(exchange="", routing_key=notify_queue, body=json.dumps(message))
connection.close()
write_to_queue(queue_name=notify_queue, msg=json.dumps(message))
\ No newline at end of file
from flask_restful import request, abort
from flask_restful import abort, request
from marshmallow import Schema, fields
import pika
import json
from models.token import TokenModel
from endpoints.auth_resource import AuthResource
from rmq import read_from_queue
class ReceiveQuerySchema(Schema):
......@@ -23,28 +22,9 @@ class Receive(AuthResource):
if errors:
abort(400, message=str(errors))
messages = []
max_messages = request.args.get("max_messages", 10)
allow = self.auth(request)
if allow:
inbox_queue = self.client['client_id'] + "-inbox"
if allow:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
channel.queue_declare(queue=inbox_queue, durable=True)
while len(messages) < max_messages:
method_frame, header_frame, body = channel.basic_get(inbox_queue)
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
messages.append(json.loads(body.decode()))
else:
print("No message returned")
break
channel.close()
connection.close()
return messages
return read_from_queue(queue_name=inbox_queue, max_msgs=max_messages)
\ No newline at end of file
import json
from flask_restful import request, abort
from flask_restful import abort, request
from marshmallow import Schema, fields
import pika
from endpoints.auth_resource import AuthResource
from rmq import write_to_queue
class SendSchema(Schema):
body = fields.Str(required=True)
......@@ -28,13 +31,9 @@ class Send(AuthResource):
body = args.get("body")
topic = args.get("topic")
outbox_queue = self.client['client_id'] + "-outbox"
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue=outbox_queue, durable=True)
message = {
'topic': topic,
'message': body,
}
channel.basic_publish(exchange="", routing_key=outbox_queue, body=json.dumps(message))
connection.close()
write_to_queue(queue_name=outbox_queue, msg=json.dumps(message))
\ No newline at end of file
File moved
File moved
File moved
File moved
File moved
import json
import pika
host='localhost' # TODO Handle host being passed in (https://git.noc.ac.uk/communications-backbone-system/communications-backbone/-/issues/17)
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def pika_connect(host):
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
return connection, channel
def setup_queue(channel, queue_name=''):
channel.queue_declare(queue=queue_name, exclusive=False, durable=True) # exclusive means the queue can only be used by the connection that created it
def fanout_exchange(channel, exchange_name):
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
def topic_exchange(channel, exchange_name):
channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
def deliver_to_exchange(channel, body, exchange_name, topic=None):
if topic is None:
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(
exchange=exchange_name,
routing_key='',
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
else:
topic_exchange(channel=channel, exchange_name=exchange_name)
channel.basic_publish(
exchange=exchange_name,
routing_key=topic,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def write_to_queue(queue_name, msg):
# write a single message to a queue
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=msg,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
connection.close()
def read_from_queue(queue_name, max_msgs):
# get messages off of a queue until the queue is empty or max_msgs is hit
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
messages = []
while len(messages) < max_msgs:
method_frame, header_frame, body = channel.basic_get(queue_name)
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
try:
messages.append(json.loads(body.decode()))
except json.decoder.JSONDecodeError:
messages.append(body.decode())
else:
print("No message returned")
break
connection.close()
return messages
def broadcast(queue_name, exchange_name):
# read from a queue, forward onto a 'fanout' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def broadcast_callback(ch, method, properties, body):
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name, on_message_callback=broadcast_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def forward(from_queue, to_queue):
# read from a queue, forward onto a different queue
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=from_queue)
setup_queue(channel=channel, queue_name=to_queue)
def forward_callback(ch, method, properties, body):
channel.basic_publish(
exchange='',
routing_key=to_queue,
body=body,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=from_queue, on_message_callback=forward_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def publish(queue_name, exchange_name):
# read from a queue, forward onto a 'topic' exchange
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
def publish_callback(ch, method, properties, body):
message = json.loads(body.decode())
topic = message["topic"]
deliver_to_exchange(channel=ch, body=body, exchange_name=exchange_name, topic=topic)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
channel.basic_consume(queue=queue_name, on_message_callback=publish_callback)
channel.start_consuming()
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
def subscribe(queue_name, exchange_name, topic=None):
# setup bindings between queue and exchange,
# exchange_type is either 'fanout' or 'topic' based on if the topic arg is passed
connection, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
if topic is None:
fanout_exchange(channel=channel, exchange_name=exchange_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
else:
topic_exchange(channel=channel, exchange_name=exchange_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
connection.close()
def listen(queue_name, callback):
# subscribe client to a queue, using the callback arg
_, channel = pika_connect(host=host)
setup_queue(channel=channel, queue_name=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()
\ No newline at end of file
......@@ -11,10 +11,7 @@
import concurrent.futures
from endpoints.clients import ClientsFile
from soar_broadcast import broadcast
from soar_forward import forward
from soar_publish import publish
from soar_subscribe import subscribe
from rmq import publish, subscribe, broadcast, forward
THREADS = []
EXCHANGES = {
......@@ -43,11 +40,15 @@ def main():
THREADS.append(thread)
# subscribe
thread = executor.submit(
subscribe,
subscribe,
f"{id}-inbox",
client["subscription"],
EXCHANGES.get("publish"),
EXCHANGES.get("broadcast"),
client["subscription"] # topic
)
thread = executor.submit(
subscribe,
f"{id}-inbox",
EXCHANGES.get("broadcast")
)
THREADS.append(thread)
# push
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment