Source code for rigid_body_motion.ros.utils

""""""

import atexit
import warnings

import numpy as np


[docs] def init_node(name, start_master=False): """Register a client node with the master. Parameters ---------- name: str Name of the node. start_master: bool, default False If True, start a ROS master if one isn't already running. Returns ------- master: ROSLaunchParent or ROSMasterStub instance If a ROS master was started by this method, returns a ``ROSLaunchParent`` instance that can be used to shut down the master with its ``shutdown()`` method. Otherwise, a ``ROSMasterStub`` is returned that shows a warning when its ``shutdown()`` method is called. """ import roslaunch import rospy class ROSMasterStub: @staticmethod def shutdown(): warnings.warn( "ROS master was started somewhere else and cannot be shut " "down." ) try: rospy.get_master().getPid() except ConnectionRefusedError: if start_master: master = roslaunch.parent.ROSLaunchParent( "master", [], is_core=True ) master.start() # make sure master is shut down on exit atexit.register(master.shutdown) else: raise RuntimeError("ROS master is not running.") else: master = ROSMasterStub() rospy.init_node(name) return master
[docs] def play_publisher(publisher, step=1, speed=1.0, skip=None, timestamps=None): """Interactive widget for playing back messages from a publisher. Parameters ---------- publisher: object Any object with a ``publish`` method that accepts an ``idx`` parameter and publishes a message corresponding to that index. step: int, default 1 Difference in indexes between consecutive messages, e.g. if ``step=2`` every second message will be published. speed: float, default 1.0 Playback speed. skip: int, optional Number of messages to skip with the forward and backward buttons. timestamps: array_like, datetime64 dtype, optional Timestamps of publisher messages that determine time difference between messages and total number of messages. The time difference is calculated as the mean difference between the timestamps, i.e. it assumes that the timestamps are more or less regular. If not provided, the publisher must have a ``timestamps`` attribute which will be used instead. """ from IPython.core.display import display from ipywidgets import widgets if timestamps is None: timestamps = np.asarray(publisher.timestamps) interval = np.mean(np.diff(timestamps.astype(float) / 1e6)) / speed # position bar s_idx = widgets.IntSlider( min=0, max=len(timestamps) - 1, value=0, description="Index" ) # forward button def button_plus(name): s_idx.value += skip or step if s_idx.value < s_idx.max else 0 forward = widgets.Button( description="►►", layout=widgets.Layout(width="50px") ) forward.on_click(button_plus) # backward button def button_minus(name): s_idx.value -= skip or step if s_idx.value < s_idx.max else 0 backward = widgets.Button( description="◄◄", layout=widgets.Layout(width="50px") ) backward.on_click(button_minus) # play button play = widgets.Play( interval=int(interval * step), value=0, min=s_idx.min, max=s_idx.max, step=step, description="Press play", disabled=False, ) widgets.jslink((play, "value"), (s_idx, "value")) # layout ui = widgets.HBox([s_idx, backward, play, forward]) out = widgets.interactive_output(publisher.publish, {"idx": s_idx}) display(ui, out)