plan.py 7.11 KB
Newer Older
1
import numpy as np
2
import time
3
import pickle
4
from bisect import bisect_left
5
from .planning_tools import planTrajectory, JTrajectory
6
from .simEnv import SimEnv
7
import matplotlib.pyplot as plt
8
9


10
def _emptyTraj(joints, arm):
11
    """
Jayant Khatkar's avatar
Jayant Khatkar committed
12
    Creates an JTrajectory with one point
13
    """
14
    traj = JTrajectory([joints], [0], arm)
15
16
17
    return traj


18
class Plan:
19
    """
20
    Holds the execution plan for n arms
21
22
23
    """

    def __init__(self,
Jayant Khatkar's avatar
Jayant Khatkar committed
24
            sim_env,
25
            start_joints = None
26
27
            ):

Jayant Khatkar's avatar
Jayant Khatkar committed
28
29
        self.cc = sim_env
        self.n = sim_env.numRobots
30
31
32
33
        if start_joints==None:
            self.start_joints = [sim_env.home] * self.n
        else:
            self.start_joints = start_joints
34

35
36
        self.trajs = {i:[_emptyTraj(self.start_joints[i],i)] for i in range(self.n)}
                # time at the end of each trajectory
37
        self.cumul_time = {i:[0] for i in range(self.n)}
38
39


40
    def jointsAtTime(self, t, arm):
41
        """
42
        state of joints for an arm given time from start
43
        """
44

45
        # if no plan yet, still at start_joints
46
        if len(self.trajs[arm])==1 or t==0:
47
48
            return self.start_joints[arm]

49
        nth_traj = bisect_left(self.cumul_time[arm], t)
50
51
        if nth_traj == len(self.cumul_time[arm]):
            # return the final state of arm
52
            return self.trajs[arm][-1].positions[-1]
53

54
        # else search for time in nth_traj
55
56
        # TODO can switch this to bisect too
        tp = self.trajs[arm][nth_traj].time
57
        c_t = self.cumul_time[arm][nth_traj-1]
58
        wp_i = next(i for i,v in enumerate(tp) if v + c_t >= t)
59

60
        # linear interpolate between the waypoints
61
62
63
64
        j1 = np.array(self.trajs[arm][nth_traj].positions[wp_i-1])
        j2 = np.array(self.trajs[arm][nth_traj].positions[wp_i])
        t1 = tp[wp_i-1] + c_t
        t2 = tp[wp_i] + c_t
65

66
        return (t-t1)/(t2-t1)*(j2-j1) + j1
67

68

69
    def appendTrajectory(self, trajectories, arm, cc_res=0.1, reserve_future=20):
70
        """
71
72
        verify that trajectories can be added to an arm's plan
          and add them if no collisions.
73
74

        cc_res is time interval between collision checks in seconds
75
76
77
78

        reserve future ensures that for (10) seconds after the trajectory is added,
            there is still no collisions (making sure inevitable collisions after
            trajectory do not occur and there is time to leave the position)
79
        """
Jayant Khatkar's avatar
Jayant Khatkar committed
80
81
        assert all([isinstance(t, JTrajectory) for t in trajectories]), \
                "Input is not a list of JTrajectories"
82

83
84
85
86
87
        # TODO check that trajectories start where last one ended

        # add trajectories to plan temporarily
        self.trajs[arm] = self.trajs[arm] + trajectories
        for traj in trajectories:
88
            self.cumul_time[arm].append(self.cumul_time[arm][-1] +
89
                    traj.time[-1])
90
91
92
93
94

        # collision check along newly added
        # TODO move to binary search style traversal (faster collision finding)
        for t in np.arange(
                self.cumul_time[arm][-1-len(trajectories)],
95
                self.cumul_time[arm][-1] + reserve_future,
96
                cc_res):
97
            joints_at_t = [self.jointsAtTime(t, arm) for arm in range(self.n)]
Jayant Khatkar's avatar
Jayant Khatkar committed
98
            if self.cc.colCheck(joints_at_t) > 0:
99
100
101
                # remove trajectories from plan
                del self.trajs[arm][-len(trajectories):]
                del self.cumul_time[arm][-len(trajectories):]
102
                print("traj add_failed")
103
104
105
106
107

                return 1 # 1 to indicate failed

        # If reached this point, no collision, trajs remain in plan
        return 0 # success
108

109
110
111
112
113
114
115
116

    def getLastJoint(self, arm):
        """
        The joint position arm is at the end of the plan so far
        """
        return self.jointsAtTime(self.cumul_time[arm][-1], arm)


117
118
119
120
121
122
123
124
125
126
127
    def plan_to(self, joints, arm):
        """
        Create a trajectory from the last position in plan to new joint position
        """
        j1 = self.getLastJoint(arm)
        j2 = joints

        t_start = self.cumul_time[arm][-1]
        # TODO time swept should be estimated by distance between j1 and j2
        # in configuration space
        t_end = t_start + 10
128
        joint_list = planTrajectory(arm, j1, j2, self, t_start, t_end)
129
130
        if joint_list is None:
            return None
131
        return JTrajectory(joint_list, np.arange(0,len(joint_list)/4, 0.25), arm)
132
133


134
    def watch(self, t=10):
135
        """
136
        watch plan in pybullet GUI (sped up)
137
138
139
140
141
142
143
144
145
146
        """
        t_max = max([self.cumul_time[r][-1] for r in self.cumul_time])
        speed = t_max/t
        tsteps = int(t/0.1)
        for t in range(tsteps):
            for r in range(self.n):
                j = self.jointsAtTime(t*speed*0.1, r)
                self.cc.setJoints(r,j)
            time.sleep(0.1)
        return
147
148


149
    def visualise(self):
150
151
152
153
154
155
156
157
158
159
160
161
162
163
        """
        Display the plan as a graph showing when execting and when not
        """
        print("green vertical line mark start, red vertical linesmark end of contour")
        fig, ax = plt.subplots()
        for r in range(self.n):
            for t in range(1,len(self.trajs[r])):
                if self.trajs[r][t].contour is not None:
                    ax.plot([self.cumul_time[r][t]- self.trajs[r][t].time[-1],
                             self.cumul_time[r][t]],
                            [r,r],
                            'bo-',
                            linewidth=2)

164
165
166
167
168
169
170
171
172
173
174
                    ax.plot([self.cumul_time[r][t]- self.trajs[r][t].time[-1],
                             self.cumul_time[r][t]- self.trajs[r][t].time[-1]],
                            [0,self.n-1],
                            'g--',
                            linewidth=0.5)

                    ax.plot([self.cumul_time[r][t],
                             self.cumul_time[r][t]],
                            [0,self.n-1],
                            'r--',
                            linewidth=0.5)
175
176
177
178
179
180
        plt.xlabel("Time")
        plt.ylabel("Robots")
        plt.show()
        return


181
182
183
184
185
186
187
188
189
190
191
192
    def save(self, filename):
        """
        pickle this plan
        """

        with open(filename, 'wb') as f:
            pickle.dump(
                    (self.cc.env_desc,
                        self.cc.home,
                        self.start_joints,
                        self.cumul_time,
                        self.trajs),
193
                    f, protocol=2)
194
195
196
        return


197
198
199
200
201
202
203
    def len(self):
        """
        Length of a plan is time of the arm with the longest plan
        """
        return max([a[-1] for a in self.cumul_time.values()])


204
205
206
207
208
209
210
211
212
213
214
215
216
    def extrusion_time(self):
        """
        time spent extruding
        """
        t = 0
        for arm in self.trajs:
            for traj in self.trajs[arm]:
                if traj.contour is not None:
                    t += traj.time[-1]
        return t



217
218
219
220
221
222
223
224
225
226
227
228
229
230
def load_plan(filename, gui=False):
    """
    load a saved plan

    You cannot have existing pybullet instance going
    """
    with open(filename, 'rb') as f:
        env_desc, home, start_joints, ctime, trajs = pickle.load(f)

    env = SimEnv(env_desc, gui=gui, home=home)
    plan = Plan(env, start_joints)
    plan.trajs = trajs
    plan.cumul_time = ctime
    return plan