#!/usr/bin/env python DEBUG = True VERBOSE = True ## Python libraries from copy import deepcopy import time import sys ## 3rd party libraries # Scipy (1.2.3) from scipy.spatial.transform import Rotation # Numpy (1.16.6) import numpy as np # ROS Libraries import rospy from std_msgs.msg import Bool from trajectory_msgs.msg import ( JointTrajectoryPoint, JointTrajectory ) from actionlib_msgs.msg import GoalStatusArray # MOVEit import moveit_commander from moveit_msgs.msg._RobotTrajectory import RobotTrajectory ## Our libraries from extrudex.msg._ExtruderControl import ExtruderControl from extrudex.srv._Hotend import Hotend from utils import load_plan, JTraj2ROS class Controller(object): """ Controller for multiple robot arms to execute lists of configurations with timestamps. The timing of the arms include extrusion of filament. Frame names: - base_link - shoulder_link - upper_arm_link - forearm_link - wrist_1_link - wrist_2_link - wrist_3_link Joint names: - shoulder_pan_joint - shoulder_lift_joint - elbow_joint - wrist_1_joint - wrist_2_joint - wrist_3_joint """ def __init__(self, close_deg=1.0, timeScale=1.0, blocking=True, extruderTemp_C=190.0, hotend_srv='hotend', retractionDist_mm=12.0, retractionSpeed_mmpmin=4800.0, disable_extruder=False, robot=0): """ Creates a controller to multiple robot arms. (TODO) close_deg: Euclidean distance of configurations within which two configurations are considered "close" timeScale: Scaling factor of time robot can be 0 (if only one robot is turned on (not using 2xbringup), 1 for r1, 2 for r2 """ ## Initialise ROS things self.node = rospy.init_node('py_controller', anonymous = False) self.extrusion_pub = rospy.Publisher('extrusion_cmd',ExtruderControl, queue_size=2) self.extruderReady_sub = rospy.Subscriber('ready',Bool,self._extruderReady_cb, queue_size=1) self.scaledPosTraj_pub = rospy.Publisher('scaled_pos_traj_controller/command',JointTrajectory, queue_size=1) self.motion_feedback_sub = rospy.Subscriber('scaled_pos_traj_controller/follow_joint_trajectory/status',GoalStatusArray, self._update_motion_status) time.sleep(0.1) if not disable_extruder: print("Waiting for Hotend service") rospy.wait_for_service(hotend_srv) try: self.extruderSetTemp_srv = rospy.ServiceProxy(hotend_srv,Hotend) except rospy.ServiceException as e: print('Service call failed: {}'.format(e)) # initialise MoveIt commander moveit_commander.roscpp_initialize(sys.argv) if robot == 0: self.robot = moveit_commander.RobotCommander() self.scene = moveit_commander.PlanningSceneInterface() self.group = moveit_commander.MoveGroupCommander('manipulator') else: self.robot = moveit_commander.RobotCommander( "/r" + str(robot) + "/robot_description", ns="r" + str(robot) ) self.scene = moveit_commander.PlanningSceneInterface(ns="r"+ str(robot)) self.group = moveit_commander.MoveGroupCommander( 'manipulator', robot_description="r" + str(robot) + "/robot_description", ns="r" + str(robot) ) self.group.set_max_velocity_scaling_factor(timeScale) self.group.set_max_acceleration_scaling_factor(timeScale**2) self.close_deg = close_deg self.timeScale = timeScale self.retractionDist_mm = retractionDist_mm self.retractionTime_s = retractionDist_mm/retractionSpeed_mmpmin * 60.0 self.extruderReady = False self.traj_complete = False # Start extruder if not disable_extruder: self.set_extruder_temp(extruderTemp_C) if blocking: self.wait_extruder_ready() ## Arm methods def get_joint_names(self): """ Returns the list of joint names in the order the controller uses """ return self.group.get_active_joints() def empty_traj(self): """ Creates an empty JointTrajectory with joint names of the arm prefilled. Returns JointTrajectory object. """ t = JointTrajectory() t.joint_names = self.get_joint_names() return t def exec_ctraj(self,jtraj, wait=True, contour=None): """ Using the jtraj: JTrajectory object to execute Returns True if the JointTrajectory is executed; False otherwise. """ plan = RobotTrajectory() plan.joint_trajectory = JTraj2ROS(jtraj) # NOTE: This can only be done safely becuase assuming that the current # joint position is already very close to where we want it to be cj = self.group.get_current_joint_values() sj = plan.joint_trajectory.points[0].positions if any([0.01 < abs(cj[i]-sj[i]) for i in range(6)]): print("Potentially would've failed here, Retrying") #time.sleep(0.1) self.group.go(plan.joint_trajectory.points[0].positions, wait=True) return False if contour is not None: print("THIS NEXT TRAJ REQUIRES EXTRUSION") self.preprint_load() self.extrude(contour.Ext[-1], jtraj.time[-1]) #rospy.loginfo('Sending trajectory to UR5e') self.traj_complete = False self.group.execute(plan, wait=False) if wait: s = time.time() t = jtraj.time[-1] while not self.traj_complete or time.time() - s < t - 1: if time.time() - s > t + 1: Exception("traj_complete failed") break self.traj_complete = False if contour is not None: self.postprint_retract() return True def exec_traj(self,traj): """ Executes a JointTrajectory object NOTE: If the arm is not at the first JointTrajectoryPoint, then the trajectory is not executed. traj: JointTrajectory object to execute Returns True if the JointTrajectory is executed; False otherwise. """ rospy.loginfo('Sending trajectory to UR5e') # Scale trajectory's temporal aspect scaledTraj = deepcopy(traj) for (iPoint,point) in enumerate(traj.points): # Scale timestamps t = point.time_from_start.secs + 1e-9*point.time_from_start.nsecs scaledT = t/self.timeScale secs = int(scaledT) nsecs = int((scaledT-secs)*1e9) scaledTraj.points[iPoint].time_from_start.secs = secs scaledTraj.points[iPoint].time_from_start.nsecs = nsecs # Scale velocity scaledTraj.points[iPoint].velocities = [v*self.timeScale for v in scaledTraj.points[iPoint].velocities] # Scale acceleration timeScale2 = self.timeScale**2 scaledTraj.points[iPoint].accelerations = [a*timeScale2 for a in scaledTraj.points[iPoint].accelerations] plan = RobotTrajectory() plan.joint_trajectory = scaledTraj self.group.execute(plan, wait=True) return True def move_to(self,conf_next,near_deg = 1.0): """ Moves the arm to the specified configuration by linearly interpolating the joint angles. conf_next: 6x numpy array of joint angles in radians near_deg: joint distance limit in degrees [default 1.0]; if None, no check is done. """ if DEBUG: print('Controller::move_to()') if near_deg is not None: # List of joint angles conf_curr = np.array(self.get_joints()) # Make sure the new joint angles are not too different dConf = conf_next-conf_curr dist_rad = np.linalg.norm(dConf) dist_deg = np.rad2deg(dist_rad) if dist_deg > near_deg: rospy.logwarn('Attempted to move a joint distance of {} > {} deg'.format(dist_conf_deg,jointLim_deg)) return False self.group.go(conf_next) return True def is_close(self,trajPoint): """ Check if the arm is currently close to the configuration specified. Ignores timestamp. trajPoint: JointTrajectoryPoint configuration """ # TODO: Check if the joint order is the same conf_ref = np.array(trajPoint.positions) conf_curr = np.array(self.get_joints()) dConf = conf_ref-conf_curr dist_rad = np.linalg.norm(dConf) dist_deg = np.rad2deg(dist_rad) return dist_deg <= self.close_deg def get_joints(self): """ Return configuration of the arm in a list """ return self.group.get_current_joint_values() ## Extruder methods def extrude(self,ext_mm,t_s): """ Send raw message to move extrusion. ext_mm: float t_s: float """ if DEBUG: print('Controller::extrude({},{})'.format(ext_mm,t_s)) msg = ExtruderControl() msg.delta_e = ext_mm msg.delta_t = t_s self.extrusion_pub.publish(msg) def postprint_retract(self): self.extrude(-self.retractionDist_mm,self.retractionTime_s) time.sleep(self.retractionTime_s) def preprint_load(self): self.extrude(self.retractionDist_mm,self.retractionTime_s) time.sleep(self.retractionTime_s) def printFilament(self,ext_mm,t_s): """ Does a sequence of print with retractions. ext_mm: Tx t_s: Tx """ nStride = ext_mm.shape[0] self.preprint_load() for iStride in range(nStride): self.extrude(self,ext_mm,t_s) time.sleep(t_s) self.postprint_retract() def set_extruder_temp(self,temp_C): if VERBOSE: rospy.loginfo('Setting extruder temperature to {} C'.format(temp_C)) self.extruderTemp_C = temp_C self.extruderSetTemp_srv(self.extruderTemp_C) def wait_extruder_ready(self): if VERBOSE: rospy.loginfo('Waiting for extruder to heat up...') while not self.extruderReady: time.sleep(0.1) if VERBOSE: rospy.loginfo('Done!') def _extruderReady_cb(self,ready_boolMsg): self.extruderReady = ready_boolMsg.data def _update_motion_status(self, msg): if len(msg.status_list)>0: self.traj_complete = (msg.status_list[-1].status==3 or msg.status_list[-1].status==2) if msg.status_list[-1].status >= 4 and msg.status_list[-1].status!=6: print("Unexpected Trajectory Status") print("STATUS: {}".format(msg.status_list[-1].status)) print("For info on status meaning, go to:\n" + \ "http://docs.ros.org/en/melodic/api/actionlib_msgs/html/msg/GoalStatus.html") def getR_ET(): return Rotation.from_euler('XYZ',[-45,-90,0], degrees=True) if __name__=='__main__': con = Controller() # TODO: Add confirmation checks # display_trajectory_publisher = rospy.Publisher('/move_group/display_planned_path', # moveit_msgs.msg.DisplayTrajectory, # queue_size=20)