from copy import copy, deepcopy from pynput import keyboard from enum import Enum from math import pi import time import warnings import sys import rospy import moveit_commander import moveit_msgs.msg import geometry_msgs.msg from trajectory_msgs.msg import ( JointTrajectoryPoint, JointTrajectory ) from sensor_msgs.msg import JointState from std_msgs.msg import String from moveit_commander.conversions import pose_to_list from moveit_msgs.srv import GetPositionFK, GetPositionFKRequest, GetPositionFKResponse from moveit_msgs.srv import GetPositionIK, GetPositionIKRequest, GetPositionIKResponse from moveit_msgs.msg import RobotState class CalibrationType(Enum): centre = 1 tiny_grid = 4 small_grid = 9 medium_grid = 16 large_grid = 25 class Calibration: """ Stores calibration parameters """ def __init__(self, calibration_type): assert(isinstance(calibration_type, CalibrationType)) self.cb_t = calibration_type self.cb_joints = [] self.cb_pose = [] self.complete = False def push_parameter(self, joints, pose): """ Save the joint angles to reach particular point in grid param should be a list of floats (joint angles) (not asserted) """ if len(self.cb_parameters) == self.cb_t.value: warnings.warn("all parameters are already set, ignoring this input") return False self.cb_parameters.append(param) if len(self.cb_parameters) == self.cb_t.value: print "Calibration Complete" return False return True class controller: def __init__(self): """ Initiate connection to UR5 """ self.node = rospy.init_node("py_controller", anonymous = False) self.traj = rospy.Publisher("scaled_pos_traj_controller/command", JointTrajectory, queue_size=1) self.joint_state = None self.calibration = None self._temp_calib = None time.sleep(1) # initialise MoveIt commander moveit_commander.roscpp_initialize(sys.argv) self.robot = moveit_commander.RobotCommander() self.scene = moveit_commander.PlanningSceneInterface() self.group = moveit_commander.MoveGroupCommander("manipulator") display_trajectory_publisher = rospy.Publisher('/move_group/display_planned_path', moveit_msgs.msg.DisplayTrajectory, queue_size=20) # Potentially useful but not used right now # self.planning_frame = self.group.get_planning_frame() self.ee_link = self.group.get_end_effector_link() # self.group_names = self.robot.get_group_names() print self.robot.get_current_state() def fk(self, joints): """ compute forward kinematics Using /compute_fk rosservice - which doesnt work """ rospy.wait_for_service('compute_fk') try: moveitFK = rospy.ServiceProxy('compute_fk', GetPositionFK) except rospy.ServiceException as e: rospy.logerr("Service call failed: %s"%e) req = GetPositionFKRequest() req.header.frame_id = 'world' req.fk_link_names = ['ee_link'] req.robot_state.joint_state.name = self.group.get_joints() req.robot_state.joint_state.position = joints return moveitFK.call(req) def ik(self, pose, start_angle = None): """ Inverse Kinematics Using /compute_ik rosservice - this one actually works """ if start_angle == None: start_angle = con.group.get_current_joint_values() rospy.wait_for_service('compute_ik') try: moveitIK = rospy.ServiceProxy('compute_ik', GetPositionIK) except rospy.ServiceException as e: rospy.logerr("Service call failed: %s"%e) req = GetPositionIKRequest() req.ik_request.ik_link_name = 'ee_link' req.ik_request.group_name = 'manipulator' req.ik_request.robot_state.joint_state.name = con.group.get_joints() req.ik_request.robot_state.joint_state.position = start_angle req.ik_request.pose_stamped = pose return moveitIK.call(req) def exec_traj(self, traj): """ execute a JointTrajectory object """ rospy.loginfo("sending trajectroy to UR5") self.traj.publish(traj) def move_to(self, joint_angles, max_speed = pi/100): # TODO TODO linear interpolation for larger angles - if needed? """ move the robot to particular joint angle gets current joints and linear interps to desired joints and executes max speed is in rad/s, deafault 10s to do a 180 """ self.group.go(joint_angles) def get_joints(self): return self.group.get_current_joint_values() def empty_traj(self): """ Empty trajectory with Joint names filled in """ t = JointTrajectory() t.joint_names = self.joint_state.name return t def _internal_move_endpoint(self, dx, dy, dz): """ move the endpoint of the arm from where it currenlty is """ # step 1 get current pose now = self.group.get_current_pose().pose # step 2 shift pose now.position.x += dx now.position.y += dy now.position.z += dz # step 3 compute cartesian plan (plan, frac) = self.group.compute_cartesian_path([now], 0.01, 0.0) # step 4 execute plan self.group.execute(plan) def _internal_on_press(self, key): """ CALIBRATION component: When a key is pressed during calibration, this is triggered and takes various actions depending on the key """ if key == keyboard.Key.up: self._internal_move_endpoint( 0.01, 0, 0) elif key == keyboard.Key.down: self._internal_move_endpoint(-0.01, 0, 0) elif key == keyboard.Key.left: self._internal_move_endpoint(0, 0.01, 0) elif key == keyboard.Key.right: self._internal_move_endpoint(0, -0.01, 0) elif key == keyboard.Key.page_up: self._internal_move_endpoint(0, 0, 0.01) elif key == keyboard.Key.page_down: self._internal_move_endpoint(0, 0, -0.01) elif key == keyboard.Key.enter: # save current pose into calibaration return self._temp_calib.push_parameter( self.group.get_current_joint_values(), self.group.get_current_pose() ) def _internal_on_release(self, key): """ Stop calibration if escape key pressed """ if key == keyboard.Key.esc: return False def calibrate(self, cb_type): """ Calibrate robot by moving endpoint using arrow keys Press enter to lock in a calibration point """ self.face_down() self._temp_calib = Calibration(cb_type) with keyboard.Listener( on_press = self._internal_on_press, on_release = self._internal_on_release) as listener: listener.join() if not self._temp_calib.complete: print "Quitting calibration without saving" else: self.calibration = self._temp_calib def face_down(self): """ Make the robot point downwards """ # Change to be at current position but facign down p = self.group.get_current_pose() p.pose.orientation.x = 0.5 p.pose.orientation.y = 0.5 p.pose.orientation.z = -0.5 p.pose.orientation.w = 0.5 self.group.set_pose_target(p) plan = self.group.go(wait=True) group.stop() # residual motion group.clear_pose_targets() return self.group.get_current_pose() def contour2traj(self, contour): """ Convert a contour to a arm trajectory: 1. shift points to calibration 2. convert positions to poses 3. get IK for each pose 4. save points and timestamp in trajectory """ # Get calibration pose: p0 = self.calibaration.cb_pose[0] p_last = deepcopy(p0) joints = [] for position in contour.pos: pose = deepcopy(p0) pose[-1].pose.position.x += position[0]/1e3 pose[-1].pose.position.y += position[1]/1e3 pose[-1].pose.position.z += position[2]/1e3 # Get IK for all the poses j_s = self.ik(pose, start_angle = p_last) p_last = j_s.solution.joint_state.position joints.append(p_last) # Append to trajectory if __name__=="__main__": ######################################## print "Testing joint angle subscriber" con = controller() # print con.get_joints() ######################################## # print "Testing Joint Trajectory controller" # # t = con.empty_traj() # js1 = (-1.3825197219848633,-1.7300764522948207,1.800080776214599,-1.4899586004069825,1.5598526000976562,0.7699174880981445) # js2 = (-1.3825197219848633,-1.7300764522948207,2.323414109547933,-1.4899586004069825,1.5598526000976562,0.7699174880981445) # # p1 = JointTrajectoryPoint() # p1.positions = copy(js1) # p1.time_from_start = rospy.Time(0) # p2 = JointTrajectoryPoint() # p2.positions = copy(js2) # p2.time_from_start = rospy.Time(3) # p3 = JointTrajectoryPoint() # p3.positions = copy(js1) # p3.time_from_start = rospy.Time(6) # t.points.append(p1) # t.points.append(p2) # t.points.append(p3) # # print t # con.exec_traj(t)