soar_broadcast.py 1.11 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
#!/usr/bin/env python
import pika


def get_connection():
    return pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))


def deliver(body, broadcast_exchange):
    print("broadcast")
    deliver_connection = get_connection()
    deliver_channel = deliver_connection.channel()
    deliver_channel.exchange_declare(
        exchange=broadcast_exchange, exchange_type="fanout"
    )
    deliver_channel.basic_publish(
        exchange=broadcast_exchange, routing_key="", body=body
    )
    deliver_connection.close()


def listen(queue_name, broadcast_exchange):
    def bcast_callback(ch, method, properties, body):
        delivered = deliver(body, broadcast_exchange)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    listen_connection = get_connection()
    listen_channel = listen_connection.channel()
    listen_channel.queue_declare(queue=queue_name, durable=True)
    listen_channel.basic_consume(queue=queue_name, on_message_callback=bcast_callback)
    listen_channel.start_consuming()


def broadcast(queue_name, broadcast_exchange="soar_broadcast"):
    listen(queue_name, broadcast_exchange)