From dd947017cc0abd733256700d5e9e147e846abb18 Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:53 -0500 Subject: [PATCH 1/9] tmp async render support --- pufferlib/config/ocean/drive.ini | 9 +++++---- pufferlib/pufferl.py | 17 ++++++++++++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index e8a650864..d0b8aaf45 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -5,9 +5,9 @@ policy_name = Drive rnn_name = Recurrent [vec] -num_workers = 16 -num_envs = 16 -batch_size = 4 +num_workers = 4 +num_envs = 4 +batch_size = 2 ; backend = Serial [policy] @@ -19,7 +19,7 @@ input_size = 256 hidden_size = 256 [env] -num_agents = 1024 +num_agents = 256 ; Options: discrete, continuous action_type = discrete ; Options: classic, jerk @@ -87,6 +87,7 @@ vtrace_rho_clip = 1 checkpoint_interval = 1000 ; Rendering options render = True +render_async = False render_interval = 1000 ; If True, show exactly what the agent sees in agent observation obs_only = True diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index a8a131583..575e26510 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -53,6 +53,8 @@ import signal # Aggressively exit on ctrl+c +import multiprocessing + signal.signal(signal.SIGINT, lambda sig, frame: os._exit(0)) # Assume advantage kernel has been built if CUDA compiler is available @@ -121,6 +123,7 @@ def __init__(self, config, vecenv, policy, logger=None): self.ep_indices = torch.arange(total_agents, device=device, dtype=torch.int32) self.free_idx = total_agents self.render = config["render"] + self.render_async = config["render_async"] self.render_interval = config["render_interval"] if self.render: @@ -514,9 +517,17 @@ def train(self): path=bin_path, silent=True, ) - pufferlib.utils.render_videos( - self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path - ) + + if self.render_async: + render_proc = multiprocessing.Process( + target=pufferlib.utils.render_videos, + args=(self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path) + ) + render_proc.start() + else: + pufferlib.utils.render_videos( + self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path + ) except Exception as e: print(f"Failed to export model weights: {e}") From f8349dcabfd753a464bc35842fe61a89e34e0c76 Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:54 -0500 Subject: [PATCH 2/9] Fixed to serializable render_videos for python multiprocessing to work. Added render_queue for async rendering and logging videos --- pufferlib/config/ocean/drive.ini | 10 +++--- pufferlib/pufferl.py | 56 +++++++++++++++++++++++++++++--- pufferlib/utils.py | 27 ++++++++++----- 3 files changed, 76 insertions(+), 17 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index d0b8aaf45..d6617fda9 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -46,8 +46,8 @@ offroad_behavior = 0 episode_length = 91 resample_frequency = 910 termination_mode = 1 # 0 - terminate at episode_length, 1 - terminate after all agents have been reset -map_dir = "resources/drive/binaries/training" -num_maps = 10000 +map_dir = "resources/drive/binaries/carla/carla_3D" +num_maps = 5 ; If True, allows training with fewer maps than requested (warns instead of erroring) allow_fewer_maps = True ; Determines which step of the trajectory to initialize the agents at upon reset @@ -84,11 +84,11 @@ vf_clip_coef = 0.1999999999999999 vf_coef = 2 vtrace_c_clip = 1 vtrace_rho_clip = 1 -checkpoint_interval = 1000 +checkpoint_interval = 1 ; Rendering options render = True -render_async = False -render_interval = 1000 +render_async = True +render_interval = 1 ; If True, show exactly what the agent sees in agent observation obs_only = True ; Show grid lines diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 575e26510..9bfe35acf 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -54,6 +54,7 @@ import signal # Aggressively exit on ctrl+c import multiprocessing +import queue signal.signal(signal.SIGINT, lambda sig, frame: os._exit(0)) @@ -123,12 +124,15 @@ def __init__(self, config, vecenv, policy, logger=None): self.ep_indices = torch.arange(total_agents, device=device, dtype=torch.int32) self.free_idx = total_agents self.render = config["render"] - self.render_async = config["render_async"] + self.render_async = config["render_async"] and self.render # Only supported if rendering is enabled self.render_interval = config["render_interval"] if self.render: ensure_drive_binary() + if self.render_async: + self.render_queue = multiprocessing.Queue() + # LSTM if config["use_rnn"]: n = vecenv.agents_per_batch @@ -517,16 +521,21 @@ def train(self): path=bin_path, silent=True, ) - + + env_cfg = getattr(self.vecenv, "driver_env", None) + wandb_log = True if hasattr(self.logger, "wandb") and self.logger.wandb else False + wandb_run = self.logger.wandb if hasattr(self.logger, "wandb") else None if self.render_async: + print("Starting async render process...") render_proc = multiprocessing.Process( target=pufferlib.utils.render_videos, - args=(self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path) + args=(self.config, env_cfg, self.logger.run_id, wandb_log, self.epoch, self.global_step, bin_path, self.render_async, self.render_queue) ) render_proc.start() + print(f"Started async render process with PID {render_proc.pid}") else: pufferlib.utils.render_videos( - self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path + self.config, env_cfg, self.logger.run_id, wandb_log, self.epoch, self.global_step, bin_path, self.render_async, wandb_run=wandb_run ) except Exception as e: @@ -542,7 +551,41 @@ def train(self): ): pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) + def check_render_queue(self): + """Check if any async render jobs finished and log them.""" + if not self.render_async or not hasattr(self, 'render_queue'): + return + + start_time = time.time() + try: + while not self.render_queue.empty(): + result = self.render_queue.get_nowait() + step = result["step"] + videos = result["videos"] + + # Log to wandb if available + if hasattr(self.logger, "wandb") and self.logger.wandb: + import wandb + payload = {} + if videos["output_topdown"]: + payload["render/world_state"] = [wandb.Video(p) for p in videos["output_topdown"]] + if videos["output_agent"]: + payload["render/agent_view"] = [wandb.Video(p) for p in videos["output_agent"]] + + if payload: + self.logger.wandb.log(payload, step=step) + print(f"Logged async videos for step {step} in {time.time() - start_time:.2f}s") + + except queue.Empty: + pass + except Exception as e: + print(f"Error reading render queue: {e}") + pass + def mean_and_log(self): + # Check render queue for finished async jobs + self.check_render_queue() + config = self.config for k in list(self.stats.keys()): v = self.stats[k] @@ -578,6 +621,11 @@ def mean_and_log(self): def close(self): self.vecenv.close() self.utilization.stop() + + if self.render_async and hasattr(self, 'render_queue'): + self.render_queue.close() + self.render_queue.join_thread() + model_path = self.save_checkpoint() run_id = self.logger.run_id path = os.path.join(self.config["data_dir"], f"{self.config['env']}_{run_id}.pt") diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 860a61934..24e92e4f5 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -167,7 +167,7 @@ def run_wosac_eval_in_subprocess(config, logger, global_step): print(f"Failed to run WOSAC evaluation: {type(e).__name__}: {e}") -def render_videos(config, vecenv, logger, epoch, global_step, bin_path): +def render_videos(config, env_cfg, run_id, wandb_log, epoch, global_step, bin_path, render_async, render_queue=None, wandb_run=None): """ Generate and log training videos using C-based rendering. @@ -186,7 +186,6 @@ def render_videos(config, vecenv, logger, epoch, global_step, bin_path): print(f"Binary weights file does not exist: {bin_path}") return - run_id = logger.run_id model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") # Now call the C rendering function @@ -206,7 +205,7 @@ def render_videos(config, vecenv, logger, epoch, global_step, bin_path): env_vars["ASAN_OPTIONS"] = "exitcode=0" # Base command with only visualization flags (env config comes from INI) - base_cmd = ["xvfb-run", "-a", "-s", "-screen 0 1280x720x24", "./visualize"] + base_cmd = ["./visualize"] # Visualization config flags only if config.get("show_grid", False): @@ -230,7 +229,6 @@ def render_videos(config, vecenv, logger, epoch, global_step, bin_path): base_cmd.extend(["--view", view_mode]) # Get num_maps if available - env_cfg = getattr(vecenv, "driver_env", None) if env_cfg is not None and getattr(env_cfg, "num_maps", None): base_cmd.extend(["--num-maps", str(env_cfg.num_maps)]) @@ -262,6 +260,7 @@ def render_videos(config, vecenv, logger, epoch, global_step, bin_path): # Collect videos to log as lists so W&B shows all in the same step videos_to_log_world = [] videos_to_log_agent = [] + generated_videos = {"output_topdown": [], "output_agent": []} for i, map_path in enumerate(render_maps): cmd = list(base_cmd) # copy @@ -277,25 +276,31 @@ def render_videos(config, vecenv, logger, epoch, global_step, bin_path): vids_exist = os.path.exists("resources/drive/output_topdown.mp4") and os.path.exists( "resources/drive/output_agent.mp4" ) + print("Videos Generated:", vids_exist) if result.returncode == 0 or (result.returncode == 1 and vids_exist): videos = [ ( + "output_topdown", "resources/drive/output_topdown.mp4", f"epoch_{epoch:06d}_map{i:02d}_topdown.mp4" if map_path else f"epoch_{epoch:06d}_topdown.mp4", ), ( + "output_agent", "resources/drive/output_agent.mp4", f"epoch_{epoch:06d}_map{i:02d}_agent.mp4" if map_path else f"epoch_{epoch:06d}_agent.mp4", ), ] - for source_vid, target_filename in videos: + for vid_type, source_vid, target_filename in videos: if os.path.exists(source_vid): target_path = os.path.join(video_output_dir, target_filename) shutil.move(source_vid, target_path) + generated_videos[vid_type].append(target_path) + if render_async: + continue # Accumulate for a single wandb.log call - if hasattr(logger, "wandb") and logger.wandb: + if wandb_log: import wandb if "topdown" in target_filename: @@ -307,14 +312,20 @@ def render_videos(config, vecenv, logger, epoch, global_step, bin_path): else: print(f"C rendering failed (map index {i}) with exit code {result.returncode}: {result.stdout}") + if render_async: + render_queue.put({ + "videos": generated_videos, + "step": global_step, + }) + # Log all videos at once so W&B keeps all of them under the same step - if hasattr(logger, "wandb") and logger.wandb and (videos_to_log_world or videos_to_log_agent): + if wandb_log and (videos_to_log_world or videos_to_log_agent) and not render_async: payload = {} if videos_to_log_world: payload["render/world_state"] = videos_to_log_world if videos_to_log_agent: payload["render/agent_view"] = videos_to_log_agent - logger.wandb.log(payload, step=global_step) + wandb_run.log(payload, step=global_step) except subprocess.TimeoutExpired: print("C rendering timed out") From 039b966999a13bfa4431ac1ac9431c66487d4a03 Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:54 -0500 Subject: [PATCH 3/9] Monotonous wandb step fix by using a custom render_step --- pufferlib/config/ocean/drive.ini | 20 ++++++++-------- pufferlib/pufferl.py | 41 ++++++++++++++++++++++++-------- pufferlib/utils.py | 19 ++++++++------- 3 files changed, 52 insertions(+), 28 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index d6617fda9..fdb79ff83 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -5,9 +5,9 @@ policy_name = Drive rnn_name = Recurrent [vec] -num_workers = 4 -num_envs = 4 -batch_size = 2 +num_workers = 8 +num_envs = 8 +batch_size = 4 ; backend = Serial [policy] @@ -19,7 +19,7 @@ input_size = 256 hidden_size = 256 [env] -num_agents = 256 +num_agents = 1024 ; Options: discrete, continuous action_type = discrete ; Options: classic, jerk @@ -46,8 +46,8 @@ offroad_behavior = 0 episode_length = 91 resample_frequency = 910 termination_mode = 1 # 0 - terminate at episode_length, 1 - terminate after all agents have been reset -map_dir = "resources/drive/binaries/carla/carla_3D" -num_maps = 5 +map_dir = "resources/drive/binaries/training" +num_maps = 10000 ; If True, allows training with fewer maps than requested (warns instead of erroring) allow_fewer_maps = True ; Determines which step of the trajectory to initialize the agents at upon reset @@ -84,11 +84,11 @@ vf_clip_coef = 0.1999999999999999 vf_coef = 2 vtrace_c_clip = 1 vtrace_rho_clip = 1 -checkpoint_interval = 1 +checkpoint_interval = 50 ; Rendering options render = True -render_async = True -render_interval = 1 +render_async = False # Render interval of below 50 might cause process starvation and slowness in training +render_interval = 50 ; If True, show exactly what the agent sees in agent observation obs_only = True ; Show grid lines @@ -100,7 +100,7 @@ show_human_logs = False ; If True, zoom in on a part of the map. Otherwise, show full map zoom_in = True ; Options: List[str to path], str to path (e.g., "resources/drive/training/binaries/map_001.bin"), None -render_map = none +render_map = "resources/drive/binaries/carla_data/map_000.bin" [eval] eval_interval = 1000 diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 9bfe35acf..9d330ac8c 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -124,7 +124,7 @@ def __init__(self, config, vecenv, policy, logger=None): self.ep_indices = torch.arange(total_agents, device=device, dtype=torch.int32) self.free_idx = total_agents self.render = config["render"] - self.render_async = config["render_async"] and self.render # Only supported if rendering is enabled + self.render_async = config["render_async"] and self.render # Only supported if rendering is enabled self.render_interval = config["render_interval"] if self.render: @@ -200,6 +200,9 @@ def __init__(self, config, vecenv, policy, logger=None): self.logger = logger if logger is None: self.logger = NoLogger(config) + if self.render_async: + self.logger.wandb.define_metric("render_step", hidden=True) + self.logger.wandb.define_metric("render/*", step_metric="render_step") # Learning rate scheduler epochs = config["total_timesteps"] // config["batch_size"] @@ -526,16 +529,33 @@ def train(self): wandb_log = True if hasattr(self.logger, "wandb") and self.logger.wandb else False wandb_run = self.logger.wandb if hasattr(self.logger, "wandb") else None if self.render_async: - print("Starting async render process...") render_proc = multiprocessing.Process( target=pufferlib.utils.render_videos, - args=(self.config, env_cfg, self.logger.run_id, wandb_log, self.epoch, self.global_step, bin_path, self.render_async, self.render_queue) + args=( + self.config, + env_cfg, + self.logger.run_id, + wandb_log, + self.epoch, + self.global_step, + bin_path, + self.render_async, + self.render_queue, + ), ) render_proc.start() print(f"Started async render process with PID {render_proc.pid}") else: pufferlib.utils.render_videos( - self.config, env_cfg, self.logger.run_id, wandb_log, self.epoch, self.global_step, bin_path, self.render_async, wandb_run=wandb_run + self.config, + env_cfg, + self.logger.run_id, + wandb_log, + self.epoch, + self.global_step, + bin_path, + self.render_async, + wandb_run=wandb_run, ) except Exception as e: @@ -553,19 +573,19 @@ def train(self): def check_render_queue(self): """Check if any async render jobs finished and log them.""" - if not self.render_async or not hasattr(self, 'render_queue'): + if not self.render_async or not hasattr(self, "render_queue"): return - start_time = time.time() try: while not self.render_queue.empty(): result = self.render_queue.get_nowait() step = result["step"] videos = result["videos"] - + # Log to wandb if available if hasattr(self.logger, "wandb") and self.logger.wandb: import wandb + payload = {} if videos["output_topdown"]: payload["render/world_state"] = [wandb.Video(p) for p in videos["output_topdown"]] @@ -573,8 +593,9 @@ def check_render_queue(self): payload["render/agent_view"] = [wandb.Video(p) for p in videos["output_agent"]] if payload: - self.logger.wandb.log(payload, step=step) - print(f"Logged async videos for step {step} in {time.time() - start_time:.2f}s") + # Custom step for render logs to prevent monotonic loggic wandb errors + payload["render_step"] = step + self.logger.wandb.log(payload) except queue.Empty: pass @@ -622,7 +643,7 @@ def close(self): self.vecenv.close() self.utilization.stop() - if self.render_async and hasattr(self, 'render_queue'): + if self.render_async and hasattr(self, "render_queue"): self.render_queue.close() self.render_queue.join_thread() diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 24e92e4f5..8f4705f8f 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -167,7 +167,9 @@ def run_wosac_eval_in_subprocess(config, logger, global_step): print(f"Failed to run WOSAC evaluation: {type(e).__name__}: {e}") -def render_videos(config, env_cfg, run_id, wandb_log, epoch, global_step, bin_path, render_async, render_queue=None, wandb_run=None): +def render_videos( + config, env_cfg, run_id, wandb_log, epoch, global_step, bin_path, render_async, render_queue=None, wandb_run=None +): """ Generate and log training videos using C-based rendering. @@ -205,7 +207,7 @@ def render_videos(config, env_cfg, run_id, wandb_log, epoch, global_step, bin_pa env_vars["ASAN_OPTIONS"] = "exitcode=0" # Base command with only visualization flags (env config comes from INI) - base_cmd = ["./visualize"] + base_cmd = ["xvfb-run", "-a", "-s", "-screen 0 1280x720x24", "./visualize"] # Visualization config flags only if config.get("show_grid", False): @@ -276,7 +278,6 @@ def render_videos(config, env_cfg, run_id, wandb_log, epoch, global_step, bin_pa vids_exist = os.path.exists("resources/drive/output_topdown.mp4") and os.path.exists( "resources/drive/output_agent.mp4" ) - print("Videos Generated:", vids_exist) if result.returncode == 0 or (result.returncode == 1 and vids_exist): videos = [ @@ -298,7 +299,7 @@ def render_videos(config, env_cfg, run_id, wandb_log, epoch, global_step, bin_pa shutil.move(source_vid, target_path) generated_videos[vid_type].append(target_path) if render_async: - continue + continue # Accumulate for a single wandb.log call if wandb_log: import wandb @@ -313,10 +314,12 @@ def render_videos(config, env_cfg, run_id, wandb_log, epoch, global_step, bin_pa print(f"C rendering failed (map index {i}) with exit code {result.returncode}: {result.stdout}") if render_async: - render_queue.put({ - "videos": generated_videos, - "step": global_step, - }) + render_queue.put( + { + "videos": generated_videos, + "step": global_step, + } + ) # Log all videos at once so W&B keeps all of them under the same step if wandb_log and (videos_to_log_world or videos_to_log_agent) and not render_async: From b02e56584768887fc895c4d107e317b953620f2d Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:54 -0500 Subject: [PATCH 4/9] binary and video files naming convention fixes for render intervals less than 50 --- pufferlib/config/ocean/drive.ini | 16 ++++++++-------- pufferlib/ocean/drive/drive.py | 10 +++++----- pufferlib/pufferl.py | 9 ++++++--- pufferlib/utils.py | 29 ++++++++++++++++------------- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index fdb79ff83..d3d5c2980 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -5,8 +5,8 @@ policy_name = Drive rnn_name = Recurrent [vec] -num_workers = 8 -num_envs = 8 +num_workers = 16 +num_envs = 16 batch_size = 4 ; backend = Serial @@ -19,7 +19,7 @@ input_size = 256 hidden_size = 256 [env] -num_agents = 1024 +num_agents = 256 ; Options: discrete, continuous action_type = discrete ; Options: classic, jerk @@ -59,7 +59,7 @@ init_mode = "create_all_valid" [train] seed=42 -total_timesteps = 2_000_000_000 +total_timesteps = 2_000_000_00 ; learning_rate = 0.02 ; gamma = 0.985 anneal_lr = True @@ -84,11 +84,11 @@ vf_clip_coef = 0.1999999999999999 vf_coef = 2 vtrace_c_clip = 1 vtrace_rho_clip = 1 -checkpoint_interval = 50 +checkpoint_interval = 1 ; Rendering options render = True -render_async = False # Render interval of below 50 might cause process starvation and slowness in training -render_interval = 50 +render_async = True # Render interval of below 50 might cause process starvation and slowness in training +render_interval = 1 ; If True, show exactly what the agent sees in agent observation obs_only = True ; Show grid lines @@ -100,7 +100,7 @@ show_human_logs = False ; If True, zoom in on a part of the map. Otherwise, show full map zoom_in = True ; Options: List[str to path], str to path (e.g., "resources/drive/training/binaries/map_001.bin"), None -render_map = "resources/drive/binaries/carla_data/map_000.bin" +render_map = none [eval] eval_interval = 1000 diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index c533fc648..61c4f7ed1 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -145,17 +145,17 @@ def __init__( # Check maps availability available_maps = len(self.map_files) - if num_maps > available_maps: + if self.num_maps > available_maps: if allow_fewer_maps: print("\n" + "=" * 80) print("WARNING: FEWER MAPS THAN REQUESTED") - print(f"Requested {num_maps} maps but only {available_maps} available in {map_dir}") + print(f"Requested {self.num_maps} maps but only {available_maps} available in {map_dir}") print(f"Maps will be randomly sampled from the {available_maps} available maps.") print("=" * 80 + "\n") - num_maps = available_maps + self.num_maps = available_maps else: raise ValueError( - f"num_maps ({num_maps}) exceeds available maps in directory ({available_maps}). " + f"num_maps ({self.num_maps}) exceeds available maps in directory ({available_maps}). " f"Please reduce num_maps, add more maps to {map_dir}, or set allow_fewer_maps=True." ) self.max_controlled_agents = int(max_controlled_agents) @@ -164,7 +164,7 @@ def __init__( agent_offsets, map_ids, num_envs = binding.shared( map_files=self.map_files, num_agents=num_agents, - num_maps=num_maps, + num_maps=self.num_maps, init_mode=self.init_mode, control_mode=self.control_mode, init_steps=self.init_steps, diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 9d330ac8c..cbdc0a2c9 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -200,7 +200,7 @@ def __init__(self, config, vecenv, policy, logger=None): self.logger = logger if logger is None: self.logger = NoLogger(config) - if self.render_async: + if self.render_async and hasattr(self.logger, "wandb") and self.logger.wandb: self.logger.wandb.define_metric("render_step", hidden=True) self.logger.wandb.define_metric("render/*", step_metric="render_step") @@ -525,6 +525,9 @@ def train(self): silent=True, ) + bin_path_epoch = f"{model_dir}_epoch_{self.epoch:06d}.bin" + shutil.copy2(bin_path, bin_path_epoch) + env_cfg = getattr(self.vecenv, "driver_env", None) wandb_log = True if hasattr(self.logger, "wandb") and self.logger.wandb else False wandb_run = self.logger.wandb if hasattr(self.logger, "wandb") else None @@ -538,7 +541,7 @@ def train(self): wandb_log, self.epoch, self.global_step, - bin_path, + bin_path_epoch, self.render_async, self.render_queue, ), @@ -553,7 +556,7 @@ def train(self): wandb_log, self.epoch, self.global_step, - bin_path, + bin_path_epoch, self.render_async, wandb_run=wandb_run, ) diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 8f4705f8f..5d83f5bd4 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -196,10 +196,9 @@ def render_videos( video_output_dir = os.path.join(model_dir, "videos") os.makedirs(video_output_dir, exist_ok=True) - # Copy the binary weights to the expected location - expected_weights_path = "resources/drive/puffer_drive_weights.bin" - os.makedirs(os.path.dirname(expected_weights_path), exist_ok=True) - shutil.copy2(bin_path, expected_weights_path) + # Record the path of the latest weights file + latest_weights_path = "resources/drive/puffer_drive_weights.bin" + os.makedirs(os.path.dirname(latest_weights_path), exist_ok=True) # TODO: Fix memory leaks so that this is not needed # Suppress AddressSanitizer exit code (temp) @@ -234,6 +233,8 @@ def render_videos( if env_cfg is not None and getattr(env_cfg, "num_maps", None): base_cmd.extend(["--num-maps", str(env_cfg.num_maps)]) + base_cmd.extend(["--policy-name", bin_path]) + # Handle single or multiple map rendering render_maps = config.get("render_map", None) if render_maps is None or render_maps == "none": @@ -263,6 +264,8 @@ def render_videos( videos_to_log_world = [] videos_to_log_agent = [] generated_videos = {"output_topdown": [], "output_agent": []} + output_topdown = f"resources/drive/output_topdown_{epoch}.mp4" + output_agent = f"resources/drive/output_agent_{epoch}.mp4" for i, map_path in enumerate(render_maps): cmd = list(base_cmd) # copy @@ -270,25 +273,23 @@ def render_videos( cmd.extend(["--map-name", str(map_path)]) # Output paths (overwrite each iteration; then moved/renamed) - cmd.extend(["--output-topdown", "resources/drive/output_topdown.mp4"]) - cmd.extend(["--output-agent", "resources/drive/output_agent.mp4"]) + cmd.extend(["--output-topdown", output_topdown]) + cmd.extend(["--output-agent", output_agent]) result = subprocess.run(cmd, cwd=os.getcwd(), capture_output=True, text=True, timeout=600, env=env_vars) - vids_exist = os.path.exists("resources/drive/output_topdown.mp4") and os.path.exists( - "resources/drive/output_agent.mp4" - ) + vids_exist = os.path.exists(output_topdown) and os.path.exists(output_agent) if result.returncode == 0 or (result.returncode == 1 and vids_exist): videos = [ ( "output_topdown", - "resources/drive/output_topdown.mp4", + output_topdown, f"epoch_{epoch:06d}_map{i:02d}_topdown.mp4" if map_path else f"epoch_{epoch:06d}_topdown.mp4", ), ( "output_agent", - "resources/drive/output_agent.mp4", + output_agent, f"epoch_{epoch:06d}_map{i:02d}_agent.mp4" if map_path else f"epoch_{epoch:06d}_agent.mp4", ), ] @@ -337,5 +338,7 @@ def render_videos( finally: # Clean up bin weights file - if os.path.exists(expected_weights_path): - os.remove(expected_weights_path) + if os.path.exists(latest_weights_path): + os.remove(latest_weights_path) + if os.path.exists(bin_path): + os.remove(bin_path) From 55684d3db1baaa5e7e205205eb232ee817936500 Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:54 -0500 Subject: [PATCH 5/9] minor changes --- pufferlib/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 5d83f5bd4..71415af64 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -311,6 +311,10 @@ def render_videos( videos_to_log_agent.append(wandb.Video(target_path, format="mp4")) else: print(f"Video generation completed but {source_vid} not found") + if result.stdout: + print(f"StdOUT: {result.stdout}") + if result.stderr: + print(f"StdERR: {result.stderr}") else: print(f"C rendering failed (map index {i}) with exit code {result.returncode}: {result.stdout}") From 8de7f30ef34f759f7ff4c175aea2f57da369c08d Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:54 -0500 Subject: [PATCH 6/9] limit renders by num_workers --- pufferlib/pufferl.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index cbdc0a2c9..805c87f45 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -132,6 +132,7 @@ def __init__(self, config, vecenv, policy, logger=None): if self.render_async: self.render_queue = multiprocessing.Queue() + self.render_processes = [] # LSTM if config["use_rnn"]: @@ -532,6 +533,17 @@ def train(self): wandb_log = True if hasattr(self.logger, "wandb") and self.logger.wandb else False wandb_run = self.logger.wandb if hasattr(self.logger, "wandb") else None if self.render_async: + # Clean up finished processes + self.render_processes = [p for p in self.render_processes if p.is_alive()] + + # Cap the number of processes to num_workers + max_processes = self.config.get("num_workers") + if len(self.render_processes) >= max_processes: + print("Waiting for render processes to finish...") + while len(self.render_processes) >= max_processes: + time.sleep(1) + self.render_processes = [p for p in self.render_processes if p.is_alive()] + render_proc = multiprocessing.Process( target=pufferlib.utils.render_videos, args=( @@ -547,7 +559,7 @@ def train(self): ), ) render_proc.start() - print(f"Started async render process with PID {render_proc.pid}") + self.render_processes.append(render_proc) else: pufferlib.utils.render_videos( self.config, @@ -1082,6 +1094,8 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None): logger = None train_config = dict(**args["train"], env=env_name, eval=args.get("eval", {})) + if "vec" in args and "num_workers" in args["vec"]: + train_config["num_workers"] = args["vec"]["num_workers"] pufferl = PuffeRL(train_config, vecenv, policy, logger) all_logs = [] From d81f3b378a491a259e10d474cf8c365bc8e4b7a7 Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:55 -0500 Subject: [PATCH 7/9] code cleanup --- pufferlib/config/ocean/drive.ini | 10 +++++----- pufferlib/pufferl.py | 29 +++++++++++++++++++++-------- pufferlib/utils.py | 15 +++++++++------ 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index d3d5c2980..f68434856 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -19,7 +19,7 @@ input_size = 256 hidden_size = 256 [env] -num_agents = 256 +num_agents = 1024 ; Options: discrete, continuous action_type = discrete ; Options: classic, jerk @@ -59,7 +59,7 @@ init_mode = "create_all_valid" [train] seed=42 -total_timesteps = 2_000_000_00 +total_timesteps = 2_000_000_000 ; learning_rate = 0.02 ; gamma = 0.985 anneal_lr = True @@ -84,11 +84,11 @@ vf_clip_coef = 0.1999999999999999 vf_coef = 2 vtrace_c_clip = 1 vtrace_rho_clip = 1 -checkpoint_interval = 1 +checkpoint_interval = 1000 ; Rendering options render = True -render_async = True # Render interval of below 50 might cause process starvation and slowness in training -render_interval = 1 +render_async = False # Render interval of below 50 might cause process starvation and slowness in training +render_interval = 1000 ; If True, show exactly what the agent sees in agent observation obs_only = True ; Show grid lines diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 805c87f45..38093f884 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -537,7 +537,7 @@ def train(self): self.render_processes = [p for p in self.render_processes if p.is_alive()] # Cap the number of processes to num_workers - max_processes = self.config.get("num_workers") + max_processes = self.config.get("num_workers", 1) if len(self.render_processes) >= max_processes: print("Waiting for render processes to finish...") while len(self.render_processes) >= max_processes: @@ -603,12 +603,12 @@ def check_render_queue(self): payload = {} if videos["output_topdown"]: - payload["render/world_state"] = [wandb.Video(p) for p in videos["output_topdown"]] + payload["render/world_state"] = [wandb.Video(p, format="mp4") for p in videos["output_topdown"]] if videos["output_agent"]: - payload["render/agent_view"] = [wandb.Video(p) for p in videos["output_agent"]] + payload["render/agent_view"] = [wandb.Video(p, format="mp4") for p in videos["output_agent"]] if payload: - # Custom step for render logs to prevent monotonic loggic wandb errors + # Custom step for render logs to prevent monotonic logic wandb errors payload["render_step"] = step self.logger.wandb.log(payload) @@ -616,7 +616,6 @@ def check_render_queue(self): pass except Exception as e: print(f"Error reading render queue: {e}") - pass def mean_and_log(self): # Check render queue for finished async jobs @@ -658,9 +657,23 @@ def close(self): self.vecenv.close() self.utilization.stop() - if self.render_async and hasattr(self, "render_queue"): - self.render_queue.close() - self.render_queue.join_thread() + if self.render_async: # Ensure all render processes are properly terminated before closing the queue if hasattr(self, "render_processes"): for p in self.render_processes: try: if p.is_alive(): p.terminate() p.join(timeout=5) if p.is_alive(): p.kill() except Exception: # Best-effort cleanup; avoid letting close() crash on process errors pass # Optionally clear the list to drop references to finished processes self.render_processes = [] if hasattr(self, "render_queue"): self.render_queue.close() self.render_queue.join_thread() + if hasattr(self, "render_processes"): + for p in self.render_processes: + try: + if p.is_alive(): + p.terminate() + p.join(timeout=5) + if p.is_alive(): + p.kill() + except Exception: + # Best-effort cleanup; avoid letting close() crash on process errors + print(f"Failed to terminate render process {p.pid}") + # Optionally clear the list to drop references to finished processes + self.render_processes = [] + if hasattr(self, "render_queue"): + self.render_queue.close() + self.render_queue.join_thread() model_path = self.save_checkpoint() run_id = self.logger.run_id diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 71415af64..c23f89502 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -264,7 +264,7 @@ def render_videos( videos_to_log_world = [] videos_to_log_agent = [] generated_videos = {"output_topdown": [], "output_agent": []} - output_topdown = f"resources/drive/output_topdown_{epoch}.mp4" + output_topdown = f"resources/drive/output_topdown_{epoch}" output_agent = f"resources/drive/output_agent_{epoch}.mp4" for i, map_path in enumerate(render_maps): @@ -272,24 +272,27 @@ def render_videos( if map_path is not None and os.path.exists(map_path): cmd.extend(["--map-name", str(map_path)]) + output_topdown_map = output_topdown + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") + output_agent_map = output_agent + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") + # Output paths (overwrite each iteration; then moved/renamed) - cmd.extend(["--output-topdown", output_topdown]) - cmd.extend(["--output-agent", output_agent]) + cmd.extend(["--output-topdown", output_topdown_map]) + cmd.extend(["--output-agent", output_agent_map]) result = subprocess.run(cmd, cwd=os.getcwd(), capture_output=True, text=True, timeout=600, env=env_vars) - vids_exist = os.path.exists(output_topdown) and os.path.exists(output_agent) + vids_exist = os.path.exists(output_topdown_map) and os.path.exists(output_agent_map) if result.returncode == 0 or (result.returncode == 1 and vids_exist): videos = [ ( "output_topdown", - output_topdown, + output_topdown_map, f"epoch_{epoch:06d}_map{i:02d}_topdown.mp4" if map_path else f"epoch_{epoch:06d}_topdown.mp4", ), ( "output_agent", - output_agent, + output_agent_map, f"epoch_{epoch:06d}_map{i:02d}_agent.mp4" if map_path else f"epoch_{epoch:06d}_agent.mp4", ), ] From 84e8f2e3015fcd8a6a1c939e797d38e49be05810 Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:55 -0500 Subject: [PATCH 8/9] pre-commit fixes --- pufferlib/pufferl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 38093f884..cdd2b20f2 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -657,7 +657,7 @@ def close(self): self.vecenv.close() self.utilization.stop() - if self.render_async: # Ensure all render processes are properly terminated before closing the queue if hasattr(self, "render_processes"): for p in self.render_processes: try: if p.is_alive(): p.terminate() p.join(timeout=5) if p.is_alive(): p.kill() except Exception: # Best-effort cleanup; avoid letting close() crash on process errors pass # Optionally clear the list to drop references to finished processes self.render_processes = [] if hasattr(self, "render_queue"): self.render_queue.close() self.render_queue.join_thread() + if self.render_async: # Ensure all render processes are properly terminated before closing the queue if hasattr(self, "render_processes"): for p in self.render_processes: try: if p.is_alive(): p.terminate() p.join(timeout=5) if p.is_alive(): p.kill() except Exception: # Best-effort cleanup; avoid letting close() crash on process errors pass # Optionally clear the list to drop references to finished processes self.render_processes = [] if hasattr(self, "render_queue"): self.render_queue.close() self.render_queue.join_thread() if hasattr(self, "render_processes"): for p in self.render_processes: try: From dd1abac40cd917100b5c688670f1dd0d0c64954b Mon Sep 17 00:00:00 2001 From: mpragnay Date: Thu, 29 Jan 2026 14:50:55 -0500 Subject: [PATCH 9/9] Code cleanup --- pufferlib/pufferl.py | 2 +- pufferlib/utils.py | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index cdd2b20f2..908bd4537 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -657,7 +657,7 @@ def close(self): self.vecenv.close() self.utilization.stop() - if self.render_async: # Ensure all render processes are properly terminated before closing the queue if hasattr(self, "render_processes"): for p in self.render_processes: try: if p.is_alive(): p.terminate() p.join(timeout=5) if p.is_alive(): p.kill() except Exception: # Best-effort cleanup; avoid letting close() crash on process errors pass # Optionally clear the list to drop references to finished processes self.render_processes = [] if hasattr(self, "render_queue"): self.render_queue.close() self.render_queue.join_thread() + if self.render_async: # Ensure all render processes are properly terminated before closing the queue if hasattr(self, "render_processes"): for p in self.render_processes: try: diff --git a/pufferlib/utils.py b/pufferlib/utils.py index c23f89502..1f2ccd514 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -196,10 +196,6 @@ def render_videos( video_output_dir = os.path.join(model_dir, "videos") os.makedirs(video_output_dir, exist_ok=True) - # Record the path of the latest weights file - latest_weights_path = "resources/drive/puffer_drive_weights.bin" - os.makedirs(os.path.dirname(latest_weights_path), exist_ok=True) - # TODO: Fix memory leaks so that this is not needed # Suppress AddressSanitizer exit code (temp) env_vars = os.environ.copy() @@ -265,7 +261,7 @@ def render_videos( videos_to_log_agent = [] generated_videos = {"output_topdown": [], "output_agent": []} output_topdown = f"resources/drive/output_topdown_{epoch}" - output_agent = f"resources/drive/output_agent_{epoch}.mp4" + output_agent = f"resources/drive/output_agent_{epoch}" for i, map_path in enumerate(render_maps): cmd = list(base_cmd) # copy @@ -345,7 +341,5 @@ def render_videos( finally: # Clean up bin weights file - if os.path.exists(latest_weights_path): - os.remove(latest_weights_path) if os.path.exists(bin_path): os.remove(bin_path)