Source code for rigid_body_motion.ros.transformer

from threading import Thread

import numpy as np
from anytree import PreOrderIter

from rigid_body_motion.core import _resolve_rf
from rigid_body_motion.reference_frames import ReferenceFrame


[docs] class Transformer: """Wrapper class for tf2_ros.Buffer. Can be constructed from a ReferenceFrame instance. """
[docs] def __init__(self, cache_time=None): """Constructor. Parameters ---------- cache_time : float, optional Cache time of the buffer in seconds. """ import rospy import tf2_ros if cache_time is not None: self._buffer = tf2_ros.Buffer( cache_time=rospy.Duration.from_sec(cache_time), debug=False ) else: self._buffer = tf2_ros.Buffer(debug=False)
[docs] @staticmethod def from_reference_frame(reference_frame): """Construct Transformer instance from static reference frame tree. Parameters ---------- reference_frame : ReferenceFrame Reference frame instance from which to construct the transformer. Returns ------- Transformer Transformer instance. """ root = reference_frame.root # get the first and last timestamps for all moving reference frames # TODO float timestamps t_start_end = list( zip( *[ ( node.timestamps[0].astype(float) / 1e9, node.timestamps[-1].astype(float) / 1e9, ) for node in PreOrderIter(root) if node.timestamps is not None ] ) ) if len(t_start_end) == 0: transformer = Transformer(cache_time=None) else: # cache time from earliest start to latest end cache_time = np.max(t_start_end[1]) - np.min(t_start_end[0]) transformer = Transformer(cache_time=cache_time) for node in PreOrderIter(root): if isinstance(node, ReferenceFrame): if node.parent is not None: if node.timestamps is None: transformer.set_transform_static(node) else: transformer.set_transforms(node) else: raise NotImplementedError() return transformer
[docs] def set_transform_static(self, reference_frame): """Add static transform from reference frame to buffer. Parameters ---------- reference_frame : ReferenceFrame Static reference frame to add. """ from .msg import static_rf_to_transform_msg self._buffer.set_transform_static( static_rf_to_transform_msg(reference_frame), "default_authority" )
[docs] def set_transforms(self, reference_frame): """Add transforms from moving reference frame to buffer. Parameters ---------- reference_frame : ReferenceFrame Static reference frame to add. """ from .msg import make_transform_msg for translation, rotation, timestamp in zip( reference_frame.translation, reference_frame.rotation, reference_frame.timestamps, ): self._buffer.set_transform( make_transform_msg( translation, rotation, reference_frame.parent.name, reference_frame.name, timestamp.astype(float) / 1e9, ), "default_authority", )
[docs] def can_transform(self, target_frame, source_frame, time=0.0): """Check if transform from source to target frame is possible. Parameters ---------- target_frame : str Name of the frame to transform into. source_frame : str Name of the input frame. time : float, default 0.0 Time at which to get the transform. (0 will get the latest) Returns ------- bool True if the transform is possible, false otherwise. """ import rospy return self._buffer.can_transform( target_frame, source_frame, rospy.Time.from_sec(time) )
[docs] def lookup_transform(self, target_frame, source_frame, time=0.0): """Get the transform from the source frame to the target frame. Parameters ---------- target_frame : str Name of the frame to transform into. source_frame : str Name of the input frame. time : float, default 0.0 Time at which to get the transform. (0 will get the latest) Returns ------- t : tuple, len 3 The translation between the frames. r : tuple, len 4 The rotation between the frames. """ import rospy from .msg import unpack_transform_msg transform = self._buffer.lookup_transform( target_frame, source_frame, rospy.Time.from_sec(time) ) return unpack_transform_msg(transform, stamped=True)
[docs] def transform_vector(self, v, target_frame, source_frame, time=0.0): """Transform a vector from the source frame to the target frame. Parameters ---------- v : iterable, len 3 Input vector in source frame. target_frame : str Name of the frame to transform into. source_frame : str Name of the input frame. time : float, default 0.0 Time at which to get the transform. (0 will get the latest) Returns ------- tuple, len 3 Transformed vector in target frame. """ import rospy import tf2_geometry_msgs from .msg import make_vector_msg, unpack_vector_msg transform = self._buffer.lookup_transform( target_frame, source_frame, rospy.Time.from_sec(time) ) v_msg = make_vector_msg(v, source_frame, time) vt_msg = tf2_geometry_msgs.do_transform_vector3(v_msg, transform) return unpack_vector_msg(vt_msg, stamped=True)
[docs] def transform_point(self, p, target_frame, source_frame, time=0.0): """Transform a point from the source frame to the target frame. Parameters ---------- p : iterable, len 3 Input point in source frame. target_frame : str Name of the frame to transform into. source_frame : str Name of the input frame. time : float, default 0.0 Time at which to get the transform. (0 will get the latest) Returns ------- tuple, len 3 Transformed point in target frame. """ import rospy import tf2_geometry_msgs from .msg import make_point_msg, unpack_point_msg transform = self._buffer.lookup_transform( target_frame, source_frame, rospy.Time.from_sec(time) ) p_msg = make_point_msg(p, source_frame, time) pt_msg = tf2_geometry_msgs.do_transform_point(p_msg, transform) return unpack_point_msg(pt_msg, stamped=True)
[docs] def transform_quaternion(self, q, target_frame, source_frame, time=0.0): """Transform a quaternion from the source frame to the target frame. Parameters ---------- q : iterable, len 4 Input quaternion in source frame. target_frame : str Name of the frame to transform into. source_frame : str Name of the input frame. time : float, default 0.0 Time at which to get the transform. (0 will get the latest) Returns ------- tuple, len 4 Transformed quaternion in target med quaternion in target frame. """ import rospy import tf2_geometry_msgs from .msg import make_pose_msg, unpack_pose_msg transform = self._buffer.lookup_transform( target_frame, source_frame, rospy.Time.from_sec(time) ) p_msg = make_pose_msg((0.0, 0.0, 0.0), q, source_frame, time) pt_msg = tf2_geometry_msgs.do_transform_pose(p_msg, transform) return unpack_pose_msg(pt_msg, stamped=True)[1]
[docs] def transform_pose(self, p, o, target_frame, source_frame, time=0.0): """Transform a pose from the source frame to the target frame. Parameters ---------- p : iterable, len 3 Input position in source frame. o : iterable, len 3 Input orientation in source frame. target_frame : str Name of the frame to transform into. source_frame : str Name of the input frame. time : float, default 0.0 Time at which to get the transform. (0 will get the latest) Returns ------- pt : tuple, len 3 Transformed position in target frame. ot : tuple, len 4 Transformed orientation in target frame. """ import rospy import tf2_geometry_msgs from .msg import make_pose_msg, unpack_pose_msg transform = self._buffer.lookup_transform( target_frame, source_frame, rospy.Time.from_sec(time) ) p_msg = make_pose_msg(p, o, source_frame, time) pt_msg = tf2_geometry_msgs.do_transform_pose(p_msg, transform) return unpack_pose_msg(pt_msg, stamped=True)
[docs] class ReferenceFrameTransformBroadcaster: """TF broadcaster for the transform of a reference frame wrt another."""
[docs] def __init__( self, frame, base=None, publish_pose=False, publish_twist=False, subscribe=False, twist_cutoff=None, twist_outlier_thresh=None, ): """Constructor. Parameters ---------- frame : str or ReferenceFrame Reference frame for which to publish the transform. base : str or ReferenceFrame, optional Base reference wrt to which the transform is published. Defaults to the parent reference frame. publish_pose : bool, default False If True, also publish a PoseStamped message on the topic "/<frame>/pose". publish_twist : bool, default False If True, also publish a TwistStamped message with the linear and angular velocity of the frame wrt the base on the topic "/<frame>/twist". subscribe : bool or str, default False If True, subscribe to the "/tf" topic and publish transforms when messages are broadcast whose `child_frame_id` is the name of the base frame. If the frame is a moving reference frame, the transform whose timestamp is the closest to the broadcast timestamp is published. `subscribe` can also be a string, in which case this broadcaster will be listening for TF messages with this `child_frame_id`. """ import pandas as pd import rospy import tf2_ros from geometry_msgs.msg import PoseStamped, TwistStamped from tf.msg import tfMessage self.frame = _resolve_rf(frame) self.base = _resolve_rf(base or self.frame.parent) ( self.translation, self.rotation, self.timestamps, ) = self.frame.lookup_transform(self.base) if self.timestamps is None: self.broadcaster = tf2_ros.StaticTransformBroadcaster() else: self.timestamps = pd.Index(self.timestamps) self.broadcaster = tf2_ros.TransformBroadcaster() if publish_pose: self.pose_publisher = rospy.Publisher( f"/{self.frame.name}/pose", PoseStamped, queue_size=1, latch=True, ) else: self.pose_publisher = None if publish_twist: self.linear, self.angular = self.frame.lookup_twist( reference=base, represent_in=self.frame, cutoff=twist_cutoff, outlier_thresh=twist_outlier_thresh, ) self.twist_publisher = rospy.Publisher( f"/{self.frame.name}/twist", TwistStamped, queue_size=1, latch=True, ) else: self.twist_publisher = None if subscribe: self.subscriber = rospy.Subscriber( "/tf", tfMessage, self.handle_incoming_msg ) if isinstance(subscribe, str): self.subscribe_to_frame = subscribe else: self.subscribe_to_frame = self.base.name else: self.subscriber = None self.idx = 0 self.stopped = False self._thread = None
[docs] def publish(self, idx=None): """Publish a transform message. Parameters ---------- idx : int, optional Index of the transform to publish for a moving reference frame. Uses ``self.idx`` as default. """ from .msg import make_pose_msg, make_transform_msg, make_twist_msg if self.timestamps is None: transform = make_transform_msg( self.translation, self.rotation, self.base.name, self.frame.name, ) if self.pose_publisher is not None: pose = make_pose_msg( self.translation, self.rotation, self.base.name, ) else: idx = idx or self.idx ts = self.timestamps.values[idx].astype(float) / 1e9 transform = make_transform_msg( self.translation[idx], self.rotation[idx], self.base.name, self.frame.name, ts, ) if self.pose_publisher is not None: pose = make_pose_msg( self.translation[idx], self.rotation[idx], self.base.name, ts, ) self.broadcaster.sendTransform(transform) if self.pose_publisher is not None: self.pose_publisher.publish(pose) if self.twist_publisher is not None: self.twist_publisher.publish( make_twist_msg( self.linear[idx], self.angular[idx], self.frame.name ) )
[docs] def handle_incoming_msg(self, msg): """Publish on incoming message.""" import pandas as pd import rospy for transform in msg.transforms: if transform.child_frame_id == self.subscribe_to_frame: if self.timestamps is not None: ts = pd.to_datetime( rospy.Time.to_sec(transform.header.stamp), unit="s" ) idx = self.timestamps.get_loc(ts, method="nearest") self.publish(idx) else: self.publish()
def _spin_blocking(self): """Continuously publish messages.""" import pandas as pd import rospy self.stopped = False if self.subscriber is None and self.timestamps is not None: while not rospy.is_shutdown() and not self.stopped: self.publish() self.idx = (self.idx + 1) % len(self.timestamps) if isinstance(self.timestamps, pd.DatetimeIndex): dt = ( self.timestamps.values[self.idx].astype(float) / 1e9 - self.timestamps.values[self.idx - 1].astype(float) / 1e9 if self.idx > 0 else 0.0 ) else: dt = float( self.timestamps.values[self.idx] - self.timestamps.values[self.idx - 1] if self.idx > 0 else 0.0 ) rospy.sleep(dt) else: rospy.spin() self.stopped = True
[docs] def spin(self, block=False): """Continuously publish messages. Parameters ---------- block: bool, default False If True, this method will block until the publisher is stopped, e.g. by calling stop(). Otherwise, the main loop is dispatched to a separate thread which is returned by this function. Returns ------- thread: threading.Thread If `block=True`, the Thread instance that runs the loop. """ if self.timestamps is None: self.publish() elif block: self._spin_blocking() else: self._thread = Thread(target=self._spin_blocking) self._thread.start()
[docs] def stop(self): """Stop publishing.""" self.stopped = True