Newer
Older
import networkx as nx
from math import log
import numpy as np
def _UCT(mu_j, c_p, n_p, n_j):
if n_j ==0:
return float("Inf")
return mu_j + 2*c_p *(2*log(n_p)/n_j)**0.5
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ActionDistribution:
"""
Action Distribution
Working with action sequences and their respective probability
An Action Sequence here refers to the node_id of the relevant
node in the Tree. The actual action sequence to get to that
node is stored inside the node.
To initialise, Inputs:
- X: list of node_ids (ints)
- n: number of action sequences for comms
"""
def __init__(self, X, n, beta=0.5):
# Starting distribution is uniform
self.q = [1/n] * n
# Action sequence as provided
assert(len(X)==n)
self.X = X
# Beta value set to initial value
self.beta = beta
self.n = n
def best_action(self):
"""
Most likely action sequence
"""
return self.X[np.argmax(self.q)]
def random_action(self):
"""
Weighted random out of possible action sequences
"""
return np.random.choice(self.X, p=self.q)
def update(self, Xnew):
"""
Update Distribution
"""
# If new Action sequences to consider, reset
if set(Xnew) != set(self.X):
self.X = Xnew
self.q = [1/self.n] * self.n
else:
Eqn = "TODO"
class Tree:
"""
DecMCTS tree
To Initiate, Inputs:
- data
- data required to calculate reward, available options
- reward
- This is a function which has inputs (data, state) and
- It may be the case that the reward function can only
calculated once simulation is complete, in which
case it should return "None" while simulation is
incomplete
- available_actions
- This is a function which has inputs (data, state) and
returns the possible actions which can be taken
- c_p
- exploration multiplier (number between 0 and 1)
Usage:
- grow
- grow MCTS tree by 1 node
- send_comms
- get state of this tree to communicate to others
- receive_comms
- Input the state of other trees for use in calculating
reward/available actions in coordination with others
"""
def __init__(self, data, reward_func, avail_actions_func, c_p=1):
self.reward = reward_func
self.available_actions = avail_actions_func
self.c_p = c_p
self.comms = {} # Plan with no robots initially
# TODO CHANGE FOLLOWING LINE
self.my_action_distribution = ActionDistribution(X,n)
self.graph.add_node(1,
mu=0,
N=0,
action_seq=[],
cost=0
)
def _parent(self, node_id):
"""
wrapper for code readability
"""
return list(self.graph.predecessors(node_id))[0]
def _select(self, children):
"""
Select Child node which maximises UCT
"""
# N for parent
n_p = self.graph.nodes[self._parent(children[0])]["N"]
uct = [_UCT(node["mu"], self.c_p, n_p, node["N"])
for node in map(self.graph.nodes.__getitem__, children)]
# Return Child with highest UCT
return children[np.argmax(uct)]
def _childNodes(self, node_id):
"""
wrapper for code readability
"""
return list(self.graph.successors(node_id))
def _get_state(self, node_id):
"""
Randomly select 1 path taken by every other robot
node_path = self.graph.nodes[node_id]["action_seq"]
other_paths = []
for
def grow(self, nsims=10, ntime=None):
"""
Grow Tree by one node
"""
### SELECTION
start_node = 1
while len(self._childNodes(start_node))>0:
start_node = self._select(self._childNodes(start_node))
state = self._get_state(start_node)
options = self.available_actions(self.data, state)
def receive_comms(self, comms_in, robot_id):
"""
Save data which has been communicated to this tree
Only receives from one robot at a time, call once
for each robot
Inputs:
- comms_in
- An Action distribution object
- robot_id
- Robot number/id - used as key for comms
"""
self.comms[robot_id] = comms_in
return True