I’ve implemented the first experiment from the Reward Design via Online Gradient Ascent paper. I don’t have any specific concerns, but it’s my first time using multiprocessing or doing reinforcement learning, and I want to add this work to my portfolio. So I want to know if there is anything wrong with this code or if it can be improved in any way. The number of trials is 13 instead of 130 like in the paper because I don’t have that much compute.

Main file:

```
import numpy as np
from agent import Agent
from environment import BirdEnv
from pgrd import PGRD
from gym.utils.seeding import np_random
from multiprocessing import Pool
import os
# 5 actions: move right, left, down, up, eat the worm
NUM_ACTIONS = 5
#the agent observes the full state given by 9*agent_location + worm_location
NUM_STATES = 81
TAU = 100
GAMMA = 0.95
NUM_TRIALS = 13
TOTAL_TIMESTEPS = 5000
if __name__ == "__main__":
rng_env, _ = np_random()
env = BirdEnv(rng_env)
for depth in range(7):
print(depth)
for alpha in (0, 2e-6, 5e-6, 2e-5, 5e-5, 2e-4, 5e-4, 2e-3, 5e-3, 1e-2):
print(alpha)
for beta in (0, 0.4, 0.7, 0.9, 0.95, 0.99):
def run_trial(num_trial):
env.reset()
rng_agent, _ = np_random()
agent = Agent(depth, TAU, GAMMA, rng_agent, NUM_ACTIONS, NUM_STATES)
model = PGRD(agent, env, alpha, beta)
return model.learn(total_timesteps=TOTAL_TIMESTEPS, visualize=False)
pool = Pool(os.cpu_count())
try:
returns = pool.map(run_trial, np.arange(num_trials))
returns = np.sum(np.array(returns), axis=0) / num_trials
finally:
pool.close()
pool.join()
np.save("results/Result_depth_{}_alpha_{}_beta_{}.npy".format(depth, alpha, beta), returns)
```

agent.py:

```
import numpy as np
from collections import defaultdict
from functools import lru_cache
def softmax(action_values, tau):
"""
Arguments: action_values - 1-dimensional array
tau - temperature
"""
preferences = action_values * tau
max_preference = np.max(preferences)
exp_prefs = np.exp(preferences - max_preference)
return exp_prefs / np.sum(exp_prefs)
class Agent:
def __init__(self, depth, tau, gamma, rng, nA, nS):
self.nA = nA
self.nS = nS
self.depth = depth #depth of planning
self.tau = tau #policy temperature
self.gamma = gamma #discount rate
#agent's model of the environment
#N(s)(a) = {total: total_visits, 'counts': {s': x, ...}}
#N(s)(a)(s') - number of visits to s' after taking action s in state a
#N(s)(a)(s`) / N(s)(a)(total) = Pr(s`|s, a)
self.N = defaultdict(lambda: defaultdict(lambda: {'total':0, 'counts': defaultdict(lambda:0)}))
self.rand_generator = rng
def update(self, state, action, newstate):
self.N(state)(action)('total') += 1
self.N(state)(action)('counts')(newstate) += 1
def plan(self, state, theta):
""" Compute d-step Q-value function and its theta-gradient at state"""
@lru_cache(maxsize=None)
def _plan(self, state, d):
""" Recursive memoized function"""
reward_grad = np.zeros((self.nA, self.nS, self.nA))
for a in range(self.nA):
reward_grad(a,state,a) = 1
if d == 0:
action_values = theta(state)
value_grad = reward_grad
else:
inc = np.zeros(self.nA)
grad_inc = np.zeros((self.nA, self.nS, self.nA))
for action in self.N(state).keys():
for state_next, count in self.N(state)(action)('counts').items():
values_next, grad_next = _plan(self, state_next, d-1)
action_next = np.argmax(values_next)
p = count / self.N(state)(action)('total')
inc(action) += values_next(action_next) * p
grad_inc(action, state_next, action_next) += np.argmax(values_next) * p
action_values = theta(state) + self.gamma * inc
value_grad = reward_grad + self.gamma * grad_inc
return action_values, value_grad
return _plan(self, state, self.depth)
def logpolicy_grad(self, value_grad, probas, action):
"""
Arguments:
value_grad: nA x nS x nA
probas: nA
action: int
Returns:
grad: nS x nA
"""
grad = self.tau * (value_grad(action) - np.tensordot(probas, value_grad, axes=1))
return grad
def policy(self, action_values):
probas = softmax(action_values, self.tau)
return probas
def step(self, state, theta):
action_values, value_grad = self.plan(state, theta)
# compute the Boltzman stochastic policy parametrized by action_values
probas = self.policy(action_values) #shape: nA
# select action according to policy
action = self.rand_generator.choice(np.arange(self.nA), p=probas)
grad = self.logpolicy_grad(value_grad, probas, action)
return action, grad
```

environment.py

```
import sys
import numpy as np
from collections import defaultdict
MAP = ("CCC",
" ==",
"CCC",
" ==",
"CCC")
class BirdEnv:
"""Bird looks for a worm"""
metadata = {'render.modes': ('human')}
def __init__(self, rng):
self.nA = 5 #number of actions: right, left, down, up, eat the worm
self.nC = 9 #number of cells in 3x3 grid
self.nS = self.nC**2 # state = (position of bird, position of worm)
self.ncol = 3 #number of columns in 3x3 grid
self.rand_generator = rng
#transitions(c)(a) == ((probability, nextcell),..)
self.transitions = {c : {} for c in range(self.nC)}
def move(i, j, inc):
cell_i = max(min(i + inc(0), 4), 0)
cell_j = max(min(j + inc(1), 2), 0)
#move according to action, if you can
if MAP(cell_i)(cell_j) == "=":
cell_i = i
elif MAP(cell_i)(cell_j) == " ":
cell_i += inc(0)
cell = 3 * (cell_i // 2) + cell_j
return cell
for i, row in enumerate(MAP):
for j, char in enumerate(row):
if char == "C":
d = defaultdict(lambda:0)
for inc in ((0,1), (0, -1), (1, 0), (-1,0)):
cell = move(i,j,inc)
d(cell) += 0.025
for action, inc in enumerate(((0,1), (0, -1), (1, 0), (-1,0))):
cell = move(i,j,inc)
trans = d.copy()
trans(cell) += 0.9
self.transitions(3*(i//2)+j)(action) = ((prob, nextcell) for nextcell, prob in trans.items())
#initial cell distribution (always start in the upper left corner)
self.icd = (1, 0, 0, 0, 0, 0, 0, 0, 0)
self.cell = self.rand_generator.choice(np.arange(self.nC), p=self.icd)
#initial worm distribution: in one of the three right-most locations at the end of each corridor
self.iwd = (0, 0, 1./3, 0, 0, 1./3, 0, 0, 1./3)
self.worm = self.rand_generator.choice(np.arange(self.nC), p=self.iwd)
self.lastaction = 4
def state(self):
return self.nC * self.cell + self.worm
def step(self, action):
"""Execute one time step within the environment"""
reward = 0
if action == 4:
#try eating the worm
if self.cell == self.worm:
#move worm into one of the empty cells on the right
self.worm = self.rand_generator.choice(((self.worm + 3) % self.nC, (self.worm + 6) % self.nC))
reward = 1
else:
transitions = self.transitions(self.cell)(action)
i = self.rand_generator.choice(np.arange(len(transitions)), p=(t(0) for t in transitions))
_, cell = transitions(i)
self.cell = cell
self.lastaction = action
state = self.state()
return state, reward
def reset(self):
# Reset the state of the environment to an initial state
self.cell = self.rand_generator.choice(np.arange(self.nC), p=self.icd)
self.worm = self.rand_generator.choice(np.arange(self.nC), p=self.iwd)
def render(self, mode='human', close=False):
# Render the environment to the screen
outfile = sys.stdout
desc = (("C", "C", "C"), ("C", "C", "C"), ("C", "C", "C"))
row, col = self.cell // self.ncol, self.cell % self.ncol
desc(row)(col) = "B"
row, col = self.worm // self.ncol, self.worm % self.ncol
desc(row)(col) = "W"
if self.lastaction is not None:
outfile.write(" ({})n".format(
("Right", "Left", "Down", "Up", "Eat")(self.lastaction)))
else:
outfile.write("n")
outfile.write("n".join(''.join(line) for line in desc)+"n")
```

pgrd.py

```
import numpy as np
class PGRD:
def __init__(self, agent, env, alpha, beta):
self.agent = agent
self.env = env
self.alpha = alpha #step size
self.beta = beta
#theta is initialized so that the initial reward function = objective reward function
self.theta = np.zeros((env.nS, env.nA))
for cell in (2,5,8):
self.theta(10*cell, 4) = 1
#variable to store theta gradient
self.z = np.zeros((env.nS, env.nA))
def learn(self, total_timesteps, visualize=False):
state = self.env.state()
total_reward = 0
returns = ()
for i in range(total_timesteps):
if visualize:
print(i)
self.env.render()
action, grad = self.agent.step(state, self.theta)
newstate, reward = self.env.step(action)
total_reward += reward
#update agent's model of the environment:
self.agent.update(state, action, newstate)
state = newstate
#update theta
self.z = self.beta * self.z + grad
self.theta += self.alpha * reward * self.z
#cap parameters at +-1:
self.theta = np.maximum(self.theta, -1)
self.theta = np.minimum(self.theta, 1)
returns.append(total_reward / (i+1))
return np.array(returns)
```

tests.py

```
import numpy as np
#print environment transitions:
def print_env(env):
def pretty(d, indent=0):
for key, value in d.items():
print('t' * indent + str(key))
if isinstance(value, dict):
pretty(value, indent+1)
else:
print('t' * (indent+1) + str(value))
pretty(env.transitions, indent=1)
def test_value_grad(agent):
theta = np.random.rand(agent.nS, agent.nA)
delta_theta = 1e-2 * np.random.rand(agent.nS, agent.nA)
state = 0
values1, grad1 = agent.plan(state, theta)
values2, grad2 = agent.plan(state, theta + delta_theta)
assert np.allclose(values2 - values1, np.tensordot(grad1, delta_theta, axes=2))
def test_policy_grad(agent):
theta = np.random.rand(agent.nS, agent.nA)
delta_theta = 1e-3 * np.random.rand(agent.nS, agent.nA)
state = 0
for action in range(5):
values1, value_grad1 = agent.plan(state, theta)
logprobas1 = np.log(agent.policy(values1))
values2, value_grad2 = agent.plan(state, theta + delta_theta)
logprobas2 = np.log(agent.policy(values2))
grad = agent.logpolicy_grad(value_grad1, agent.policy(values1), action)
assert np.allclose(logprobas2(action) - logprobas1(action), (grad * delta_theta).sum())
```