import numpy as np import time import pickle from bisect import bisect_left from .planning_tools import planTrajectory, JTrajectory from .simEnv import SimEnv import matplotlib.pyplot as plt def _emptyTraj(joints, arm): """ Creates an JTrajectory with one point """ traj = JTrajectory([joints], [0], arm) return traj class Plan: """ Holds the execution plan for n arms """ def __init__(self, sim_env, start_joints = None ): self.cc = sim_env self.n = sim_env.numRobots if start_joints==None: self.start_joints = [sim_env.home] * self.n else: self.start_joints = start_joints self.trajs = {i:[_emptyTraj(self.start_joints[i],i)] for i in range(self.n)} # time at the end of each trajectory self.cumul_time = {i:[0] for i in range(self.n)} def jointsAtTime(self, t, arm): """ state of joints for an arm given time from start """ # if no plan yet, still at start_joints if len(self.trajs[arm])==1 or t==0: return self.start_joints[arm] nth_traj = bisect_left(self.cumul_time[arm], t) if nth_traj == len(self.cumul_time[arm]): # return the final state of arm return self.trajs[arm][-1].positions[-1] # else search for time in nth_traj # TODO can switch this to bisect too tp = self.trajs[arm][nth_traj].time c_t = self.cumul_time[arm][nth_traj-1] wp_i = next(i for i,v in enumerate(tp) if v + c_t >= t) # linear interpolate between the waypoints j1 = np.array(self.trajs[arm][nth_traj].positions[wp_i-1]) j2 = np.array(self.trajs[arm][nth_traj].positions[wp_i]) t1 = tp[wp_i-1] + c_t t2 = tp[wp_i] + c_t return (t-t1)/(t2-t1)*(j2-j1) + j1 def appendTrajectory(self, trajectories, arm, cc_res=0.1, reserve_future=20): """ verify that trajectories can be added to an arm's plan and add them if no collisions. cc_res is time interval between collision checks in seconds reserve future ensures that for (10) seconds after the trajectory is added, there is still no collisions (making sure inevitable collisions after trajectory do not occur and there is time to leave the position) """ assert all([isinstance(t, JTrajectory) for t in trajectories]), \ "Input is not a list of JTrajectories" # TODO check that trajectories start where last one ended # add trajectories to plan temporarily self.trajs[arm] = self.trajs[arm] + trajectories for traj in trajectories: self.cumul_time[arm].append(self.cumul_time[arm][-1] + traj.time[-1]) # collision check along newly added # TODO move to binary search style traversal (faster collision finding) for t in np.arange( self.cumul_time[arm][-1-len(trajectories)], self.cumul_time[arm][-1] + reserve_future, cc_res): joints_at_t = [self.jointsAtTime(t, arm) for arm in range(self.n)] if self.cc.colCheck(joints_at_t) > 0: # remove trajectories from plan del self.trajs[arm][-len(trajectories):] del self.cumul_time[arm][-len(trajectories):] print("traj add_failed") return 1 # 1 to indicate failed # If reached this point, no collision, trajs remain in plan return 0 # success def getLastJoint(self, arm): """ The joint position arm is at the end of the plan so far """ return self.jointsAtTime(self.cumul_time[arm][-1], arm) def plan_to(self, joints, arm): """ Create a trajectory from the last position in plan to new joint position """ j1 = self.getLastJoint(arm) j2 = joints t_start = self.cumul_time[arm][-1] # TODO time swept should be estimated by distance between j1 and j2 # in configuration space t_end = t_start + 10 joint_list = planTrajectory(arm, j1, j2, self, t_start, t_end) if joint_list is None: return None return JTrajectory(joint_list, np.arange(0,len(joint_list)/4, 0.25), arm) def watch(self, t=10): """ watch plan in pybullet GUI (sped up) """ t_max = max([self.cumul_time[r][-1] for r in self.cumul_time]) speed = t_max/t tsteps = int(t/0.1) for t in range(tsteps): for r in range(self.n): j = self.jointsAtTime(t*speed*0.1, r) self.cc.setJoints(r,j) time.sleep(0.1) return def visualise(self): """ Display the plan as a graph showing when execting and when not """ print("green vertical line mark start, red vertical linesmark end of contour") fig, ax = plt.subplots() for r in range(self.n): for t in range(1,len(self.trajs[r])): if self.trajs[r][t].contour is not None: ax.plot([self.cumul_time[r][t]- self.trajs[r][t].time[-1], self.cumul_time[r][t]], [r,r], 'bo-', linewidth=2) ax.plot([self.cumul_time[r][t]- self.trajs[r][t].time[-1], self.cumul_time[r][t]- self.trajs[r][t].time[-1]], [0,self.n-1], 'g--', linewidth=0.5) ax.plot([self.cumul_time[r][t], self.cumul_time[r][t]], [0,self.n-1], 'r--', linewidth=0.5) plt.xlabel("Time") plt.ylabel("Robots") plt.show() return def save(self, filename): """ pickle this plan """ with open(filename, 'wb') as f: pickle.dump( (self.cc.env_desc, self.cc.home, self.start_joints, self.cumul_time, self.trajs), f, protocol=2) return def len(self): """ Length of a plan is time of the arm with the longest plan """ return max([a[-1] for a in self.cumul_time.values()]) def extrusion_time(self): """ time spent extruding """ t = 0 for arm in self.trajs: for traj in self.trajs[arm]: if traj.contour is not None: t += traj.time[-1] return t def load_plan(filename, gui=False): """ load a saved plan You cannot have existing pybullet instance going """ with open(filename, 'rb') as f: env_desc, home, start_joints, ctime, trajs = pickle.load(f) env = SimEnv(env_desc, gui=gui, home=home) plan = Plan(env, start_joints) plan.trajs = trajs plan.cumul_time = ctime return plan