Unverified Commit cd4ad416 authored by Dan Jones's avatar Dan Jones
Browse files

feat: add endpoint to empty a named queue

parent 0f5e168c
Pipeline #115150 passed with stages
in 57 seconds
......@@ -10,7 +10,7 @@ from endpoints.notify import Notify
from endpoints.receive import Receive
from endpoints.send import Send
from endpoints.token import Token
from endpoints.queues import QueueList
from endpoints.queues import Queue, QueueList
from models.token_model import TokenModel
from logger import setup_logging
......@@ -46,7 +46,8 @@ def create_app():
api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token")
api.add_resource(QueueList, "/queues")
api.add_resource(QueueList, "/queue")
api.add_resource(Queue, "/queue/<queue_name>")
return app
......
from flask_restful import request
from flask_restful import request, abort
from marshmallow import Schema, fields
from endpoints.auth_resource import AuthResource
from models.client_model import ClientModel
from rmq import get_queue_status
from rmq import get_queue_status, empty_queue
class Queue(AuthResource):
def get(self, queue_name):
allow = self.auth(request)
if allow:
queue_status = get_queue_status(queue_name)
return {"queue": queue_name, "properties": queue_status}
def delete(self, queue_name):
allow = self.auth(request)
if allow:
emptied = empty_queue(queue_name)
queue_status = get_queue_status(queue_name)
queue_status["empty"] = emptied
return {"queue": queue_name, "properties": queue_status}
class QueueList(AuthResource):
......@@ -25,8 +42,8 @@ class QueueList(AuthResource):
"client_id": client_id,
"queue": f"{client_id}-{queue_type}",
"type": queue_type,
"properties": queue_size,
"properties": queue_status,
}
for client_id, client_queues in queue_status.items()
for queue_type, queue_size in client_queues.items()
for queue_type, queue_status in client_queues.items()
]
import json
import logging
import os
import time
import pika
......@@ -232,3 +233,13 @@ def get_queue_status(queue_name):
}
logging.debug(f"Queue: {queue_name} contains {queue_status['size']} messages")
return queue_status
def empty_queue(queue_name):
_, channel = pika_connect(host=host)
emptied = True
try:
channel.queue_purge(queue_name)
except ValueError:
emptied = False
return emptied
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment