-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreinforce_template.py
192 lines (145 loc) · 6.46 KB
/
reinforce_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import gym
import gym.spaces
import torch, torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim
import numpy as np
def test_predict_proba():
test_states = np.array([env.reset() for _ in range(5)])
test_probas = predict_proba(test_states)
assert isinstance(test_probas, np.ndarray), "you must return np array and not %s" % type(test_probas)
assert tuple(test_probas.shape) == (test_states.shape[0], n_actions), "wrong output shape: %s" % np.shape(test_probas)
assert np.allclose(np.sum(test_probas, axis = 1), 1), "probabilities do not sum to 1"
print('Test: predict_proba() function: OK!')
def test_generate_session():
states, actions, rewards = generate_session()
assert len(states) == len(actions) == len(rewards), "length must be equal"
print('Test: generate_session() function: OK!')
def test_get_cumulative_rewards():
assert len(get_cumulative_rewards(list(range(100)))) == 100
assert np.allclose(get_cumulative_rewards([0,0,1,0,0,1,0],gamma=0.9),[1.40049, 1.5561, 1.729, 0.81, 0.9, 1.0, 0.0])
assert np.allclose(get_cumulative_rewards([0,0,1,-2,3,-4,0],gamma=0.5), [0.0625, 0.125, 0.25, -1.5, 1.0, -4.0, 0.0])
assert np.allclose(get_cumulative_rewards([0,0,1,2,3,4,0],gamma=0), [0, 0, 1, 2, 3, 4, 0])
print('Test: get_cumulative_rewards() function: OK!')
# < YOUR CODE HERE >
# Build a simple neural network that predicts policy logits.
# Keep it simple: CartPole isn't worth deep architectures.
class ReinforceAgent(nn.Module):
def __init__(self, state_dim, n_actions):
super(ReinforceAgent, self).__init__()
self.fc1 = nn.Linear(state_dim[0], 42)
self.fc2 = nn.Linear(42, n_actions)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
# < YOUR CODE HERE >
def predict_proba(states):
"""
Predict action probabilities given states.
:param states: numpy array of shape [batch, state_shape]
:returns: numpy array of shape [batch, n_actions]
"""
# convert states, compute logits, use softmax to get probability
some = F.softmax(agent(torch.FloatTensor(states)))
return some.data.numpy()
# < YOUR CODE HERE >
def generate_session(t_max=1000):
"""
play a full session with REINFORCE agent and train at the session end.
returns sequences of states, actions andrewards
"""
# arrays to record session
states, actions, rewards = [], [], []
s = env.reset()
for t in range(t_max):
# action probabilities array aka pi(a|s)
action_probas = predict_proba(np.array([s]))[0]
a = np.random.choice(n_actions, p=action_probas)
new_s, r, done, info = env.step(a)
states.append(s)
actions.append(a)
rewards.append(r)
# record session history to train later
s = new_s
if done:
break
return states, actions, rewards
# < YOUR CODE HERE >
def get_cumulative_rewards(rewards, # rewards at each step
gamma=0.99 # discount for reward
):
"""
take a list of immediate rewards r(s,a) for the whole session
compute cumulative returns (a.k.a. G(s,a) in Sutton '16)
G_t = r_t + gamma*r_{t+1} + gamma^2*r_{t+2} + ...
The simple way to compute cumulative rewards is to iterate from last to first time tick
and compute G_t = r_t + gamma*G_{t+1} recurrently
You must return an array/list of cumulative rewards with as many elements as in the initial rewards.
"""
cumulative_rewards = [rewards.pop()]
for rew in reversed(rewards):
cumulative_rewards.append(rew + gamma * cumulative_rewards[-1])
return list(reversed(cumulative_rewards))
# Helper function
def to_one_hot(y, n_dims=None):
""" Take an integer vector (tensor of variable) and convert it to 1-hot matrix. """
y_tensor = y.data if isinstance(y, Variable) else y
y_tensor = y_tensor.type(torch.LongTensor).viFew(-1, 1)
n_dims = n_dims if n_dims is not None else int(torch.max(y_tensor)) + 1
y_one_hot = torch.zeros(y_tensor.size()[0], n_dims).scatter_(1, y_tensor, 1)
return Variable(y_one_hot) if isinstance(y, Variable) else y_one_hot
# < YOUR CODE HERE >
def train_on_session(optimizer, states, actions, rewards, gamma=0.99):
"""
Takes a sequence of states, actions and rewards produced by generate_session.
Updates agent's weights by following the policy gradient above.
Please use Adam optimizer with default parameters.
"""
optimizer.zero_grad()
# cast everything into a variable
states = Variable(torch.FloatTensor(states))
actions = Variable(torch.IntTensor(actions))
cumulative_returns = np.array(get_cumulative_rewards(rewards, gamma))
cumulative_returns = Variable(torch.FloatTensor(cumulative_returns))
# predict logits, probas and log-probas using an agent.
logits = agent(states)
probas = F.softmax(logits, dim=1)
logprobas = F.log_softmax(logits, dim=1)
assert all(isinstance(v, Variable) for v in [logits, probas, logprobas]), \
"please use compute using torch tensors and don't use predict_proba function"
# select log-probabilities for chosen actions, log pi(a_i|s_i)
logprobas_for_actions = torch.sum(logprobas * to_one_hot(actions), dim=1)
# REINFORCE objective function
J_hat = torch.mean(torch.dot(logprobas_for_actions, cumulative_returns))
loss = -1 * J_hat
loss.backward()
optimizer.step()
# technical: return session rewards to print them later
return np.sum(rewards)
if __name__ == '__main__':
env = gym.make("CartPole-v0").env
env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape
env.render("rgb_array")
env.close()
# 1. Complete ReinforceAgent class
# 2. Complete predict_proba()
# 3. Complete generate_session()
# 4. Complete get_cumulative_rewards()
# 5. Complete train_on_sessions()
# Create agent
agent = ReinforceAgent(state_dim, n_actions)
test_predict_proba()
test_generate_session()
test_get_cumulative_rewards()
# call train_on_sessions()
for i in range(100):
optimizer = optim.Adam(agent.parameters())
rewards = [train_on_session(optimizer, *generate_session()) for _ in range(100)] # generate new sessions
print("Iteration: %i, Mean reward:%.3f" % (i, np.mean(rewards)))
if np.mean(rewards) > 500:
print("You Win!") # but you can train even further
break