#!/usr/bin/env python import pika import json import sys def get_connection(): return pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) def deliver(body, topic, publish_exchange): print("publish on topic: %s" % topic) deliver_connection = get_connection() deliver_channel = deliver_connection.channel() deliver_channel.exchange_declare(exchange=publish_exchange, exchange_type="topic") deliver_channel.basic_publish( exchange=publish_exchange, routing_key=topic, body=body ) deliver_connection.close() def listen(queue_name, publish_exchange): def pub_callback(ch, method, properties, body): message = json.loads(body.decode()) topic = message["topic"] deliver(body, topic, publish_exchange) ch.basic_ack(delivery_tag=method.delivery_tag) listen_connection = get_connection() listen_channel = listen_connection.channel() listen_channel.queue_declare(queue=queue_name, durable=True) listen_channel.basic_consume(queue=queue_name, on_message_callback=pub_callback) listen_channel.start_consuming() def publish(queue_name, publish_exchange="soar_publish"): listen(queue_name, publish_exchange) if __name__ == "__main__": queue_name = sys.argv[1] publish(queue_name)