Commit d2af70ac authored by Dan Jones's avatar Dan Jones
Browse files

Merge branch '41-create-api-endpoint-to-report-on-queue-size' into 'dev'

Resolve "Create API endpoint to report on queue size"

Closes #41

See merge request !21
2 merge requests!26Resolve "Release v1.0.0",!21Resolve "Create API endpoint to report on queue size"
Pipeline #115160 passed with stages
in 1 minute and 4 seconds
...@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ...@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased] ## [Unreleased]
### Added
Create a queues API to enable listing queue size and emptying queues
## [v0.1.0] - 2023-03-24 ## [v0.1.0] - 2023-03-24
### Added ### Added
......
...@@ -10,6 +10,7 @@ from endpoints.notify import Notify ...@@ -10,6 +10,7 @@ from endpoints.notify import Notify
from endpoints.receive import Receive from endpoints.receive import Receive
from endpoints.send import Send from endpoints.send import Send
from endpoints.token import Token from endpoints.token import Token
from endpoints.queues import Queue, QueueList
from models.token_model import TokenModel from models.token_model import TokenModel
from logger import setup_logging from logger import setup_logging
...@@ -45,6 +46,8 @@ def create_app(): ...@@ -45,6 +46,8 @@ def create_app():
api.add_resource(Send, "/send") api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify") api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token") api.add_resource(Token, "/token")
api.add_resource(QueueList, "/queue")
api.add_resource(Queue, "/queue/<queue_name>")
return app return app
......
from flask_restful import request, abort
from marshmallow import Schema, fields
from endpoints.auth_resource import AuthResource
from models.client_model import ClientModel
from rmq import get_queue_status, empty_queue
class Queue(AuthResource):
def get(self, queue_name):
allow = self.auth(request)
if allow:
queue_status = get_queue_status(queue_name)
return {"queue": queue_name, "properties": queue_status}
def delete(self, queue_name):
allow = self.auth(request)
if allow:
emptied = empty_queue(queue_name)
queue_status = get_queue_status(queue_name)
queue_status["empty"] = emptied
return {"queue": queue_name, "properties": queue_status}
class QueueList(AuthResource):
def __init__(self):
super().__init__()
self.clients_file = ClientModel()
def get(self):
allow = self.auth(request)
if allow:
queue_status = {
client_id: {
"inbox": get_queue_status(f"{client_id}-inbox"),
"outbox": get_queue_status(f"{client_id}-outbox"),
"broadcast": get_queue_status(f"{client_id}-broadcast"),
}
for client_id in self.clients_file.get().keys()
}
return [
{
"client_id": client_id,
"queue": f"{client_id}-{queue_type}",
"type": queue_type,
"properties": queue_status,
}
for client_id, client_queues in queue_status.items()
for queue_type, queue_status in client_queues.items()
]
...@@ -4,6 +4,7 @@ import os ...@@ -4,6 +4,7 @@ import os
import pika import pika
host = os.getenv( host = os.getenv(
"MQ_HOST", "localhost" "MQ_HOST", "localhost"
) # Sets to whatever MQ_HOST is, or defaults to localhost ) # Sets to whatever MQ_HOST is, or defaults to localhost
...@@ -220,3 +221,24 @@ def listen(queue_name, callback): ...@@ -220,3 +221,24 @@ def listen(queue_name, callback):
channel.basic_consume(queue=queue_name, on_message_callback=callback) channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming() channel.start_consuming()
def get_queue_status(queue_name):
_, channel = pika_connect(host=host)
response = channel.queue_declare(queue=queue_name, passive=True)
queue_status = {
"size": response.method.message_count,
"has_consumer": response.method.consumer_count > 0,
}
logging.debug(f"Queue: {queue_name} contains {queue_status['size']} messages")
return queue_status
def empty_queue(queue_name):
_, channel = pika_connect(host=host)
emptied = True
try:
channel.queue_purge(queue_name)
except ValueError:
emptied = False
return emptied
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment