Skip to content
Snippets Groups Projects 9.52 KiB
Newer Older
Jayant Khatkar's avatar
Jayant Khatkar committed
import networkx as nx
from copy import copy
Jayant Khatkar's avatar
Jayant Khatkar committed
from math import log
import numpy as np

def _UCT(mu_j, c_p, n_p, n_j):
    if n_j ==0:
        return float("Inf")

    return mu_j + 2*c_p *(2*log(n_p)/n_j)**0.5
Jayant Khatkar's avatar
Jayant Khatkar committed
class ActionDistribution:
    Action Distribution
    Working with action sequences and their respective probability

    To initialise, Inputs:
    - X: list of action sequences
        - NOTE: X is is simply a state object returned by state_store.
            You are expected to store action sequence in this object
    - q: probability of each action sequence (normalised in intialisation)
Jayant Khatkar's avatar
Jayant Khatkar committed

    def __init__(self, X, q):
Jayant Khatkar's avatar
Jayant Khatkar committed
        # Action sequence as provided
        self.X = X

        # Normalise 
Jayant Khatkar's avatar
Jayant Khatkar committed
        if sum(q)==0:
Jayant Khatkar's avatar
Jayant Khatkar committed
            self.q = [1/float(len(q))] * len(q)
Jayant Khatkar's avatar
Jayant Khatkar committed
            self.q = (np.array(q).astype(float)/sum(q)).tolist()
Jayant Khatkar's avatar
Jayant Khatkar committed

    def best_action(self):
        Most likely action sequence
        return self.X[np.argmax(self.q)]

    def random_action(self):
        Weighted random out of possible action sequences
Jayant Khatkar's avatar
Jayant Khatkar committed
        return self.X[np.random.choice(len(self.q), p=self.q)]
Jayant Khatkar's avatar
Jayant Khatkar committed

class Tree:
    DecMCTS tree
    To Initiate, Inputs:
    - data
        - data required to calculate reward, available options
    - reward
        - This is a function which has inputs (data, state) and
            returns the GLOBAL reward to be maximised
Jayant Khatkar's avatar
Jayant Khatkar committed
    - available_actions
        - This is a function which has inputs (data, state) and
            returns the possible actions which can be taken
    - state_store
        - This is a function which has inputs 
            (data, parent_state, action) and returns an object to
            store in the node. 
        - Root Node has parent state None and action None.
    - sim_selection_func
        - This is a function which chooses an available of option
            during simulation (can be random or more advanced)
    - c_p
        - exploration multiplier (number between 0 and 1)

    - grow
        - grow MCTS tree by 1 node
    - send_comms
        - get state of this tree to communicate to others
    - receive_comms
        - Input the state of other trees for use in calculating
            reward/available actions in coordination with others

    def __init__(self,
Jayant Khatkar's avatar
Jayant Khatkar committed
Jayant Khatkar's avatar
Jayant Khatkar committed = data
Jayant Khatkar's avatar
Jayant Khatkar committed
        self.graph = nx.DiGraph()
Jayant Khatkar's avatar
Jayant Khatkar committed
        self.reward = reward_func
        self.available_actions = avail_actions_func
        self.sim_available_actions = sim_avail_actions_func
Jayant Khatkar's avatar
Jayant Khatkar committed
        self.sim_selection_func = sim_selection_func
Jayant Khatkar's avatar
Jayant Khatkar committed
        self.c_p = c_p = robot_id
Jayant Khatkar's avatar
Jayant Khatkar committed
        self.comms = {} # Plan with no robots initially
        self.comm_n = comm_n # number of action dists to communicate

        # Graph add root node of tree
Jayant Khatkar's avatar
Jayant Khatkar committed
                state=self.state_store(, None, None,
Jayant Khatkar's avatar
Jayant Khatkar committed
        # Set Action sequence as nothing for now
        self.my_act_dist = ActionDistribution([self.graph.node[1]["state"]],[1])

Jayant Khatkar's avatar
Jayant Khatkar committed

    def _parent(self, node_id):
        wrapper for code readability

        return list(self.graph.predecessors(node_id))[0]

Jayant Khatkar's avatar
Jayant Khatkar committed
    def _select(self, children):
        Select Child node which maximises UCT
Jayant Khatkar's avatar
Jayant Khatkar committed

Jayant Khatkar's avatar
Jayant Khatkar committed
        # N for parent
Jayant Khatkar's avatar
Jayant Khatkar committed
        n_p = self.graph.node[self._parent(children[0])]["N"]
Jayant Khatkar's avatar
Jayant Khatkar committed

        # UCT values for children
        uct = [_UCT(node["mu"], self.c_p, n_p, node["N"]) 
                for node in map(self.graph.nodes.__getitem__, children)]
Jayant Khatkar's avatar
Jayant Khatkar committed

        # Return Child with highest UCT
        return children[np.argmax(uct)]

    def _childNodes(self, node_id):
        wrapper for code readability

        return list(self.graph.successors(node_id))

    def _update_distribution(self):
        Get the top n Action sequences and their "probabilities"
            and store them for communication

        # For now, just using q = mu**2
        temp=nx.get_node_attributes(self.graph, "mu")
        temp.pop(1, None)

        top_n_nodes = sorted(temp, key=temp.get, reverse=True)[:self.comm_n]
        X = [self.graph.node[n]["best_rollout"] for n in top_n_nodes if self.graph.node[n]["N"]>0]
        q = [self.graph.node[n]["mu"]**2 for n in top_n_nodes if self.graph.node[n]["N"]>0]
        self.my_act_dist = ActionDistribution(X,q)
        return True

Jayant Khatkar's avatar
Jayant Khatkar committed

    def _get_system_state(self, node_id):
        Randomly select 1 path taken by every other robot & path taken by
            this robot to get to this node

        Returns tuple where first element is path of current robot,
            and second element is a dictionary of the other paths
Jayant Khatkar's avatar
Jayant Khatkar committed

        system_state = {k:self.comms[k].random_action() for k in self.comms}
Jayant Khatkar's avatar
Jayant Khatkar committed
        system_state[] = self.graph.node[node_id]["state"]
Jayant Khatkar's avatar
Jayant Khatkar committed

    def _null_state(self, state):
        temp = copy(state)
        temp[] = self.graph.node[1]["state"] # Null state is if robot still at root node
Jayant Khatkar's avatar
Jayant Khatkar committed
    def _expansion(self, start_node):
Jayant Khatkar's avatar
Jayant Khatkar committed
Jayant Khatkar's avatar
Jayant Khatkar committed
        Does the Expansion step for tree growing.
        Separated into it's own function because also done in
            Init step.
Jayant Khatkar's avatar
Jayant Khatkar committed
Jayant Khatkar's avatar
Jayant Khatkar committed
        options = self.available_actions(
Jayant Khatkar's avatar
Jayant Khatkar committed

        if len(options)==0:
            return False

        # create empty nodes underneath the node being expanded
        for o in options:

Jayant Khatkar's avatar
Jayant Khatkar committed
                    state=self.state_store(, self.graph.node[start_node]["state"], o,

            self.graph.add_edge(start_node, len(self.graph))
Jayant Khatkar's avatar
Jayant Khatkar committed
        return True

    def grow(self, nsims=10, gamma = 0.9, depth=10):
Jayant Khatkar's avatar
Jayant Khatkar committed
        Grow Tree by one node

        ### SELECTION
        start_node = 1

        # Sample actions of other robots
        # NOTE: Sampling done at the begining for dependency graph reasons
        state = self._get_system_state(start_node)

        # Propagate down the tree
brian.lee's avatar
brian.lee committed
        # check how _select handles mu, N = 0
Jayant Khatkar's avatar
Jayant Khatkar committed
        while len(self._childNodes(start_node))>0:
            start_node = self._select(self._childNodes(start_node))

        ### EXPANSION
brian.lee's avatar
brian.lee committed
        # check if _expansion changes start_node to the node after jumping
brian.lee's avatar
brian.lee committed
Jayant Khatkar's avatar
Jayant Khatkar committed
        avg_reward = 0
        best_reward = float("-Inf")
        best_rollout = None
        for i in range(nsims):
            temp_state = self.graph.node[start_node]["state"]
            state[] = temp_state
Jayant Khatkar's avatar
Jayant Khatkar committed

            d = 0 # depth
            while d<depth: # also breaks at no available options
                d += 1

                # Get the available actions
                options = self.sim_available_actions(
Jayant Khatkar's avatar
Jayant Khatkar committed
Jayant Khatkar's avatar
Jayant Khatkar committed

                # If no actions possible, simulation complete
                if len(options)==0:

                # "randomly" choose 1 - function provided by user
                #state[] = temp_state
                sim_action = self.sim_selection_func(, options, temp_state)

                # add that to the actions of the current robot
                temp_state = self.state_store(, temp_state, sim_action,
                state[] = temp_state
Jayant Khatkar's avatar
Jayant Khatkar committed

            # calculate the reward at the end of simulation
            rew = self.reward(, state) \
                - self.reward(, self._null_state(state))

            # if best reward so far, store the rollout in the new node
            if rew > best_reward:
                best_reward = rew
                best_rollout = copy(temp_state)
                self.graph.node[start_node]["mu"] = avg_reward

Jayant Khatkar's avatar
Jayant Khatkar committed
        self.graph.node[start_node]["N"] = 1
Jayant Khatkar's avatar
Jayant Khatkar committed
        self.graph.node[start_node]["best_rollout"] = copy(best_rollout)
Jayant Khatkar's avatar
Jayant Khatkar committed
        while start_node!=1: #while not root node

            start_node = self._parent(start_node)

Jayant Khatkar's avatar
Jayant Khatkar committed
            self.graph.node[start_node]["mu"] = \
Jayant Khatkar's avatar
Jayant Khatkar committed
                    (gamma * self.graph.node[start_node]["mu"] * \
                     self.graph.node[start_node]["N"] + avg_reward) \
                    /(self.graph.node[start_node]["N"] + 1)
Jayant Khatkar's avatar
Jayant Khatkar committed
            self.graph.node[start_node]["N"] = \
                    gamma * self.graph.node[start_node]["N"] + 1
Jayant Khatkar's avatar
Jayant Khatkar committed
Jayant Khatkar's avatar
Jayant Khatkar committed
        return avg_reward
Jayant Khatkar's avatar
Jayant Khatkar committed
    def send_comms(self):
        return self.my_act_dist
Jayant Khatkar's avatar
Jayant Khatkar committed

Jayant Khatkar's avatar
Jayant Khatkar committed

Jayant Khatkar's avatar
Jayant Khatkar committed
    def receive_comms(self, comms_in, robot_id):
        Save data which has been communicated to this tree
        Only receives from one robot at a time, call once
Jayant Khatkar's avatar
Jayant Khatkar committed
        for each robot
Jayant Khatkar's avatar
Jayant Khatkar committed
        - comms_in
            - An Action distribution object
        - robot_id
            - Robot number/id - used as key for comms
        self.comms[robot_id] = comms_in
        return True
Jayant Khatkar's avatar
Jayant Khatkar committed