from openapi_spec_validator import openapi_v30_spec_validator, openapi_v3_spec_validator from openapi_spec_validator.readers import read_from_filename from openapi_schema_validator import validate from jsonschema import FormatChecker from jsonschema import validate from fixtures.schemas import ( acknowledgement_schema, message_header, observation_schema, planning_configuration_schema, platform_status_schema, mission_plan_schema, ) import json """ Test specs """ schema, spec_url = read_from_filename("fixtures/swagger.json") errors_iterator = openapi_v3_spec_validator.iter_errors(schema) print(errors_iterator) print(openapi_v30_spec_validator.validate(schema)) """ Test schema - Hydrosurv Messages """ message_types = ["acknowledgement", "mission_plan", "platform_status"] for item in message_types: f = open("mock_data/hydrosurv_adapter/%s.json" % item) mock_data = json.load(f) header_data = mock_data["header"] payload_data = mock_data["payload"] print(validate(header_data, message_header, format_checker=FormatChecker())) if item == "acknowledgement": print( validate( payload_data, acknowledgement_schema, format_checker=FormatChecker() ) ) elif item == "mission_plan": print( validate(payload_data, mission_plan_schema, format_checker=FormatChecker()) ) elif item == "platform_status": print( validate( payload_data, platform_status_schema, format_checker=FormatChecker() ) ) """ Test schema - GUI Messages """ message_types = ["planning_configuration"] for item in message_types: f = open("mock_data/gui_adapter/%s.json" % item) mock_data = json.load(f) header_data = mock_data["header"] payload_data = mock_data["payload"] print(validate(header_data, message_header, format_checker=FormatChecker())) if item == "planning_configuration": print( validate( payload_data, planning_configuration_schema, format_checker=FormatChecker(), ) ) """ Test schema - Ecosub Messages """ message_types = [ "observation", "mission_plan", "platform_status", "platform_status-from_usbl_example", ] for item in message_types: f = open("mock_data/ecosub_adapter/%s.json" % item) mock_data = json.load(f) header_data = mock_data["header"] payload_data = mock_data["payload"] print(validate(header_data, message_header, format_checker=FormatChecker())) if item == "platform_status-from_usbl_example.json": print( validate( payload_data, platform_status_schema, format_checker=FormatChecker() ) ) elif item == "mission_plan": print( validate(payload_data, mission_plan_schema, format_checker=FormatChecker()) ) elif item == "platform_status": print( validate( payload_data, platform_status_schema, format_checker=FormatChecker() ) ) elif item == "observation": print( validate(payload_data, observation_schema, format_checker=FormatChecker()) ) """ Test schema - Autonomy Engine Messages """ message_types = [ "planning_configuration", "mission_plan_AH1", "mission_plan_ECOSUB", "mission_plan_HYDROSURV", "platform_status", "platform_status-from_usbl_example", "acknowledgement", ] for item in message_types: f = open("mock_data/autonomy_engine_adapter/%s.json" % item) mock_data = json.load(f) header_data = mock_data["header"] payload_data = mock_data["payload"] print(validate(header_data, message_header, format_checker=FormatChecker())) if item == "platform_status-from_usbl_example.json": print( validate( payload_data, platform_status_schema, format_checker=FormatChecker() ) ) elif item == "mission_plan_AH1": print( validate(payload_data, mission_plan_schema, format_checker=FormatChecker()) ) elif item == "mission_plan_ECOSUB": print( validate(payload_data, mission_plan_schema, format_checker=FormatChecker()) ) elif item == "mission_plan_HYDROSURV": print( validate(payload_data, mission_plan_schema, format_checker=FormatChecker()) ) elif item == "platform_status": print( validate( payload_data, platform_status_schema, format_checker=FormatChecker() ) ) elif item == "observation": print( validate(payload_data, observation_schema, format_checker=FormatChecker()) ) elif item == "planning_configuration": print( validate( payload_data, planning_configuration_schema, format_checker=FormatChecker(), ) ) elif item == "acknowledgement": print( validate( payload_data, acknowledgement_schema, format_checker=FormatChecker() ) )