Skip to content
Open
3 changes: 2 additions & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: docker://github/super-linter:v3
- uses: github/super-linter@v4
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DEFAULT_BRANCH: master
VALIDATE_PYTHON_FLAKE8: true
PYTHON_FLAKE8_CONFIG_FILE: ".flake8"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages

install_requires = [
"cpprb>=8.1.1",
"cpprb>=10.5.2",
"setuptools>=41.0.0",
"numpy>=1.16.0",
"joblib",
Expand Down
142 changes: 56 additions & 86 deletions tf2rl/algos/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import logging
import multiprocessing
from multiprocessing import Process, Value, Event
from multiprocessing.managers import SyncManager

from cpprb import ReplayBuffer, PrioritizedReplayBuffer
from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer

from tf2rl.envs.multi_thread_env import MultiThreadEnv
from tf2rl.misc.prepare_output_dir import prepare_output_dir
Expand All @@ -24,24 +23,20 @@ def import_tf():


def explorer(global_rb, queue, trained_steps, is_training_done,
lock, env_fn, policy_fn, set_weights_fn, noise_level,
env_fn, policy_fn, set_weights_fn, noise_level,
n_env=64, n_thread=4, buffer_size=1024, episode_max_steps=1000, gpu=0):
"""Collect transitions and store them to prioritized replay buffer.

Args:
global_rb: multiprocessing.managers.AutoProxy[PrioritizedReplayBuffer]
global_rb: MPPrioritizedReplayBuffer
Prioritized replay buffer sharing with multiple explorers and only one learner.
This object is shared over processes, so it must be locked when trying to
operate something with `lock` object.
queue: multiprocessing.Queue
A FIFO shared with the `learner` and `evaluator` to get the latest network weights.
This is process safe, so you don't need to lock process when use this.
trained_steps: multiprocessing.Value
Number of steps to apply gradients.
is_training_done: multiprocessing.Event
multiprocessing.Event object to share the status of training.
lock: multiprocessing.Lock
Lock other processes.
env_fn: function
Method object to generate an environment.
policy_fn: function
Expand Down Expand Up @@ -142,12 +137,10 @@ def explorer(global_rb, queue, trained_steps, is_training_done,
next_states=samples["next_obs"], rewards=samples["rew"],
dones=samples["done"])
priorities = np.abs(np.squeeze(td_errors)) + 1e-6
lock.acquire()
global_rb.add(
obs=samples["obs"], act=samples["act"], rew=samples["rew"],
next_obs=samples["next_obs"], done=samples["done"],
priorities=priorities)
lock.release()
local_rb.clear()

msg = "Grad: {0: 6d}\t".format(trained_steps.value)
Expand All @@ -167,21 +160,17 @@ def explorer(global_rb, queue, trained_steps, is_training_done,


def learner(global_rb, trained_steps, is_training_done,
lock, env, policy_fn, get_weights_fn,
env, policy_fn, get_weights_fn,
n_training, update_freq, evaluation_freq, gpu, queues):
"""Update network weights using samples collected by explorers.

Args:
global_rb: multiprocessing.managers.AutoProxy[PrioritizedReplayBuffer]
global_rb: MPPrioritizedReplayBuffer
Prioritized replay buffer sharing with multiple explorers and only one learner.
This object is shared over processes, so it must be locked when trying to
operate something with `lock` object.
trained_steps: multiprocessing.Value
Number of steps to apply gradients.
is_training_done: multiprocessing.Event
multiprocessing.Event object to share the status of training.
lock: multiprocessing.Lock
multiprocessing.Lock to lock other processes.
env: OpenAI-gym compatible environment object
policy_fn: function
Method object to generate an explorer.
Expand Down Expand Up @@ -218,37 +207,32 @@ def learner(global_rb, trained_steps, is_training_done,

start_time = time.time()
while not is_training_done.is_set():
trained_steps.value += 1
tf.summary.experimental.set_step(trained_steps.value)
lock.acquire()
with trained_steps.get_lock():
trained_steps.value += 1
n_trained_steps = trained_steps.value
tf.summary.experimental.set_step(n_trained_steps)
samples = global_rb.sample(policy.batch_size)
lock.release()
td_errors = policy.train(
samples["obs"], samples["act"], samples["next_obs"],
samples["rew"], samples["done"], samples["weights"])
writer.flush()
lock.acquire()
global_rb.update_priorities(
samples["indexes"], np.abs(td_errors)+1e-6)
lock.release()
global_rb.update_priorities(samples["indexes"], np.abs(td_errors))

# Put updated weights to queue
if trained_steps.value % update_freq == 0:
if n_trained_steps % update_freq == 0:
weights = get_weights_fn(policy)
for i in range(len(queues) - 1):
queues[i].put(weights)
fps = update_freq / (time.time() - start_time)
tf.summary.scalar(name="apex/fps", data=fps)
logger.info("Update weights. {0:.2f} FPS for GRAD. Learned {1:.2f} steps".format(
fps, trained_steps.value))
fps, n_trained_steps))
start_time = time.time()

# Periodically do evaluation
if trained_steps.value % evaluation_freq == 0:
queues[-1].put(get_weights_fn(policy))
queues[-1].put(trained_steps.value)
if n_trained_steps % evaluation_freq == 0:
queues[-1].put((get_weights_fn(policy), n_trained_steps))

if trained_steps.value >= n_training:
if n_trained_steps >= n_training:
is_training_done.set()


Expand Down Expand Up @@ -299,40 +283,36 @@ def evaluator(is_training_done, env, policy_fn, set_weights_fn, queue, gpu,
while not is_training_done.is_set():
n_evaluated_episode = 0
# Wait until a new weights comes
if queue.empty():
continue
else:
set_weights_fn(policy, queue.get())
trained_steps = queue.get()
tf.summary.experimental.set_step(trained_steps)
avg_test_return = 0.
for _ in range(n_evaluation):
n_evaluated_episode += 1
episode_return = 0.
obs = env.reset()
done = False
for _ in range(episode_max_steps):
action = policy.get_action(obs, test=True)
next_obs, reward, done, _ = env.step(action)
if show_test_progress:
env.render()
episode_return += reward
obs = next_obs
if done:
break
avg_test_return += episode_return
# Break if a new weights comes
if not queue.empty():
weights, trained_steps = queue.get()
set_weights_fn(policy, weights)
tf.summary.experimental.set_step(trained_steps)
avg_test_return = 0.
for _ in range(n_evaluation):
n_evaluated_episode += 1
episode_return = 0.
obs = env.reset()
done = False
for _ in range(episode_max_steps):
action = policy.get_action(obs, test=True)
next_obs, reward, done, _ = env.step(action)
if show_test_progress:
env.render()
episode_return += reward
obs = next_obs
if done:
break
avg_test_return /= n_evaluated_episode
logger.info("Evaluation: {} over {} run".format(
avg_test_return, n_evaluated_episode))
tf.summary.scalar(
name="apex/average_test_return", data=avg_test_return)
writer.flush()
if trained_steps > model_save_threshold:
model_save_threshold += save_model_interval
checkpoint_manager.save()
avg_test_return += episode_return
# Break if a new weights comes
if not queue.empty():
break
avg_test_return /= n_evaluated_episode
logger.info("Evaluation: {} over {} run".format(
avg_test_return, n_evaluated_episode))
tf.summary.scalar(
name="apex/average_test_return", data=avg_test_return)
if trained_steps > model_save_threshold:
model_save_threshold += save_model_interval
checkpoint_manager.save()
checkpoint_manager.save()


Expand Down Expand Up @@ -371,31 +351,22 @@ def apex_argument(parser=None):


def prepare_experiment(env, args):
# Manager to share PER between a learner and explorers
SyncManager.register('PrioritizedReplayBuffer',
PrioritizedReplayBuffer)
manager = SyncManager()
manager.start()

kwargs = get_default_rb_dict(args.replay_buffer_size, env)
kwargs["check_for_update"] = True
global_rb = manager.PrioritizedReplayBuffer(**kwargs)
global_rb = MPPrioritizedReplayBuffer(**kwargs)

# queues to share network parameters between a learner and explorers
n_queue = 1 if args.n_env > 1 else args.n_explorer
n_queue += 1 # for evaluation
queues = [manager.Queue() for _ in range(n_queue)]
queues = [multiprocessing.SimpleQueue() for _ in range(n_queue)]

# Event object to share training status. if event is set True, all exolorers stop sampling transitions
is_training_done = Event()

# Lock
lock = manager.Lock()

# Shared memory objects to count number of samples and applied gradients
trained_steps = Value('i', 0)

return global_rb, queues, is_training_done, lock, trained_steps
return global_rb, queues, is_training_done, trained_steps


def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn):
Expand All @@ -410,7 +381,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn):

env = env_fn()

global_rb, queues, is_training_done, lock, trained_steps = prepare_experiment(env, args)
global_rb, queues, is_training_done, trained_steps = prepare_experiment(env, args)

noise = 0.3
tasks = []
Expand All @@ -420,25 +391,18 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn):
tasks.append(Process(
target=explorer,
args=[global_rb, queues[0], trained_steps, is_training_done,
lock, env_fn, policy_fn, set_weights_fn, noise,
env_fn, policy_fn, set_weights_fn, noise,
args.n_env, args.n_thread, args.local_buffer_size,
args.episode_max_steps, args.gpu_explorer]))
else:
for i in range(args.n_explorer):
tasks.append(Process(
target=explorer,
args=[global_rb, queues[i], trained_steps, is_training_done,
lock, env_fn, policy_fn, set_weights_fn, noise,
env_fn, policy_fn, set_weights_fn, noise,
args.n_env, args.n_thread, args.local_buffer_size,
args.episode_max_steps, args.gpu_explorer]))

# Add learner
tasks.append(Process(
target=learner,
args=[global_rb, trained_steps, is_training_done,
lock, env_fn(), policy_fn, get_weights_fn,
args.n_training, args.param_update_freq,
args.test_freq, args.gpu_learner, queues]))

# Add evaluator
tasks.append(Process(
Expand All @@ -448,5 +412,11 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn):

for task in tasks:
task.start()

learner(global_rb, trained_steps, is_training_done,
env_fn(), policy_fn, get_weights_fn,
args.n_training, args.param_update_freq,
args.test_freq, args.gpu_learner, queues)

for task in tasks:
task.join()
2 changes: 1 addition & 1 deletion tf2rl/envs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from gym.envs.atari.atari_env import AtariEnv


logger = getLogger(__file__)
logger = getLogger(__name__)


def is_discrete(space):
Expand Down