diff --git a/endpoints/hello.py b/endpoints/hello.py deleted file mode 100644 index 7bb0bab0de35c7cebf2d22fac1f1213b16ef90df..0000000000000000000000000000000000000000 --- a/endpoints/hello.py +++ /dev/null @@ -1,6 +0,0 @@ -from flask_restful import Resource - - -class HelloWorld(Resource): - def get(self): - return {"hello": "world"} diff --git a/endpoints/notify.py b/endpoints/notify.py index a953db879eee9b7b7b22ce34de44e57f93d9e772..22bcb8df0eb1e8302db0549e30d769339826979d 100644 --- a/endpoints/notify.py +++ b/endpoints/notify.py @@ -1,8 +1,10 @@ import json -from flask_restful import request, abort + +from flask_restful import abort, request from marshmallow import Schema, fields -import pika + from endpoints.auth_resource import AuthResource +from rmq import write_to_queue class NotifySchema(Schema): @@ -33,8 +35,4 @@ class Notify(AuthResource): if allow: notify_queue = self.client['client_id'] + "-broadcast" - connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) - channel = connection.channel() - channel.queue_declare(queue=notify_queue, durable=True) - channel.basic_publish(exchange="", routing_key=notify_queue, body=json.dumps(message)) - connection.close() + write_to_queue(queue_name=notify_queue, msg=json.dumps(message)) \ No newline at end of file diff --git a/endpoints/receive.py b/endpoints/receive.py index bc1febaa41d5fbd2e724c2bf77521842952c6c88..5c1f99c9865248c259e83ecb9fd2df6ddfabb084 100644 --- a/endpoints/receive.py +++ b/endpoints/receive.py @@ -1,9 +1,8 @@ -from flask_restful import request, abort +from flask_restful import abort, request from marshmallow import Schema, fields -import pika -import json -from models.token import TokenModel + from endpoints.auth_resource import AuthResource +from rmq import read_from_queue class ReceiveQuerySchema(Schema): @@ -23,28 +22,9 @@ class Receive(AuthResource): if errors: abort(400, message=str(errors)) - messages = [] max_messages = request.args.get("max_messages", 10) - + allow = self.auth(request) if allow: inbox_queue = self.client['client_id'] + "-inbox" - - if allow: - connection = pika.BlockingConnection( - pika.ConnectionParameters(host="localhost") - ) - channel = connection.channel() - channel.queue_declare(queue=inbox_queue, durable=True) - while len(messages) < max_messages: - method_frame, header_frame, body = channel.basic_get(inbox_queue) - if method_frame: - print(method_frame, header_frame, body) - channel.basic_ack(method_frame.delivery_tag) - messages.append(json.loads(body.decode())) - else: - print("No message returned") - break - channel.close() - connection.close() - return messages + return read_from_queue(queue_name=inbox_queue, max_msgs=max_messages) \ No newline at end of file diff --git a/endpoints/send.py b/endpoints/send.py index 900e8a5c63d066921315e9e93e22fb8d4eb59d9a..9fe91ca548973ee32d36ce51cd5e8bdd89f23533 100644 --- a/endpoints/send.py +++ b/endpoints/send.py @@ -1,8 +1,11 @@ import json -from flask_restful import request, abort + +from flask_restful import abort, request from marshmallow import Schema, fields -import pika + from endpoints.auth_resource import AuthResource +from rmq import write_to_queue + class SendSchema(Schema): body = fields.Str(required=True) @@ -28,13 +31,9 @@ class Send(AuthResource): body = args.get("body") topic = args.get("topic") outbox_queue = self.client['client_id'] + "-outbox" - - connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) - channel = connection.channel() - channel.queue_declare(queue=outbox_queue, durable=True) message = { 'topic': topic, 'message': body, } - channel.basic_publish(exchange="", routing_key=outbox_queue, body=json.dumps(message)) - connection.close() + + write_to_queue(queue_name=outbox_queue, msg=json.dumps(message)) \ No newline at end of file diff --git a/rmq.py b/rmq.py index 3008d0f2e6c30ef38f7d62c4a0f2d38eba4e33c2..8216ab586346800347a6eb40164936cfa7ad3e20 100644 --- a/rmq.py +++ b/rmq.py @@ -53,7 +53,7 @@ def write_to_queue(queue_name, msg): def read_from_queue(queue_name, max_msgs): - # get messages off of a queue until the queue is empty of max_msgs is hit + # get messages off of a queue until the queue is empty or max_msgs is hit connection, channel = pika_connect(host=host) setup_queue(channel=channel, queue_name=queue_name)