Newer
Older
from math import log
import numpy as np
def _UCT(mu_j, c_p, n_p, n_j):
if n_j ==0:
return float("Inf")
return mu_j + 2*c_p *(2*log(n_p)/n_j)**0.5
class ActionDistribution:
"""
Action Distribution
Working with action sequences and their respective probability
To initialise, Inputs:
- X: list of action sequences
- NOTE: X is is simply a state object returned by state_store.
You are expected to store action sequence in this object
- q: probability of each action sequence (normalised in intialisation)
else:
self.q = (np.array(q).astype(float)/sum(q)).tolist()
def best_action(self):
"""
Most likely action sequence
"""
return self.X[np.argmax(self.q)]
def random_action(self):
"""
Weighted random out of possible action sequences
"""
"""
DecMCTS tree
To Initiate, Inputs:
- data
- data required to calculate reward, available options
- reward
- This is a function which has inputs (data, state) and
returns the GLOBAL reward to be maximised
- available_actions
- This is a function which has inputs (data, state) and
returns the possible actions which can be taken
- state_store
- This is a function which has inputs
(data, parent_state, action) and returns an object to
store in the node.
- Root Node has parent state None and action None.
- sim_selection_func
- This is a function which chooses an available of option
during simulation (can be random or more advanced)
- c_p
- exploration multiplier (number between 0 and 1)
Usage:
- grow
- grow MCTS tree by 1 node
- send_comms
- get state of this tree to communicate to others
- receive_comms
- Input the state of other trees for use in calculating
reward/available actions in coordination with others
"""
def __init__(self,
data,
reward_func,
avail_actions_func,
state_store_func,
sim_avail_actions_func,
comm_n,
robot_id,
c_p=1):
self.reward = reward_func
self.available_actions = avail_actions_func
self.sim_available_actions = sim_avail_actions_func
self.state_store = state_store_func
self.comm_n = comm_n # number of action dists to communicate
# Graph add root node of tree
state=self.state_store(self.data, None, None, self.id)
# Set Action sequence as nothing for now
self.my_act_dist = ActionDistribution([self.graph.node[1]["state"]],[1])
def _parent(self, node_id):
"""
wrapper for code readability
"""
return list(self.graph.predecessors(node_id))[0]
def _select(self, children):
"""
Select Child node which maximises UCT
"""
uct = [_UCT(node["mu"], self.c_p, n_p, node["N"])
for node in map(self.graph.nodes.__getitem__, children)]
# Return Child with highest UCT
return children[np.argmax(uct)]
def _childNodes(self, node_id):
"""
wrapper for code readability
"""
return list(self.graph.successors(node_id))
def _update_distribution(self):
"""
Get the top n Action sequences and their "probabilities"
and store them for communication
"""
# For now, just using q = mu**2
temp=nx.get_node_attributes(self.graph, "mu")
top_n_nodes = sorted(temp, key=temp.get, reverse=True)[:self.comm_n]
X = [self.graph.node[n]["best_rollout"] for n in top_n_nodes if self.graph.node[n]["N"]>0]
q = [self.graph.node[n]["mu"]**2 for n in top_n_nodes if self.graph.node[n]["N"]>0]
self.my_act_dist = ActionDistribution(X,q)
return True
def _get_system_state(self, node_id):
Randomly select 1 path taken by every other robot & path taken by
this robot to get to this node
Returns tuple where first element is path of current robot,
and second element is a dictionary of the other paths
system_state = {k:self.comms[k].random_action() for k in self.comms}
system_state[self.id] = self.graph.node[node_id]["state"]
return system_state
def _null_state(self, state):
temp[self.id] = self.graph.node[1]["state"] # Null state is if robot still at root node
Does the Expansion step for tree growing.
Separated into it's own function because also done in
Init step.
options = self.available_actions(
self.data,
self.graph.node[start_node]["state"],
self.id
)
if len(options)==0:
return False
# create empty nodes underneath the node being expanded
for o in options:
state=self.state_store(self.data, self.graph.node[start_node]["state"], o, self.id)
)
self.graph.add_edge(start_node, len(self.graph))
def grow(self, nsims=10, gamma = 0.9, depth=10):
"""
Grow Tree by one node
"""
### SELECTION
start_node = 1
# Sample actions of other robots
# NOTE: Sampling done at the begining for dependency graph reasons
state = self._get_system_state(start_node)
# Propagate down the tree
while len(self._childNodes(start_node))>0:
start_node = self._select(self._childNodes(start_node))
### EXPANSION
# check if _expansion changes start_node to the node after jumping
self._expansion(start_node)
avg_reward = 0
best_reward = float("-Inf")
best_rollout = None
for i in range(nsims):
temp_state = self.graph.node[start_node]["state"]
state[self.id] = temp_state
d = 0 # depth
while d<depth: # also breaks at no available options
d += 1
# Get the available actions
options = self.sim_available_actions(
self.id
)
# If no actions possible, simulation complete
if len(options)==0:
break
# "randomly" choose 1 - function provided by user
#state[self.id] = temp_state
sim_action = self.sim_selection_func(self.data, options, temp_state)
# add that to the actions of the current robot
temp_state = self.state_store(self.data, temp_state, sim_action, self.id)
state[self.id] = temp_state
# calculate the reward at the end of simulation
rew = self.reward(self.data, state) \
- self.reward(self.data, self._null_state(state))
# if best reward so far, store the rollout in the new node
if rew > best_reward:
best_reward = rew
best_rollout = copy(temp_state)
self.graph.node[start_node]["mu"] = avg_reward
self.graph.node[start_node]["best_rollout"] = copy(best_rollout)
### BACKPROPOGATION
while start_node!=1: #while not root node
start_node = self._parent(start_node)
(gamma * self.graph.node[start_node]["mu"] * \
self.graph.node[start_node]["N"] + avg_reward) \
/(self.graph.node[start_node]["N"] + 1)
self.graph.node[start_node]["N"] = \
gamma * self.graph.node[start_node]["N"] + 1
def receive_comms(self, comms_in, robot_id):
"""
Save data which has been communicated to this tree
Only receives from one robot at a time, call once
Inputs:
- comms_in
- An Action distribution object
- robot_id
- Robot number/id - used as key for comms
"""
self.comms[robot_id] = comms_in
return True