controller.py 9.58 KB
Newer Older
1
2
3
from copy import copy, deepcopy
from pynput import keyboard
from enum import Enum
4
from math import pi
5
6
import time
import warnings
7
import sys
8

9

Jayant Khatkar's avatar
Jayant Khatkar committed
10
import rospy
11
12
13
import moveit_commander
import moveit_msgs.msg
import geometry_msgs.msg
Jayant Khatkar's avatar
Jayant Khatkar committed
14
15
16
17
18
from trajectory_msgs.msg import (
    JointTrajectoryPoint,
    JointTrajectory
)
from sensor_msgs.msg import JointState
19
20
from std_msgs.msg import String
from moveit_commander.conversions import pose_to_list
21
22
23
from moveit_msgs.srv import GetPositionFK, GetPositionFKRequest, GetPositionFKResponse
from moveit_msgs.srv import GetPositionIK, GetPositionIKRequest, GetPositionIKResponse
from moveit_msgs.msg import RobotState
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


class CalibrationType(Enum):
    centre = 1
    tiny_grid = 4
    small_grid = 9
    medium_grid = 16
    large_grid = 25


class Calibration:
    """
    Stores calibration parameters
    """

    def __init__(self, calibration_type):

        assert(isinstance(calibration_type, CalibrationType))

        self.cb_t = calibration_type
44
45
        self.cb_joints = []
        self.cb_pose = []
46
47
        self.complete = False

48
    def push_parameter(self, joints, pose):
49
50
51
52
53
54
55
        """
        Save the joint angles to reach particular point in grid
        param should be a list of floats (joint angles) (not asserted)
        """

        if len(self.cb_parameters) == self.cb_t.value:
            warnings.warn("all parameters are already set, ignoring this input")
56
            return False
57
58
59
60
61

        self.cb_parameters.append(param)

        if len(self.cb_parameters) == self.cb_t.value:
            print "Calibration Complete"
62
63
64
            return False

        return True
Jayant Khatkar's avatar
Jayant Khatkar committed
65

Jayant Khatkar's avatar
Jayant Khatkar committed
66
67
68
69
70
71
72
73

class controller:

    def __init__(self):
        """
        Initiate connection to UR5
        """
        self.node = rospy.init_node("py_controller", anonymous = False)
Jayant Khatkar's avatar
Jayant Khatkar committed
74
        self.traj = rospy.Publisher("scaled_pos_traj_controller/command", JointTrajectory, queue_size=1)
Jayant Khatkar's avatar
Jayant Khatkar committed
75
        self.joint_state = None
76
77
        self.calibration = None
        self._temp_calib = None
Jayant Khatkar's avatar
Jayant Khatkar committed
78
        time.sleep(1)
Jayant Khatkar's avatar
Jayant Khatkar committed
79

80
81
82
83
84
85
86
87
88
89
90
91
        # initialise MoveIt commander
        moveit_commander.roscpp_initialize(sys.argv)

        self.robot = moveit_commander.RobotCommander()
        self.scene = moveit_commander.PlanningSceneInterface()
        self.group = moveit_commander.MoveGroupCommander("manipulator")

        display_trajectory_publisher = rospy.Publisher('/move_group/display_planned_path',
            moveit_msgs.msg.DisplayTrajectory,
            queue_size=20)

        # Potentially useful but not used right now
92
93
94
        # self.planning_frame = self.group.get_planning_frame()
        self.ee_link = self.group.get_end_effector_link()
        # self.group_names = self.robot.get_group_names()
95
96
97

        print self.robot.get_current_state()

Jayant Khatkar's avatar
Jayant Khatkar committed
98

99
100
101
    def fk(self, joints):
        """
        compute forward kinematics
Jayant Khatkar's avatar
Jayant Khatkar committed
102
103
        Using /compute_fk rosservice 
            - which doesnt work
104
105
106
107
108
109
110
111
112
        """

        rospy.wait_for_service('compute_fk')
        try:
            moveitFK = rospy.ServiceProxy('compute_fk', GetPositionFK)
        except rospy.ServiceException as e:
            rospy.logerr("Service call failed: %s"%e)

        req = GetPositionFKRequest()
Jayant Khatkar's avatar
Jayant Khatkar committed
113
        req.header.frame_id = 'world'
114
115
116
117
118
119
120
        req.fk_link_names = ['ee_link']
        req.robot_state.joint_state.name = self.group.get_joints()
        req.robot_state.joint_state.position = joints

        return moveitFK.call(req)


121
    def ik(self, pose, start_angle = None):
122
123
        """
        Inverse Kinematics
124
125
        Using /compute_ik rosservice
            - this one actually works
126
127
        """

Jayant Khatkar's avatar
Jayant Khatkar committed
128
        if start_angle == None:
129
130
            start_angle = con.group.get_current_joint_values()

131
132
        rospy.wait_for_service('compute_ik')
        try:
133
            moveitIK = rospy.ServiceProxy('compute_ik', GetPositionIK)
134
135
136
        except rospy.ServiceException as e:
            rospy.logerr("Service call failed: %s"%e)

137
138
139
140
141
142
143
144
        req = GetPositionIKRequest()
        req.ik_request.ik_link_name = 'ee_link'
        req.ik_request.group_name = 'manipulator'
        req.ik_request.robot_state.joint_state.name = con.group.get_joints() 
        req.ik_request.robot_state.joint_state.position = start_angle
        req.ik_request.pose_stamped = pose

        return moveitIK.call(req)
145
146


Jayant Khatkar's avatar
Jayant Khatkar committed
147
148
149
150
151
    def exec_traj(self, traj):
        """
        execute a JointTrajectory object
        """
        rospy.loginfo("sending trajectroy to UR5")
Jayant Khatkar's avatar
Jayant Khatkar committed
152
        self.traj.publish(traj)
Jayant Khatkar's avatar
Jayant Khatkar committed
153
154


155
    def move_to(self, joint_angles, max_speed = pi/100):
156
157
158
159
160
161
162
163
164
165
        # TODO TODO linear interpolation for larger angles - if needed?
        """
        move the robot to particular joint angle

        gets current joints and linear interps to desired joints
            and executes

        max speed is in rad/s, deafault 10s to do a 180
        """

166
        self.group.go(joint_angles)
Jayant Khatkar's avatar
Jayant Khatkar committed
167
168


Jayant Khatkar's avatar
Jayant Khatkar committed
169
170
171
    def get_joints(self):
        return self.group.get_current_joint_values() 

Jayant Khatkar's avatar
Jayant Khatkar committed
172
173
174
175
176
177
178
    def empty_traj(self):
        """
        Empty trajectory with Joint names filled in
        """
        t = JointTrajectory()
        t.joint_names = self.joint_state.name
        return t
Jayant Khatkar's avatar
Jayant Khatkar committed
179
180


181
182
183
184
    def _internal_move_endpoint(self, dx, dy, dz):
        """
        move the endpoint of the arm from where it currenlty is 
        """
185
186
        # step 1 get current pose
        now = self.group.get_current_pose().pose
187

188
189
190
191
        # step 2 shift pose
        now.position.x += dx
        now.position.y += dy
        now.position.z += dz
192

193
194
        # step 3 compute cartesian plan
        (plan, frac) = self.group.compute_cartesian_path([now], 0.01, 0.0)
195

196
197
        # step 4 execute plan
        self.group.execute(plan)
198
199


200
201
202
203
204
205
206
207
    def _internal_on_press(self, key):
        """
        CALIBRATION component:
        When a key is pressed during calibration, this is triggered
        and takes various actions depending on the key
        """

        if key == keyboard.Key.up:
208
            self._internal_move_endpoint( 0.01, 0, 0)
209
        elif key == keyboard.Key.down:
210
            self._internal_move_endpoint(-0.01, 0, 0)
211
        elif key == keyboard.Key.left:
212
            self._internal_move_endpoint(0,  0.01, 0)
213
        elif key == keyboard.Key.right:
214
            self._internal_move_endpoint(0, -0.01, 0)
215
        elif key == keyboard.Key.page_up:
216
            self._internal_move_endpoint(0, 0,  0.01)
217
        elif key == keyboard.Key.page_down:
218
            self._internal_move_endpoint(0, 0, -0.01)
219
220
        elif key == keyboard.Key.enter:
            # save current pose into calibaration
221
222
223
224
            return self._temp_calib.push_parameter(
                self.group.get_current_joint_values(),
                self.group.get_current_pose()
                )
225
226


227
    def _internal_on_release(self, key):
228
229
230
231
232
233
234
235
236
237
238
239
240
        """
        Stop calibration if escape key pressed
        """
        if key == keyboard.Key.esc:
            return False


    def calibrate(self, cb_type):
        """
        Calibrate robot by moving endpoint using arrow keys
        Press enter to lock in a calibration point
        """

Jayant Khatkar's avatar
Jayant Khatkar committed
241
        self.face_down()
242
243
244
245
246
247
248
249
250
251
252
253
254
        self._temp_calib = Calibration(cb_type)

        with keyboard.Listener(
                on_press = self._internal_on_press,
                on_release = self._internal_on_release) as listener:
            listener.join()

        if  not self._temp_calib.complete:
            print "Quitting calibration without saving"
        else:
            self.calibration = self._temp_calib


Jayant Khatkar's avatar
Jayant Khatkar committed
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
    def face_down(self):
        """
        Make the robot point downwards
        """

        # Change to be at current position but facign down
        p = self.group.get_current_pose()
        p.pose.orientation.x = 0.5
        p.pose.orientation.y = 0.5
        p.pose.orientation.z = -0.5
        p.pose.orientation.w = 0.5

        self.group.set_pose_target(p)
        plan = self.group.go(wait=True)
        group.stop() # residual motion
        group.clear_pose_targets()
        return self.group.get_current_pose()


274
275
276
277
278
279
280
281
282
    def contour2traj(self, contour):
        """
        Convert a contour to a arm trajectory:
            1. shift points to calibration
            2. convert positions to poses
            3. get IK for each pose
            4. save points and timestamp in trajectory
        """

Jayant Khatkar's avatar
Jayant Khatkar committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
        # Get calibration pose:
        p0 = self.calibaration.cb_pose[0]
        p_last = deepcopy(p0)

        joints = []
        for position in contour.pos:
            pose = deepcopy(p0)
            pose[-1].pose.position.x += position[0]/1e3
            pose[-1].pose.position.y += position[1]/1e3
            pose[-1].pose.position.z += position[2]/1e3

            # Get IK for all the poses
            j_s = self.ik(pose, start_angle = p_last)
            p_last = j_s.solution.joint_state.position
            joints.append(p_last)

            # Append to trajectory

301
302


Jayant Khatkar's avatar
Jayant Khatkar committed
303
if __name__=="__main__":
304

Jayant Khatkar's avatar
Jayant Khatkar committed
305
306
    ########################################
    print "Testing joint angle subscriber"
Jayant Khatkar's avatar
Jayant Khatkar committed
307
308
309

    con = controller()

310
#    print con.get_joints()
Jayant Khatkar's avatar
Jayant Khatkar committed
311

Jayant Khatkar's avatar
Jayant Khatkar committed
312
    ########################################
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#    print "Testing Joint Trajectory controller"
#
#    t = con.empty_traj()
#    js1 = (-1.3825197219848633,-1.7300764522948207,1.800080776214599,-1.4899586004069825,1.5598526000976562,0.7699174880981445)
#    js2 = (-1.3825197219848633,-1.7300764522948207,2.323414109547933,-1.4899586004069825,1.5598526000976562,0.7699174880981445)
#
#    p1 = JointTrajectoryPoint()
#    p1.positions = copy(js1)
#    p1.time_from_start = rospy.Time(0)
#    p2 = JointTrajectoryPoint()
#    p2.positions = copy(js2)
#    p2.time_from_start = rospy.Time(3)
#    p3 = JointTrajectoryPoint()
#    p3.positions = copy(js1)
#    p3.time_from_start = rospy.Time(6)
#    t.points.append(p1)
#    t.points.append(p2)
#    t.points.append(p3)
#
#    print t
#    con.exec_traj(t)