Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
Communications Backbone System
backbone-adapter-ros
Commits
3f2b37a7
Commit
3f2b37a7
authored
1 year ago
by
izzat
Browse files
Options
Download
Email Patches
Plain Diff
feat(bb publisher): Add backbone publisher with Tom's example
parent
e79ec9bb
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
204 additions
and
0 deletions
+204
-0
src/backbone_adapter/src/backbone_publisher.py
src/backbone_adapter/src/backbone_publisher.py
+204
-0
No files found.
src/backbone_adapter/src/backbone_publisher.py
0 → 100755
View file @
3f2b37a7
#!/usr/bin/env python
import
json
import
time
import
uuid
from
datetime
import
datetime
import
requests
import
rospy
from
adapter
import
get_adapter
from
ecoSUB_interaction_pkg.msg
import
ecosub_state_v2
from
ecosub_lib_core.functions_core_getters
import
lib_core_getters
from
std_msgs.msg
import
String
import
adapter_interface.msg
as
soar_msg
from
adapter_interface.srv
import
StartExecution
,
StartExecutionResponse
class
BackbonePublisherHandler
(
object
):
"""
An example class to handle messages to be published to the communication backbone.
This class includes internal ROS subscribers/services to get the latest states of a component
(e.g. the Autonomy Engine or ecoSUBs) and comms backbone JSON message generator (to be included as
a callback function to a ROS service, for example).
Methods
-------
"""
def
__init__
(
self
)
->
None
:
self
.
adapter
=
get_adapter
()
# self.config_ID = 0
rospy
.
Subscriber
(
"platform_status"
,
String
,
self
.
send_platform_stats_cb
)
# rospy.Subscriber(
# "planning_config", soar_msg.PlanningConfig, self.planning_config_cb
# )
# start_execution_srv = rospy.Service(
# "start_execution", StartExecution, self.backbone_publisher_server
# )
self
.
message_example
=
"""
{
"header":{
"message_ID": "b427003c-0000-11aa-a1eb-bvcdfghjgfdd",
"timestamp": "2022-11-16T00:00:00Z",
"version": 2,
"source": "ecosub_c2",
"destination": "soar.noc.a.a.receive.platform_status",
"delivery_type": "publish",
"encoded": false
},
"payload":{
"message_type": "platform_status",
"platform_ID": "ecosub-2",
"platform_timestamp": "2022-12-21T00:00:00Z",
"status_source": "onboard_platform",
"autonomy_engine_plan_ID": 1,
"battery_remaining_capacity": 80.2,
"active": true,
"platform_state": "ABORT",
"mission_plan_ID": 1,
"mission_track_ID": 4,
"latitude": 178.2,
"longitude": -10.122,
"depth": 50.0,
"altitude": 20,
"heading": 90.0,
"health_status": false,
"localisation_north_error": 0,
"localisation_east_error": 0,
"usbl_fix_seconds_ago": 0,
"range_to_go": 124.3,
"sensor_config": {
"sensor_ID": 22,
"serial": "sidescan-2x",
"sensor_on": true,
"additional_data": {
"whiskers_on": true
}
},
"speed_over_ground": 4.3,
"thrust_applied": 124.3,
"water_current_velocity": "124.3NE"
}
}
"""
# load the above example json into a python dict
self
.
message
=
json
.
loads
(
self
.
message_example
)
# Send status update every defined time duration in seconds
rospy
.
Timer
(
rospy
.
Duration
(
10.0
),
self
.
update_platform_status
)
# ROS Subscriber callback example
def
send_platform_stats_cb
(
self
,
data
):
# rospy.loginfor(rospy.get_caller_id() + "Received: %s".format(data.data))
# Add code to update variables
# Example given by Tom
# Generate a universally unique ID for message ID
# the example appears to be type 1 (though the last section includes non-hex digits)
self
.
message
[
"header"
][
"message_ID"
]
=
str
(
uuid
.
uuid1
())
# timestamp with now
self
.
message
[
"header"
][
"timestamp"
]
=
datetime
.
now
().
strftime
(
"%Y-%m-%dT%H:%M:%SZ"
)
# the source of the message
self
.
message
[
"header"
][
"source"
]
=
"ecosub_something"
# message destination
self
.
message
[
"header"
][
"destination"
]
=
"soar.noc.a.a.receive.platform_status"
# maybe?
self
.
message
[
"payload"
][
"platform_timestamp"
]
=
datetime
.
now
().
strftime
(
"%Y-%m-%dT%H:%M:%SZ"
)
# battery remaining in %. Robot shuts off when it reaches 9V i think
self
.
message
[
"payload"
][
"battery_remaining_capacity"
]
=
(
(
data
.
battery1_volts
-
self
.
battery_cutoff
)
*
100
/
(
self
.
battery_full
-
self
.
battery_cutoff
)
)
# the following are retreived from the status subscribed
self
.
message
[
"payload"
][
"latitude"
]
=
data
.
latitude
self
.
message
[
"payload"
][
"longitude"
]
=
data
.
longitude
self
.
message
[
"payload"
][
"depth"
]
=
data
.
depth
self
.
message
[
"payload"
][
"altitude"
]
=
data
.
altitude
self
.
message
[
"payload"
][
"heading"
]
=
data
.
heading
self
.
message
[
"payload"
][
"thrust_applied"
]
=
data
.
RPM
self
.
message
[
"payload"
][
"speed_over_ground"
]
=
data
.
forward_speed
def
comms_bb_sender
(
self
,
message
):
json_message
=
json
.
loads
(
message
)
# Load String
validation
=
self
.
adapter
.
validate
(
json_message
)
if
validation
.
valid
:
message_type
=
self
.
adapter
.
protocol
.
getType
(
message
)
message
=
self
.
adapter
.
protocol
.
encode
(
message_type
,
message
)
metadata
=
message
.
get
(
"metadata"
,
{})
topic
=
metadata
.
get
(
"destination"
,
"broadcast"
)
try
:
if
topic
==
"broadcast"
:
self
.
adapter
.
broadcast
(
message
)
else
:
self
.
adapter
.
publish
(
topic
,
message
)
except
requests
.
exceptions
.
RequestException
as
error
:
print
(
f
"status
{
error
.
response
.
status_code
}
reason:
{
error
.
response
.
reason
}
"
)
# def generate_platform_status_json(self):
# pass
def
update_platform_status
(
self
):
# generate json message
# message_to_send = self.generate_platform_status_json(data.data)
self
.
comms_bb_sender
(
self
.
message
)
# # ROS Service Example (Feel free to remove this.)
# def planning_config_cb(self, data):
# rospy.loginfo(rospy.get_caller_id() + "I heard %s", data)
# self.planning_config = data
# self.planning_config_ID = data.planning_config_ID
# def backbone_publisher_server(self):
# # add condition for service to be successful
# if self.planning_config and (self.planning_config_ID != old_config_ID):
# success = True
# old_config_ID = self.planning_config_ID
# self.start_autonomy_engine()
# return StartExecutionResponse(success)
# else:
# success = False
# return StartExecutionResponse(success)
# def start_autonomy_engine(self):
# # start sutonomy engine
# pass
if
__name__
==
"__main__"
:
try
:
rospy
.
init_node
(
"backbone_publisher"
)
receiver
=
BackbonePublisherHandler
()
rospy
.
spin
()
except
rospy
.
ROSInterruptException
:
# rospy.ROSInitException:
rospy
.
loginfo
(
"Backbone Listener interrupted!"
)
pass
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment