import networkx as nx from math import log import numpy as np def _UCT(mu_j, c_p, n_p, n_j): if n_j ==0: return float("Inf") return mu_j + 2*c_p *(2*log(n_p)/n_j)**0.5 class ActionDistribution: """ Action Distribution Working with action sequences and their respective probability An Action Sequence here refers to the node_id of the relevant node in the Tree. The actual action sequence to get to that node is stored inside the node. To initialise, Inputs: - X: list of node_ids (ints) - n: number of action sequences for comms """ def __init__(self, X, n, beta=0.5): # Starting distribution is uniform self.q = [1/n] * n # Action sequence as provided assert(len(X)==n) self.X = X # Beta value set to initial value self.beta = beta self.n = n def best_action(self): """ Most likely action sequence """ return self.X[np.argmax(self.q)] def random_action(self): """ Weighted random out of possible action sequences """ return np.random.choice(self.X, p=self.q) def update(self, Xnew): """ Update Distribution """ # If new Action sequences to consider, reset if set(Xnew) != set(self.X): self.X = Xnew self.q = [1/self.n] * self.n else: Eqn = "TODO" class Tree: """ DecMCTS tree To Initiate, Inputs: - data - data required to calculate reward, available options - reward - This is a function which has inputs (data, state) and returns the global reward to be maximised - It may be the case that the reward function can only calculated once simulation is complete, in which case it should return "None" while simulation is incomplete - available_actions - This is a function which has inputs (data, state) and returns the possible actions which can be taken - c_p - exploration multiplier (number between 0 and 1) Usage: - grow - grow MCTS tree by 1 node - send_comms - get state of this tree to communicate to others - receive_comms - Input the state of other trees for use in calculating reward/available actions in coordination with others """ def __init__(self, data, reward_func, avail_actions_func, c_p=1): self.data = data self.graph = nx.DiGraph() self.reward = reward_func self.available_actions = avail_actions_func self.c_p = c_p self.comms = {} # Plan with no robots initially # TODO CHANGE FOLLOWING LINE self.my_action_distribution = ActionDistribution(X,n) # Graph self.graph.add_node(1, mu=0, N=0, action_seq=[], cost=0 ) def _parent(self, node_id): """ wrapper for code readability """ return list(self.graph.predecessors(node_id))[0] def _select(self, children): """ Select Child node which maximises UCT """ # N for parent n_p = self.graph.nodes[self._parent(children[0])]["N"] # UCT values for children uct = [_UCT(node["mu"], self.c_p, n_p, node["N"]) for node in map(self.graph.nodes.__getitem__, children)] # Return Child with highest UCT return children[np.argmax(uct)] def _childNodes(self, node_id): """ wrapper for code readability """ return list(self.graph.successors(node_id)) def _update_distribution(): return "TODO" def _get_state(self, node_id): """ Randomly select 1 path taken by every other robot & path taken by this robot to get to this node """ node_path = self.graph.nodes[node_id]["action_seq"] other_paths = [] for return "TODO" def grow(self, nsims=10, ntime=None): """ Grow Tree by one node """ ### SELECTION start_node = 1 while len(self._childNodes(start_node))>0: start_node = self._select(self._childNodes(start_node)) ### EXPANSION state = self._get_state(start_node) options = self.available_actions(self.data, state) def send_comms(self): return self.my_action_distribution def receive_comms(self, comms_in, robot_id): """ Save data which has been communicated to this tree Only receives from one robot at a time, call once for each robot Inputs: - comms_in - An Action distribution object - robot_id - Robot number/id - used as key for comms """ self.comms[robot_id] = comms_in return True