Unverified Commit 90dcfbba authored by Dan Jones's avatar Dan Jones
Browse files

feat: prototype soar backbone implementation

Quick sketch of what soar might look like.
parent 611d9cab
Pipeline #92250 canceled with stages
clients.json
examples/
\ No newline at end of file
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
pubsubpy = "*"
pika = "*"
pyrabbit = "*"
flask = "*"
flask-restful = "*"
marshmallow = "*"
[dev-packages]
[requires]
python_version = "3.8"
{
"_meta": {
"hash": {
"sha256": "b71ec6300f7c04a8d222991089770e4384941bf5553cfa12031330227ad043cb"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.8"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"amqp": {
"hashes": [
"sha256:2c1b13fecc0893e946c65cbd5f36427861cffa4ea2201d8f6fca22e2a373b5e2",
"sha256:6f0956d2c23d8fa6e7691934d8c3930eadb44972cbbd1a7ae3a520f735d43359"
],
"markers": "python_version >= '3.6'",
"version": "==5.1.1"
},
"aniso8601": {
"hashes": [
"sha256:1d2b7ef82963909e93c4f24ce48d4de9e66009a21bf1c1e1c85bdd0812fe412f",
"sha256:72e3117667eedf66951bb2d93f4296a56b94b078a8a95905a052611fb3f1b973"
],
"version": "==9.0.1"
},
"click": {
"hashes": [
"sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e",
"sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"
],
"markers": "python_version >= '3.7'",
"version": "==8.1.3"
},
"flask": {
"hashes": [
"sha256:642c450d19c4ad482f96729bd2a8f6d32554aa1e231f4f6b4e7e5264b16cca2b",
"sha256:b9c46cc36662a7949f34b52d8ec7bb59c0d74ba08ba6cb9ce9adc1d8676d9526"
],
"index": "pypi",
"version": "==2.2.2"
},
"flask-restful": {
"hashes": [
"sha256:4970c49b6488e46c520b325f54833374dc2b98e211f1b272bd4b0c516232afe2",
"sha256:ccec650b835d48192138c85329ae03735e6ced58e9b2d9c2146d6c84c06fa53e"
],
"index": "pypi",
"version": "==0.3.9"
},
"future": {
"hashes": [
"sha256:b1bead90b70cf6ec3f0710ae53a525360fa360d306a86583adc6bf83a4db537d"
],
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.18.2"
},
"httplib2": {
"hashes": [
"sha256:987c8bb3eb82d3fa60c68699510a692aa2ad9c4bd4f123e51dfb1488c14cdd01",
"sha256:fc144f091c7286b82bec71bdbd9b27323ba709cc612568d3000893bfd9cb4b34"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.21.0"
},
"itsdangerous": {
"hashes": [
"sha256:2c2349112351b88699d8d4b6b075022c0808887cb7ad10069318a8b0bc88db44",
"sha256:5dbbc68b317e5e42f327f9021763545dc3fc3bfe22e6deb96aaf1fc38874156a"
],
"markers": "python_version >= '3.7'",
"version": "==2.1.2"
},
"jinja2": {
"hashes": [
"sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852",
"sha256:6088930bfe239f0e6710546ab9c19c9ef35e29792895fed6e6e31a023a182a61"
],
"markers": "python_version >= '3.7'",
"version": "==3.1.2"
},
"kombu": {
"hashes": [
"sha256:37cee3ee725f94ea8bb173eaab7c1760203ea53bbebae226328600f9d2799610",
"sha256:8b213b24293d3417bcf0d2f5537b7f756079e3ea232a8386dcc89a59fd2361a4"
],
"markers": "python_version >= '3.7'",
"version": "==5.2.4"
},
"markupsafe": {
"hashes": [
"sha256:0212a68688482dc52b2d45013df70d169f542b7394fc744c02a57374a4207003",
"sha256:089cf3dbf0cd6c100f02945abeb18484bd1ee57a079aefd52cffd17fba910b88",
"sha256:10c1bfff05d95783da83491be968e8fe789263689c02724e0c691933c52994f5",
"sha256:33b74d289bd2f5e527beadcaa3f401e0df0a89927c1559c8566c066fa4248ab7",
"sha256:3799351e2336dc91ea70b034983ee71cf2f9533cdff7c14c90ea126bfd95d65a",
"sha256:3ce11ee3f23f79dbd06fb3d63e2f6af7b12db1d46932fe7bd8afa259a5996603",
"sha256:421be9fbf0ffe9ffd7a378aafebbf6f4602d564d34be190fc19a193232fd12b1",
"sha256:43093fb83d8343aac0b1baa75516da6092f58f41200907ef92448ecab8825135",
"sha256:46d00d6cfecdde84d40e572d63735ef81423ad31184100411e6e3388d405e247",
"sha256:4a33dea2b688b3190ee12bd7cfa29d39c9ed176bda40bfa11099a3ce5d3a7ac6",
"sha256:4b9fe39a2ccc108a4accc2676e77da025ce383c108593d65cc909add5c3bd601",
"sha256:56442863ed2b06d19c37f94d999035e15ee982988920e12a5b4ba29b62ad1f77",
"sha256:671cd1187ed5e62818414afe79ed29da836dde67166a9fac6d435873c44fdd02",
"sha256:694deca8d702d5db21ec83983ce0bb4b26a578e71fbdbd4fdcd387daa90e4d5e",
"sha256:6a074d34ee7a5ce3effbc526b7083ec9731bb3cbf921bbe1d3005d4d2bdb3a63",
"sha256:6d0072fea50feec76a4c418096652f2c3238eaa014b2f94aeb1d56a66b41403f",
"sha256:6fbf47b5d3728c6aea2abb0589b5d30459e369baa772e0f37a0320185e87c980",
"sha256:7f91197cc9e48f989d12e4e6fbc46495c446636dfc81b9ccf50bb0ec74b91d4b",
"sha256:86b1f75c4e7c2ac2ccdaec2b9022845dbb81880ca318bb7a0a01fbf7813e3812",
"sha256:8dc1c72a69aa7e082593c4a203dcf94ddb74bb5c8a731e4e1eb68d031e8498ff",
"sha256:8e3dcf21f367459434c18e71b2a9532d96547aef8a871872a5bd69a715c15f96",
"sha256:8e576a51ad59e4bfaac456023a78f6b5e6e7651dcd383bcc3e18d06f9b55d6d1",
"sha256:96e37a3dc86e80bf81758c152fe66dbf60ed5eca3d26305edf01892257049925",
"sha256:97a68e6ada378df82bc9f16b800ab77cbf4b2fada0081794318520138c088e4a",
"sha256:99a2a507ed3ac881b975a2976d59f38c19386d128e7a9a18b7df6fff1fd4c1d6",
"sha256:a49907dd8420c5685cfa064a1335b6754b74541bbb3706c259c02ed65b644b3e",
"sha256:b09bf97215625a311f669476f44b8b318b075847b49316d3e28c08e41a7a573f",
"sha256:b7bd98b796e2b6553da7225aeb61f447f80a1ca64f41d83612e6139ca5213aa4",
"sha256:b87db4360013327109564f0e591bd2a3b318547bcef31b468a92ee504d07ae4f",
"sha256:bcb3ed405ed3222f9904899563d6fc492ff75cce56cba05e32eff40e6acbeaa3",
"sha256:d4306c36ca495956b6d568d276ac11fdd9c30a36f1b6eb928070dc5360b22e1c",
"sha256:d5ee4f386140395a2c818d149221149c54849dfcfcb9f1debfe07a8b8bd63f9a",
"sha256:dda30ba7e87fbbb7eab1ec9f58678558fd9a6b8b853530e176eabd064da81417",
"sha256:e04e26803c9c3851c931eac40c695602c6295b8d432cbe78609649ad9bd2da8a",
"sha256:e1c0b87e09fa55a220f058d1d49d3fb8df88fbfab58558f1198e08c1e1de842a",
"sha256:e72591e9ecd94d7feb70c1cbd7be7b3ebea3f548870aa91e2732960fa4d57a37",
"sha256:e8c843bbcda3a2f1e3c2ab25913c80a3c5376cd00c6e8c4a86a89a28c8dc5452",
"sha256:efc1913fd2ca4f334418481c7e595c00aad186563bbc1ec76067848c7ca0a933",
"sha256:f121a1420d4e173a5d96e47e9a0c0dcff965afdf1626d28de1460815f7c4ee7a",
"sha256:fc7b548b17d238737688817ab67deebb30e8073c95749d55538ed473130ec0c7"
],
"markers": "python_version >= '3.7'",
"version": "==2.1.1"
},
"marshmallow": {
"hashes": [
"sha256:35e02a3a06899c9119b785c12a22f4cda361745d66a71ab691fd7610202ae104",
"sha256:6804c16114f7fce1f5b4dadc31f4674af23317fcc7f075da21e35c1a35d781f7"
],
"index": "pypi",
"version": "==3.18.0"
},
"packaging": {
"hashes": [
"sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb",
"sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"
],
"markers": "python_version >= '3.6'",
"version": "==21.3"
},
"pika": {
"hashes": [
"sha256:89f5e606646caebe3c00cbdbc4c2c609834adde45d7507311807b5775edac8e0",
"sha256:beb19ff6dd1547f99a29acc2c6987ebb2ba7c44bf44a3f8e305877c5ef7d2fdc"
],
"index": "pypi",
"version": "==1.3.1"
},
"pubsubpy": {
"hashes": [
"sha256:58e394d14dd172fc03caff172adf3817d980bb6b8cb46cd18a362f8aa6e530c6",
"sha256:b29fa140615935ac03801ccd1de137ce4d33b741465b9002f290538ce966f2e9"
],
"index": "pypi",
"version": "==2.3.0"
},
"pyparsing": {
"hashes": [
"sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb",
"sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"
],
"markers": "python_full_version >= '3.6.8'",
"version": "==3.0.9"
},
"pyrabbit": {
"hashes": [
"sha256:50b8995fbfde14820ddc97292312c8f0c77054748c2b018138d03d94e400c39c"
],
"index": "pypi",
"version": "==1.1.0"
},
"pytz": {
"hashes": [
"sha256:222439474e9c98fced559f1709d89e6c9cbf8d79c794ff3eb9f8800064291427",
"sha256:e89512406b793ca39f5971bc999cc538ce125c0e51c27941bef4568b460095e2"
],
"version": "==2022.6"
},
"six": {
"hashes": [
"sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926",
"sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.16.0"
},
"vine": {
"hashes": [
"sha256:4c9dceab6f76ed92105027c49c823800dd33cacce13bdedc5b914e3514b7fb30",
"sha256:7d3b1624a953da82ef63462013bbd271d3eb75751489f9807598e8f340bd637e"
],
"markers": "python_version >= '3.6'",
"version": "==5.0.0"
},
"werkzeug": {
"hashes": [
"sha256:7ea2d48322cc7c0f8b3a215ed73eabd7b5d75d0b50e31ab006286ccff9e00b8f",
"sha256:f979ab81f58d7318e064e99c4506445d60135ac5cd2e177a2de0089bfd4c9bd5"
],
"markers": "python_version >= '3.7'",
"version": "==2.2.2"
}
},
"develop": {}
}
# Communications Backbone
Communications Backbone by C2 Team (NOC)
\ No newline at end of file
Communications Backbone by C2 Team (NOC)
## DJ prototype
I did this in freedom-fire-day so it doesn't currently follow the flask template.
It's also a bit weird because it writes to a local ignored `clients.json` file
instead of using a database. I did this as a placeholder because we've not yet
decided what the infrastructure looks like.
### Data flow
- Client A sends to `client-a-outbox` (or POSTs to API /send - not yet implemented)
- Messages are forwarded from `client-a-outbox` to `soar-publish`
- Messages are published from `soar-publish` with the given topic read from the message
- Subscribers listen to for messages
- Subscription is delivered to `client-b-inbox`
- Client B reads from `client-b-inbox (or GETs from API /receive)
There is a parallel flow when a client sends to `client-a-notify` in which case the
messages are delivered through the broadcast exchange to all clients `client-x-inbox`.
### Setup
```
pipenv install
pipenv shell
```
### Running
#### RabbitMQ
`docker run --rm -p 5672:5672 -d --hostname rmq --name rmq rabbitmq:management`
#### API
In a `pipenv shell`
```
python api.py
```
#### Create some clients
`POST` to `http://localhost:3000/clients`
#### Event bus
In a `pipenv shell`
```
python soar_bus.py
```
#### Send / Receive directly
In a `pipenv shell`
```
# Send a message
python client_send.py noc-c2-outbox 'soar.noc.slocum.something' from noc-c2
```
```
# Receive messages
python client_read.py noc-sfmc-inbox
```
#### Receive via API
As a placeholder for proper authentication you post the `client_id` and
`secret` and it checks that a client with that id exists and that the
secret matches before allowing the request.
This should be replaced with a proper auth layer.
`GET http://localhost:5000/receive?client_id=[client_id]&secret=[secret]`
### Components
- `soar_bus.py` - Run all the components threaded based on existing clients
- `soar_forward.py` - Listen for messages on queue A and forward messages to queue B
- `soar_publish.py` - Listen for messages on queue A and publish on exchange B
- `soar_broadcast.py` - Listen for messages on queue A and broadcast on exchange B
- `soar_subscribe.py` - Create subscriptions to both the publish and broadcast exchange - deliver to queue A
(I think this should probably be 2 separate functions to keep things nice and simple)
- `soar_push.py` - Not yet implemented - Listen for messages on queue A and POST to the client's webhook URL
from flask import Flask
from flask_restful import Api
from endpoints.hello import HelloWorld
from endpoints.clients import Client, ClientList
from endpoints.receive import Receive
app = Flask(__name__)
api = Api(app)
api.add_resource(HelloWorld, "/")
api.add_resource(ClientList, "/client")
api.add_resource(Client, "/client/<client_id>")
api.add_resource(Receive, "/receive")
if __name__ == "__main__":
app.run(debug=True)
#!/usr/bin/env python
import pika, sys, os, json
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
queue_name = sys.argv[1]
channel.queue_declare(queue=queue_name, durable=True)
def callback(ch, method, properties, body):
message = json.loads(body.decode())
print(" [x] Received %r" % message)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
#!/usr/bin/env python
import pika
import sys
import json
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
queue_name = sys.argv[1]
topic = sys.argv[2]
message = " ".join(sys.argv[3:]) or "Hello World!"
body = json.dumps({"topic": topic, "message": message})
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange="", routing_key=queue_name, body=body)
connection.close()
from flask_restful import Resource, reqparse, abort, fields, marshal_with
import json
import os
import random
import string
class ClientsFile:
file = "clients.json"
mtime = 0
clients = {}
parser = None
def __init__(self):
self.get()
self.setup_request_parser()
def get(self):
try:
mtime = os.path.getmtime(self.file)
if mtime > self.mtime:
with open(self.file, "r") as client_file:
self.clients = json.load(client_file)
except FileNotFoundError as error:
self.clients = {}
self.save()
return self.clients
def find(self, client_id):
self.get()
if client_id in self.clients:
client = self.clients[client_id]
else:
client = None
return client
def add(self, client):
client.secret = self.secret()
self.clients[client["client_id"]] = client
self.save()
return client
def remove(self, client):
del self.clients[client["client_id"]]
self.save()
def update(self, client_updates):
client = self.find(client_updates["client_id"])
client.update(client_updates)
self.clients[client["client_id"]] = client
self.save()
return client
def save(self):
try:
with open(self.file, "w") as client_file:
client_file.write(json.dumps(self.clients, indent=2))
return True
except OSError as error:
print(str(error))
return False
def secret(self, chars=36):
res = "".join(
random.choices(
string.ascii_lowercase + string.ascii_uppercase + string.digits, k=chars
)
)
return str(res)
def setup_request_parser(self):
parser = reqparse.RequestParser()
parser.add_argument(
"client_id", type=str, help="A unique name to identify the client"
)
parser.add_argument(
"client_name", type=str, help="A human friendly name to identify the client"
)
parser.add_argument(
"subscription",
type=str,
help="A dot delimited string identify topics to subscribe to",
)
self.parser = parser
def parse(self):
return self.parser.parse_args()
resource_fields = {
"client_id": fields.String,
"client_name": fields.String,
"subscription": fields.String,
}
clients_file = ClientsFile()
# Client
class Client(Resource):
@marshal_with(resource_fields)
def get(self, client_id):
client = clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
return client
def delete(self, todo_id):
client = clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
clients_file.remove(client)
return client, 204
def put(self, client_id):
args = clients_file.parse()
client = clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
client = clients_file.update(args)
return client, 201
# ClientList
class ClientList(Resource):
def get(self):
return {
client_id: (client, client.pop("secret", None))[0]
for client_id, client in clients_file.get().items()
}
def post(self):
args = clients_file.parse()
client = clients_file.find(args["client_id"])
if client:
abort(403, message="Duplicate client id: {}".format(client_id))
else:
client = clients_file.add(args)
return client, 201
from flask_restful import Resource
class HelloWorld(Resource):
def get(self):
return {"hello": "world"}
from flask_restful import Resource, reqparse, abort, fields, marshal_with
import json
class Notify(Resource):
def setup_request_parser(self):
parser = reqparse.RequestParser()
parser.add_argument(
"client_id", type=str, help="A unique name to identify the client"
)
parser.add_argument(
"secret", type=str, help="A human friendly name to identify the client"
)
parser.add_argument("content", type=str, help="The message")
self.parser = parser
def parse(self):
return self.parser.parse_args()
def put(self, client_id):
args = clients_file.parse()
client = clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
client = clients_file.update(args)
return client, 201
from flask_restful import Resource, request, abort, fields
from marshmallow import Schema, fields
import pika
import json
class ReceiveSchema(Schema):
client_id = fields.Str(required=True)
secret = fields.Str(required=True)
max_messages = fields.Int(required=False)
class Receive(Resource):
clients = None
schema = None
def __init__(self):
self.schema = ReceiveSchema()
with open("clients.json", "r") as clients_file:
self.clients = json.load(clients_file)
def get(self):
errors = self.schema.validate(request.args)
if errors:
abort(400, str(errors))
messages = []
allow = False
max_messages = request.args.get("max_messages", 10)
client_id = request.args.get("client_id")
inbox_queue = client_id + "-inbox"
if client_id in self.clients:
client = self.clients.get(client_id)
if request.args.get("secret") == client.get("secret"):
allow = True
if allow:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
channel.queue_declare(queue=inbox_queue, durable=True)
while len(messages) < max_messages:
method_frame, header_frame, body = channel.basic_get(inbox_queue)
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
messages.append(json.loads(body.decode()))
else:
print("No message returned")
break
channel.close()
connection.close()
else:
abort(403, message="Invalid client credentials")
return messages
from flask_restful import Resource, reqparse, abort, fields, marshal_with
import json
class Send(Resource):
def setup_request_parser(self):
parser = reqparse.RequestParser()
parser.add_argument(
"client_id", type=str, help="A unique name to identify the client"
)
parser.add_argument(
"secret", type=str, help="A human friendly name to identify the client"
)
parser.add_argument("topic", type=str, help="Publisher topic")
parser.add_argument("content", type=str, help="The message")
self.parser = parser
def parse(self):
return self.parser.parse_args()
def put(self, client_id):
args = clients_file.parse()
client = clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
client = clients_file.update(args)
return client, 201
[*] Waiting for logs. To exit press CTRL+C
[x] 'error':b'Run. Run. Or it will explode.'
[*] Waiting for logs. To exit press CTRL+C
[x] 'warning':b'Run. Run. Or it will explode.'
#!/usr/bin/env python
import pika
def get_connection():
return pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
def deliver(body, broadcast_exchange):
print("broadcast")
deliver_connection = get_connection()
deliver_channel = deliver_connection.channel()
deliver_channel.exchange_declare(
exchange=broadcast_exchange, exchange_type="fanout"
)
deliver_channel.basic_publish(
exchange=broadcast_exchange, routing_key="", body=body
)
deliver_connection.close()
def listen(queue_name, broadcast_exchange):
def bcast_callback(ch, method, properties, body):
delivered = deliver(body, broadcast_exchange)
ch.basic_ack(delivery_tag=method.delivery_tag)
listen_connection = get_connection()
listen_channel = listen_connection.channel()
listen_channel.queue_declare(queue=queue_name, durable=True)
listen_channel.basic_consume(queue=queue_name, on_message_callback=bcast_callback)
listen_channel.start_consuming()
def broadcast(queue_name, broadcast_exchange="soar_broadcast"):
listen(queue_name, broadcast_exchange)
#!/usr/bin/env python
# Queues
# (per client)
# [client_id]-inbox - receive subscriptions
# [client_id]-outbox - send messages to the bus
# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime
# soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions
import concurrent.futures
from endpoints.clients import ClientsFile
from soar_broadcast import broadcast
from soar_forward import forward
from soar_publish import publish
from soar_subscribe import subscribe
THREADS = []
EXCHANGES = {
"publish": "soar_publish",
"broadcast": "soar_broadcast",
}
def main():
clients_file = ClientsFile()
clients = clients_file.get()
with concurrent.futures.ProcessPoolExecutor() as executor:
# publish
thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
THREADS.append(thread)
for (id, client) in clients.items():
# forward
thread = executor.submit(forward, f"{id}-outbox", "soar-publish")
THREADS.append(thread)
# broadcast
thread = executor.submit(
broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast")
)
THREADS.append(thread)
# subscribe
thread = executor.submit(
subscribe,
f"{id}-inbox",
client["subscription"],
EXCHANGES.get("publish"),
EXCHANGES.get("broadcast"),
)
THREADS.append(thread)
# push
# TODO - add optional webhook target to client and post to webhook target
# if present
if __name__ == "__main__":
main()
#!/usr/bin/env python
import pika
def get_connection():
return pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
def deliver(body, queue_name):
print("forward to: %s" % queue_name)
deliver_connection = get_connection()
deliver_channel = deliver_connection.channel()
deliver_channel.queue_declare(queue=queue_name, durable=True)
deliver_channel.basic_publish(exchange="", routing_key=queue_name, body=body)
deliver_connection.close()
def listen(from_queue_name, to_queue_name):
def fwd_callback(ch, method, properties, body):
delivered = deliver(body, to_queue_name)
ch.basic_ack(delivery_tag=method.delivery_tag)
listen_connection = get_connection()
listen_channel = listen_connection.channel()
listen_channel.queue_declare(queue=from_queue_name, durable=True)
listen_channel.basic_consume(
queue=from_queue_name, on_message_callback=fwd_callback
)
listen_channel.start_consuming()
def forward(from_queue_name, to_queue_name):
listen(from_queue_name, to_queue_name)
#!/usr/bin/env python
import pika
import json
import sys
def get_connection():
return pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
def deliver(body, topic, publish_exchange):
print("publish on topic: %s" % topic)
deliver_connection = get_connection()
deliver_channel = deliver_connection.channel()
deliver_channel.exchange_declare(exchange=publish_exchange, exchange_type="topic")
deliver_channel.basic_publish(
exchange=publish_exchange, routing_key=topic, body=body
)
deliver_connection.close()
def listen(queue_name, publish_exchange):
def pub_callback(ch, method, properties, body):
message = json.loads(body.decode())
topic = message["topic"]
deliver(body, topic, publish_exchange)
ch.basic_ack(delivery_tag=method.delivery_tag)
listen_connection = get_connection()
listen_channel = listen_connection.channel()
listen_channel.queue_declare(queue=queue_name, durable=True)
listen_channel.basic_consume(queue=queue_name, on_message_callback=pub_callback)
listen_channel.start_consuming()
def publish(queue_name, publish_exchange="soar_publish"):
listen(queue_name, publish_exchange)
if __name__ == "__main__":
queue_name = sys.argv[1]
publish(queue_name)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment