""" Generate swagger to view models/schemas. Command: 1/ python3 generate_swagger.py 2/ Go to http://127.0.0.1:5000/soardocs WARNING: API Endpoints are NOT functional. Purely for easy-reading. """ from flask import Flask, request from flasgger import Swagger, LazyString from flask_restx import Api, fields, Resource app = Flask(__name__) api = Api(app) swagger_template = dict( info={ "title": LazyString(lambda: "SoAR Backbone Message Formats"), "version": LazyString(lambda: "0.1"), "description": LazyString( lambda: "Backbone Message Format component for the Squad of" + " Autonomous Robots (SoAR) message definitions." ), }, host=LazyString(lambda: request.host), ) full_message_schema = api.model( "FullMessageSchema", { "message_ID": fields.String( required=True, description="UUID assigned to this message", example="b427003c-7bc8-11ed-a1eb-0242ac120002", ), "timestamp": fields.DateTime( required=True, description="Timestamp of message", example="2022-11-16T00:00:00Z", ), "version": fields.Float( required=True, description="Version of comms bacbone messaging format protocol", example=2.0, ), "source": fields.String( required=True, description="Where is this message from", example="autonomy_engine", ), "destination": fields.String( required=True, description="What is the destination of this message", example="ah-1", ), "encoded": fields.Boolean( required=True, description="Indicate that message raw (encoded) or decoded. " + "Options: encoded=True, decoded=False", example=False, ), "type": fields.String( required=True, description="Type of message", example="platform_status", ), "payload": fields.Raw( required=True, description="Content of Message", # example="{}", ), }, ) sensor_schema = api.model( "SensorSchema", { "sensor_ID": fields.Integer( required=True, description="unique identifier for platform", example=2, ), "serial": fields.String( required=False, description="serial number of sensor", example="mbes-001", ), "sensor_status": fields.Boolean( required=False, description="Sensor switched on (True) or off (False)", example=True, ), "additional_data": fields.Raw( required=False, description="Any addition fields/data to be added here", ), }, ) constraints_schema = api.model( "ConstraintsSchema", { "min_altitude": fields.Float( required=True, description="Minimum altitude set for squad.", example=15.2, ), "min_velocity": fields.Float( required=True, description="Minimum velocity set for squad.", example=0.1, ), "max_velocity": fields.Float( required=True, description="Maximum altitude set for squad.", example=0.9, ), }, ) platform_schema = api.model( "PlatformSchema", { "platform_ID": fields.Integer( required=True, description="unique identifier for platform", example=1, ), "serial": fields.String( required=True, description="platform serial number", example="reav-60", ), "model": fields.String( required=True, description="platform serial number", example="reav", ), "constraints": fields.Nested(constraints_schema), }, ) action_schema = api.model( "AutonomyEngineAction", { "action": fields.String( required=True, description="Autonomy Engine's action from `move`, `payload`," + " `dive`, `send_hits`, `scanline`, `scanpoint`.", example="move", ), "flight_style": fields.String( required=False, description="Platform-specific modes/flight styles to perform" + " next action", example="orbit", ), "latitude_waypoint": fields.Float( required=True, description="Next waypoint, x-coordinate", example=-4.187143188645706, ), "longitude_waypoint": fields.Float( required=True, description="Next waypoint, y-coordinate", example=50.37072283932642, ), "altitude": fields.Float( required=False, description="Altitude of next action", example=15.0, ), "depth": fields.Float( required=False, description="Depth of next action", example=15.0, ), "activate_payload": fields.Boolean( required=False, description="To activate/deactivate sensor for Autosub " + "Hover-1 --> `MBES` sensor and for EcoSUB --> `Sidescan`", example=True, ), "send_environmental_data": fields.Boolean( required=False, description="DO WE NEED THIS? To trigger the platform to send" + " list of observations if any found", example=False, ), }, ) mission_plan_schema = api.model( "MissionPlan", { "message": fields.Nested( full_message_schema, required=True, description="Message header", ), "plan_ID": fields.Integer( required=True, description="Identifier of given mission sequence planned", example=1, ), "platform_serial": fields.String( required=True, description="Serial of target platform to send mission to", example="reav-60", ), "plan": fields.List( fields.Nested(action_schema), required=True, description="Sequence of actions/instructions generated by the " + " Autonomy Engine that should be compiled by the respective C2.", ), }, ) observation_schema = api.model( "Observation", { "message": fields.Nested( full_message_schema, required=True, description="Message header", ), "platform_serial": fields.String( required=True, description="Serial of platform to sendign observations", example="ecosub-3", ), # "observation_type" ==> payloads tied to different types maybe? # properties of each observation? "points_of_interest": fields.Float( required=False, description="Points from features of interest identified by" + " platform if any found. DEFINE FORMAT.", example="", ), "region_surveyed": fields.Float( required=False, description="Region surveyed by given platform. DEFINE FORMAT." + " GEOJSON?", example="", ), "quality_of_points": fields.Float( required=False, description="Quality/strength of points from features of interest" + " identified by platform. DEFINE FORMAT.", example=0.98, ), "additional_data": fields.Raw( required=False, description="Placeholder field for any additional data", example={"sensor_payload": False}, ), }, ) region_schema = api.model( "RegionSchema", { "region": fields.Raw( required=True, description="Using GEOJSON, exact region of interest in rectangle" + " format polygon", example={ "type": "FeatureCollection", "features": [ { "type": "Feature", "properties": {}, "geometry": { "coordinates": [ [ [-4.187143188645706, 50.37072283932642], [-4.202697005964865, 50.368816892405874], [-4.203156724702808, 50.365640144076906], [-4.19449868846155, 50.362267670845654], ] ], "type": "Polygon", }, } ], }, ), }, ) squad_metadata_schema = api.model( "SquadMetadataSchema", { "squad_ID": fields.Integer( required=True, description="Identifier of given squad", example=23, ), "no_of_platforms": fields.Integer( required=True, description="Number of platforms", example=3, ), "platforms": fields.List( fields.Nested(platform_schema), required=True, description="Squad consists of these platforms", ), "squad_mission_type": fields.String( required=True, description="Mission of given squad: `tracking`, `survey`," + " `inspection`", example="survey", ), "squad_state": fields.Boolean( required=True, description="In execution, Waiting.. ", example=False, ), "region_of_interest": fields.List( fields.Nested(region_schema), required=False, description="Region of interest and exclusion zones per squad.", ), "exclusion_zones": fields.List( fields.Nested(region_schema), required=True, description="Exclusion zones exclusion zones per squad.", ), }, ) planning_configuration_schema = api.model( "PlanningConfigurationSchema", { "message": fields.Nested( full_message_schema, required=True, description="Message header", ), "ID": fields.Integer( required=True, description="Unique identifier tagged to version of this" + " configuration plan", example=3, ), "squads": fields.Nested( squad_metadata_schema, required=False, description="Details of each squad", ), }, ) gps_schema = api.model( "GPS", { "gps_source": fields.Float( # TODO: TBD with partners required=False, description="Source of gps position. E.g. USBL (external), " + "platform itself (internal)", example="internal", ), "latitude_type": fields.String( required=False, description="", example="", ), "longitude_type": fields.String( required=False, description="", example="", ), "latitude": fields.Float( required=False, description="Latitude in ", example="", ), "longitude": fields.Float( required=False, description="Longitude in ", example="", ), "depth": fields.Float( required=False, description="Depth in ", example="", ), "altitude": fields.Float( required=False, description="Altitude in ", example="", ), # "gps_fix_seconds_ago" }, ) platform_status_message_schema = api.model( "platformStatusMessage", { "message": fields.Nested( full_message_schema, required=True, description="Message header", ), "platform_ID": fields.Integer( required=True, description="unique identifier for platform", example=1, ), "active": fields.Boolean( required=False, description="When a platform is in deployment (executing a mission" + " plan) this should be True", example=True, ), "platform_state": fields.String( # TODO: Define dictionary with potential STATES of each platform required=False, description="Current state executed by platform. E.g. " + "STOP, IDLE, ABORT.", example="IDLE", ), "autonomy_plan_ID": fields.Integer( required=False, description="Last mission plan ID (according to Autonomy Engine's " + "mission plan number) executed by platform", example=1, ), "mission_track_ID": fields.Integer( required=False, description=( "Track number - stage in mission (e.g. " + "4 --> Waypoint 3 to Waypoint 4)" ), example=4, ), "mission_action_ID": fields.Integer( required=False, description="to add description", example=1, ), "range_to_go": fields.Float( required=False, description="Estimated distance to reach next waypoint", example=124.3, ), "speed_over_ground": fields.Float( required=False, description="", example=124.3, ), "water_current_velocity": fields.Float( required=False, description="", example=124.3, ), "thrust_applied": fields.Float( required=False, description="TODO: Needs further consideration", example=124.3, ), "health_status": fields.String( required=False, description="Health status extracted by respective platform " + "if any diagnosis available checks on sensors", example="Warning", ), "gps_data": fields.List( fields.Nested(gps_schema), # TODO: TBD Do we want a list of gps # readings to allow > 1 reading i.e. platform + usbl required=True, description="Metadata pf each platform", ), "localisation_error": fields.Float( required=False, description="Localisation error at last USBL update.", example="", ), "usbl_fix_seconds_ago": fields.Float( required=False, description="", example="", ), "battery_remaining_capacity": fields.Float( required=True, description="Battery remaining capacity % provided by respective" + " platform/C2.", example=80.0, ), "sensor_config": fields.Nested( sensor_schema ), # TODO: TBD Do we want a list of sensors to allow > 1 sensor }, ) acknowledgement_schema = api.model( "Acknowledgement", { "message": fields.Nested( full_message_schema, required=True, description="Message header", ), "message_ID": fields.Integer( required=True, description="Identifier of message received and executed with " + "success for mission plans sent by the Autonomy Engine.", example=202, ), "status": fields.String( required=True, description="Highest level of acknowledgement. I.e. `c2_received`:" + " Received by C2,`c2_sent`: Sent from C2" + " ->Platform, `executed`: Executed by platform", ), }, ) swagger_config = { "headers": [], "specs": [ { "endpoint": "swagger", "route": "/swagger.json", "rule_filter": lambda rule: True, "model_filter": lambda tag: True, } ], "static_url_path": "/flasgger_static", "swagger_ui": True, "specs_route": "/soardocs/", "swagger": "2.0", "basePath": "/soar", "info": { "title": "soar", "version": "0.1", "description": "SoAR message schemas", }, "produces": ["application/json"], "consumes": ["application/json"], } swagger = Swagger(app, template=swagger_template, config=swagger_config) ns1 = api.namespace( "message", description="Message Wrapper (Full Message Schema) Format" ) @ns1.route("/wrapper") class MessageWrapper(Resource): @ns1.response(200, "Success", full_message_schema) def get(self): pass ns2 = api.namespace( "platform_status", description="platform Status Message Format", ) @ns2.route("") class platformStatus(Resource): @ns2.response(200, "Success", platform_status_message_schema) def get(self): pass ns3 = api.namespace( "planning_configuration", description="Planning Configuration Format. Do we want region of " + "interest to be per squad, per platform or in the main schema?", ) @ns3.route("") class PlanningConfiguration(Resource): @ns3.response(200, "Success", planning_configuration_schema) def get(self): pass # @api.route('/mission-plan/