Commit a5c5ba14 authored by Dan Jones's avatar Dan Jones
Browse files

Merge branch '45-release-v0-1-0' into 'master'

Resolve "Release v0.1.0"

See merge request !23
parents c3325b03 e4b42db1
Pipeline #115149 passed with stages
in 1 minute and 2 seconds
# DATA_DIR=
# SOAR_TOKEN_LIFETIME=
# SOAR_TOKEN_SECRET=
\ No newline at end of file
__pycache__/
data/*.json
examples/
rmq.log
Pipfile
Pipfile.lock
\ No newline at end of file
include:
- project: communications-backbone-system/backbone-infrastructure-config
ref: master
file: gitlab/all.yml
variables:
DOCKER_IMAGE_NAME: communications-backbone
TEST_DOCKER_COMPOSE: 1
\ No newline at end of file
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [v0.1.0] - 2023-03-24
### Added
Create a mechanism to exchange messages between multiple clients
Bus
- Implements persistent queues per client
- Implements publish/subscribe message flow between clients
- Implements broadcast message flow between clients
- Listens for and responds to add/remove clients
API
- Implements clients endpoint to manage clients and subscriptions
- Implements token endpoint to manage authentication client credentials grants
- Implements send endpoint for publishing
- Implements notify endpoint for broadcasts
- Implements receive endpoint to get messages
Docker
- Run local dev environment in docker-compose
[v0.1.0]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/611d9cab...v0.1.0
[unreleased]: https://git.noc.ac.uk/communications-backbone-system/communications-backbone/compare/v0.1.0...dev
MIT License MIT License
Copyright (c) 2022 National Oceanography Centre CLG Copyright (c) 2023 National Oceanography Centre
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
......
# Communications Backbone # Communications Backbone
Communications Backbone by C2 Team (NOC) Communications Backbone by C2 Team (NOC)
\ No newline at end of file
### Infrastructure
The backbone has 3 runtime components:
- RabbitMQ - run as a docker container
- The bus which handles the delivery of messages between queues
- The API which provides an interface to read from and write to the queues
### Data flow
- Client A sends to `client-a-outbox` (by POSTing to API /send)
- Messages are forwarded from `client-a-outbox` to `soar-publish`
- Messages are published from `soar-publish` with the given topic read from the message
- Subscribers listen to for messages
- Subscription is delivered to `client-b-inbox` (if the client subscription matches the message topic)
- Client B reads from `client-b-inbox` (by GETting from API /receive)
There is a parallel flow when a client sends to `client-a-broadcast` (by POSTing to /notify).
In this case the messages are delivered through the broadcast exchange to all clients `client-x-inbox`.
### Prerequisites
- Python >= 3.8
- A virtual environment manager - virtualenv, venv or pipenv
- Docker and docker-compose
### Running via docker-compose
```
run-compose.sh
```
Using `docker-compose` will mean that everything is setup automatically, this includes the `rabbitmq` container, the backbone API, and the backbone bus. The `run-compose.sh` script has been provided to simplify this even further - all you have to do is set whatever env vars you need in the `.env` file and then run `./run-compose.sh` (the defaults in `.env` are fine for local dev work, but ones not labelled `optional` will need setting in a production setting). The env vars are:
- `DATA_DIR` - Where to mount the volume of the API container on your local system. This defaults to the result of `pwd`, which should be within the `communications-backbone` repo
- `SOAR_TOKEN_LIFETIME` (Optional) - The number of hours until a newly created token expires
- `SOAR_TOKEN_SECRET` (Optional) - A secret key used to encrypt/decrypt token data. If specified the value should be set using TokenModel.getKey()
### Running the bus and API natively (without docker)
#### Setup
We recommend using some form of python virtual environment to maintain a consistent
python version and ring-fence the package management.
In your virtual environment:
```
pip install -r requirements-dev.txt
```
This installs both the development and runtime dependencies for the project.
### Testing
Current coverage:
- API: yes
- Pika RabbitMQ implementation: no
In your virtual environment:
```
pytest
```
#### RabbitMQ
`docker run --rm -p 5672:5672 -d --hostname rmq --name rmq rabbitmq:management`
#### API
In your virtual environment:
```
python api.py
```
##### Config
The API reads it's config from `./data/api-config.json` if present.
The default config for development environments only enables
connections from local host on any port `http://localhost:*`.
If you want to open the API up to connections from anywhere
or from specific known client domains you can create an
`api-config.json` to do that.
This can also be used to limit requests to specified endpoints.
The default config is intended for running a local development
environment. For production it is expected that the config file
will exist with settings like the following:
```json
{
"cors": {
"*": {
"origins": [
"*"
]
},
"/client": {
"origins": [
"http://localhost:*"
]
}
}
}
```
This opens up the all endpoints except for the `/client` endpoint.
For now we expect client administration to be managed centrally.
#### Event bus
In your virtual environment:
```
python soar_bus.py
```
At present the soar bus creates the clients defined in the API when it starts
but does not monitor for new clients so if you create a new client you will
need to restart the bus. This will be fixed in a later release.
### Client Adapters
The intended use of the backbone is that multiple clients connect to the backbone
using adapters. An adapter handles:
- Authentication
- Sending and receiving over the backbone API
- Transforming messages between local formats and the backbone message formats
We have implemented the following template client adapters:
- [Javascript](https://git.noc.ac.uk/communications-backbone-system/backbone-adapter-javascript)
- [Python](https://git.noc.ac.uk/communications-backbone-system/backbone-adapter-python)
If you need to port the adapter to another language please contact us.
The adapters can be installed as packages and sub-classed as required for your
client application.
For install and usage instructions for the adapters see the READMEs.
### Usage
- Run the backbone
- Create some clients
- Restart the soar_bus service
- `cd docker && docker-compose restart soar_bus`
- Save your client credentials
- Test reading and writing directly to the queues
- Create an adapter
- Test sending and receiving via the adapter
#### Create some clients
##### With the script
```bash
python client_create.py
# will create a default 'admin' client subscribed to all messages (#)
# OR
python client_create.py --id=[client_id] --name="Your Client Name" --sub="something.something.#"
# will create a client with your preferred name, id and subscription
```
##### Through the API
`POST` to `http://localhost:8087/client`
The POST body should be JSON.
```json
{
"client_id": "noc-c2",
"client_name": "NOC C2",
"subscription": "soar.*.noc.#"
}
```
- `client_id` - a project unique human readable name
- `client_name` - how to refer to that client on screen
- `subscription` - the topic pattern identifying the messages you want to receive
(`*` = single word wildcard, `#` = multi-word wildcard)
The response from the post contains your client `secret`. This is only displayed once.
Subsequent GETS to `/client` or `/client/[client-id]` will not return the secret.
You should save the response as `soar-config.json` adding the additional `api` field
to specify the root URL of the API you're connecting to
(eg http://localhost:8087 for local development)
#### Send / Receive directly
```
# Send a message
python client_send.py noc-c2-outbox 'soar.noc.slocum.something' from noc-c2
```
```
# Receive messages
python client_read.py noc-sfmc-inbox
```
These scripts bypass authentication reading and writing directly to rabbitmq.
#### Authentication
Authentication is handled by a client credentials grant which is a GET
request to `/token?client_id=[client-id]&secret=[secret]`
The response includes a token and this token should be included as an
authorization header in requests to the API:
```json
{
"Authorization": "Bearer [token]"
}
```
#### Send and Receive via API
Once you have a bearer token you can make requests to send and receive
messages via the API:
- GET `/receive?max_messages=[X]` - gets all messages in the queue
- the default value of `max_messages` if not specified is 10
- POST `/send` - publish a message to a given topic
- POST `/notify` - broadcast a message to all clients
Supplying the token as an Authorization header as described above.
#### Send and Receive using adapters
If you have implemented one of the adapter templates then authentication
is handled for you.
You will need to create a set of client credentials as above using the
POST `/client` endpoint and save the response.
Then you pass your credentials when you create your adapter instance.
\ No newline at end of file
import json
import logging
import os
from flask import Flask
from flask_cors import CORS
from flask_restful import Api
from endpoints.client import Client, ClientList
from endpoints.notify import Notify
from endpoints.receive import Receive
from endpoints.send import Send
from endpoints.token import Token
from models.token_model import TokenModel
from logger import setup_logging
setup_logging()
token = TokenModel()
token.setSecret()
def get_config():
config = {"cors": {r"*": {"origins": "http://localhost:*"}}}
try:
with open("./data/api-config.json", "r") as config_file:
config = {
r"{}".format(key): value
for key, value in json.load(config_file).items()
}
except FileNotFoundError:
logging.info("No API config: Using default")
return config
def create_app():
app = Flask(__name__)
api = Api(app)
api_config = get_config()
logging.debug(str(api_config))
CORS(app, resources=api_config["cors"])
api.add_resource(ClientList, "/client")
api.add_resource(Client, "/client/<client_id>")
api.add_resource(Receive, "/receive")
api.add_resource(Send, "/send")
api.add_resource(Notify, "/notify")
api.add_resource(Token, "/token")
return app
flask_host = os.getenv(
"FLASK_HOST", "localhost"
) # Sets to whatever MQ_HOST is, or defaults to localhost
if __name__ == "__main__":
app = create_app()
app.run(debug=False, port=8087, host=flask_host)
"""
Create a set of client credentials programmatically or return existing
if a client matching the id has already been configured.
Usage: client_create.py \
--id=soar-hydrogen \
--name="Hydrogen Interface" \
--sub="soar.#" \
--api="https://nucleus.noc.ac.uk/soar/api" \
[--silent]
"""
import argparse
import json
from models.client_model import ClientModel
parser = argparse.ArgumentParser(
prog="PROG", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--id",
type=str,
default="admin",
help="Client IDs should be human readable kebab-case and unique",
)
parser.add_argument(
"--name",
type=str,
default="Admin Client",
help="Client names are how the client will appear on screen",
)
parser.add_argument(
"--sub",
type=str,
default="#",
help="A rabbitmq topic pattern for the client to subscribe to *=one-word-wildcard #=multi-word-wildcard",
)
parser.add_argument(
"--api",
type=str,
default="http://localhost:8087",
help="The root api endpoint",
)
parser.add_argument("--silent", action="store_true")
parser.set_defaults(silent=False)
args = parser.parse_args()
if not args.silent:
parser.print_help()
client_model = ClientModel()
# Get existing client if already exists
client = client_model.find(args.id)
if not client:
client = client_model.add(
{
"client_id": args.id,
"client_name": args.name,
"subscription": args.sub,
}
)
# Add the API endpoint to the credentials file
client["api"] = args.api
if not args.silent:
print("Here is your credentials file:")
print(json.dumps(client, indent=2))
#!/usr/bin/env python
import pika, sys, os, json
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
queue_name = sys.argv[1]
channel.queue_declare(queue=queue_name, durable=True)
def callback(ch, method, properties, body):
message = json.loads(body.decode())
print(" [x] Received %r" % message)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
#!/usr/bin/env python
import pika
import sys
import json
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
queue_name = sys.argv[1]
topic = sys.argv[2]
message = " ".join(sys.argv[3:]) or "Hello World!"
body = json.dumps({"topic": topic, "message": message})
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange="", routing_key=queue_name, body=body)
connection.close()
import pytest
def clients():
return {
"client-1": {
"client_id": "client-1",
"client_name": "Client 1",
"subscription": "soar.#",
"secret": "abc123",
},
"client-2": {
"client_id": "client-2",
"client_name": "Client 2",
"subscription": "soar.client-2.#",
"secret": "xyz789",
},
}
def get_auth_header(client, credentials):
token_response = client.get("/token", query_string=credentials)
if token_response.status_code == 200:
token = token_response.json["token"]
return {"Authorization": f"Bearer {token}"}
else:
return None
@pytest.fixture
def mock_clients():
return clients()
@pytest.fixture
def mock_new_client():
return {
"client_id": "client-3",
"client_name": "Client 3",
"subscription": "soar.client-3.#",
}
@pytest.fixture
def mock_client_credentials():
mock_clients = clients()
return {
"client_id": mock_clients["client-1"]["client_id"],
"secret": mock_clients["client-1"]["secret"],
}
@pytest.fixture
def mock_invalid_credentials():
return {"client_id": "client-invalid", "secret": "fake-secret"}
@pytest.fixture
def mock_token_secret():
return "2UrRyeb9c6hq8Gj9nmI5safPz9LpPeUFtifeMNx4GQo="
def posts():
return {
"send": {
"topic": "soar.client-1.message",
"body": "this is a pub/sub message from client-1",
},
"notify": {"body": "this is a broadcast message from client-1"},
}
@pytest.fixture
def mock_post_send():
return posts()["send"]
@pytest.fixture
def mock_message_send():
post = posts()["send"]
return {"topic": post["topic"], "message": post["body"]}
@pytest.fixture
def mock_post_notify():
return posts()["notify"]
@pytest.fixture
def mock_message_notify():
post = posts()["notify"]
return {"topic": "broadcast", "message": post["body"]}
@pytest.fixture
def mock_read_from_queue_return():
return [
{
"topic": "soar.client-1.something",
"message": "this is a pub/sub message from client-1",
}
]
FROM python:3.9.16
WORKDIR /app
# If we want to use the alpine image, we need these packages for pytest
# RUN apk update && apk add python3-dev gcc libc-dev libffi-dev
COPY requirements.txt requirements.txt
COPY requirements-dev.txt requirements-dev.txt
RUN pip install -r requirements.txt
RUN pip install -r requirements-dev.txt
\ No newline at end of file
FROM python:3.9.16
WORKDIR /app
COPY . .
# If we want to use the alpine image, we need these packages for pytest
# RUN apk update && apk add python3-dev gcc libc-dev libffi-dev
COPY requirements.txt requirements.txt
COPY requirements-dev.txt requirements-dev.txt
RUN pip install -r requirements.txt
RUN pip install -r requirements-dev.txt
\ No newline at end of file
version: '3.8'
services:
soar_api_test:
build:
context: ..
dockerfile: docker/Dev.Dockerfile
volumes:
- ../:/app
environment:
- PYTHONDONTWRITEBYTECODE=1
command: "pytest -p no:cacheprovider"
container_name: soar_api_test
\ No newline at end of file
version: '3.8'
services:
rabbitmq__local:
image: rabbitmq:management
restart: unless-stopped
ports:
- "5672:5672"
# - "15672:15672" # Admin web console
expose:
- "5672"
container_name: rmq
soar_bus:
build:
context: ..
dockerfile: docker/Dev.Dockerfile
restart: unless-stopped
depends_on:
- rabbitmq__local
environment:
- MQ_HOST=rmq
- LOG_LEVEL=${LOG_LEVEL:-DEBUG}
volumes:
- ../:/app
command: "python soar_bus.py"
container_name: soar_bus
soar_api:
build:
context: ..
dockerfile: docker/Dev.Dockerfile
restart: unless-stopped
ports:
- "8087:8087"
expose:
- "8087"
depends_on:
- rabbitmq__local
environment:
- MQ_HOST=rmq
- FLASK_HOST=0.0.0.0
- LOG_LEVEL=${LOG_LEVEL:-DEBUG}
volumes:
- ../:/app
command: "python api.py"
container_name: soar_api
\ No newline at end of file
import json
import logging
from flask_restful import Resource, abort
from models.token_model import TokenModel
class AuthResource(Resource):
def __init__(self):
self.token = TokenModel()
with open("./data/clients.json", "r") as clients_file:
self.clients = json.load(clients_file)
def auth(self, request):
allow = False
token = None
auth = request.headers.get("Authorization", False)
if auth:
token = auth.split(" ").pop()
parsed = self.token.validate(token)
if parsed["valid"]:
client = self.clients.get(parsed["client_id"])
if client:
self.client = client
allow = True
if not allow:
logging.debug(f"Authentication failed: {request.url}")
logging.trace(f"Token: {token}")
abort(403, message="Invalid token")
return allow
import logging
from flask_restful import Resource, abort, request
from marshmallow import Schema, fields
from werkzeug.exceptions import BadRequest
from models.client_model import ClientModel
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.ERROR)
class ClientSchema(Schema):
client_id = fields.Str(required=True)
client_name = fields.Str(required=True)
subscription = fields.Str(required=True)
# Client
class Client(Resource):
clients_file = None
def __init__(self):
self.schema = ClientSchema()
self.clients_file = ClientModel()
def get(self, client_id):
client = self.clients_file.find(client_id)
# remove secret
# secret is only returned in initial post response
del client["secret"]
if not client:
abort(404, message="No client with id: {}".format(client_id))
return client
def delete(self, client_id):
client = self.clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
self.clients_file.remove(client)
return client, 204
def put(self, client_id):
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
client = self.clients_file.find(client_id)
if not client:
abort(404, message="No client with id: {}".format(client_id))
else:
client = self.clients_file.update(args)
return client, 201
# ClientList
class ClientList(Resource):
def __init__(self):
self.schema = ClientSchema()
self.clients_file = ClientModel()
def get(self):
# return list of clients with secret property removed
return {
client_id: (client, client.pop("secret", None))[0]
for client_id, client in self.clients_file.get().items()
}
def post(self):
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
client = self.clients_file.find(args["client_id"])
if client:
abort(409, message="Duplicate client id: {}".format(args["client_id"]))
else:
client = self.clients_file.add(args)
return client, 201
import json
from flask_restful import abort, request
from marshmallow import Schema, fields
from werkzeug.exceptions import BadRequest
from endpoints.auth_resource import AuthResource
from rmq import write_to_queue
class NotifySchema(Schema):
body = fields.Str(required=True)
class Notify(AuthResource):
clients = None
schema = None
def __init__(self):
super().__init__()
self.schema = NotifySchema()
def post(self):
try:
args = request.get_json()
except BadRequest:
abort(400, message="POSTed body is invalid JSON")
errors = self.schema.validate(args)
if errors:
abort(400, message=str(errors))
allow = False
body = args.get("body")
message = {
"topic": "broadcast",
"message": body,
}
allow = self.auth(request)
if allow:
notify_queue = self.client["client_id"] + "-broadcast"
write_to_queue(queue_name=notify_queue, msg=json.dumps(message))
from flask_restful import abort, request
from marshmallow import Schema, fields
from endpoints.auth_resource import AuthResource
from rmq import read_from_queue
class ReceiveQuerySchema(Schema):
max_messages = fields.Int(required=False)
class Receive(AuthResource):
clients = None
schema = None
def __init__(self):
super().__init__()
self.schema = ReceiveQuerySchema()
def get(self):
errors = self.schema.validate(request.args)
if errors:
abort(400, message=str(errors))
# force query string parameter value into int
max_messages = int(request.args.get("max_messages", 10))
allow = self.auth(request)
if allow:
inbox_queue = self.client["client_id"] + "-inbox"
return read_from_queue(queue_name=inbox_queue, max_msgs=max_messages)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment