Commit b05b8172 authored by Dan Jones's avatar Dan Jones
Browse files

Merge branch '1-prototype' into 'dev'

prototype soar backbone implementation

Closes #1

See merge request !3
parents c3325b03 73f478a0
clients.json
examples/
rmq.log
\ No newline at end of file
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
pubsubpy = "*"
pika = "*"
pyrabbit = "*"
flask = "*"
flask-restful = "*"
marshmallow = "*"
bson = "*"
flask-cors = "*"
cryptography = "*"
[dev-packages]
[requires]
python_version = "3.8"
This diff is collapsed.
# Communications Backbone
Communications Backbone by C2 Team (NOC)
\ No newline at end of file
Communications Backbone by C2 Team (NOC)
## DJ prototype
I did this in freedom-fire-day so it doesn't currently follow the flask template.
It's also a bit weird because it writes to a local ignored `clients.json` file
instead of using a database. I did this as a placeholder because we've not yet
decided what the infrastructure looks like.
### Data flow
- Client A sends to `client-a-outbox` (or POSTs to API /send - not yet implemented)
- Messages are forwarded from `client-a-outbox` to `soar-publish`
- Messages are published from `soar-publish` with the given topic read from the message
- Subscribers listen to for messages
- Subscription is delivered to `client-b-inbox`
- Client B reads from `client-b-inbox (or GETs from API /receive)
There is a parallel flow when a client sends to `client-a-notify` in which case the
messages are delivered through the broadcast exchange to all clients `client-x-inbox`.
### Auth placeholder
As a proxy for proper authentication, when you post a client a random secret is
returned in the response. To send to / receive from the bus you then call the API
with the client_id and secret and it checks they match. The client_id determines
which queues it reads from.
Subsequent requests to the client endpoint return the client_id but not the secret.
### Setup
```
pipenv install
```
### Running
#### RabbitMQ
`docker run --rm -p 5672:5672 -d --hostname rmq --name rmq rabbitmq:management`
#### API
```
pipenv run python api.py
```
#### Create some clients
`POST` to `http://localhost:3000/clients`
#### Event bus
```
pipenv run python soar_bus.py
```
#### Send / Receive directly
```
# Send a message
pipenv run python client_send.py noc-c2-outbox 'soar.noc.slocum.something' from noc-c2
```
```
# Receive messages
pipenv run python client_read.py noc-sfmc-inbox
```
#### Receive via API
As a placeholder for proper authentication you post the `client_id` and
`secret` and it checks that a client with that id exists and that the
secret matches before allowing the request.
This should be replaced with a proper auth layer.
`GET http://localhost:5000/receive?client_id=[client_id]&secret=[secret]`
### Components
- `soar_bus.py` - Run all the components threaded based on existing clients
- `soar_forward.py` - Listen for messages on queue A and forward messages to queue B
- `soar_publish.py` - Listen for messages on queue A and publish on exchange B
- `soar_broadcast.py` - Listen for messages on queue A and broadcast on exchange B
- `soar_subscribe.py` - Create subscriptions to both the publish and broadcast exchange - deliver to queue A
(I think this should probably be 2 separate functions to keep things nice and simple)
- `soar_push.py` - Not yet implemented - Listen for messages on queue A and POST to the client's webhook URL
from flask import Flask
from flask_restful import Api
from endpoints.clients import Client, ClientList
from endpoints.receive import Receive
from endpoints.send import Send
from endpoints.notify import Notify
from endpoints.token import Token
from flask_cors import CORS
from models.token import TokenModel
token = TokenModel()
token.setSecret()
app = Flask(__name__)
api = Api(app)
CORS(app, resources={r"*": {"origins": "http://localhost:8086"}})
api.add_resource(ClientList, "/client")
api.add_resource(Client, "/client/<client_id>")
api.add_resource(Receive, "/receive")
api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token")
if __name__ == "__main__":
app.run(debug=True, port=8087)
#!/usr/bin/env python
import pika, sys, os, json
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
queue_name = sys.argv[1]
channel.queue_declare(queue=queue_name, durable=True)
def callback(ch, method, properties, body):
message = json.loads(body.decode())
print(" [x] Received %r" % message)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
#!/usr/bin/env python
import pika
import sys
import json
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
queue_name = sys.argv[1]
topic = sys.argv[2]
message = " ".join(sys.argv[3:]) or "Hello World!"
body = json.dumps({"topic": topic, "message": message})
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange="", routing_key=queue_name, body=body)
connection.close()
import json
from flask_restful import Resource, abort
from models.token import TokenModel
class AuthResource(Resource):
def __init__(self):
self.token = TokenModel()
with open("clients.json", "r") as clients_file:
self.clients = json.load(clients_file)
def auth(self, request):
allow = False
auth = request.headers.get('Authorization', False)
if auth:
token = auth.split(' ').pop()
parsed = self.token.validate(token)
if parsed['valid']:
client = self.clients.get(parsed['client_id'])
if client:
self.client = client
allow = True
if not allow:
abort(403, message="Invalid token")
return allow
\ No newline at end of file
from flask_restful import Resource, request, abort
from marshmallow import Schema, fields
import json
import os
import random
import string
class ClientSchema(Schema):
client_id = fields.Str(required=True)
client_name = fields.Str(required=True)
subscription = fields.Str(required=True)
class ClientsFile:
file = "clients.json"
mtime = 0
clients = {}
parser = None
def __init__(self):
self.get()
def get(self):
try:
mtime = os.path.getmtime(self.file)
if mtime > self.mtime:
with open(self.file, "r") as client_file:
self.clients = json.load(client_file)
except FileNotFoundError as error:
self.clients = {}
self.save()
return self.clients
def find(self, client_id):
self.get()
if client_id in self.clients:
client = self.clients[client_id]
else:
client = None
return client
def add(self, client):
client['secret'] = self.secret()
self.clients[client["client_id"]] = client
self.save()
return client
def remove(self, client):
del self.clients[client["client_id"]]
self.save()
def update(self, client_updates):
client = self.find(client_updates["client_id"])
client.update(client_updates)
self.clients[client["client_id"]] = client
self.save()
return client
def save(self):
try:
with open(self.file, "w") as client_file:
client_file.write(json.dumps(self.clients, indent=2))
return True
except OSError as error:
print(str(error))
return False
def secret(self, chars=36):
res = "".join(
random.choices(
string.ascii_lowercase + string.ascii_uppercase + string.digits, k=chars
)
)
return str(res)
clients_file = ClientsFile()
# Client
class Client(Resource):
clients_file = None
def __init__(self):
self.schema = ClientSchema()
self.clients_file = ClientsFile()
def get(self, client_id):
client = self.clients_file.find(client_id)
del client['secret']
if not client:
abort(404, message="No client with id: {}".format(client_id))
return client
def delete(self, todo_id):
client = self.clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
self.clients_file.remove(client)
return client, 204
def put(self, client_id):
args = request.get_json()
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
client = self.clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
client = self.clients_file.update(args)
return client, 201
# ClientList
class ClientList(Resource):
def __init__(self):
self.schema = ClientSchema()
self.clients_file = ClientsFile()
def get(self):
return {
client_id: (client, client.pop("secret", None))[0]
for client_id, client in self.clients_file.get().items()
}
def post(self):
args = request.get_json()
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
client = clients_file.find(args["client_id"])
if client:
abort(403, message="Duplicate client id: {}".format(client_id))
else:
client = clients_file.add(args)
return client, 201
from flask_restful import Resource
class HelloWorld(Resource):
def get(self):
return {"hello": "world"}
import json
from flask_restful import request, abort
from marshmallow import Schema, fields
import pika
from endpoints.auth_resource import AuthResource
class NotifySchema(Schema):
body = fields.Str(required=True)
class Notify(AuthResource):
clients = None
schema = None
def __init__(self):
super().__init__()
self.schema = NotifySchema()
def post(self):
args = request.get_json()
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
allow = False
body = args.get("body")
message = {
'topic': 'broadcast',
'message': body,
}
allow = self.auth(request)
if allow:
notify_queue = self.client['client_id'] + "-broadcast"
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue=notify_queue, durable=True)
channel.basic_publish(exchange="", routing_key=notify_queue, body=json.dumps(message))
connection.close()
from flask_restful import request, abort
from marshmallow import Schema, fields
import pika
import json
from models.token import TokenModel
from endpoints.auth_resource import AuthResource
class ReceiveQuerySchema(Schema):
max_messages = fields.Int(required=False)
class Receive(AuthResource):
clients = None
schema = None
def __init__(self):
super().__init__()
self.schema = ReceiveQuerySchema()
def get(self):
errors = self.schema.validate(request.args)
if errors:
abort(400, message=str(errors))
messages = []
max_messages = request.args.get("max_messages", 10)
allow = self.auth(request)
if allow:
inbox_queue = self.client['client_id'] + "-inbox"
if allow:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
channel.queue_declare(queue=inbox_queue, durable=True)
while len(messages) < max_messages:
method_frame, header_frame, body = channel.basic_get(inbox_queue)
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
messages.append(json.loads(body.decode()))
else:
print("No message returned")
break
channel.close()
connection.close()
return messages
import json
from flask_restful import request, abort
from marshmallow import Schema, fields
import pika
from endpoints.auth_resource import AuthResource
class SendSchema(Schema):
body = fields.Str(required=True)
topic = fields.Str(required=True)
class Send(AuthResource):
clients = None
schema = None
def __init__(self):
super().__init__()
self.schema = SendSchema()
def post(self):
args = request.get_json()
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
allow = self.auth(request)
if allow:
body = args.get("body")
topic = args.get("topic")
outbox_queue = self.client['client_id'] + "-outbox"
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue=outbox_queue, durable=True)
message = {
'topic': topic,
'message': body,
}
channel.basic_publish(exchange="", routing_key=outbox_queue, body=json.dumps(message))
connection.close()
import json
from flask_restful import Resource, request, abort
from marshmallow import Schema, fields
import pika
from models.token import TokenModel
class TokenQuerySchema(Schema):
client_id = fields.Str(required=True)
secret = fields.Str(required=True)
class Token(Resource):
clients = None
schema = None
model = None
def __init__(self):
self.schema = TokenQuerySchema()
self.model = TokenModel()
with open("clients.json", "r") as clients_file:
self.clients = json.load(clients_file)
def get(self):
errors = self.schema.validate(request.args)
if errors:
abort(400, message=str(errors))
token = None
allow = False
max_messages = request.args.get("max_messages", 10)
client_id = request.args.get("client_id")
if client_id in self.clients:
client = self.clients.get(client_id)
if request.args.get("secret") == client.get("secret"):
allow = True
if allow:
token = self.model.get(client_id)
else:
abort(403, message="Invalid client credentials")
return token
\ No newline at end of file
from cryptography.fernet import Fernet,InvalidToken
import datetime
import os
import json
TOKENS = {}
class TokenModel():
clients = None
schema = None
key = None
fernet = None
token_lifetime_hours = None
env_lifetime = 'SOAR_TOKEN_LIFETIME'
env_secret = 'SOAR_TOKEN_SECRET'
def __init__(self):
self.getFernet()
self.token_lifetime_hours = os.getenv(self.env_lifetime, 24)
def getFernet(self):
self.fernet = Fernet(self.getKey().encode())
def getKey(self):
key = os.getenv(self.env_secret)
print(key)
if not key:
key = Fernet.generate_key().decode()
os.environ[self.env_secret] = key
self.key = key
return self.key
def setSecret(self):
if not os.getenv(self.env_secret):
os.environ[self.env_secret] = self.getKey()
def getExpiry(self):
now = datetime.datetime.utcnow()
expires = now + datetime.timedelta(hours=self.token_lifetime_hours)
return expires.isoformat()
def encrypt(self, client_id):
try:
expiry = self.getExpiry()
token_content = {
'client_id': client_id,
'expiry': expiry
}
token = self.fernet.encrypt(json.dumps(token_content).encode()).decode()
return {
'token': token,
'expiry': expiry
}
except KeyError as e:
return None
def decrypt(self, token):
try:
content = json.loads(self.fernet.decrypt(token.encode()).decode())
return content
except (InvalidToken,KeyError) as e:
return None
def get(self, client_id):
response = self.encrypt(client_id)
TOKENS[response['token']] = client_id
return response
def validate(self, token):
response = {
'valid': False
}
if token in TOKENS:
content = self.decrypt(token)
if content:
now = datetime.datetime.utcnow()
expires = datetime.datetime.fromisoformat(content['expiry'])
response['valid'] = expires > now
if response['valid']:
response.update(content)
else:
del TOKENS[token]
else:
del TOKENS[token]
return response
#!/usr/bin/env python
import pika
def get_connection():
return pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
def deliver(body, broadcast_exchange):
print("broadcast")
deliver_connection = get_connection()
deliver_channel = deliver_connection.channel()
deliver_channel.exchange_declare(
exchange=broadcast_exchange, exchange_type="fanout"
)
deliver_channel.basic_publish(
exchange=broadcast_exchange, routing_key="", body=body
)
deliver_connection.close()
def listen(queue_name, broadcast_exchange):
def bcast_callback(ch, method, properties, body):
delivered = deliver(body, broadcast_exchange)
ch.basic_ack(delivery_tag=method.delivery_tag)
listen_connection = get_connection()
listen_channel = listen_connection.channel()
listen_channel.queue_declare(queue=queue_name, durable=True)
listen_channel.basic_consume(queue=queue_name, on_message_callback=bcast_callback)
listen_channel.start_consuming()
def broadcast(queue_name, broadcast_exchange="soar_broadcast"):
listen(queue_name, broadcast_exchange)
#!/usr/bin/env python
# Queues
# (per client)
# [client_id]-inbox - receive subscriptions
# [client_id]-outbox - send messages to the bus
# [client-id]-broadcast - send message to all subscribers? - eg notify of downtime
# soar-publisher - fan-in from client-outboxes
# soar-dlq - undeliverables
# soar-broadcast - admin messages forwarded to all client-inboxes regardless of subscriptions
import concurrent.futures
from endpoints.clients import ClientsFile
from soar_broadcast import broadcast
from soar_forward import forward
from soar_publish import publish
from soar_subscribe import subscribe
THREADS = []
EXCHANGES = {
"publish": "soar_publish",
"broadcast": "soar_broadcast",
}
def main():
clients_file = ClientsFile()
clients = clients_file.get()
with concurrent.futures.ProcessPoolExecutor() as executor:
# publish
thread = executor.submit(publish, "soar-publish", EXCHANGES.get("publish"))
THREADS.append(thread)
for (id, client) in clients.items():
# forward
thread = executor.submit(forward, f"{id}-outbox", "soar-publish")
THREADS.append(thread)
# broadcast
thread = executor.submit(
broadcast, f"{id}-broadcast", EXCHANGES.get("broadcast")
)
THREADS.append(thread)
# subscribe
thread = executor.submit(
subscribe,
f"{id}-inbox",
client["subscription"],
EXCHANGES.get("publish"),
EXCHANGES.get("broadcast"),
)
THREADS.append(thread)
# push
# TODO - add optional webhook target to client and post to webhook target
# if present
if __name__ == "__main__":
main()
#!/usr/bin/env python
import pika
def get_connection():
return pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
def deliver(body, queue_name):
print("forward to: %s" % queue_name)
deliver_connection = get_connection()
deliver_channel = deliver_connection.channel()
deliver_channel.queue_declare(queue=queue_name, durable=True)
deliver_channel.basic_publish(exchange="", routing_key=queue_name, body=body)
deliver_connection.close()
def listen(from_queue_name, to_queue_name):
def fwd_callback(ch, method, properties, body):
delivered = deliver(body, to_queue_name)
ch.basic_ack(delivery_tag=method.delivery_tag)
listen_connection = get_connection()
listen_channel = listen_connection.channel()
listen_channel.queue_declare(queue=from_queue_name, durable=True)
listen_channel.basic_consume(
queue=from_queue_name, on_message_callback=fwd_callback
)
listen_channel.start_consuming()
def forward(from_queue_name, to_queue_name):
listen(from_queue_name, to_queue_name)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment