Commit 2b329c8d authored by Dan Jones's avatar Dan Jones
Browse files

Merge branch '1-port-http-adapter-to-python' into 'dev'

Resolve "Port HTTP Adapter to python"

Closes #1

See merge request !1
parents ad1285b9 5a54efac
/Pipfile
/Pipfile.lock
/venv/*
/test/features/*
/test/fixtures/*
__pycache__/
build/
\ No newline at end of file
Copyright 2023 [NOC](https://noc.ac.uk)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
...@@ -4,4 +4,132 @@ Generic adapter for the communications-backbone. ...@@ -4,4 +4,132 @@ Generic adapter for the communications-backbone.
Implements: Implements:
client credentials grant http send/receive/notify to backbone websockets send and receive validation of messages against a specified OpenAPI schema decode/encode stubs - client credentials grant
\ No newline at end of file - http send/receive/notify to backbone
- validation of messages against a specified OpenAPI schema
- decode/encode stubs
## Setup
```
pip install -r requirements.txt
copy_tests
```
## Test
The tests are written in [behave](https://behave.readthedocs.io/en/stable/)
This means we can have a common suite of [gherkin](https://github.com/cucumber/gherkin)
tests across adapter ports written in multiple languages.
```
behave
```
## Example implementation
There's an example using the python client in [./example](./example).
There's a [README](./example/README.md) to explain what it does and how to
set it up.
## Installing in your project
We may publish this to a public registry but for now you need to install the package
from git.
### Requirements
Python >= 3.8
### Pip
It should also work with pipenv or poetry. You might have to tinker with the git url syntax in different
package managers.
#### Once public
```bash
echo backbone-adapter-python @ git+ssh://git@git.noc.ac.uk/communications-backbone-system/backbone-adapter-python.git@[tag|branch|commit] >> requirements.txt
pip install -r requirements.txt
```
#### For now
```bash
echo backbone-adapter-python @ git+https://git@git.noc.ac.uk/communications-backbone-system/backbone-adapter-python.git@[tag|branch|commit] >> requirements.txt
pip install -r requirements.txt
```
## Schema
The example code uses a mock schema with some example messages. The intention is the message
protocol schema is retreived from an external source.
## Config
To run the adapter you need a credentials file called `soar-config.json`.
This will be provided by the backbone operator or requested via the API.
```json
{
"api": "[backbone api root url]",
"client_id": "unique-client-id",
"client_name": "UniqueClientName",
"subscription": "dot.delimited.topic.subscription.#",
"secret": "[a generated secret]"
}
```
### Topics and Subscriptions
The topics have the following structure:
```
project.operator.vehicleType.vehicleID.[send|receive].messageType
# eg
soar.noc.autosub.ah1.send.platform-mission
# or
soar.po.ecosub.eco1.receive.platform-status
```
Subscriptions may contain single-word wildcards (*) or multi-word wildcards (#).
## Encoding and decoding
### Decoding
Decoding refers to translation from the backbone message protocol into a
native format for the client app to process.
All messages received from the backbone are parsed and validated against the
protocol schema and then passed to the protocol decode function.
By overriding the decode function the client can define local actions to be
executed when a message of a given type is received.
### Encoding
Encoding refers to translation from the client app's native format into a
message conforming to the backbone message protocol.
The equivalent encode method allows the client to define translations per
message type to transform local data into a message conforming to the
protocol schema for transmission.
Messages passed to the publish and broadcast methods should have been
encoded and validated against the protocol schema.
## Publish vs Broadcast
It is intended that all normal-operation messages will be published on
a given topic allowing clients to choose which message topics to
subscribe to.
Broadcast is provided for contingency scenarios. The intention is that
in the case of a failure/abort a message can be sent to all parties
which bypasses any existing messages in the publish queue.
The client implementation can chose to take no-action on decoding one of
these messages but they will be made available to all clients.
\ No newline at end of file
[behave]
paths=test
\ No newline at end of file
soar-config.json
received/*
!received/README.md
transmit/*
!transmit/README.md
sent/*
!sent/README.md
\ No newline at end of file
# Example Python Client
A reference example of how you might use the python adapter.
It has very basic functionality but it shows how the adapter
is instantiated and wired up to send and receive messages.
## Install
In the `example` directory:
```
pip install -r requirements.txt
```
## Run
Get a set of client config from your backbone instance and
save as `soar-config.json` in the `example` directory.
```
python run.py
```
## Receiving
When a message is received it logged in the `received` directory
according to a naming convention and indexed by time.
```
recevied
\_ YYYY-MM-DD (iso format date)
\_ HH (2 digit hour UTC)
\_ [iso format timestamp]-[id].json
```
Each message is classified by date, type, operator (eg Planet Ocean)
and platform (eg ecosub-X).
Where operator and platform are not specified they will default to `all`
## Sending
When a message is written to the `transmit` directory it will be read,
parsed, validated and sent to the backbone. Once transmitted
the files are moved from the `transmit` to the sent directory and
renamed as per the naming convention described in received.
It reads `.json` files. Other file types are ignored.
## Frontend
There's a very basic frontend at `http://localhost:8089` which
serves up the raw messages. It's not dynamic. The backend is
dynamically consuming messages but the front end will just
render messages for the timescale requested.
There's a very basic API that serves up the raw message data at
`http://localhost:8089/messages`.
Both take params specified as microtime ISO format timestamps:
`2023-02-03T00:00:00.000000`
import json
import time
from backbone_adapter.adapter import Adapter
from protocol import ExampleProtocol
ADAPTER = None
def get_schema():
with open('../test/fixtures/schema-swagger.json', 'r') as api_schema_file:
schema = json.load(api_schema_file)
return schema
def get_config():
with open('soar-config.json', 'r') as soar_config_file:
soar_config = json.load(soar_config_file)
return soar_config
def get_adapter():
global ADAPTER
if not ADAPTER:
schema = get_schema()
soar_config = get_config()
protocol = ExampleProtocol(schema, {})
ADAPTER = Adapter(protocol, soar_config)
return ADAPTER
\ No newline at end of file
from flask import Flask, request, render_template
from flask_restful import Api
from endpoints.messages import Messages
import json
def to_pretty_json(value):
return json.dumps(value, sort_keys=True,
indent=2, separators=(',', ': '))
app = Flask(__name__)
app.jinja_env.filters['to_json_pretty'] = to_pretty_json
api = Api(app)
api.add_resource(Messages, "/messages")
@app.route("/")
def index():
message_resource = Messages()
template_data = {
"title": "Example Python Backbone Adatper",
"messages": message_resource.get(),
"from_time": request.args.get("from_time", ""),
"until_time": request.args.get("until_time", ""),
}
return render_template('pages/messages.html', **template_data)
def run():
app.run(debug=True, port=8089)
if __name__ == "__main__":
run()
\ No newline at end of file
from flask_restful import Resource, request, abort
from marshmallow import Schema, fields, validate
import functools
from datetime import datetime, timedelta
from glob import glob
import json
import os
import re
class MessagesRequestSchema(Schema):
from_time = fields.Str(required=False)
until_time = fields.Str(required=False)
class MessageLoader:
def date_in_range(self, received, earliest, latest):
in_range = False
if re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", received):
after_start = received >= earliest
before_end = received <= latest
in_range = after_start and before_end
return in_range
def date_as_file_prefix(self, isostamp):
return datetime.strptime(isostamp, "%Y-%m-%dT%H:%M:%S.%f").strftime("%Y-%m-%d-%H-%M-%S-%f")
def read_message(self, file_path):
with open(file_path, "r") as file:
content = json.load(file)
return content
def get(self, directory, earliest, latest):
earliest_fileprefix = self.date_as_file_prefix(earliest)
latest_fileprefix = self.date_as_file_prefix(latest)
earliest_date = earliest[0:10]
latest_date = latest[0:10]
start_dir = os.getcwd()
os.chdir(directory)
received_dates = glob('*')
dates_in_range = [
received_date
for received_date
in received_dates
if self.date_in_range(received_date, earliest_date, latest_date)
]
in_range_messages = []
for in_range_date in dates_in_range:
earliest_hour = "00"
latest_hour = "23"
is_earliest_date = in_range_date == earliest_date
is_latest_date = in_range_date == latest_date
if is_earliest_date:
earliest_hour = earliest[11:13]
if is_latest_date:
latest_hour = latest[11:13]
os.chdir(in_range_date)
hours_in_range = [
received_hour
for received_hour
in glob("*")
if received_hour >= earliest_hour
and received_hour <= latest_hour
]
for in_range_hour in hours_in_range:
os.chdir(in_range_hour)
messages = glob("*")
is_earliest_hour = in_range_hour == earliest_hour
is_latest_hour = in_range_hour == latest_hour
if is_earliest_hour:
messages = [
message
for message in messages
if message >= earliest_fileprefix
]
if is_latest_hour:
messages = [
message
for message in messages
if message <= latest_fileprefix
]
in_range_messages.extend([
self.read_message(os.path.join(start_dir, directory, in_range_date, in_range_hour, message))
for message
in messages
])
os.chdir("..")
os.chdir("..")
os.chdir(start_dir)
return in_range_messages
# Messages
class Messages(Resource):
def __init__(self):
self.schema = MessagesRequestSchema()
self.loader = MessageLoader()
def get(self):
errors = self.schema.validate(request.args)
if errors:
abort(400, message=str(errors))
params = {
arg: val
for (arg,val) in request.args.items()
if val is not ''
}
# Default = 1 hour ago to now
latest_default = datetime.utcnow()
latest = params.get("until_time", latest_default.isoformat())
earliest_default = latest_default - timedelta(hours=1)
earliest = params.get("from_time", earliest_default.isoformat())
print(f"earliest: {earliest} latest {latest}")
return self.loader.get("received", earliest, latest)
\ No newline at end of file
import json
import os
from datetime import datetime
from backbone_adapter.protocol import GenericProtocol
class ExampleProtocol(GenericProtocol):
def decode(self, type, message):
print(json.dumps(message))
now = datetime.utcnow()
iso = now.isoformat()
file_path = os.path.join("received", self.get_file_path(iso, message))
file_name = self.get_file_name(now, message)
print(file_path, file_name)
self.write_to_file(file_path, file_name, json.dumps(message))
return message
def encode(self, type, message):
now = datetime.utcnow()
iso = now.isoformat()
file_path = os.path.join("sent", self.get_file_path(iso, message))
file_name = self.get_file_name(now, message)
print(file_path, file_name)
self.write_to_file(file_path, file_name, json.dumps(message))
return message
def get_file_path(self, isotimestamp, message):
(isodate, isotime) = isotimestamp.split("T")
hour = isotime.split(":")[0]
return os.path.join(isodate, hour)
def get_file_name(self, now, message):
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S-%f")
metadata = message.get("metadata", {})
mid = metadata.get("message_id", "no.id")
file_name = f"{timestamp}-{mid}.json";
return file_name
def write_to_file(self, file_path, file_name, content):
try:
os.makedirs(file_path, 0o755, exist_ok=True)
with open(os.path.join(file_path, file_name), "w") as message_file:
message_file.write(content)
return True
except Exception as error:
print(str(error))
return False
## Receiving
When a message is received it logged in the `received` directory
according to a naming convention and indexed by time.
```
recevied
\_ YYYY-MM-DD (iso format date)
\_ HH (2 digit hour UTC)
\_ [iso format timestamp]-[type]-[topic]-[id].json
```
Each message is classified by date, type, the topic it was
published to and the message id to ensure uniqueness.
\ No newline at end of file
import json
import time
from adapter import get_adapter
POLL_INTERVAL = 10
def main():
soar_adapter = get_adapter()
while(True):
soar_adapter.poll()
time.sleep(POLL_INTERVAL)
if __name__ == '__main__':
main()
aniso8601==9.0.1
attrs==22.2.0
backbone-adapter-python @ git+ssh://git@git.noc.ac.uk/communications-backbone-system/backbone-adapter-python.git@ff0dae90
behave==1.2.6
certifi==2022.12.7
charset-normalizer==3.0.1
click==8.1.3
exceptiongroup==1.1.0
Flask==2.2.2
Flask-RESTful==0.3.9
idna==3.4
importlib-metadata==6.0.0
importlib-resources==5.10.2
iniconfig==2.0.0
itsdangerous==2.1.2
Jinja2==3.1.2
jsonschema==4.17.3
jsonschema-spec==0.1.2
lazy-object-proxy==1.9.0
MarkupSafe==2.1.2
marshmallow==3.19.0
openapi-schema-validator==0.4.1
openapi-spec-validator==0.5.2
packaging==23.0
parse==1.19.0
parse-type==0.6.0
pathable==0.4.3
pkgutil_resolve_name==1.3.10
pluggy==1.0.0
pyrsistent==0.19.3
pytest==7.2.1
pytz==2022.7.1
PyYAML==6.0
requests==2.28.2
responses==0.22.0
six==1.16.0
toml==0.10.2
tomli==2.0.1
types-toml==0.10.8.1
typing_extensions==4.4.0
urllib3==1.26.14
watchdog==2.2.1
Werkzeug==2.2.2
zipp==3.12.0
#!/usr/bin/env python
import concurrent.futures
from app import run as run_flask
from sender import main as run_sender
from receiver import main as run_receiver
THREADS = []
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
# send
thread = executor.submit(run_sender)
THREADS.append(thread)
# receive
thread = executor.submit(run_receiver)
THREADS.append(thread)
# api
thread = executor.submit(run_flask)
THREADS.append(thread)
if __name__ == "__main__":
main()
import json
import os
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from adapter import get_adapter
class TransmitHandler(FileSystemEventHandler):
def __init__(self):
self.adapter = get_adapter()
super().__init__()
def on_created(self, event):
# print(event.src_path.strip())
file_name = event.src_path.strip()
if file_name.endswith(".json"):
print("sending", event.src_path)
with open(file_name, "r") as message_file:
message = json.load(message_file)
validation = self.adapter.validate(message)
if (validation.valid):
message_type = self.adapter.protocol.getType(message)
message = self.adapter.protocol.encode(message_type, message)
metadata = message.get("metadata", {})
topic = metadata.get("destination", "broadcast")
try:
if topic == "broadcast":
self.adapter.broadcast(message)
else:
self.adapter.publish(topic, message)
except requests.exceptions.RequestException as error:
print(f"status {error.response.status_code} reason: {error.response.reason}")
os.remove(file_name)
def main():
event_handler = TransmitHandler()
observer = Observer()
observer.schedule(event_handler, path='./transmit', recursive=False)
observer.start()
while True:
try:
pass
except KeyboardInterrupt:
observer.stop()
if __name__ == '__main__':
main()
\ No newline at end of file
# Sent
Messages transmitted to the backbone from the `transmit`
directory are named and saved here indexed by timestamp.
\ No newline at end of file
<!doctype html>
<html>
<head>
<title>{{ page_title }}</title>
<!-- Compressed CSS -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/foundation-sites@6.7.5/dist/css/foundation.min.css" crossorigin="anonymous">
</head>
<body>
{% block content %}{% endblock %}
</body>
</html>
\ No newline at end of file
{% extends 'layout/simple.html' %}
{% block content %}
<div class="top-bar">
<div class="top-bar-left">
<ul class="dropdown menu" data-dropdown-menu>
<li class="menu-text">
{{ title }}
</li>
</ul>
</div>
<form method="get" action=".">
<div class="top-bar-right">
<ul class="menu">
<li><input name="from_time" type="text" placeholder="Earliest ISO timestamp" value="{{ from_time }}"></li>
<li><input name="until_time" type="text" placeholder="Latest ISO timestamp" value="{{ until_time }}"></li>
<li><button type="submit" class="button">Load</button></li>
</ul>
</div>
</form>
</div>
<section name="content">
<div class="grid-container">
<div class="grid-x">
<div class="cell small-12">
<div class="callout">
<p>This client is not dynamically updated with new content.</p>
<p>It just renders messages from the received folder once on page load.</p>
<p>If you add a message to the transmit folder with a metadata.destination that matches
the subscription in your credentials you should receive it back when it gets published.
</p>
<p>So you should see the message:</p>
<ul>
<li>disappear from transmit</li>
<li>appear in sent renamed and filed by date</li>
<li>then appear in received provided it matches your subscription</li>
</ul>
<p>Once you've received some messages either from this client or from
another client - you should be able to search them on this page or from
the /messages api endpoint.
</p>
</div>
</div>
<div class="cell medium-12">
<table>
<tbody>
<tr>
<th class="text-left">Messages</th>
<td>{{ messages | length }}</td>
</tr>
</tbody>
</table>
</div>
{% for message in messages %}
<div class="cell medium-6 small-12">
<table>
<tbody>
<tr>
<th class="text-left">Topic</th>
<td>{{ message.metadata.destination }}</td>
</tr>
<tr>
<th class="text-left">Source</th>
<td>{{ message.metadata.source }}</td>
</tr>
<tr>
<th class="text-left">Message ID</th>
<td>{{ message.metadata.message_id }}</td>
</tr>
<tr>
<th class="text-left">Type</th>
<td>{{ message.payload.message_type }}</td>
</tr>
</tbody>
</table>
</div>
<div class="cell medium-6 small-12">
<table>
<tbody>
<tr>
<td>
<pre>{{ message | to_json_pretty }}</pre>
</td>
</tr>
</tbody>
</table>
</div>
{% endfor %}
</div>
</div>
</section>
{% endblock %}
\ No newline at end of file
# Transmit
This folder is monitored by `sender.py` when a file is
detected it will be read, parsed, validated and sent
to the backbone.
Once sent the file will be moved to the sent folder and
renamed to match the received naming convention.
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment