diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 87c4563..87e0bd8 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -7,8 +7,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: docker://github/super-linter:v3 + - uses: github/super-linter@v4 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + DEFAULT_BRANCH: master VALIDATE_PYTHON_FLAKE8: true PYTHON_FLAKE8_CONFIG_FILE: ".flake8" diff --git a/setup.py b/setup.py index 3d16778..e0a35f1 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages install_requires = [ - "cpprb>=8.1.1", + "cpprb>=10.5.2", "setuptools>=41.0.0", "numpy>=1.16.0", "joblib", diff --git a/tf2rl/algos/apex.py b/tf2rl/algos/apex.py index 02a0d00..3b455fb 100644 --- a/tf2rl/algos/apex.py +++ b/tf2rl/algos/apex.py @@ -4,9 +4,8 @@ import logging import multiprocessing from multiprocessing import Process, Value, Event -from multiprocessing.managers import SyncManager -from cpprb import ReplayBuffer, PrioritizedReplayBuffer +from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer from tf2rl.envs.multi_thread_env import MultiThreadEnv from tf2rl.misc.prepare_output_dir import prepare_output_dir @@ -24,15 +23,13 @@ def import_tf(): def explorer(global_rb, queue, trained_steps, is_training_done, - lock, env_fn, policy_fn, set_weights_fn, noise_level, + env_fn, policy_fn, set_weights_fn, noise_level, n_env=64, n_thread=4, buffer_size=1024, episode_max_steps=1000, gpu=0): """Collect transitions and store them to prioritized replay buffer. Args: - global_rb: multiprocessing.managers.AutoProxy[PrioritizedReplayBuffer] + global_rb: MPPrioritizedReplayBuffer Prioritized replay buffer sharing with multiple explorers and only one learner. - This object is shared over processes, so it must be locked when trying to - operate something with `lock` object. queue: multiprocessing.Queue A FIFO shared with the `learner` and `evaluator` to get the latest network weights. This is process safe, so you don't need to lock process when use this. @@ -40,8 +37,6 @@ def explorer(global_rb, queue, trained_steps, is_training_done, Number of steps to apply gradients. is_training_done: multiprocessing.Event multiprocessing.Event object to share the status of training. - lock: multiprocessing.Lock - Lock other processes. env_fn: function Method object to generate an environment. policy_fn: function @@ -142,12 +137,10 @@ def explorer(global_rb, queue, trained_steps, is_training_done, next_states=samples["next_obs"], rewards=samples["rew"], dones=samples["done"]) priorities = np.abs(np.squeeze(td_errors)) + 1e-6 - lock.acquire() global_rb.add( obs=samples["obs"], act=samples["act"], rew=samples["rew"], next_obs=samples["next_obs"], done=samples["done"], priorities=priorities) - lock.release() local_rb.clear() msg = "Grad: {0: 6d}\t".format(trained_steps.value) @@ -167,21 +160,17 @@ def explorer(global_rb, queue, trained_steps, is_training_done, def learner(global_rb, trained_steps, is_training_done, - lock, env, policy_fn, get_weights_fn, + env, policy_fn, get_weights_fn, n_training, update_freq, evaluation_freq, gpu, queues): """Update network weights using samples collected by explorers. Args: - global_rb: multiprocessing.managers.AutoProxy[PrioritizedReplayBuffer] + global_rb: MPPrioritizedReplayBuffer Prioritized replay buffer sharing with multiple explorers and only one learner. - This object is shared over processes, so it must be locked when trying to - operate something with `lock` object. trained_steps: multiprocessing.Value Number of steps to apply gradients. is_training_done: multiprocessing.Event multiprocessing.Event object to share the status of training. - lock: multiprocessing.Lock - multiprocessing.Lock to lock other processes. env: OpenAI-gym compatible environment object policy_fn: function Method object to generate an explorer. @@ -218,37 +207,32 @@ def learner(global_rb, trained_steps, is_training_done, start_time = time.time() while not is_training_done.is_set(): - trained_steps.value += 1 - tf.summary.experimental.set_step(trained_steps.value) - lock.acquire() + with trained_steps.get_lock(): + trained_steps.value += 1 + n_trained_steps = trained_steps.value + tf.summary.experimental.set_step(n_trained_steps) samples = global_rb.sample(policy.batch_size) - lock.release() td_errors = policy.train( samples["obs"], samples["act"], samples["next_obs"], samples["rew"], samples["done"], samples["weights"]) - writer.flush() - lock.acquire() - global_rb.update_priorities( - samples["indexes"], np.abs(td_errors)+1e-6) - lock.release() + global_rb.update_priorities(samples["indexes"], np.abs(td_errors)) # Put updated weights to queue - if trained_steps.value % update_freq == 0: + if n_trained_steps % update_freq == 0: weights = get_weights_fn(policy) for i in range(len(queues) - 1): queues[i].put(weights) fps = update_freq / (time.time() - start_time) tf.summary.scalar(name="apex/fps", data=fps) logger.info("Update weights. {0:.2f} FPS for GRAD. Learned {1:.2f} steps".format( - fps, trained_steps.value)) + fps, n_trained_steps)) start_time = time.time() # Periodically do evaluation - if trained_steps.value % evaluation_freq == 0: - queues[-1].put(get_weights_fn(policy)) - queues[-1].put(trained_steps.value) + if n_trained_steps % evaluation_freq == 0: + queues[-1].put((get_weights_fn(policy), n_trained_steps)) - if trained_steps.value >= n_training: + if n_trained_steps >= n_training: is_training_done.set() @@ -299,40 +283,36 @@ def evaluator(is_training_done, env, policy_fn, set_weights_fn, queue, gpu, while not is_training_done.is_set(): n_evaluated_episode = 0 # Wait until a new weights comes - if queue.empty(): - continue - else: - set_weights_fn(policy, queue.get()) - trained_steps = queue.get() - tf.summary.experimental.set_step(trained_steps) - avg_test_return = 0. - for _ in range(n_evaluation): - n_evaluated_episode += 1 - episode_return = 0. - obs = env.reset() - done = False - for _ in range(episode_max_steps): - action = policy.get_action(obs, test=True) - next_obs, reward, done, _ = env.step(action) - if show_test_progress: - env.render() - episode_return += reward - obs = next_obs - if done: - break - avg_test_return += episode_return - # Break if a new weights comes - if not queue.empty(): + weights, trained_steps = queue.get() + set_weights_fn(policy, weights) + tf.summary.experimental.set_step(trained_steps) + avg_test_return = 0. + for _ in range(n_evaluation): + n_evaluated_episode += 1 + episode_return = 0. + obs = env.reset() + done = False + for _ in range(episode_max_steps): + action = policy.get_action(obs, test=True) + next_obs, reward, done, _ = env.step(action) + if show_test_progress: + env.render() + episode_return += reward + obs = next_obs + if done: break - avg_test_return /= n_evaluated_episode - logger.info("Evaluation: {} over {} run".format( - avg_test_return, n_evaluated_episode)) - tf.summary.scalar( - name="apex/average_test_return", data=avg_test_return) - writer.flush() - if trained_steps > model_save_threshold: - model_save_threshold += save_model_interval - checkpoint_manager.save() + avg_test_return += episode_return + # Break if a new weights comes + if not queue.empty(): + break + avg_test_return /= n_evaluated_episode + logger.info("Evaluation: {} over {} run".format( + avg_test_return, n_evaluated_episode)) + tf.summary.scalar( + name="apex/average_test_return", data=avg_test_return) + if trained_steps > model_save_threshold: + model_save_threshold += save_model_interval + checkpoint_manager.save() checkpoint_manager.save() @@ -371,31 +351,22 @@ def apex_argument(parser=None): def prepare_experiment(env, args): - # Manager to share PER between a learner and explorers - SyncManager.register('PrioritizedReplayBuffer', - PrioritizedReplayBuffer) - manager = SyncManager() - manager.start() - kwargs = get_default_rb_dict(args.replay_buffer_size, env) kwargs["check_for_update"] = True - global_rb = manager.PrioritizedReplayBuffer(**kwargs) + global_rb = MPPrioritizedReplayBuffer(**kwargs) # queues to share network parameters between a learner and explorers n_queue = 1 if args.n_env > 1 else args.n_explorer n_queue += 1 # for evaluation - queues = [manager.Queue() for _ in range(n_queue)] + queues = [multiprocessing.SimpleQueue() for _ in range(n_queue)] # Event object to share training status. if event is set True, all exolorers stop sampling transitions is_training_done = Event() - # Lock - lock = manager.Lock() - # Shared memory objects to count number of samples and applied gradients trained_steps = Value('i', 0) - return global_rb, queues, is_training_done, lock, trained_steps + return global_rb, queues, is_training_done, trained_steps def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): @@ -410,7 +381,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): env = env_fn() - global_rb, queues, is_training_done, lock, trained_steps = prepare_experiment(env, args) + global_rb, queues, is_training_done, trained_steps = prepare_experiment(env, args) noise = 0.3 tasks = [] @@ -420,7 +391,7 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): tasks.append(Process( target=explorer, args=[global_rb, queues[0], trained_steps, is_training_done, - lock, env_fn, policy_fn, set_weights_fn, noise, + env_fn, policy_fn, set_weights_fn, noise, args.n_env, args.n_thread, args.local_buffer_size, args.episode_max_steps, args.gpu_explorer])) else: @@ -428,17 +399,10 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): tasks.append(Process( target=explorer, args=[global_rb, queues[i], trained_steps, is_training_done, - lock, env_fn, policy_fn, set_weights_fn, noise, + env_fn, policy_fn, set_weights_fn, noise, args.n_env, args.n_thread, args.local_buffer_size, args.episode_max_steps, args.gpu_explorer])) - # Add learner - tasks.append(Process( - target=learner, - args=[global_rb, trained_steps, is_training_done, - lock, env_fn(), policy_fn, get_weights_fn, - args.n_training, args.param_update_freq, - args.test_freq, args.gpu_learner, queues])) # Add evaluator tasks.append(Process( @@ -448,5 +412,11 @@ def run(args, env_fn, policy_fn, get_weights_fn, set_weights_fn): for task in tasks: task.start() + + learner(global_rb, trained_steps, is_training_done, + env_fn(), policy_fn, get_weights_fn, + args.n_training, args.param_update_freq, + args.test_freq, args.gpu_learner, queues) + for task in tasks: task.join() diff --git a/tf2rl/envs/utils.py b/tf2rl/envs/utils.py index e40bb36..42ddd7c 100644 --- a/tf2rl/envs/utils.py +++ b/tf2rl/envs/utils.py @@ -11,7 +11,7 @@ from gym.envs.atari.atari_env import AtariEnv -logger = getLogger(__file__) +logger = getLogger(__name__) def is_discrete(space):