Source
...
Target
Commits (82)
# DATA_DIR=
# SOAR_TOKEN_LIFETIME=
# SOAR_TOKEN_SECRET=
\ No newline at end of file
clients.json
__pycache__/
data/*.json
examples/
rmq.log
\ No newline at end of file
rmq.log
Pipfile
Pipfile.lock
\ No newline at end of file
include:
- project: communications-backbone-system/backbone-infrastructure-config
ref: master
file: gitlab/all.yml
variables:
DOCKER_IMAGE_NAME: communications-backbone
TEST_DOCKER_COMPOSE: 1
\ No newline at end of file
MIT License
Copyright (c) 2022 National Oceanography Centre CLG
Copyright (c) 2023 National Oceanography Centre
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
......
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
pubsubpy = "*"
pika = "*"
pyrabbit = "*"
flask = "*"
flask-restful = "*"
marshmallow = "*"
bson = "*"
flask-cors = "*"
cryptography = "*"
[dev-packages]
[requires]
python_version = "3.8"
This diff is collapsed.
......@@ -2,42 +2,71 @@
Communications Backbone by C2 Team (NOC)
## DJ prototype
### Infrastructure
I did this in freedom-fire-day so it doesn't currently follow the flask template.
The backbone has 3 runtime components:
It's also a bit weird because it writes to a local ignored `clients.json` file
instead of using a database. I did this as a placeholder because we've not yet
decided what the infrastructure looks like.
- RabbitMQ - run as a docker container
- The bus which handles the delivery of messages between queues
- The API which provides an interface to read from and write to the queues
### Data flow
- Client A sends to `client-a-outbox` (or POSTs to API /send - not yet implemented)
- Client A sends to `client-a-outbox` (by POSTing to API /send)
- Messages are forwarded from `client-a-outbox` to `soar-publish`
- Messages are published from `soar-publish` with the given topic read from the message
- Subscribers listen to for messages
- Subscription is delivered to `client-b-inbox`
- Client B reads from `client-b-inbox (or GETs from API /receive)
- Subscription is delivered to `client-b-inbox` (if the client subscription matches the message topic)
- Client B reads from `client-b-inbox` (by GETting from API /receive)
There is a parallel flow when a client sends to `client-a-notify` in which case the
messages are delivered through the broadcast exchange to all clients `client-x-inbox`.
There is a parallel flow when a client sends to `client-a-broadcast` (by POSTing to /notify).
In this case the messages are delivered through the broadcast exchange to all clients `client-x-inbox`.
### Auth placeholder
### Prerequisites
As a proxy for proper authentication, when you post a client a random secret is
returned in the response. To send to / receive from the bus you then call the API
with the client_id and secret and it checks they match. The client_id determines
which queues it reads from.
- Python >= 3.8
- A virtual environment manager - virtualenv, venv or pipenv
- Docker and docker-compose
Subsequent requests to the client endpoint return the client_id but not the secret.
### Running via docker-compose
### Setup
```
run-compose.sh
```
Using `docker-compose` will mean that everything is setup automatically, this includes the `rabbitmq` container, the backbone API, and the backbone bus. The `run-compose.sh` script has been provided to simplify this even further - all you have to do is set whatever env vars you need in the `.env` file and then run `./run-compose.sh` (the defaults in `.env` are fine for local dev work, but ones not labelled `optional` will need setting in a production setting). The env vars are:
- `DATA_DIR` - Where to mount the volume of the API container on your local system. This defaults to the result of `pwd`, which should be within the `communications-backbone` repo
- `SOAR_TOKEN_LIFETIME` (Optional) - The number of hours until a newly created token expires
- `SOAR_TOKEN_SECRET` (Optional) - A secret key used to encrypt/decrypt token data. If specified the value should be set using TokenModel.getKey()
### Running the bus and API natively (without docker)
#### Setup
We recommend using some form of python virtual environment to maintain a consistent
python version and ring-fence the package management.
In your virtual environment:
```
pipenv install
pip install -r requirements-dev.txt
```
### Running
This installs both the development and runtime dependencies for the project.
### Testing
Current coverage:
- API: yes
- Pika RabbitMQ implementation: no
In your virtual environment:
```
pytest
```
#### RabbitMQ
......@@ -45,49 +74,179 @@ pipenv install
#### API
In your virtual environment:
```
pipenv run python api.py
python api.py
```
#### Create some clients
##### Config
The API reads it's config from `./data/api-config.json` if present.
The default config for development environments only enables
connections from local host on any port `http://localhost:*`.
If you want to open the API up to connections from anywhere
or from specific known client domains you can create an
`api-config.json` to do that.
This can also be used to limit requests to specified endpoints.
The default config is intended for running a local development
environment. For production it is expected that the config file
will exist with settings like the following:
```json
{
"cors": {
"*": {
"origins": [
"*"
]
},
"/client": {
"origins": [
"http://localhost:*"
]
}
}
}
```
`POST` to `http://localhost:3000/clients`
This opens up the all endpoints except for the `/client` endpoint.
For now we expect client administration to be managed centrally.
#### Event bus
In your virtual environment:
```
pipenv run python soar_bus.py
python soar_bus.py
```
At present the soar bus creates the clients defined in the API when it starts
but does not monitor for new clients so if you create a new client you will
need to restart the bus. This will be fixed in a later release.
### Client Adapters
The intended use of the backbone is that multiple clients connect to the backbone
using adapters. An adapter handles:
- Authentication
- Sending and receiving over the backbone API
- Transforming messages between local formats and the backbone message formats
We have implemented the following template client adapters:
- [Javascript](https://git.noc.ac.uk/communications-backbone-system/backbone-adapter-javascript)
- [Python](https://git.noc.ac.uk/communications-backbone-system/backbone-adapter-python)
If you need to port the adapter to another language please contact us.
The adapters can be installed as packages and sub-classed as required for your
client application.
For install and usage instructions for the adapters see the READMEs.
### Usage
- Run the backbone
- Create some clients
- Restart the soar_bus service
- `cd docker && docker-compose restart soar_bus`
- Save your client credentials
- Test reading and writing directly to the queues
- Create an adapter
- Test sending and receiving via the adapter
#### Create some clients
##### With the script
```bash
python client_create.py
# will create a default 'admin' client subscribed to all messages (#)
# OR
python client_create.py --id=[client_id] --name="Your Client Name" --sub="something.something.#"
# will create a client with your preferred name, id and subscription
```
##### Through the API
`POST` to `http://localhost:8087/client`
The POST body should be JSON.
```json
{
"client_id": "noc-c2",
"client_name": "NOC C2",
"subscription": "soar.*.noc.#"
}
```
- `client_id` - a project unique human readable name
- `client_name` - how to refer to that client on screen
- `subscription` - the topic pattern identifying the messages you want to receive
(`*` = single word wildcard, `#` = multi-word wildcard)
The response from the post contains your client `secret`. This is only displayed once.
Subsequent GETS to `/client` or `/client/[client-id]` will not return the secret.
You should save the response as `soar-config.json` adding the additional `api` field
to specify the root URL of the API you're connecting to
(eg http://localhost:8087 for local development)
#### Send / Receive directly
```
# Send a message
pipenv run python client_send.py noc-c2-outbox 'soar.noc.slocum.something' from noc-c2
python client_send.py noc-c2-outbox 'soar.noc.slocum.something' from noc-c2
```
```
# Receive messages
pipenv run python client_read.py noc-sfmc-inbox
python client_read.py noc-sfmc-inbox
```
These scripts bypass authentication reading and writing directly to rabbitmq.
#### Authentication
Authentication is handled by a client credentials grant which is a GET
request to `/token?client_id=[client-id]&secret=[secret]`
The response includes a token and this token should be included as an
authorization header in requests to the API:
```json
{
"Authorization": "Bearer [token]"
}
```
#### Receive via API
#### Send and Receive via API
Once you have a bearer token you can make requests to send and receive
messages via the API:
As a placeholder for proper authentication you post the `client_id` and
`secret` and it checks that a client with that id exists and that the
secret matches before allowing the request.
- GET `/receive?max_messages=[X]` - gets all messages in the queue
- the default value of `max_messages` if not specified is 10
- POST `/send` - publish a message to a given topic
- POST `/notify` - broadcast a message to all clients
This should be replaced with a proper auth layer.
Supplying the token as an Authorization header as described above.
`GET http://localhost:5000/receive?client_id=[client_id]&secret=[secret]`
#### Send and Receive using adapters
### Components
If you have implemented one of the adapter templates then authentication
is handled for you.
- `soar_bus.py` - Run all the components threaded based on existing clients
- `soar_forward.py` - Listen for messages on queue A and forward messages to queue B
- `soar_publish.py` - Listen for messages on queue A and publish on exchange B
- `soar_broadcast.py` - Listen for messages on queue A and broadcast on exchange B
- `soar_subscribe.py` - Create subscriptions to both the publish and broadcast exchange - deliver to queue A
(I think this should probably be 2 separate functions to keep things nice and simple)
- `soar_push.py` - Not yet implemented - Listen for messages on queue A and POST to the client's webhook URL
You will need to create a set of client credentials as above using the
POST `/client` endpoint and save the response.
Then you pass your credentials when you create your adapter instance.
\ No newline at end of file
import json
import logging
import os
from flask import Flask
from flask_cors import CORS
from flask_restful import Api
from endpoints.clients import Client, ClientList
from endpoints.client import Client, ClientList
from endpoints.notify import Notify
from endpoints.receive import Receive
from endpoints.send import Send
from endpoints.notify import Notify
from endpoints.token import Token
from flask_cors import CORS
from models.token import TokenModel
from models.token_model import TokenModel
from logger import setup_logging
setup_logging()
token = TokenModel()
token.setSecret()
app = Flask(__name__)
api = Api(app)
CORS(app, resources={r"*": {"origins": "http://localhost:8086"}})
api.add_resource(ClientList, "/client")
api.add_resource(Client, "/client/<client_id>")
api.add_resource(Receive, "/receive")
api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token")
def get_config():
config = {"cors": {r"*": {"origins": "http://localhost:*"}}}
try:
with open("./data/api-config.json", "r") as config_file:
config = {
r"{}".format(key): value
for key, value in json.load(config_file).items()
}
except FileNotFoundError:
logging.info("No API config: Using default")
return config
def create_app():
app = Flask(__name__)
api = Api(app)
api_config = get_config()
logging.debug(str(api_config))
CORS(app, resources=api_config["cors"])
api.add_resource(ClientList, "/client")
api.add_resource(Client, "/client/<client_id>")
api.add_resource(Receive, "/receive")
api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token")
return app
flask_host = os.getenv(
"FLASK_HOST", "localhost"
) # Sets to whatever MQ_HOST is, or defaults to localhost
if __name__ == "__main__":
app.run(debug=True, port=8087)
app = create_app()
app.run(debug=False, port=8087, host=flask_host)
"""
Create a set of client credentials programmatically or return existing
if a client matching the id has already been configured.
Usage: client_create.py \
--id=soar-hydrogen \
--name="Hydrogen Interface" \
--sub="soar.#" \
--api="https://nucleus.noc.ac.uk/soar/api" \
[--silent]
"""
import argparse
import json
from models.client_model import ClientModel
parser = argparse.ArgumentParser(
prog="PROG", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--id",
type=str,
default="admin",
help="Client IDs should be human readable kebab-case and unique",
)
parser.add_argument(
"--name",
type=str,
default="Admin Client",
help="Client names are how the client will appear on screen",
)
parser.add_argument(
"--sub",
type=str,
default="#",
help="A rabbitmq topic pattern for the client to subscribe to *=one-word-wildcard #=multi-word-wildcard",
)
parser.add_argument(
"--api",
type=str,
default="http://localhost:8087",
help="The root api endpoint",
)
parser.add_argument("--silent", action="store_true")
parser.set_defaults(silent=False)
args = parser.parse_args()
if not args.silent:
parser.print_help()
client_model = ClientModel()
# Get existing client if already exists
client = client_model.find(args.id)
if not client:
client = client_model.add(
{
"client_id": args.id,
"client_name": args.name,
"subscription": args.sub,
}
)
# Add the API endpoint to the credentials file
client["api"] = args.api
if not args.silent:
print("Here is your credentials file:")
print(json.dumps(client, indent=2))
import pytest
def clients():
return {
"client-1": {
"client_id": "client-1",
"client_name": "Client 1",
"subscription": "soar.#",
"secret": "abc123",
},
"client-2": {
"client_id": "client-2",
"client_name": "Client 2",
"subscription": "soar.client-2.#",
"secret": "xyz789",
},
}
def get_auth_header(client, credentials):
token_response = client.get("/token", query_string=credentials)
if token_response.status_code == 200:
token = token_response.json["token"]
return {"Authorization": f"Bearer {token}"}
else:
return None
@pytest.fixture
def mock_clients():
return clients()
@pytest.fixture
def mock_new_client():
return {
"client_id": "client-3",
"client_name": "Client 3",
"subscription": "soar.client-3.#",
}
@pytest.fixture
def mock_client_credentials():
mock_clients = clients()
return {
"client_id": mock_clients["client-1"]["client_id"],
"secret": mock_clients["client-1"]["secret"],
}
@pytest.fixture
def mock_invalid_credentials():
return {"client_id": "client-invalid", "secret": "fake-secret"}
@pytest.fixture
def mock_token_secret():
return "2UrRyeb9c6hq8Gj9nmI5safPz9LpPeUFtifeMNx4GQo="
def posts():
return {
"send": {
"topic": "soar.client-1.message",
"body": "this is a pub/sub message from client-1",
},
"notify": {"body": "this is a broadcast message from client-1"},
}
@pytest.fixture
def mock_post_send():
return posts()["send"]
@pytest.fixture
def mock_message_send():
post = posts()["send"]
return {"topic": post["topic"], "message": post["body"]}
@pytest.fixture
def mock_post_notify():
return posts()["notify"]
@pytest.fixture
def mock_message_notify():
post = posts()["notify"]
return {"topic": "broadcast", "message": post["body"]}
@pytest.fixture
def mock_read_from_queue_return():
return [
{
"topic": "soar.client-1.something",
"message": "this is a pub/sub message from client-1",
}
]
FROM python:3.9.16
WORKDIR /app
# If we want to use the alpine image, we need these packages for pytest
# RUN apk update && apk add python3-dev gcc libc-dev libffi-dev
COPY requirements.txt requirements.txt
COPY requirements-dev.txt requirements-dev.txt
RUN pip install -r requirements.txt
RUN pip install -r requirements-dev.txt
\ No newline at end of file
FROM python:3.9.16
WORKDIR /app
COPY . .
# If we want to use the alpine image, we need these packages for pytest
# RUN apk update && apk add python3-dev gcc libc-dev libffi-dev
COPY requirements.txt requirements.txt
COPY requirements-dev.txt requirements-dev.txt
RUN pip install -r requirements.txt
RUN pip install -r requirements-dev.txt
\ No newline at end of file
version: '3.8'
services:
soar_api_test:
build:
context: ..
dockerfile: docker/Dev.Dockerfile
volumes:
- ../:/app
environment:
- PYTHONDONTWRITEBYTECODE=1
command: "pytest -p no:cacheprovider"
container_name: soar_api_test
\ No newline at end of file
version: '3.8'
services:
rabbitmq__local:
image: rabbitmq:management
restart: unless-stopped
ports:
- "5672:5672"
# - "15672:15672" # Admin web console
expose:
- "5672"
container_name: rmq
soar_bus:
build:
context: ..
dockerfile: docker/Dev.Dockerfile
restart: unless-stopped
depends_on:
- rabbitmq__local
environment:
- MQ_HOST=rmq
- LOG_LEVEL=${LOG_LEVEL:-DEBUG}
volumes:
- ../:/app
command: "python soar_bus.py"
container_name: soar_bus
soar_api:
build:
context: ..
dockerfile: docker/Dev.Dockerfile
restart: unless-stopped
ports:
- "8087:8087"
expose:
- "8087"
depends_on:
- rabbitmq__local
environment:
- MQ_HOST=rmq
- FLASK_HOST=0.0.0.0
- LOG_LEVEL=${LOG_LEVEL:-DEBUG}
volumes:
- ../:/app
command: "python api.py"
container_name: soar_api
\ No newline at end of file
import json
import logging
from flask_restful import Resource, abort
from models.token import TokenModel
from models.token_model import TokenModel
class AuthResource(Resource):
def __init__(self):
class AuthResource(Resource):
def __init__(self):
self.token = TokenModel()
with open("clients.json", "r") as clients_file:
with open("./data/clients.json", "r") as clients_file:
self.clients = json.load(clients_file)
def auth(self, request):
def auth(self, request):
allow = False
auth = request.headers.get('Authorization', False)
token = None
auth = request.headers.get("Authorization", False)
if auth:
token = auth.split(' ').pop()
token = auth.split(" ").pop()
parsed = self.token.validate(token)
if parsed['valid']:
client = self.clients.get(parsed['client_id'])
if client:
if parsed["valid"]:
client = self.clients.get(parsed["client_id"])
if client:
self.client = client
allow = True
if not allow:
logging.debug(f"Authentication failed: {request.url}")
logging.trace(f"Token: {token}")
abort(403, message="Invalid token")
return allow
\ No newline at end of file
return allow
import logging
from flask_restful import Resource, abort, request
from marshmallow import Schema, fields
from werkzeug.exceptions import BadRequest
from models.client_model import ClientModel
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.ERROR)
class ClientSchema(Schema):
client_id = fields.Str(required=True)
client_name = fields.Str(required=True)
subscription = fields.Str(required=True)
# Client
class Client(Resource):
clients_file = None
def __init__(self):
self.schema = ClientSchema()
self.clients_file = ClientModel()
def get(self, client_id):
client = self.clients_file.find(client_id)
# remove secret
# secret is only returned in initial post response
del client["secret"]
if not client:
abort(404, message="No client with id: {}".format(client_id))
return client
def delete(self, client_id):
client = self.clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
self.clients_file.remove(client)
return client, 204
def put(self, client_id):
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
client = self.clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
client = self.clients_file.update(args)
return client, 201
# ClientList
class ClientList(Resource):
def __init__(self):
self.schema = ClientSchema()
self.clients_file = ClientModel()
def get(self):
# return list of clients with secret property removed
return {
client_id: (client, client.pop("secret", None))[0]
for client_id, client in self.clients_file.get().items()
}
def post(self):
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
client = self.clients_file.find(args["client_id"])
if client:
abort(409, message="Duplicate client id: {}".format(args["client_id"]))
else:
client = self.clients_file.add(args)
return client, 201
......@@ -2,6 +2,7 @@ import json
from flask_restful import abort, request
from marshmallow import Schema, fields
from werkzeug.exceptions import BadRequest
from endpoints.auth_resource import AuthResource
from rmq import write_to_queue
......@@ -10,6 +11,7 @@ from rmq import write_to_queue
class NotifySchema(Schema):
body = fields.Str(required=True)
class Notify(AuthResource):
clients = None
schema = None
......@@ -17,9 +19,13 @@ class Notify(AuthResource):
def __init__(self):
super().__init__()
self.schema = NotifySchema()
def post(self):
args = request.get_json()
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
......@@ -27,12 +33,12 @@ class Notify(AuthResource):
allow = False
body = args.get("body")
message = {
'topic': 'broadcast',
'message': body,
"topic": "broadcast",
"message": body,
}
allow = self.auth(request)
if allow:
notify_queue = self.client['client_id'] + "-broadcast"
write_to_queue(queue_name=notify_queue, msg=json.dumps(message))
\ No newline at end of file
notify_queue = self.client["client_id"] + "-broadcast"
write_to_queue(queue_name=notify_queue, msg=json.dumps(message))
......@@ -22,9 +22,10 @@ class Receive(AuthResource):
if errors:
abort(400, message=str(errors))
max_messages = request.args.get("max_messages", 10)
# force query string parameter value into int
max_messages = int(request.args.get("max_messages", 10))
allow = self.auth(request)
if allow:
inbox_queue = self.client['client_id'] + "-inbox"
return read_from_queue(queue_name=inbox_queue, max_msgs=max_messages)
\ No newline at end of file
inbox_queue = self.client["client_id"] + "-inbox"
return read_from_queue(queue_name=inbox_queue, max_msgs=max_messages)
......@@ -2,6 +2,7 @@ import json
from flask_restful import abort, request
from marshmallow import Schema, fields
from werkzeug.exceptions import BadRequest
from endpoints.auth_resource import AuthResource
from rmq import write_to_queue
......@@ -11,6 +12,7 @@ class SendSchema(Schema):
body = fields.Str(required=True)
topic = fields.Str(required=True)
class Send(AuthResource):
clients = None
schema = None
......@@ -18,22 +20,26 @@ class Send(AuthResource):
def __init__(self):
super().__init__()
self.schema = SendSchema()
def post(self):
args = request.get_json()
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
allow = self.auth(request)
if allow:
body = args.get("body")
topic = args.get("topic")
outbox_queue = self.client['client_id'] + "-outbox"
outbox_queue = self.client["client_id"] + "-outbox"
message = {
'topic': topic,
'message': body,
"topic": topic,
"message": body,
}
write_to_queue(queue_name=outbox_queue, msg=json.dumps(message))
\ No newline at end of file
write_to_queue(queue_name=outbox_queue, msg=json.dumps(message))