From 92c593b24b2eeaf2bafb1bc7e3aabc114be5072d Mon Sep 17 00:00:00 2001 From: Yamada Date: Mon, 4 Jan 2021 12:36:19 +0900 Subject: [PATCH 01/12] Use lockless MPPrioritizedReplayBuffer --- tf2rl/algos/apex.py | 55 +++++++++++++-------------------------------- 1 file changed, 15 insertions(+), 40 deletions(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 02a0d007..426cbe2e 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -6,7 +6,7 @@ from multiprocessing import Process, Value, Event from multiprocessing.managers import SyncManager -from cpprb import ReplayBuffer, PrioritizedReplayBuffer +from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer from tf2rl.envs.multi_thread_env import MultiThreadEnv from tf2rl.misc.prepare_output_dir import prepare_output_dir @@ -24,15 +24,13 @@ def import_tf(): def explorer(global_rb, queue, trained_steps, is_training_done, - lock, env_fn, policy_fn, set_weights_fn, noise_level, + env_fn, policy_fn, set_weights_fn, noise_level, n_env=64, n_thread=4, buffer_size=1024, episode_max_steps=1000, gpu=0): """Collect transitions and store them to prioritized replay buffer. Args: - global_rb: multiprocessing.managers.AutoProxy[PrioritizedReplayBuffer] + global_rb: MPPrioritizedReplayBuffer Prioritized replay buffer sharing with multiple explorers and only one learner. - This object is shared over processes, so it must be locked when trying to - operate something with `lock` object. queue: multiprocessing.Queue A FIFO shared with the `learner` and `evaluator` to get the latest network weights. This is process safe, so you don't need to lock process when use this. @@ -40,8 +38,6 @@ def explorer(global_rb, queue, trained_steps, is_training_done, Number of steps to apply gradients. is_training_done: multiprocessing.Event multiprocessing.Event object to share the status of training. - lock: multiprocessing.Lock - Lock other processes. env_fn: function Method object to generate an environment. policy_fn: function @@ -142,12 +138,10 @@ def explorer(global_rb, queue, trained_steps, is_training_done, next_states=samples["next_obs"], rewards=samples["rew"], dones=samples["done"]) priorities = np.abs(np.squeeze(td_errors)) + 1e-6 - lock.acquire() global_rb.add( obs=samples["obs"], act=samples["act"], rew=samples["rew"], next_obs=samples["next_obs"], done=samples["done"], priorities=priorities) - lock.release() local_rb.clear() msg = "Grad: {0: 6d}\t".format(trained_steps.value) @@ -167,21 +161,17 @@ def explorer(global_rb, queue, trained_steps, is_training_done, def learner(global_rb, trained_steps, is_training_done, - lock, env, policy_fn, get_weights_fn, + env, policy_fn, get_weights_fn, n_training, update_freq, evaluation_freq, gpu, queues): """Update network weights using samples collected by explorers. Args: - global_rb: multiprocessing.managers.AutoProxy[PrioritizedReplayBuffer] + global_rb: MPPrioritizedReplayBuffer Prioritized replay buffer sharing with multiple explorers and only one learner. - This object is shared over processes, so it must be locked when trying to - operate something with `lock` object. trained_steps: multiprocessing.Value Number of steps to apply gradients. is_training_done: multiprocessing.Event multiprocessing.Event object to share the status of training. - lock: multiprocessing.Lock - multiprocessing.Lock to lock other processes. env: OpenAI-gym compatible environment object policy_fn: function Method object to generate an explorer. @@ -218,19 +208,14 @@ def learner(global_rb, trained_steps, is_training_done, start_time = time.time() while not is_training_done.is_set(): - trained_steps.value += 1 + with trained_steps.get_lock(): + trained_steps.value += 1 tf.summary.experimental.set_step(trained_steps.value) - lock.acquire() samples = global_rb.sample(policy.batch_size) - lock.release() td_errors = policy.train( samples["obs"], samples["act"], samples["next_obs"], samples["rew"], samples["done"], samples["weights"]) - writer.flush() - lock.acquire() - global_rb.update_priorities( - samples["indexes"], np.abs(td_errors)+1e-6) - lock.release() + global_rb.update_priorities(samples["indexes"], np.abs(td_errors)+1e-6) # Put updated weights to queue if trained_steps.value % update_freq == 0: @@ -329,7 +314,6 @@ def evaluator(is_training_done, env, policy_fn, set_weights_fn, queue, gpu, avg_test_return, n_evaluated_episode)) tf.summary.scalar( name="apex/average_test_return", data=avg_test_return) - writer.flush() if trained_steps > model_save_threshold: model_save_threshold += save_model_interval checkpoint_manager.save() @@ -371,31 +355,22 @@ def apex_argument(parser=None): def prepare_experiment(env, args): - # Manager to share PER between a learner and explorers - SyncManager.register('PrioritizedReplayBuffer', - PrioritizedReplayBuffer) - manager = SyncManager() - manager.start() - kwargs = get_default_rb_dict(args.replay_buffer_size, env) kwargs["check_for_update"] = True - global_rb = manager.PrioritizedReplayBuffer(**kwargs) + global_rb = MPPrioritizedReplayBuffer(**kwargs) # queues to share network parameters between a learner and explorers n_queue = 1 if args.n_env > 1 else args.n_explorer n_queue += 1 # for evaluation - queues = [manager.Queue() for _ in range(n_queue)] + queues = [multiprocessing.SimpleQueue() for _ in range(n_queue)] # Event object to share training status. if event is set True, all exolorers stop sampling transitions is_training_done = Event() - # Lock - lock = manager.Lock() - # Shared memory objects to count number of samples and applied gradients trained_steps = Value('i', 0) - return global_rb, queues, is_training_done, lock, trained_steps + return global_rb, queues, is_training_done, trained_steps def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): @@ -410,7 +385,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): env = env_fn() - global_rb, queues, is_training_done, lock, trained_steps = prepare_experiment(env, args) + global_rb, queues, is_training_done, trained_steps = prepare_experiment(env, args) noise = 0.3 tasks = [] @@ -420,7 +395,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): tasks.append(Process( target=explorer, args=[global_rb, queues[0], trained_steps, is_training_done, - lock, env_fn, policy_fn, set_weights_fn, noise, + env_fn, policy_fn, set_weights_fn, noise, args.n_env, args.n_thread, args.local_buffer_size, args.episode_max_steps, args.gpu_explorer])) else: @@ -428,7 +403,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): tasks.append(Process( target=explorer, args=[global_rb, queues[i], trained_steps, is_training_done, - lock, env_fn, policy_fn, set_weights_fn, noise, + env_fn, policy_fn, set_weights_fn, noise, args.n_env, args.n_thread, args.local_buffer_size, args.episode_max_steps, args.gpu_explorer])) @@ -436,7 +411,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): tasks.append(Process( target=learner, args=[global_rb, trained_steps, is_training_done, - lock, env_fn(), policy_fn, get_weights_fn, + env_fn(), policy_fn, get_weights_fn, args.n_training, args.param_update_freq, args.test_freq, args.gpu_learner, queues])) From 0153599e58d5196ee7970a9d29e89397c24cb0d4 Mon Sep 17 00:00:00 2001 From: Yamada Date: Mon, 4 Jan 2021 12:45:22 +0900 Subject: [PATCH 02/12] Remove busy loop but just wait by SimpleQueue.get() --- tf2rl/algos/apex.py | 61 +++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 426cbe2e..0f1a8355 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -284,39 +284,36 @@ def evaluator(is_training_done, env, policy_fn, set_weights_fn, queue, gpu, while not is_training_done.is_set(): n_evaluated_episode = 0 # Wait until a new weights comes - if queue.empty(): - continue - else: - set_weights_fn(policy, queue.get()) - trained_steps = queue.get() - tf.summary.experimental.set_step(trained_steps) - avg_test_return = 0. - for _ in range(n_evaluation): - n_evaluated_episode += 1 - episode_return = 0. - obs = env.reset() - done = False - for _ in range(episode_max_steps): - action = policy.get_action(obs, test=True) - next_obs, reward, done, _ = env.step(action) - if show_test_progress: - env.render() - episode_return += reward - obs = next_obs - if done: - break - avg_test_return += episode_return - # Break if a new weights comes - if not queue.empty(): + set_weights_fn(policy, queue.get()) + trained_steps = queue.get() + tf.summary.experimental.set_step(trained_steps) + avg_test_return = 0. + for _ in range(n_evaluation): + n_evaluated_episode += 1 + episode_return = 0. + obs = env.reset() + done = False + for _ in range(episode_max_steps): + action = policy.get_action(obs, test=True) + next_obs, reward, done, _ = env.step(action) + if show_test_progress: + env.render() + episode_return += reward + obs = next_obs + if done: break - avg_test_return /= n_evaluated_episode - logger.info("Evaluation: {} over {} run".format( - avg_test_return, n_evaluated_episode)) - tf.summary.scalar( - name="apex/average_test_return", data=avg_test_return) - if trained_steps > model_save_threshold: - model_save_threshold += save_model_interval - checkpoint_manager.save() + avg_test_return += episode_return + # Break if a new weights comes + if not queue.empty(): + break + avg_test_return /= n_evaluated_episode + logger.info("Evaluation: {} over {} run".format( + avg_test_return, n_evaluated_episode)) + tf.summary.scalar( + name="apex/average_test_return", data=avg_test_return) + if trained_steps > model_save_threshold: + model_save_threshold += save_model_interval + checkpoint_manager.save() checkpoint_manager.save() From 4840e569755c0e206f51fc375597a84e63fadfac Mon Sep 17 00:00:00 2001 From: Yamada Date: Mon, 4 Jan 2021 12:55:49 +0900 Subject: [PATCH 03/12] Reduce access to lock object by copying to local object --- tf2rl/algos/apex.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 0f1a8355..c54a67ad 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -210,7 +210,8 @@ def learner(global_rb, trained_steps, is_training_done, while not is_training_done.is_set(): with trained_steps.get_lock(): trained_steps.value += 1 - tf.summary.experimental.set_step(trained_steps.value) + n_trained_steps = trained_steps.value + tf.summary.experimental.set_step(n_trained_steps) samples = global_rb.sample(policy.batch_size) td_errors = policy.train( samples["obs"], samples["act"], samples["next_obs"], @@ -218,22 +219,22 @@ def learner(global_rb, trained_steps, is_training_done, global_rb.update_priorities(samples["indexes"], np.abs(td_errors)+1e-6) # Put updated weights to queue - if trained_steps.value % update_freq == 0: + if n_trained_steps % update_freq == 0: weights = get_weights_fn(policy) for i in range(len(queues) - 1): queues[i].put(weights) fps = update_freq / (time.time() - start_time) tf.summary.scalar(name="apex/fps", data=fps) logger.info("Update weights. {0:.2f} FPS for GRAD. Learned {1:.2f} steps".format( - fps, trained_steps.value)) + fps, n_trained_steps)) start_time = time.time() # Periodically do evaluation - if trained_steps.value % evaluation_freq == 0: + if n_trained_steps % evaluation_freq == 0: queues[-1].put(get_weights_fn(policy)) - queues[-1].put(trained_steps.value) + queues[-1].put(n_trained_steps) - if trained_steps.value >= n_training: + if n_trained_steps >= n_training: is_training_done.set() From cda46ce07ecbaff5de3a3c19610f37f7715e2605 Mon Sep 17 00:00:00 2001 From: Yamada Date: Mon, 4 Jan 2021 19:42:40 +0900 Subject: [PATCH 04/12] Pass set of weights and steps to evaluator simultaneously --- tf2rl/algos/apex.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index c54a67ad..df9896c9 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -231,8 +231,7 @@ def learner(global_rb, trained_steps, is_training_done, # Periodically do evaluation if n_trained_steps % evaluation_freq == 0: - queues[-1].put(get_weights_fn(policy)) - queues[-1].put(n_trained_steps) + queues[-1].put((get_weights_fn(policy),n_trained_steps)) if n_trained_steps >= n_training: is_training_done.set() @@ -285,8 +284,8 @@ def evaluator(is_training_done, env, policy_fn, set_weights_fn, queue, gpu, while not is_training_done.is_set(): n_evaluated_episode = 0 # Wait until a new weights comes - set_weights_fn(policy, queue.get()) - trained_steps = queue.get() + weights, trained_steps = queue.get() + set_weights_fn(policy, weights) tf.summary.experimental.set_step(trained_steps) avg_test_return = 0. for _ in range(n_evaluation): From 5282c1ea2fd929b05d9a7c501cb4ed8b294f3df6 Mon Sep 17 00:00:00 2001 From: Yamada Date: Wed, 6 Jan 2021 07:20:13 +0900 Subject: [PATCH 05/12] Remove eps, which added internally --- tf2rl/algos/apex.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index df9896c9..2ad64aef 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -216,7 +216,7 @@ def learner(global_rb, trained_steps, is_training_done, td_errors = policy.train( samples["obs"], samples["act"], samples["next_obs"], samples["rew"], samples["done"], samples["weights"]) - global_rb.update_priorities(samples["indexes"], np.abs(td_errors)+1e-6) + global_rb.update_priorities(samples["indexes"], np.abs(td_errors)) # Put updated weights to queue if n_trained_steps % update_freq == 0: From 1dcacb7a8e019bee712e4bc889f23d2b75ef11c7 Mon Sep 17 00:00:00 2001 From: Yamada Date: Sat, 9 Jan 2021 12:18:51 +0900 Subject: [PATCH 06/12] Reduce one process --- tf2rl/algos/apex.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 2ad64aef..22bcebe2 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -404,13 +404,6 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): args.n_env, args.n_thread, args.local_buffer_size, args.episode_max_steps, args.gpu_explorer])) - # Add learner - tasks.append(Process( - target=learner, - args=[global_rb, trained_steps, is_training_done, - env_fn(), policy_fn, get_weights_fn, - args.n_training, args.param_update_freq, - args.test_freq, args.gpu_learner, queues])) # Add evaluator tasks.append(Process( @@ -420,5 +413,11 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): for task in tasks: task.start() + + learner(global_rb, trained_steps, is_training_done, + env_fn(), policy_fn, get_weights_fn, + args.n_training, args.param_update_freq, + args.test_freq, args.gpu_learner, queues) + for task in tasks: task.join() From 615dca6144eb86c09f2b2cb61714ea2d999c6ccd Mon Sep 17 00:00:00 2001 From: Yamada Date: Sat, 9 Jan 2021 16:10:20 +0900 Subject: [PATCH 07/12] Require cpprb 9.4.2 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9de2e7b8..067a4930 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages install_requires = [ - "cpprb>=8.1.1", + "cpprb>=9.4.2", "setuptools>=41.0.0", "numpy>=1.16.0", "joblib", From c6edc0f3f37da1bd18a4143ff961896578a4a517 Mon Sep 17 00:00:00 2001 From: Yamada Date: Sat, 9 Jan 2021 16:30:28 +0900 Subject: [PATCH 08/12] F401: Remove unused SyncManager --- tf2rl/algos/apex.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 22bcebe2..2130a7c4 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -4,7 +4,6 @@ import logging import multiprocessing from multiprocessing import Process, Value, Event -from multiprocessing.managers import SyncManager from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer From fd7d7003b788d186862ef449f8b9ca64622e4c11 Mon Sep 17 00:00:00 2001 From: Yamada Date: Sat, 9 Jan 2021 16:32:40 +0900 Subject: [PATCH 09/12] Fix: E231 missing whitespace after ',' --- tf2rl/algos/apex.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 2130a7c4..3b455fbb 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -230,7 +230,7 @@ def learner(global_rb, trained_steps, is_training_done, # Periodically do evaluation if n_trained_steps % evaluation_freq == 0: - queues[-1].put((get_weights_fn(policy),n_trained_steps)) + queues[-1].put((get_weights_fn(policy), n_trained_steps)) if n_trained_steps >= n_training: is_training_done.set() From c4e129dbf894331eddae09b90d22e31e03a762f7 Mon Sep 17 00:00:00 2001 From: Yamada Date: Sun, 13 Feb 2022 10:46:25 +0900 Subject: [PATCH 10/12] Fix: __file__ -> __name__ for logger --- tf2rl/envs/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tf2rl/envs/utils.py b/tf2rl/envs/utils.py index e40bb364..42ddd7cc 100644 --- a/tf2rl/envs/utils.py +++ b/tf2rl/envs/utils.py @@ -11,7 +11,7 @@ from gym.envs.atari.atari_env import AtariEnv -logger = getLogger(__file__) +logger = getLogger(__name__) def is_discrete(space): From 214442870d07fffa691487411b842a4b0ddf47f5 Mon Sep 17 00:00:00 2001 From: Yamada Date: Sun, 13 Feb 2022 11:09:50 +0900 Subject: [PATCH 11/12] Use cpprb>=10.5.2 cpprb 10.5.2 imporoves performance of PrioritizedReplayBuffer and MPPrioritizedReplayBuffer. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index fa5f4ed8..e0a35f1a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages install_requires = [ - "cpprb>=9.4.2", + "cpprb>=10.5.2", "setuptools>=41.0.0", "numpy>=1.16.0", "joblib", From b8c731979b6a2bd9b34e23116071692d52a330e3 Mon Sep 17 00:00:00 2001 From: Yamada Date: Sun, 13 Feb 2022 11:49:09 +0900 Subject: [PATCH 12/12] Update Super Linter to version 4 Ref: https://github.com/github/super-linter/issues/2253 --- .github/workflows/linter.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 87c4563d..87e0bd80 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -7,8 +7,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: docker://github/super-linter:v3 + - uses: github/super-linter@v4 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + DEFAULT_BRANCH: master VALIDATE_PYTHON_FLAKE8: true PYTHON_FLAKE8_CONFIG_FILE: ".flake8"