diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..9638c160add921638ae8597f53aba214c4ab83e3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +clients.json +examples/ \ No newline at end of file diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000000000000000000000000000000000000..c3380be7ec881ca1b942f39363ad73d4f93a3a78 --- /dev/null +++ b/Pipfile @@ -0,0 +1,17 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +pubsubpy = "*" +pika = "*" +pyrabbit = "*" +flask = "*" +flask-restful = "*" +marshmallow = "*" + +[dev-packages] + +[requires] +python_version = "3.8" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000000000000000000000000000000000000..3cd34d20a17d9f6dc7710a03cfa563ac973834bb --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,223 @@ +{ + "_meta": { + "hash": { + "sha256": "b71ec6300f7c04a8d222991089770e4384941bf5553cfa12031330227ad043cb" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.8" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "amqp": { + "hashes": [ + "sha256:2c1b13fecc0893e946c65cbd5f36427861cffa4ea2201d8f6fca22e2a373b5e2", + "sha256:6f0956d2c23d8fa6e7691934d8c3930eadb44972cbbd1a7ae3a520f735d43359" + ], + "markers": "python_version >= '3.6'", + "version": "==5.1.1" + }, + "aniso8601": { + "hashes": [ + "sha256:1d2b7ef82963909e93c4f24ce48d4de9e66009a21bf1c1e1c85bdd0812fe412f", + "sha256:72e3117667eedf66951bb2d93f4296a56b94b078a8a95905a052611fb3f1b973" + ], + "version": "==9.0.1" + }, + "click": { + "hashes": [ + "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e", + "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48" + ], + "markers": "python_version >= '3.7'", + "version": "==8.1.3" + }, + "flask": { + "hashes": [ + "sha256:642c450d19c4ad482f96729bd2a8f6d32554aa1e231f4f6b4e7e5264b16cca2b", + "sha256:b9c46cc36662a7949f34b52d8ec7bb59c0d74ba08ba6cb9ce9adc1d8676d9526" + ], + "index": "pypi", + "version": "==2.2.2" + }, + "flask-restful": { + "hashes": [ + "sha256:4970c49b6488e46c520b325f54833374dc2b98e211f1b272bd4b0c516232afe2", + "sha256:ccec650b835d48192138c85329ae03735e6ced58e9b2d9c2146d6c84c06fa53e" + ], + "index": "pypi", + "version": "==0.3.9" + }, + "future": { + "hashes": [ + "sha256:b1bead90b70cf6ec3f0710ae53a525360fa360d306a86583adc6bf83a4db537d" + ], + "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==0.18.2" + }, + "httplib2": { + "hashes": [ + "sha256:987c8bb3eb82d3fa60c68699510a692aa2ad9c4bd4f123e51dfb1488c14cdd01", + "sha256:fc144f091c7286b82bec71bdbd9b27323ba709cc612568d3000893bfd9cb4b34" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==0.21.0" + }, + "itsdangerous": { + "hashes": [ + "sha256:2c2349112351b88699d8d4b6b075022c0808887cb7ad10069318a8b0bc88db44", + "sha256:5dbbc68b317e5e42f327f9021763545dc3fc3bfe22e6deb96aaf1fc38874156a" + ], + "markers": "python_version >= '3.7'", + "version": "==2.1.2" + }, + "jinja2": { + "hashes": [ + "sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852", + "sha256:6088930bfe239f0e6710546ab9c19c9ef35e29792895fed6e6e31a023a182a61" + ], + "markers": "python_version >= '3.7'", + "version": "==3.1.2" + }, + "kombu": { + "hashes": [ + "sha256:37cee3ee725f94ea8bb173eaab7c1760203ea53bbebae226328600f9d2799610", + "sha256:8b213b24293d3417bcf0d2f5537b7f756079e3ea232a8386dcc89a59fd2361a4" + ], + "markers": "python_version >= '3.7'", + "version": "==5.2.4" + }, + "markupsafe": { + "hashes": [ + "sha256:0212a68688482dc52b2d45013df70d169f542b7394fc744c02a57374a4207003", + "sha256:089cf3dbf0cd6c100f02945abeb18484bd1ee57a079aefd52cffd17fba910b88", + "sha256:10c1bfff05d95783da83491be968e8fe789263689c02724e0c691933c52994f5", + "sha256:33b74d289bd2f5e527beadcaa3f401e0df0a89927c1559c8566c066fa4248ab7", + "sha256:3799351e2336dc91ea70b034983ee71cf2f9533cdff7c14c90ea126bfd95d65a", + "sha256:3ce11ee3f23f79dbd06fb3d63e2f6af7b12db1d46932fe7bd8afa259a5996603", + "sha256:421be9fbf0ffe9ffd7a378aafebbf6f4602d564d34be190fc19a193232fd12b1", + "sha256:43093fb83d8343aac0b1baa75516da6092f58f41200907ef92448ecab8825135", + "sha256:46d00d6cfecdde84d40e572d63735ef81423ad31184100411e6e3388d405e247", + "sha256:4a33dea2b688b3190ee12bd7cfa29d39c9ed176bda40bfa11099a3ce5d3a7ac6", + "sha256:4b9fe39a2ccc108a4accc2676e77da025ce383c108593d65cc909add5c3bd601", + "sha256:56442863ed2b06d19c37f94d999035e15ee982988920e12a5b4ba29b62ad1f77", + "sha256:671cd1187ed5e62818414afe79ed29da836dde67166a9fac6d435873c44fdd02", + "sha256:694deca8d702d5db21ec83983ce0bb4b26a578e71fbdbd4fdcd387daa90e4d5e", + "sha256:6a074d34ee7a5ce3effbc526b7083ec9731bb3cbf921bbe1d3005d4d2bdb3a63", + "sha256:6d0072fea50feec76a4c418096652f2c3238eaa014b2f94aeb1d56a66b41403f", + "sha256:6fbf47b5d3728c6aea2abb0589b5d30459e369baa772e0f37a0320185e87c980", + "sha256:7f91197cc9e48f989d12e4e6fbc46495c446636dfc81b9ccf50bb0ec74b91d4b", + "sha256:86b1f75c4e7c2ac2ccdaec2b9022845dbb81880ca318bb7a0a01fbf7813e3812", + "sha256:8dc1c72a69aa7e082593c4a203dcf94ddb74bb5c8a731e4e1eb68d031e8498ff", + "sha256:8e3dcf21f367459434c18e71b2a9532d96547aef8a871872a5bd69a715c15f96", + "sha256:8e576a51ad59e4bfaac456023a78f6b5e6e7651dcd383bcc3e18d06f9b55d6d1", + "sha256:96e37a3dc86e80bf81758c152fe66dbf60ed5eca3d26305edf01892257049925", + "sha256:97a68e6ada378df82bc9f16b800ab77cbf4b2fada0081794318520138c088e4a", + "sha256:99a2a507ed3ac881b975a2976d59f38c19386d128e7a9a18b7df6fff1fd4c1d6", + "sha256:a49907dd8420c5685cfa064a1335b6754b74541bbb3706c259c02ed65b644b3e", + "sha256:b09bf97215625a311f669476f44b8b318b075847b49316d3e28c08e41a7a573f", + "sha256:b7bd98b796e2b6553da7225aeb61f447f80a1ca64f41d83612e6139ca5213aa4", + "sha256:b87db4360013327109564f0e591bd2a3b318547bcef31b468a92ee504d07ae4f", + "sha256:bcb3ed405ed3222f9904899563d6fc492ff75cce56cba05e32eff40e6acbeaa3", + "sha256:d4306c36ca495956b6d568d276ac11fdd9c30a36f1b6eb928070dc5360b22e1c", + "sha256:d5ee4f386140395a2c818d149221149c54849dfcfcb9f1debfe07a8b8bd63f9a", + "sha256:dda30ba7e87fbbb7eab1ec9f58678558fd9a6b8b853530e176eabd064da81417", + "sha256:e04e26803c9c3851c931eac40c695602c6295b8d432cbe78609649ad9bd2da8a", + "sha256:e1c0b87e09fa55a220f058d1d49d3fb8df88fbfab58558f1198e08c1e1de842a", + "sha256:e72591e9ecd94d7feb70c1cbd7be7b3ebea3f548870aa91e2732960fa4d57a37", + "sha256:e8c843bbcda3a2f1e3c2ab25913c80a3c5376cd00c6e8c4a86a89a28c8dc5452", + "sha256:efc1913fd2ca4f334418481c7e595c00aad186563bbc1ec76067848c7ca0a933", + "sha256:f121a1420d4e173a5d96e47e9a0c0dcff965afdf1626d28de1460815f7c4ee7a", + "sha256:fc7b548b17d238737688817ab67deebb30e8073c95749d55538ed473130ec0c7" + ], + "markers": "python_version >= '3.7'", + "version": "==2.1.1" + }, + "marshmallow": { + "hashes": [ + "sha256:35e02a3a06899c9119b785c12a22f4cda361745d66a71ab691fd7610202ae104", + "sha256:6804c16114f7fce1f5b4dadc31f4674af23317fcc7f075da21e35c1a35d781f7" + ], + "index": "pypi", + "version": "==3.18.0" + }, + "packaging": { + "hashes": [ + "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb", + "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522" + ], + "markers": "python_version >= '3.6'", + "version": "==21.3" + }, + "pika": { + "hashes": [ + "sha256:89f5e606646caebe3c00cbdbc4c2c609834adde45d7507311807b5775edac8e0", + "sha256:beb19ff6dd1547f99a29acc2c6987ebb2ba7c44bf44a3f8e305877c5ef7d2fdc" + ], + "index": "pypi", + "version": "==1.3.1" + }, + "pubsubpy": { + "hashes": [ + "sha256:58e394d14dd172fc03caff172adf3817d980bb6b8cb46cd18a362f8aa6e530c6", + "sha256:b29fa140615935ac03801ccd1de137ce4d33b741465b9002f290538ce966f2e9" + ], + "index": "pypi", + "version": "==2.3.0" + }, + "pyparsing": { + "hashes": [ + "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb", + "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc" + ], + "markers": "python_full_version >= '3.6.8'", + "version": "==3.0.9" + }, + "pyrabbit": { + "hashes": [ + "sha256:50b8995fbfde14820ddc97292312c8f0c77054748c2b018138d03d94e400c39c" + ], + "index": "pypi", + "version": "==1.1.0" + }, + "pytz": { + "hashes": [ + "sha256:222439474e9c98fced559f1709d89e6c9cbf8d79c794ff3eb9f8800064291427", + "sha256:e89512406b793ca39f5971bc999cc538ce125c0e51c27941bef4568b460095e2" + ], + "version": "==2022.6" + }, + "six": { + "hashes": [ + "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", + "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==1.16.0" + }, + "vine": { + "hashes": [ + "sha256:4c9dceab6f76ed92105027c49c823800dd33cacce13bdedc5b914e3514b7fb30", + "sha256:7d3b1624a953da82ef63462013bbd271d3eb75751489f9807598e8f340bd637e" + ], + "markers": "python_version >= '3.6'", + "version": "==5.0.0" + }, + "werkzeug": { + "hashes": [ + "sha256:7ea2d48322cc7c0f8b3a215ed73eabd7b5d75d0b50e31ab006286ccff9e00b8f", + "sha256:f979ab81f58d7318e064e99c4506445d60135ac5cd2e177a2de0089bfd4c9bd5" + ], + "markers": "python_version >= '3.7'", + "version": "==2.2.2" + } + }, + "develop": {} +} diff --git a/README.md b/README.md index f4374047eb3d69ef26107ec97983ba0790f1bf06..e64b191bcb1dfdcca6bdd4465d55d8a1a4263045 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,91 @@ # Communications Backbone -Communications Backbone by C2 Team (NOC) \ No newline at end of file +Communications Backbone by C2 Team (NOC) + +## DJ prototype + +I did this in freedom-fire-day so it doesn't currently follow the flask template. + +It's also a bit weird because it writes to a local ignored `clients.json` file +instead of using a database. I did this as a placeholder because we've not yet +decided what the infrastructure looks like. + +### Data flow + +- Client A sends to `client-a-outbox` (or POSTs to API /send - not yet implemented) +- Messages are forwarded from `client-a-outbox` to `soar-publish` +- Messages are published from `soar-publish` with the given topic read from the message +- Subscribers listen to for messages +- Subscription is delivered to `client-b-inbox` +- Client B reads from `client-b-inbox (or GETs from API /receive) + +There is a parallel flow when a client sends to `client-a-notify` in which case the +messages are delivered through the broadcast exchange to all clients `client-x-inbox`. + +### Setup + +``` +pipenv install +pipenv shell +``` + +### Running + +#### RabbitMQ + +`docker run --rm -p 5672:5672 -d --hostname rmq --name rmq rabbitmq:management` + +#### API + +In a `pipenv shell` + +``` +python api.py +``` + +#### Create some clients + +`POST` to `http://localhost:3000/clients` + +#### Event bus + +In a `pipenv shell` + +``` +python soar_bus.py +``` + +#### Send / Receive directly + +In a `pipenv shell` + +``` +# Send a message +python client_send.py noc-c2-outbox 'soar.noc.slocum.something' from noc-c2 +``` + +``` +# Receive messages +python client_read.py noc-sfmc-inbox +``` + +#### Receive via API + +As a placeholder for proper authentication you post the `client_id` and +`secret` and it checks that a client with that id exists and that the +secret matches before allowing the request. + +This should be replaced with a proper auth layer. + +`GET http://localhost:5000/receive?client_id=[client_id]&secret=[secret]` + +### Components + +- `soar_bus.py` - Run all the components threaded based on existing clients +- `soar_forward.py` - Listen for messages on queue A and forward messages to queue B +- `soar_publish.py` - Listen for messages on queue A and publish on exchange B +- `soar_broadcast.py` - Listen for messages on queue A and broadcast on exchange B +- `soar_subscribe.py` - Create subscriptions to both the publish and broadcast exchange - deliver to queue A + (I think this should probably be 2 separate functions to keep things nice and simple) +- `soar_push.py` - Not yet implemented - Listen for messages on queue A and POST to the client's webhook URL + diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/api.py b/api.py new file mode 100644 index 0000000000000000000000000000000000000000..41e02f074cdc3fb9e7ae850a3c306c00770dc308 --- /dev/null +++ b/api.py @@ -0,0 +1,16 @@ +from flask import Flask +from flask_restful import Api +from endpoints.hello import HelloWorld +from endpoints.clients import Client, ClientList +from endpoints.receive import Receive + +app = Flask(__name__) +api = Api(app) + +api.add_resource(HelloWorld, "/") +api.add_resource(ClientList, "/client") +api.add_resource(Client, "/client/<client_id>") +api.add_resource(Receive, "/receive") + +if __name__ == "__main__": + app.run(debug=True) diff --git a/client_read.py b/client_read.py new file mode 100644 index 0000000000000000000000000000000000000000..fe860600bd0efc63cbf8eba2b87547e1333784a6 --- /dev/null +++ b/client_read.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +import pika, sys, os, json + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) + channel = connection.channel() + + queue_name = sys.argv[1] + + channel.queue_declare(queue=queue_name, durable=True) + + def callback(ch, method, properties, body): + message = json.loads(body.decode()) + print(" [x] Received %r" % message) + + channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) + + print(" [*] Waiting for messages. To exit press CTRL+C") + channel.start_consuming() + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Interrupted") + try: + sys.exit(0) + except SystemExit: + os._exit(0) diff --git a/client_send.py b/client_send.py new file mode 100644 index 0000000000000000000000000000000000000000..42197be815668bbf2a21fe3d679e6accf8bfb521 --- /dev/null +++ b/client_send.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +import pika +import sys +import json + + +connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) +channel = connection.channel() + +queue_name = sys.argv[1] +topic = sys.argv[2] +message = " ".join(sys.argv[3:]) or "Hello World!" +body = json.dumps({"topic": topic, "message": message}) +channel.queue_declare(queue=queue_name, durable=True) +channel.basic_publish(exchange="", routing_key=queue_name, body=body) +connection.close() diff --git a/endpoints/__init__.py b/endpoints/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/endpoints/clients.py b/endpoints/clients.py new file mode 100644 index 0000000000000000000000000000000000000000..f1a441938fc72a964a22195fa33fa222f054c0e2 --- /dev/null +++ b/endpoints/clients.py @@ -0,0 +1,141 @@ +from flask_restful import Resource, reqparse, abort, fields, marshal_with +import json +import os +import random +import string + + +class ClientsFile: + file = "clients.json" + mtime = 0 + clients = {} + parser = None + + def __init__(self): + self.get() + self.setup_request_parser() + + def get(self): + try: + mtime = os.path.getmtime(self.file) + if mtime > self.mtime: + with open(self.file, "r") as client_file: + self.clients = json.load(client_file) + except FileNotFoundError as error: + self.clients = {} + self.save() + + return self.clients + + def find(self, client_id): + self.get() + if client_id in self.clients: + client = self.clients[client_id] + else: + client = None + return client + + def add(self, client): + client.secret = self.secret() + self.clients[client["client_id"]] = client + self.save() + return client + + def remove(self, client): + del self.clients[client["client_id"]] + self.save() + + def update(self, client_updates): + client = self.find(client_updates["client_id"]) + client.update(client_updates) + self.clients[client["client_id"]] = client + self.save() + return client + + def save(self): + try: + with open(self.file, "w") as client_file: + client_file.write(json.dumps(self.clients, indent=2)) + return True + except OSError as error: + print(str(error)) + return False + + def secret(self, chars=36): + res = "".join( + random.choices( + string.ascii_lowercase + string.ascii_uppercase + string.digits, k=chars + ) + ) + return str(res) + + def setup_request_parser(self): + parser = reqparse.RequestParser() + parser.add_argument( + "client_id", type=str, help="A unique name to identify the client" + ) + parser.add_argument( + "client_name", type=str, help="A human friendly name to identify the client" + ) + parser.add_argument( + "subscription", + type=str, + help="A dot delimited string identify topics to subscribe to", + ) + self.parser = parser + + def parse(self): + return self.parser.parse_args() + + +resource_fields = { + "client_id": fields.String, + "client_name": fields.String, + "subscription": fields.String, +} + +clients_file = ClientsFile() + +# Client +class Client(Resource): + @marshal_with(resource_fields) + def get(self, client_id): + client = clients_file.find(client_id) + if not client: + abort(404, message="No client with id: {}".format(client_id)) + return client + + def delete(self, todo_id): + client = clients_file.find(client_id) + if not client: + abort(404, message="No client with id: {}".format(client_id)) + else: + clients_file.remove(client) + return client, 204 + + def put(self, client_id): + args = clients_file.parse() + client = clients_file.find(client_id) + if not client: + abort(404, message="No client with id: {}".format(client_id)) + else: + client = clients_file.update(args) + return client, 201 + + +# ClientList +class ClientList(Resource): + def get(self): + return { + client_id: (client, client.pop("secret", None))[0] + for client_id, client in clients_file.get().items() + } + + def post(self): + args = clients_file.parse() + client = clients_file.find(args["client_id"]) + if client: + abort(403, message="Duplicate client id: {}".format(client_id)) + else: + client = clients_file.add(args) + return client, 201 diff --git a/endpoints/hello.py b/endpoints/hello.py new file mode 100644 index 0000000000000000000000000000000000000000..7bb0bab0de35c7cebf2d22fac1f1213b16ef90df --- /dev/null +++ b/endpoints/hello.py @@ -0,0 +1,6 @@ +from flask_restful import Resource + + +class HelloWorld(Resource): + def get(self): + return {"hello": "world"} diff --git a/endpoints/notify.py b/endpoints/notify.py new file mode 100644 index 0000000000000000000000000000000000000000..c5f420630ef357d4afa3c276bbc50bd06b8d3e3b --- /dev/null +++ b/endpoints/notify.py @@ -0,0 +1,28 @@ +from flask_restful import Resource, reqparse, abort, fields, marshal_with +import json + + +class Notify(Resource): + def setup_request_parser(self): + parser = reqparse.RequestParser() + parser.add_argument( + "client_id", type=str, help="A unique name to identify the client" + ) + parser.add_argument( + "secret", type=str, help="A human friendly name to identify the client" + ) + parser.add_argument("content", type=str, help="The message") + self.parser = parser + + def parse(self): + return self.parser.parse_args() + + def put(self, client_id): + args = clients_file.parse() + + client = clients_file.find(client_id) + if not client: + abort(404, message="No client with id: {}".format(client_id)) + else: + client = clients_file.update(args) + return client, 201 diff --git a/endpoints/receive.py b/endpoints/receive.py new file mode 100644 index 0000000000000000000000000000000000000000..75e027606c3817b22d8526722feeac08cb734daf --- /dev/null +++ b/endpoints/receive.py @@ -0,0 +1,57 @@ +from flask_restful import Resource, request, abort, fields +from marshmallow import Schema, fields +import pika +import json + + +class ReceiveSchema(Schema): + client_id = fields.Str(required=True) + secret = fields.Str(required=True) + max_messages = fields.Int(required=False) + + +class Receive(Resource): + clients = None + schema = None + + def __init__(self): + self.schema = ReceiveSchema() + with open("clients.json", "r") as clients_file: + self.clients = json.load(clients_file) + + def get(self): + errors = self.schema.validate(request.args) + if errors: + abort(400, str(errors)) + + messages = [] + allow = False + max_messages = request.args.get("max_messages", 10) + client_id = request.args.get("client_id") + inbox_queue = client_id + "-inbox" + if client_id in self.clients: + client = self.clients.get(client_id) + if request.args.get("secret") == client.get("secret"): + allow = True + + if allow: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host="localhost") + ) + channel = connection.channel() + channel.queue_declare(queue=inbox_queue, durable=True) + while len(messages) < max_messages: + method_frame, header_frame, body = channel.basic_get(inbox_queue) + if method_frame: + print(method_frame, header_frame, body) + channel.basic_ack(method_frame.delivery_tag) + messages.append(json.loads(body.decode())) + else: + print("No message returned") + break + channel.close() + connection.close() + + else: + abort(403, message="Invalid client credentials") + return messages diff --git a/endpoints/send.py b/endpoints/send.py new file mode 100644 index 0000000000000000000000000000000000000000..70353c9f4fc8a193afb504907e83a1f4a45e2be5 --- /dev/null +++ b/endpoints/send.py @@ -0,0 +1,29 @@ +from flask_restful import Resource, reqparse, abort, fields, marshal_with +import json + + +class Send(Resource): + def setup_request_parser(self): + parser = reqparse.RequestParser() + parser.add_argument( + "client_id", type=str, help="A unique name to identify the client" + ) + parser.add_argument( + "secret", type=str, help="A human friendly name to identify the client" + ) + parser.add_argument("topic", type=str, help="Publisher topic") + parser.add_argument("content", type=str, help="The message") + self.parser = parser + + def parse(self): + return self.parser.parse_args() + + def put(self, client_id): + args = clients_file.parse() + + client = clients_file.find(client_id) + if not client: + abort(404, message="No client with id: {}".format(client_id)) + else: + client = clients_file.update(args) + return client, 201 diff --git a/rmq.log b/rmq.log new file mode 100644 index 0000000000000000000000000000000000000000..d0a37c1920be4156c790a727d91bf5f2f27f7051 --- /dev/null +++ b/rmq.log @@ -0,0 +1,4 @@ + [*] Waiting for logs. To exit press CTRL+C + [x] 'error':b'Run. Run. Or it will explode.' + [*] Waiting for logs. To exit press CTRL+C + [x] 'warning':b'Run. Run. Or it will explode.' diff --git a/soar_broadcast.py b/soar_broadcast.py new file mode 100644 index 0000000000000000000000000000000000000000..de5062db99981096a513d372b2acd6dcd0fa28d4 --- /dev/null +++ b/soar_broadcast.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +import pika + + +def get_connection(): + return pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) + + +def deliver(body, broadcast_exchange): + print("broadcast") + deliver_connection = get_connection() + deliver_channel = deliver_connection.channel() + deliver_channel.exchange_declare( + exchange=broadcast_exchange, exchange_type="fanout" + ) + deliver_channel.basic_publish( + exchange=broadcast_exchange, routing_key="", body=body + ) + deliver_connection.close() + + +def listen(queue_name, broadcast_exchange): + def bcast_callback(ch, method, properties, body): + delivered = deliver(body, broadcast_exchange) + ch.basic_ack(delivery_tag=method.delivery_tag) + + listen_connection = get_connection() + listen_channel = listen_connection.channel() + listen_channel.queue_declare(queue=queue_name, durable=True) + listen_channel.basic_consume(queue=queue_name, on_message_callback=bcast_callback) + listen_channel.start_consuming() + + +def broadcast(queue_name, broadcast_exchange="soar_broadcast"): + listen(queue_name, broadcast_exchange) diff --git a/soar_bus.py b/soar_bus.py new file mode 100644 index 0000000000000000000000000000000000000000..3576ea3a75a390ec4cd2de7ce111d7435d5eabf4 --- /dev/null +++ b/soar_bus.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python + +# Queues +# (per client) +# [client_id]-inbox - receive subscriptions +# [client_id]-outbox - send messages to the bus +# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime +# soar-publisher - fan-in from client-outboxes +# soar-dlq - undeliverables +# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions + +import concurrent.futures +from endpoints.clients import ClientsFile +from soar_broadcast import broadcast +from soar_forward import forward +from soar_publish import publish +from soar_subscribe import subscribe + +THREADS = [] +EXCHANGES = { + "publish": "soar_publish", + "broadcast": "soar_broadcast", +} + + +def main(): + clients_file = ClientsFile() + clients = clients_file.get() + + with concurrent.futures.ProcessPoolExecutor() as executor: + # publish + thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish")) + THREADS.append(thread) + + for (id, client) in clients.items(): + # forward + thread = executor.submit(forward, f"{id}-outbox", "soar-publish") + THREADS.append(thread) + # broadcast + thread = executor.submit( + broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast") + ) + THREADS.append(thread) + # subscribe + thread = executor.submit( + subscribe, + f"{id}-inbox", + client["subscription"], + EXCHANGES.get("publish"), + EXCHANGES.get("broadcast"), + ) + THREADS.append(thread) + # push + # TODO - add optional webhook target to client and post to webhook target + # if present + + +if __name__ == "__main__": + main() diff --git a/soar_forward.py b/soar_forward.py new file mode 100644 index 0000000000000000000000000000000000000000..855d02f30ff49adeeaaa7e031cfb7aeca7812af4 --- /dev/null +++ b/soar_forward.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import pika + + +def get_connection(): + return pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) + + +def deliver(body, queue_name): + print("forward to: %s" % queue_name) + deliver_connection = get_connection() + deliver_channel = deliver_connection.channel() + deliver_channel.queue_declare(queue=queue_name, durable=True) + deliver_channel.basic_publish(exchange="", routing_key=queue_name, body=body) + deliver_connection.close() + + +def listen(from_queue_name, to_queue_name): + def fwd_callback(ch, method, properties, body): + delivered = deliver(body, to_queue_name) + ch.basic_ack(delivery_tag=method.delivery_tag) + + listen_connection = get_connection() + listen_channel = listen_connection.channel() + listen_channel.queue_declare(queue=from_queue_name, durable=True) + listen_channel.basic_consume( + queue=from_queue_name, on_message_callback=fwd_callback + ) + listen_channel.start_consuming() + + +def forward(from_queue_name, to_queue_name): + listen(from_queue_name, to_queue_name) diff --git a/soar_publish.py b/soar_publish.py new file mode 100644 index 0000000000000000000000000000000000000000..52121faae63eead99d410d4534e3888f11842eb0 --- /dev/null +++ b/soar_publish.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import pika +import json +import sys + + +def get_connection(): + return pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) + + +def deliver(body, topic, publish_exchange): + print("publish on topic: %s" % topic) + deliver_connection = get_connection() + deliver_channel = deliver_connection.channel() + deliver_channel.exchange_declare(exchange=publish_exchange, exchange_type="topic") + deliver_channel.basic_publish( + exchange=publish_exchange, routing_key=topic, body=body + ) + deliver_connection.close() + + +def listen(queue_name, publish_exchange): + def pub_callback(ch, method, properties, body): + message = json.loads(body.decode()) + topic = message["topic"] + deliver(body, topic, publish_exchange) + ch.basic_ack(delivery_tag=method.delivery_tag) + + listen_connection = get_connection() + listen_channel = listen_connection.channel() + listen_channel.queue_declare(queue=queue_name, durable=True) + listen_channel.basic_consume(queue=queue_name, on_message_callback=pub_callback) + listen_channel.start_consuming() + + +def publish(queue_name, publish_exchange="soar_publish"): + listen(queue_name, publish_exchange) + + +if __name__ == "__main__": + queue_name = sys.argv[1] + publish(queue_name) diff --git a/soar_push.py b/soar_push.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/soar_subscribe.py b/soar_subscribe.py new file mode 100644 index 0000000000000000000000000000000000000000..9ce847738e26c9ccc4530c5b0476b98d0f0b9cff --- /dev/null +++ b/soar_subscribe.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +import pika + + +def get_connection(): + return pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) + + +def subscribe( + queue_name, + topic, + publish_exchange="soar_publish", + broadcast_exchange="soar_broadcast", +): + adm_connection = get_connection() + admin_channel = adm_connection.channel() + admin_channel.exchange_declare(exchange=broadcast_exchange, exchange_type="fanout") + admin_channel.queue_bind(exchange=broadcast_exchange, queue=queue_name) + sub_connection = get_connection() + subscriber_channel = sub_connection.channel() + subscriber_channel.exchange_declare( + exchange=publish_exchange, exchange_type="topic" + ) + subscriber_channel.queue_bind( + exchange=publish_exchange, queue=queue_name, routing_key=topic + )