diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..1148d7f6 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +*.md text working-tree-encoding=UTF-8 +*.rst text working-tree-encoding=UTF-8 \ No newline at end of file diff --git a/docs/source/model_steps.md b/docs/source/model_steps.md index 7c02b811..215c3138 100644 --- a/docs/source/model_steps.md +++ b/docs/source/model_steps.md @@ -8,15 +8,15 @@ Source : https://github.com/mobility-team/mobility/issues/145#issuecomment-32280 Le fonctionnement actuel est le suivant : Initialisation : -- Génération des séquences de motifs de déplacement dans chaque zone de transport, selon le profil de la population résidente (CSP, nombre de voitures du ménage, type de catégorie urbaine de la commune), et des besoins en heures d'activité pour chaque étape des séquences. -- Calcul des opportunités disponibles (=heures d'activités disponibles) par motif, pour chaque zone de transport. +- Génération des séquences de motifs de déplacement dans chaque zone de transport, selon le profil de la population résidente (CSP, nombre de voitures du ménage, type de catégorie urbaine de la commune), et des besoins en heures d'activité pour chaque étape des séquences. +- Calcul des opportunités disponibles (=heures d'activités disponibles) par motif, pour chaque zone de transport. Boucle : -- Calcul des coûts généralisés de transport pour chaque couple motif - origine - destination (sans congestion pour la première itération). -- Calcul des probabilités de choisir une destination en fonction du motif et de l'origine du déplacement ainsi que du lieu de résidence des personnes. -- Echantillonnage d'une séquence de destinations pour chaque séquence de motifs, zone de transport de résidence et CSP. -- Recherche des top k séquences de modes disponibles pour réaliser ces séquences de déplacements (k<=10) -- Calcul des flux résultants par OD et par mode, puis recalcul des coûts généralisés. -- Calcul d'une part de personnes qui vont changer d'assignation séquence de motifs + modes (en fonction de la saturation des opportunités à destination, de possibilités d'optimisation comparatives, et d'une part de changements aléatoires). -- Calcul des opportunités restantes à destination. -- Recommencement de la procédure avec cette part de personnes non assignées. +- Calcul des coûts généralisés de transport pour chaque couple motif - origine - destination (sans congestion pour la première itération). +- Calcul des probabilités de choisir une destination en fonction du motif et de l'origine du déplacement ainsi que du lieu de résidence des personnes. +- Echantillonnage d'une séquence de destinations pour chaque séquence de motifs, zone de transport de résidence et CSP. +- Recherche des top k séquences de modes disponibles pour réaliser ces séquences de déplacements (k<=10) +- Calcul des flux résultants par OD et par mode, puis recalcul des coûts généralisés. +- Calcul d'une part de personnes qui vont changer d'assignation séquence de motifs + modes (en fonction de la saturation des opportunités à destination, de possibilités d'optimisation comparatives, et d'une part de changements aléatoires). +- Calcul des opportunités restantes à destination. +- Recommencement de la procédure avec cette part de personnes non assignées. diff --git a/mobility/choice_models/population_trips.py b/mobility/choice_models/population_trips.py index 8b916d83..befc1a74 100644 --- a/mobility/choice_models/population_trips.py +++ b/mobility/choice_models/population_trips.py @@ -23,6 +23,14 @@ from mobility.motives import Motive, HomeMotive, OtherMotive from mobility.transport_modes.transport_mode import TransportMode from mobility.parsers.mobility_survey import MobilitySurvey +from mobility.choice_models.population_trips_checkpoint import PopulationTripsCheckpointAsset +from mobility.choice_models.population_trips_resume import ( + compute_resume_plan, + try_load_checkpoint, + restore_state_or_fresh_start, + prune_tmp_artifacts, + rehydrate_congestion_snapshot, +) class PopulationTrips(FileAsset): """ @@ -336,7 +344,23 @@ def run_model(self, is_weekday): parameters = self.inputs["parameters"] cache_path = self.cache_path["weekday_flows"] if is_weekday is True else self.cache_path["weekend_flows"] - tmp_folders = self.prepare_tmp_folders(cache_path) + + run_key = self.inputs_hash + resume_plan = compute_resume_plan( + run_key=run_key, + is_weekday=is_weekday, + n_iterations=parameters.n_iterations, + ) + if resume_plan.resume_from_iter is None: + logging.info("No checkpoint found for run_key=%s is_weekday=%s. Starting from scratch.", run_key, str(is_weekday)) + else: + logging.info( + "Latest checkpoint found for run_key=%s is_weekday=%s: iteration=%s", + run_key, + str(is_weekday), + str(resume_plan.resume_from_iter), + ) + tmp_folders = self.prepare_tmp_folders(cache_path, resume=(resume_plan.resume_from_iter is not None)) chains_by_motive, chains, demand_groups = self.state_initializer.get_chains( population, @@ -369,8 +393,38 @@ def run_model(self, is_weekday): ) remaining_sinks = sinks.clone() - - for iteration in range(1, parameters.n_iterations+1): + start_iteration = 1 + + if resume_plan.resume_from_iter is not None: + ckpt = try_load_checkpoint( + run_key=run_key, + is_weekday=is_weekday, + iteration=resume_plan.resume_from_iter, + ) + current_states, remaining_sinks, restored = restore_state_or_fresh_start( + ckpt=ckpt, + stay_home_state=stay_home_state, + sinks=sinks, + rng=self.rng, + ) + + if restored: + start_iteration = resume_plan.start_iteration + logging.info( + "Resuming PopulationTrips from checkpoint: run_key=%s is_weekday=%s iteration=%s", + run_key, + str(is_weekday), + str(resume_plan.resume_from_iter), + ) + prune_tmp_artifacts(tmp_folders=tmp_folders, keep_up_to_iter=resume_plan.resume_from_iter) + costs = rehydrate_congestion_snapshot( + costs_aggregator=costs_aggregator, + run_key=run_key, + last_completed_iter=resume_plan.resume_from_iter, + n_iter_per_cost_update=parameters.n_iter_per_cost_update, + ) + + for iteration in range(start_iteration, parameters.n_iterations+1): logging.info(f"Iteration n°{iteration}") @@ -423,7 +477,8 @@ def run_model(self, is_weekday): iteration, parameters.n_iter_per_cost_update, current_states_steps, - costs_aggregator + costs_aggregator, + run_key=self.inputs_hash ) remaining_sinks = self.state_updater.get_new_sinks( @@ -431,6 +486,36 @@ def run_model(self, is_weekday): sinks, motives ) + + # Save per-iteration checkpoint after all state has been advanced. + try: + PopulationTripsCheckpointAsset( + run_key=run_key, + is_weekday=is_weekday, + iteration=iteration, + current_states=current_states, + remaining_sinks=remaining_sinks, + rng_state=self.rng.getstate(), + ).create_and_get_asset() + except Exception: + logging.exception("Failed to save checkpoint for iteration %s.", str(iteration)) + + # If we resumed after completing all iterations (or start_iteration > n_iterations), + # rebuild step-level flows from cached artifacts for final output. + if "current_states_steps" not in locals(): + possible_states_steps = self.state_updater.get_possible_states_steps( + current_states, + demand_groups, + chains_by_motive, + costs_aggregator, + remaining_sinks, + motive_dur, + parameters.n_iterations, + motives, + parameters.min_activity_time_constant, + tmp_folders + ) + current_states_steps = self.state_updater.get_current_states_steps(current_states, possible_states_steps) costs = costs_aggregator.get_costs_by_od_and_mode( @@ -464,9 +549,25 @@ def run_model(self, is_weekday): ) return current_states_steps, sinks, demand_groups, costs, chains + + def remove(self, remove_checkpoints: bool = True): + """Remove cached outputs for this PopulationTrips run. + + By default this also removes any saved checkpoints for this run_key, to avoid + resuming from stale intermediate state after a "clean" remove. + """ + super().remove() + + if remove_checkpoints: + run_key = self.inputs_hash + removed = 0 + removed += PopulationTripsCheckpointAsset.remove_checkpoints_for_run(run_key=run_key, is_weekday=True) + removed += PopulationTripsCheckpointAsset.remove_checkpoints_for_run(run_key=run_key, is_weekday=False) + if removed > 0: + logging.info("Removed %s checkpoint files for run_key=%s", str(removed), str(run_key)) - def prepare_tmp_folders(self, cache_path): + def prepare_tmp_folders(self, cache_path, resume: bool = False): """Create per-run temp folders next to the cache path. Args: @@ -478,14 +579,15 @@ def prepare_tmp_folders(self, cache_path): inputs_hash = str(cache_path.stem).split("-")[0] - def rm_then_mkdirs(folder_name): + def ensure_dir(folder_name): path = cache_path.parent / (inputs_hash + "-" + folder_name) - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) + if resume is False: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path, exist_ok=True) return path folders = ["spatialized-chains", "modes", "flows", "sequences-index"] - folders = {f: rm_then_mkdirs(f) for f in folders} + folders = {f: ensure_dir(f) for f in folders} return folders @@ -583,7 +685,7 @@ def plot_modal_share(self, zone="origin", mode="car", period="weekdays", if mode == "public_transport": mode_name = "Public transport" - mode_share["mode"] = mode_share["mode"].replace("\S+\/public_transport\/\S+", "public_transport", regex=True) + mode_share["mode"] = mode_share["mode"].replace(r"\S+/public_transport/\S+", "public_transport", regex=True) else: mode_name = mode.capitalize() mode_share = mode_share[mode_share["mode"] == mode] diff --git a/mobility/choice_models/population_trips_checkpoint.py b/mobility/choice_models/population_trips_checkpoint.py new file mode 100644 index 00000000..2c641438 --- /dev/null +++ b/mobility/choice_models/population_trips_checkpoint.py @@ -0,0 +1,194 @@ +import os +import json +import pickle +import pathlib +import logging +import re + +import polars as pl + +from mobility.file_asset import FileAsset + + +class PopulationTripsCheckpointAsset(FileAsset): + """Per-iteration checkpoint for PopulationTrips to enable crash-safe resume. + + The checkpoint is keyed by: + - run_key: PopulationTrips.inputs_hash (includes the seed and all params) + - is_weekday: True/False + - iteration: last completed iteration k + + Payload: + - current_states (pl.DataFrame) + - remaining_sinks (pl.DataFrame) + - rng_state (pickle of random.Random.getstate()) + + Notes: + - We write a JSON meta file last so incomplete checkpoints are ignored. + - This asset is intentionally not part of the main model dependency graph; + it is only used as an optional resume source. + """ + + SCHEMA_VERSION = 1 + + def __init__( + self, + *, + run_key: str, + is_weekday: bool, + iteration: int, + current_states: pl.DataFrame | None = None, + remaining_sinks: pl.DataFrame | None = None, + rng_state=None, + ): + self._payload_current_states = current_states + self._payload_remaining_sinks = remaining_sinks + self._payload_rng_state = rng_state + + inputs = { + "run_key": str(run_key), + "is_weekday": bool(is_weekday), + "iteration": int(iteration), + "schema_version": self.SCHEMA_VERSION, + } + + project_folder = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + period = "weekday" if is_weekday else "weekend" + base_dir = project_folder / "population_trips" / period / "checkpoints" + + stem = f"checkpoint_{run_key}_iter_{int(iteration)}" + cache_path = { + "current_states": base_dir / f"{stem}_current_states.parquet", + "remaining_sinks": base_dir / f"{stem}_remaining_sinks.parquet", + "rng_state": base_dir / f"{stem}_rng_state.pkl", + "meta": base_dir / f"{stem}.json", + } + + super().__init__(inputs, cache_path) + + def get_cached_asset(self): + current_states = pl.read_parquet(self.cache_path["current_states"]) + remaining_sinks = pl.read_parquet(self.cache_path["remaining_sinks"]) + with open(self.cache_path["rng_state"], "rb") as f: + rng_state = pickle.load(f) + + meta = {} + try: + with open(self.cache_path["meta"], "r", encoding="utf-8") as f: + meta = json.load(f) + except Exception: + # Meta is only for convenience; payload files are the source of truth. + pass + + return { + "current_states": current_states, + "remaining_sinks": remaining_sinks, + "rng_state": rng_state, + "meta": meta, + } + + def create_and_get_asset(self): + for p in self.cache_path.values(): + pathlib.Path(p).parent.mkdir(parents=True, exist_ok=True) + + if self._payload_current_states is None or self._payload_remaining_sinks is None or self._payload_rng_state is None: + raise ValueError("Checkpoint payload is missing (current_states, remaining_sinks, rng_state).") + + def atomic_write_bytes(final_path: pathlib.Path, data: bytes): + tmp = pathlib.Path(str(final_path) + ".tmp") + with open(tmp, "wb") as f: + f.write(data) + os.replace(tmp, final_path) + + def atomic_write_text(final_path: pathlib.Path, text: str): + tmp = pathlib.Path(str(final_path) + ".tmp") + with open(tmp, "w", encoding="utf-8") as f: + f.write(text) + os.replace(tmp, final_path) + + # Write payload first + tmp_states = pathlib.Path(str(self.cache_path["current_states"]) + ".tmp") + self._payload_current_states.write_parquet(tmp_states) + os.replace(tmp_states, self.cache_path["current_states"]) + + tmp_sinks = pathlib.Path(str(self.cache_path["remaining_sinks"]) + ".tmp") + self._payload_remaining_sinks.write_parquet(tmp_sinks) + os.replace(tmp_sinks, self.cache_path["remaining_sinks"]) + + atomic_write_bytes(self.cache_path["rng_state"], pickle.dumps(self._payload_rng_state, protocol=pickle.HIGHEST_PROTOCOL)) + + # Meta last, so readers only see complete checkpoints. + meta = { + "run_key": self.inputs["run_key"], + "is_weekday": self.inputs["is_weekday"], + "iteration": self.inputs["iteration"], + "schema_version": self.SCHEMA_VERSION, + } + atomic_write_text(self.cache_path["meta"], json.dumps(meta, sort_keys=True)) + + logging.info( + "Checkpoint saved: run_key=%s is_weekday=%s iteration=%s", + self.inputs["run_key"], + str(self.inputs["is_weekday"]), + str(self.inputs["iteration"]), + ) + + return self.get_cached_asset() + + @staticmethod + def find_latest_checkpoint_iter(*, run_key: str, is_weekday: bool) -> int | None: + project_folder = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + period = "weekday" if is_weekday else "weekend" + base_dir = project_folder / "population_trips" / period / "checkpoints" + if not base_dir.exists(): + return None + + # FileAsset prefixes filenames with its own inputs_hash, so we match on the suffix. + pattern = f"*checkpoint_{run_key}_iter_*.json" + candidates = list(base_dir.glob(pattern)) + if not candidates: + return None + + rx = re.compile(rf"checkpoint_{re.escape(run_key)}_iter_(\d+)\.json$") + best = None + for p in candidates: + m = rx.search(p.name) + if not m: + continue + it = int(m.group(1)) + if best is None or it > best: + best = it + + return best + + @staticmethod + def remove_checkpoints_for_run(*, run_key: str, is_weekday: bool) -> int: + """Remove all checkpoint files for a given run_key and period. + + Returns number of files removed. + """ + project_folder = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + period = "weekday" if is_weekday else "weekend" + base_dir = project_folder / "population_trips" / period / "checkpoints" + if not base_dir.exists(): + return 0 + + # FileAsset prefixes filenames with its own inputs_hash, so just match suffix fragments. + pattern = f"*checkpoint_{run_key}_iter_*" + removed = 0 + for p in base_dir.glob(pattern): + try: + p.unlink(missing_ok=True) + removed += 1 + except Exception: + logging.exception("Failed to remove checkpoint file: %s", str(p)) + + # Also delete any stray tmp files. + for p in base_dir.glob(pattern + ".tmp"): + try: + p.unlink(missing_ok=True) + removed += 1 + except Exception: + logging.exception("Failed to remove checkpoint tmp file: %s", str(p)) + + return removed diff --git a/mobility/choice_models/population_trips_resume.py b/mobility/choice_models/population_trips_resume.py new file mode 100644 index 00000000..313b24f3 --- /dev/null +++ b/mobility/choice_models/population_trips_resume.py @@ -0,0 +1,193 @@ +import logging +from dataclasses import dataclass + +import polars as pl +import pandas as pd + +from mobility.choice_models.population_trips_checkpoint import PopulationTripsCheckpointAsset +from mobility.transport_costs.od_flows_asset import VehicleODFlowsAsset + + +@dataclass(frozen=True) +class ResumePlan: + """Plan for resuming a PopulationTrips run.""" + + run_key: str + is_weekday: bool + resume_from_iter: int | None # last completed iteration to resume from (k), or None + start_iteration: int # first iteration to compute (k+1, or 1) + + +def compute_resume_plan(*, run_key: str, is_weekday: bool, n_iterations: int) -> ResumePlan: + """Computes the resume plan for a PopulationTrips run. + + This inspects the checkpoint folder and returns: + - the last completed iteration (k), if a checkpoint exists + - the next iteration to compute (k+1), or 1 when no checkpoint exists + + Note: If k == n_iterations, then start_iteration == n_iterations + 1 and + callers should treat the iteration loop as complete (no-op). + + Args: + run_key: Hash-like identifier for the run. Must match + PopulationTrips.inputs_hash. + is_weekday: Whether this is the weekday simulation (True) or weekend (False). + n_iterations: Total number of iterations configured for this run. + + Returns: + ResumePlan describing whether to resume and which iteration to start from. + """ + latest = PopulationTripsCheckpointAsset.find_latest_checkpoint_iter( + run_key=run_key, + is_weekday=is_weekday, + ) + if latest is None: + return ResumePlan(run_key=run_key, is_weekday=is_weekday, resume_from_iter=None, start_iteration=1) + + k = min(int(latest), int(n_iterations)) + return ResumePlan(run_key=run_key, is_weekday=is_weekday, resume_from_iter=k, start_iteration=k + 1) + + +def try_load_checkpoint(*, run_key: str, is_weekday: bool, iteration: int): + """Loads a checkpoint payload (best-effort). + + Args: + run_key: Run identifier. + is_weekday: Weekday/weekend selector. + iteration: Iteration number k (last completed). + + Returns: + The checkpoint payload dict as returned by PopulationTripsCheckpointAsset.get(), + or None if loading fails for any reason. + """ + try: + return PopulationTripsCheckpointAsset( + run_key=run_key, + is_weekday=is_weekday, + iteration=iteration, + ).get() + except Exception: + logging.exception("Failed to load checkpoint (run_key=%s, is_weekday=%s, iteration=%s).", run_key, str(is_weekday), str(iteration)) + return None + + +def restore_state_or_fresh_start( + *, + ckpt, + stay_home_state: pl.DataFrame, + sinks: pl.DataFrame, + rng, +): + """Restores iteration state from a checkpoint or returns a clean start state. + + This is the core of resume correctness: to continue deterministically, both + the model state and the RNG state must be restored. + + Args: + ckpt: Checkpoint payload dict (or None) returned by try_load_checkpoint(). + stay_home_state: Baseline "stay home" state used to build a clean start. + sinks: Initial sinks; used to build a clean start remaining_sinks. + rng: random.Random instance to restore with rng.setstate(...). + + Returns: + Tuple of: + - current_states: pl.DataFrame + - remaining_sinks: pl.DataFrame + - restored: bool indicating whether checkpoint restoration succeeded. + """ + + fresh_current_states = ( + stay_home_state + .select(["demand_group_id", "iteration", "motive_seq_id", "mode_seq_id", "dest_seq_id", "utility", "n_persons"]) + .clone() + ) + fresh_remaining_sinks = sinks.clone() + + if ckpt is None: + return fresh_current_states, fresh_remaining_sinks, False + + try: + rng.setstate(ckpt["rng_state"]) + except Exception: + logging.exception("Failed to restore RNG state from checkpoint; restarting from scratch.") + return fresh_current_states, fresh_remaining_sinks, False + + return ckpt["current_states"], ckpt["remaining_sinks"], True + + +def prune_tmp_artifacts(*, tmp_folders, keep_up_to_iter: int) -> None: + """Deletes temp artifacts beyond the last completed iteration. + + If a run crashed mid-iteration, temp parquet files for that iteration may + exist. This ensures we don't accidentally reuse partial artifacts on resume. + + Args: + tmp_folders: Dict of temp folders produced by PopulationTrips.prepare_tmp_folders(). + keep_up_to_iter: Last completed iteration k; any artifacts for >k are removed. + """ + try: + for p in tmp_folders["spatialized-chains"].glob("spatialized_chains_*.parquet"): + it = int(p.stem.split("_")[-1]) + if it > keep_up_to_iter: + p.unlink(missing_ok=True) + for p in tmp_folders["modes"].glob("mode_sequences_*.parquet"): + it = int(p.stem.split("_")[-1]) + if it > keep_up_to_iter: + p.unlink(missing_ok=True) + except Exception: + logging.exception("Failed to prune temp artifacts on resume. Continuing anyway.") + + +def rehydrate_congestion_snapshot( + *, + costs_aggregator, + run_key: str, + last_completed_iter: int, + n_iter_per_cost_update: int, +): + """Rehydrates congestion snapshot state for deterministic resume. + + The model stores a pointer to the "current congestion snapshot" in-memory. + After a crash/restart, that pointer is lost, even though the snapshot files + are cached on disk. This function reloads the last applicable flow asset and + re-applies it so that subsequent cost lookups use the same congested costs + as an uninterrupted run. + + Args: + costs_aggregator: TravelCostsAggregator instance from PopulationTrips inputs. + run_key: Run identifier (PopulationTrips.inputs_hash). + last_completed_iter: Last completed iteration k. + n_iter_per_cost_update: Update cadence. 0 means no congestion feedback. + + Returns: + A costs dataframe from costs_aggregator.get(...), using congested costs + when rehydration succeeds, or falling back to free-flow on failure. + """ + if n_iter_per_cost_update <= 0 or last_completed_iter < 1: + return costs_aggregator.get(congestion=False) + + last_update_iter = 1 + ((last_completed_iter - 1) // n_iter_per_cost_update) * n_iter_per_cost_update + if last_update_iter < 1: + return costs_aggregator.get(congestion=False) + + try: + # Load the existing flow asset for the last congestion update iteration. + flow_asset = VehicleODFlowsAsset( + vehicle_od_flows=pd.DataFrame({"from": [], "to": [], "vehicle_volume": []}), + run_key=run_key, + iteration=last_update_iter, + mode_name="car", + ) + flow_asset.get() + + # Apply snapshot to the road mode so get(congestion=True) is aligned. + for mode in costs_aggregator.modes: + if getattr(mode, "congestion", False) and getattr(mode, "name", None) == "car": + # Restore the in-memory pointer to the correct congestion snapshot. + mode.travel_costs.apply_flow_snapshot(flow_asset) + break + + return costs_aggregator.get(congestion=True) + except Exception: + logging.exception("Failed to rehydrate congestion snapshot on resume; falling back to free-flow costs until next update.") + return costs_aggregator.get(congestion=False) diff --git a/mobility/choice_models/state_updater.py b/mobility/choice_models/state_updater.py index 46f0bff3..e4e3bbc8 100644 --- a/mobility/choice_models/state_updater.py +++ b/mobility/choice_models/state_updater.py @@ -467,7 +467,7 @@ def get_current_states_steps(self, current_states, possible_states_steps): - def get_new_costs(self, costs, iteration, n_iter_per_cost_update, current_states_steps, costs_aggregator): + def get_new_costs(self, costs, iteration, n_iter_per_cost_update, current_states_steps, costs_aggregator, run_key=None): """Optionally recompute congested costs from current flows. Aggregates OD flows by mode, updates network/user-equilibrium in the @@ -496,8 +496,12 @@ def get_new_costs(self, costs, iteration, n_iter_per_cost_update, current_states flow_volume=pl.col("n_persons").sum() ) ) - - costs_aggregator.update(od_flows_by_mode) + + has_congestion = any(getattr(m, "congestion", False) for m in costs_aggregator.modes) + + # Only build/update congestion snapshots when at least one mode handles congestion. + if has_congestion: + costs_aggregator.update(od_flows_by_mode, run_key=run_key, iteration=iteration) costs = costs_aggregator.get(congestion=True) return costs @@ -572,4 +576,4 @@ def get_new_sinks( ) - return remaining_sinks \ No newline at end of file + return remaining_sinks diff --git a/mobility/choice_models/travel_costs_aggregator.py b/mobility/choice_models/travel_costs_aggregator.py index 719a150a..216c0aba 100644 --- a/mobility/choice_models/travel_costs_aggregator.py +++ b/mobility/choice_models/travel_costs_aggregator.py @@ -1,3 +1,4 @@ +import os import polars as pl import logging @@ -174,7 +175,7 @@ def get_prob_by_od_and_mode(self, metrics: List, congestion: bool): return prob - def update(self, od_flows_by_mode): + def update(self, od_flows_by_mode, run_key=None, iteration=None): logging.info("Updating travel costs given OD flows...") @@ -218,6 +219,39 @@ def update(self, od_flows_by_mode): raise ValueError("No flow volume to vehicle volume model for mode : " + mode.name) - mode.travel_costs.update(flows) + flow_asset = None + if run_key is not None and iteration is not None: + # Persist vehicle flows as a first-class asset so downstream congestion + # snapshots are isolated per run/iteration and safe for parallel runs. + from mobility.transport_costs.od_flows_asset import VehicleODFlowsAsset + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + try: + n_rows = flows.height + vol_sum = float(flows["vehicle_volume"].sum()) if "vehicle_volume" in flows.columns else float("nan") + except Exception: + n_rows, vol_sum = None, None + logging.info( + "Congestion update input: run_key=%s iteration=%s mode=%s rows=%s vehicle_volume_sum=%s", + str(run_key), + str(iteration), + str(mode.name), + str(n_rows), + str(vol_sum), + ) + flow_asset = VehicleODFlowsAsset( + flows.to_pandas(), + run_key=str(run_key), + iteration=int(iteration), + mode_name=str(mode.name) + ) + flow_asset.get() + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "Flow asset ready: inputs_hash=%s path=%s", + flow_asset.inputs_hash, + str(flow_asset.cache_path), + ) + + mode.travel_costs.update(flows, flow_asset=flow_asset) - \ No newline at end of file + diff --git a/mobility/experiments/hash_stability/Dockerfile b/mobility/experiments/hash_stability/Dockerfile new file mode 100644 index 00000000..1dcbd8f4 --- /dev/null +++ b/mobility/experiments/hash_stability/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.12-slim +ARG POLARS_VERSION=1.37.1 +WORKDIR /app +RUN pip install --no-cache-dir polars==${POLARS_VERSION} +COPY hash_stability.py /app/hash_stability.py +CMD ["python", "hash_stability.py"] \ No newline at end of file diff --git a/mobility/experiments/hash_stability/hash_stability.py b/mobility/experiments/hash_stability/hash_stability.py new file mode 100644 index 00000000..dd4582aa --- /dev/null +++ b/mobility/experiments/hash_stability/hash_stability.py @@ -0,0 +1,40 @@ +import json, platform, sys +import polars as pl + +SEED = 12345 + +# Replace this with your real df if you want: +df = pl.DataFrame( + { + "demand_group_id": pl.Series([13, 25, 16, 16, 9, 2029, 2028, 2032, 2030, 2029], dtype=pl.UInt32), + "home_zone_id": pl.Series([1, 1, 1, 1, 1, 77, 77, 77, 77, 77], dtype=pl.Int32), + "motive_seq_id": pl.Series([241, 241, 215, 228, 143, 237, 235, 227, 215, 241], dtype=pl.UInt32), + "motive": pl.Series( + ["work"] * 10, + dtype=pl.Enum(["home", "other", "studies", "work"]), + ), + "to": pl.Series([76, 76, 76, 76, 76, 63, 63, 63, 63, 63], dtype=pl.Int32), + "p_ij": pl.Series([0.185129]*5 + [0.010314]*5, dtype=pl.Float64), + } +) + +hashes = ( + df.select( + pl.struct(["demand_group_id", "motive_seq_id", "motive", "to"]) + .hash(seed=SEED) + .alias("h") + )["h"] + .to_list() +) + +payload = { + "polars_version": pl.__version__, + "python_version": sys.version.split()[0], + "machine": platform.machine(), + "platform": platform.platform(), + "seed": SEED, + "hashes": hashes, +} + +print("CURRENT:") +print(json.dumps(payload, indent=2)) \ No newline at end of file diff --git a/mobility/transport_costs/od_flows_asset.py b/mobility/transport_costs/od_flows_asset.py new file mode 100644 index 00000000..bd0b9da6 --- /dev/null +++ b/mobility/transport_costs/od_flows_asset.py @@ -0,0 +1,59 @@ +import os +import pathlib +import pandas as pd +import logging + +from mobility.file_asset import FileAsset + + +class VehicleODFlowsAsset(FileAsset): + """Persist vehicle OD flows for congestion as a first-class FileAsset. + + This intentionally stores only what the congestion builder needs: + ["from","to","vehicle_volume"]. + + The cache key is (run_key, iteration, mode_name), where run_key should be + PopulationTrips.inputs_hash (includes the seed). + """ + + def __init__(self, vehicle_od_flows: pd.DataFrame, *, run_key: str, iteration: int, mode_name: str): + inputs = { + "run_key": str(run_key), + "iteration": int(iteration), + "mode_name": str(mode_name), + "schema_version": 1 + } + folder_path = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + cache_path = folder_path / "od_flows" / f"vehicle_od_flows_{mode_name}.parquet" + + self._vehicle_od_flows = vehicle_od_flows + super().__init__(inputs, cache_path) + + def get_cached_asset(self) -> pd.DataFrame: + df = pd.read_parquet(self.cache_path) + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "VehicleODFlowsAsset cache hit: inputs_hash=%s path=%s rows=%s", + self.inputs_hash, + str(self.cache_path), + df.shape[0], + ) + return df + + def create_and_get_asset(self) -> pd.DataFrame: + self.cache_path.parent.mkdir(parents=True, exist_ok=True) + + # Ensure the file always exists and has the expected schema, even if empty. + df = self._vehicle_od_flows + expected_cols = ["from", "to", "vehicle_volume"] + df = df[expected_cols] if all(c in df.columns for c in expected_cols) else df + df.to_parquet(self.cache_path, index=False) + + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "VehicleODFlowsAsset written: inputs_hash=%s path=%s rows=%s", + self.inputs_hash, + str(self.cache_path), + df.shape[0], + ) + return df diff --git a/mobility/transport_costs/path_travel_costs.py b/mobility/transport_costs/path_travel_costs.py index f3d6dce7..8fe19941 100644 --- a/mobility/transport_costs/path_travel_costs.py +++ b/mobility/transport_costs/path_travel_costs.py @@ -14,6 +14,9 @@ from mobility.path_routing_parameters import PathRoutingParameters from mobility.transport_modes.osm_capacity_parameters import OSMCapacityParameters from mobility.transport_graphs.speed_modifier import SpeedModifier +from mobility.transport_graphs.congested_path_graph_snapshot import CongestedPathGraphSnapshot +from mobility.transport_graphs.contracted_path_graph_snapshot import ContractedPathGraphSnapshot +from mobility.transport_costs.path_travel_costs_snapshot import PathTravelCostsSnapshot from typing import List @@ -45,7 +48,7 @@ def __init__( osm_capacity_parameters: OSMCapacityParameters, congestion: bool = False, congestion_flows_scaling_factor: float = 1.0, - speed_modifiers: List[SpeedModifier] = [] + speed_modifiers: List[SpeedModifier] = [], ): """ Initializes a TravelCosts object with the given transport zones and travel mode. @@ -81,6 +84,10 @@ def __init__( super().__init__(inputs, cache_path) + # When congestion updates are used, we keep a pointer to the latest + # per-iteration snapshot so `get(congestion=True)` is isolated per run. + self._current_congested_snapshot = None + def get_cached_asset(self, congestion: bool = False) -> pd.DataFrame: """ Retrieves the travel costs DataFrame from the cache. @@ -92,7 +99,22 @@ def get_cached_asset(self, congestion: bool = False) -> pd.DataFrame: if congestion is False: path = self.cache_path["freeflow"] else: - path = self.cache_path["congested"] + if self._current_congested_snapshot is not None: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "PathTravelCosts.get(congestion=True) using snapshot: snapshot_hash=%s snapshot_path=%s", + self._current_congested_snapshot.inputs_hash, + str(self._current_congested_snapshot.cache_path), + ) + return self._current_congested_snapshot.get() + # If no congestion snapshot has been applied in this run, treat + # "congested" as free-flow to avoid reusing stale shared caches. + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "PathTravelCosts.get(congestion=True) no snapshot -> fallback to freeflow: %s", + str(self.cache_path["freeflow"]), + ) + path = self.cache_path["freeflow"] logging.info("Travel costs already prepared. Reusing the file : " + str(path)) costs = pd.read_parquet(path) @@ -117,7 +139,11 @@ def create_and_get_asset(self, congestion: bool = False) -> pd.DataFrame: if congestion is False: output_path = self.cache_path["freeflow"] else: - output_path = self.cache_path["congested"] + if self._current_congested_snapshot is not None: + return self._current_congested_snapshot.get() + # Same rationale as get_cached_asset(): without an applied snapshot, + # compute free-flow costs. + output_path = self.cache_path["freeflow"] costs = self.compute_costs_by_OD(self.transport_zones, self.contracted_path_graph, output_path) @@ -163,10 +189,62 @@ def compute_costs_by_OD( return costs - def update(self, od_flows): - - self.contracted_path_graph.update(od_flows) - self.create_and_get_asset(congestion=True) + def update(self, od_flows, flow_asset=None): + """Update congestion state. + + Legacy behavior (flow_asset is None) mutates the shared congested graph/costs. + New behavior (flow_asset provided) builds isolated per-iteration snapshot assets + and switches `get(congestion=True)` to use that snapshot. + """ + + if flow_asset is None: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "PathTravelCosts.update legacy(shared) path: mode=%s", + str(self.mode_name), + ) + self.contracted_path_graph.update(od_flows) + self._current_congested_snapshot = None + self.create_and_get_asset(congestion=True) + return + + self._apply_flow_snapshot(flow_asset) + + def apply_flow_snapshot(self, flow_asset) -> None: + """Repoint this mode's congested costs to the snapshot defined by `flow_asset`. + + This is primarily used when resuming a run from a checkpoint: the snapshot + files exist on disk, but the in-memory pointer to the "current snapshot" + is lost on restart. + """ + self._apply_flow_snapshot(flow_asset) + + def _apply_flow_snapshot(self, flow_asset) -> None: + congested_graph = CongestedPathGraphSnapshot( + modified_graph=self.modified_path_graph, + transport_zones=self.transport_zones, + vehicle_flows=flow_asset, + congestion_flows_scaling_factor=self.congested_path_graph.congestion_flows_scaling_factor, + ) + contracted_graph = ContractedPathGraphSnapshot(congested_graph) + + snapshot = PathTravelCostsSnapshot( + mode_name=self.mode_name, + transport_zones=self.transport_zones, + routing_parameters=self.routing_parameters, + contracted_graph=contracted_graph, + ) + + self._current_congested_snapshot = snapshot + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "PathTravelCosts snapshot selected: mode=%s flow_hash=%s snapshot_hash=%s snapshot_path=%s", + str(self.mode_name), + flow_asset.get_cached_hash(), + snapshot.inputs_hash, + str(snapshot.cache_path), + ) + snapshot.get() def clone(self): diff --git a/mobility/transport_costs/path_travel_costs_snapshot.py b/mobility/transport_costs/path_travel_costs_snapshot.py new file mode 100644 index 00000000..5f88f9bc --- /dev/null +++ b/mobility/transport_costs/path_travel_costs_snapshot.py @@ -0,0 +1,78 @@ +import os +import pathlib +import logging +import pandas as pd + +from importlib import resources + +from mobility.file_asset import FileAsset +from mobility.r_utils.r_script import RScript +from mobility.transport_zones import TransportZones +from mobility.path_routing_parameters import PathRoutingParameters +from mobility.transport_graphs.contracted_path_graph_snapshot import ContractedPathGraphSnapshot + + +class PathTravelCostsSnapshot(FileAsset): + """A per-run/iteration travel-cost snapshot based on a contracted graph snapshot.""" + + def __init__( + self, + *, + mode_name: str, + transport_zones: TransportZones, + routing_parameters: PathRoutingParameters, + contracted_graph: ContractedPathGraphSnapshot, + ): + inputs = { + "mode_name": str(mode_name), + "transport_zones": transport_zones, + "routing_parameters": routing_parameters, + "contracted_graph": contracted_graph, + "schema_version": 1, + } + + folder_path = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + cache_path = folder_path / f"travel_costs_congested_{mode_name}.parquet" + super().__init__(inputs, cache_path) + + def get_cached_asset(self) -> pd.DataFrame: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "Congested travel costs snapshot cache hit: inputs_hash=%s path=%s", + self.inputs_hash, + str(self.cache_path), + ) + else: + logging.info("Congested travel costs snapshot already prepared. Reusing: " + str(self.cache_path)) + return pd.read_parquet(self.cache_path) + + def create_and_get_asset(self) -> pd.DataFrame: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "Computing congested travel costs snapshot: inputs_hash=%s contracted_graph=%s out=%s", + self.inputs_hash, + str(self.inputs["contracted_graph"].cache_path), + str(self.cache_path), + ) + else: + logging.info("Computing congested travel costs snapshot...") + + transport_zones: TransportZones = self.inputs["transport_zones"] + contracted_graph: ContractedPathGraphSnapshot = self.inputs["contracted_graph"] + routing_parameters: PathRoutingParameters = self.inputs["routing_parameters"] + + transport_zones.get() + contracted_graph.get() + + script = RScript(resources.files('mobility.r_utils').joinpath('prepare_dodgr_costs.R')) + script.run( + args=[ + str(transport_zones.cache_path), + str(contracted_graph.cache_path), + str(routing_parameters.filter_max_speed), + str(routing_parameters.filter_max_time), + str(self.cache_path), + ] + ) + + return pd.read_parquet(self.cache_path) diff --git a/mobility/transport_graphs/congested_path_graph.py b/mobility/transport_graphs/congested_path_graph.py index 6cf68808..accdc70b 100644 --- a/mobility/transport_graphs/congested_path_graph.py +++ b/mobility/transport_graphs/congested_path_graph.py @@ -45,15 +45,17 @@ def get_cached_asset(self) -> pathlib.Path: return self.cache_path - def create_and_get_asset(self, enable_congestion: bool = False) -> pathlib.Path: + def create_and_get_asset(self, enable_congestion: bool = False, flows_file_path: pathlib.Path | None = None) -> pathlib.Path: logging.info("Loading graph with traffic...") + if flows_file_path is None: + flows_file_path = self.flows_file_path self.load_graph( self.modified_graph.get(), self.transport_zones.cache_path, enable_congestion, - self.flows_file_path, + flows_file_path, self.congestion_flows_scaling_factor, ) @@ -84,11 +86,16 @@ def load_graph( return None - def update(self, od_flows): + def update(self, od_flows, flow_asset=None): if self.handles_congestion is True: - od_flows.write_parquet(self.flows_file_path) - self.create_and_get_asset(enable_congestion=True) + if flow_asset is None: + od_flows.write_parquet(self.flows_file_path) + self.create_and_get_asset(enable_congestion=True) + else: + # flow_asset is expected to already be a parquet with the right schema. + flow_asset.get() + self.create_and_get_asset(enable_congestion=True, flows_file_path=flow_asset.cache_path) diff --git a/mobility/transport_graphs/congested_path_graph_snapshot.py b/mobility/transport_graphs/congested_path_graph_snapshot.py new file mode 100644 index 00000000..cb2e1ca5 --- /dev/null +++ b/mobility/transport_graphs/congested_path_graph_snapshot.py @@ -0,0 +1,87 @@ +import os +import pathlib +import logging + +from importlib import resources + +from mobility.file_asset import FileAsset +from mobility.r_utils.r_script import RScript +from mobility.transport_graphs.modified_path_graph import ModifiedPathGraph +from mobility.transport_zones import TransportZones +from mobility.transport_costs.od_flows_asset import VehicleODFlowsAsset + + +class CongestedPathGraphSnapshot(FileAsset): + """A per-run/iteration congested graph snapshot. + + This is the "variant" layer: it depends on a stable modified graph and a + VehicleODFlowsAsset, so different seeds/iterations produce distinct cache + files without invalidating upstream base graphs. + """ + + def __init__( + self, + modified_graph: ModifiedPathGraph, + transport_zones: TransportZones, + vehicle_flows: VehicleODFlowsAsset, + congestion_flows_scaling_factor: float, + ): + inputs = { + "mode_name": modified_graph.mode_name, + "modified_graph": modified_graph, + "transport_zones": transport_zones, + "vehicle_flows": vehicle_flows, + "congestion_flows_scaling_factor": float(congestion_flows_scaling_factor), + "schema_version": 1, + } + + mode_name = modified_graph.mode_name + folder_path = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + file_name = pathlib.Path("path_graph_" + mode_name) / "congested" / (mode_name + "-congested-path-graph") + cache_path = folder_path / file_name + + super().__init__(inputs, cache_path) + + def get_cached_asset(self) -> pathlib.Path: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + vf: VehicleODFlowsAsset = self.inputs["vehicle_flows"] + logging.info( + "Congested snapshot graph cache hit: inputs_hash=%s mode=%s flows_hash=%s path=%s", + self.inputs_hash, + self.inputs["mode_name"], + vf.get_cached_hash(), + str(self.cache_path), + ) + else: + logging.info("Congested snapshot graph already prepared. Reusing: " + str(self.cache_path)) + return self.cache_path + + def create_and_get_asset(self) -> pathlib.Path: + vehicle_flows: VehicleODFlowsAsset = self.inputs["vehicle_flows"] + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "Building congested snapshot graph: inputs_hash=%s mode=%s flows_hash=%s flows_path=%s out=%s", + self.inputs_hash, + self.inputs["mode_name"], + vehicle_flows.get_cached_hash(), + str(vehicle_flows.cache_path), + str(self.cache_path), + ) + else: + logging.info("Building congested snapshot graph...") + + vehicle_flows.get() # ensure parquet exists + + script = RScript(resources.files('mobility.transport_graphs').joinpath('load_path_graph.R')) + script.run( + args=[ + str(self.inputs["modified_graph"].get()), + str(self.inputs["transport_zones"].cache_path), + "True", + str(vehicle_flows.cache_path), + str(self.inputs["congestion_flows_scaling_factor"]), + str(self.cache_path), + ] + ) + + return self.cache_path diff --git a/mobility/transport_graphs/contracted_path_graph.py b/mobility/transport_graphs/contracted_path_graph.py index 1cc9a957..5c217582 100644 --- a/mobility/transport_graphs/contracted_path_graph.py +++ b/mobility/transport_graphs/contracted_path_graph.py @@ -60,13 +60,13 @@ def contract_graph( return None - def update(self, od_flows): + def update(self, od_flows, flow_asset=None): if self.congested_graph.handles_congestion is True: logging.info("Rebuilding contracted graph given OD flows and congestion...") - self.congested_graph.update(od_flows) + self.congested_graph.update(od_flows, flow_asset=flow_asset) self.create_and_get_asset() diff --git a/mobility/transport_graphs/contracted_path_graph_snapshot.py b/mobility/transport_graphs/contracted_path_graph_snapshot.py new file mode 100644 index 00000000..2ffb6dc9 --- /dev/null +++ b/mobility/transport_graphs/contracted_path_graph_snapshot.py @@ -0,0 +1,51 @@ +import os +import pathlib +import logging + +from importlib import resources + +from mobility.file_asset import FileAsset +from mobility.r_utils.r_script import RScript +from mobility.transport_graphs.congested_path_graph_snapshot import CongestedPathGraphSnapshot + + +class ContractedPathGraphSnapshot(FileAsset): + """A per-run/iteration contracted graph derived from a congested snapshot.""" + + def __init__(self, congested_graph: CongestedPathGraphSnapshot): + inputs = {"congested_graph": congested_graph, "schema_version": 1} + + mode_name = congested_graph.inputs["mode_name"] + folder_path = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"]) + file_name = pathlib.Path("path_graph_" + mode_name) / "contracted" / (mode_name + "-contracted-path-graph") + cache_path = folder_path / file_name + + super().__init__(inputs, cache_path) + + def get_cached_asset(self) -> pathlib.Path: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "Contracted snapshot graph cache hit: inputs_hash=%s path=%s", + self.inputs_hash, + str(self.cache_path), + ) + else: + logging.info("Contracted snapshot graph already prepared. Reusing: " + str(self.cache_path)) + return self.cache_path + + def create_and_get_asset(self) -> pathlib.Path: + if os.environ.get("MOBILITY_DEBUG_CONGESTION") == "1": + logging.info( + "Contracting snapshot graph: inputs_hash=%s in=%s out=%s", + self.inputs_hash, + str(self.inputs["congested_graph"].cache_path), + str(self.cache_path), + ) + else: + logging.info("Contracting snapshot graph...") + + congested_graph_path = self.inputs["congested_graph"].get() + script = RScript(resources.files('mobility.transport_graphs').joinpath('contract_path_graph.R')) + script.run(args=[str(congested_graph_path), str(self.cache_path)]) + + return self.cache_path diff --git a/mobility/transport_graphs/path_graph.py b/mobility/transport_graphs/path_graph.py index ba3f0805..5ad753d3 100644 --- a/mobility/transport_graphs/path_graph.py +++ b/mobility/transport_graphs/path_graph.py @@ -47,4 +47,4 @@ def __init__( - \ No newline at end of file + diff --git a/mobility/transport_modes/car/car_mode.py b/mobility/transport_modes/car/car_mode.py index 9f51e8ed..1e68a6c4 100644 --- a/mobility/transport_modes/car/car_mode.py +++ b/mobility/transport_modes/car/car_mode.py @@ -76,4 +76,4 @@ def __init__( vehicle="car", survey_ids=survey_ids ) - \ No newline at end of file +