From a85cd815b8c74fd73a83f140e6c75dd1762b493a Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Feb 2026 17:52:19 +0000 Subject: [PATCH 01/25] Add Modal GPU calibration to speed up CI Offload the Adam optimisation loop to Modal T4 GPU containers. Both calibrations (650 constituencies, 360 LAs) run in parallel on separate containers, so wall time becomes max(c_time, la_time) rather than the sum. - Extract _run_optimisation() helper from calibrate.py (device-agnostic) - Add modal_calibrate.py: Modal app wrapping the GPU loop - create_datasets.py: dispatch to Modal when MODAL_CALIBRATE=1, CPU fallback otherwise - push.yaml / pull_request.yaml: set MODAL_CALIBRATE=1 + token secrets - pyproject.toml: add modal to dev extras --- .github/workflows/pull_request.yaml | 3 + .github/workflows/push.yaml | 4 + .../datasets/create_datasets.py | 215 ++++++--- policyengine_uk_data/utils/calibrate.py | 416 +++++++++--------- policyengine_uk_data/utils/modal_calibrate.py | 94 ++++ pyproject.toml | 1 + 6 files changed, 466 insertions(+), 267 deletions(-) create mode 100644 policyengine_uk_data/utils/modal_calibrate.py diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index e7be9e6f..33baadef 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -53,6 +53,9 @@ jobs: run: make data env: TESTING: "1" + MODAL_CALIBRATE: "1" + MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }} + MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }} - name: Save calibration log (constituencies) uses: actions/upload-artifact@v4 with: diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index d4575eb6..9c019934 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -58,6 +58,10 @@ jobs: HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }} - name: Build datasets run: make data + env: + MODAL_CALIBRATE: "1" + MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }} + MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }} - name: Save calibration log (constituencies) uses: actions/upload-artifact@v4 with: diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 07f6b362..7d5ed54b 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -2,6 +2,9 @@ from policyengine_uk_data.storage import STORAGE_FOLDER import logging import os +import io +import numpy as np +import h5py from policyengine_uk_data.utils.uprating import uprate_dataset from policyengine_uk_data.utils.progress import ( ProcessingProgress, @@ -11,17 +14,90 @@ logging.basicConfig(level=logging.INFO) +USE_MODAL = os.environ.get("MODAL_CALIBRATE", "0") == "1" + + +def _dump(arr) -> bytes: + buf = io.BytesIO() + np.save(buf, arr) + return buf.getvalue() + + +def _build_weights_init(dataset, area_count, r): + areas_per_household = np.maximum(r.sum(axis=0), 1) + original_weights = np.log( + dataset.household.household_weight.values / areas_per_household + + np.random.random( + len(dataset.household.household_weight.values) + ) + * 0.01 + ) + return np.ones((area_count, len(original_weights))) * original_weights + + +def _run_modal_calibrations( + frs, + epochs, + create_constituency_target_matrix, + create_local_authority_target_matrix, + create_national_target_matrix, +): + """ + Dispatch both calibrations concurrently to Modal GPU containers. + Returns (constituency_weights, la_weights) as numpy arrays. + """ + from policyengine_uk_data.utils.modal_calibrate import ( + app, + run_calibration, + ) + + # Build arrays for constituencies + matrix_c, y_c, r_c = create_constituency_target_matrix(frs) + m_nat_c, y_nat_c = create_national_target_matrix(frs) + wi_c = _build_weights_init(frs, 650, r_c) + + # Build arrays for local authorities + matrix_la, y_la, r_la = create_local_authority_target_matrix(frs) + m_nat_la, y_nat_la = create_national_target_matrix(frs) + wi_la = _build_weights_init(frs, 360, r_la) + + def _arr(x): + return x.values if hasattr(x, "values") else x + + # Submit both jobs before waiting on either; Modal runs them in parallel + with app.run(): + fut_c = run_calibration.spawn( + _dump(_arr(matrix_c)), + _dump(_arr(y_c)), + _dump(r_c), + _dump(_arr(m_nat_c)), + _dump(_arr(y_nat_c)), + _dump(wi_c), + epochs, + ) + fut_la = run_calibration.spawn( + _dump(_arr(matrix_la)), + _dump(_arr(y_la)), + _dump(r_la), + _dump(_arr(m_nat_la)), + _dump(_arr(y_nat_la)), + _dump(wi_la), + epochs, + ) + weights_c = np.load(io.BytesIO(fut_c.get())) + weights_la = np.load(io.BytesIO(fut_la.get())) + + return weights_c, weights_la + def main(): """Create enhanced FRS dataset with rich progress tracking.""" try: - # Use reduced epochs and fidelity for testing is_testing = os.environ.get("TESTING", "0") == "1" epochs = 32 if is_testing else 512 progress_tracker = ProcessingProgress() - # Define dataset creation steps steps = [ "Create base FRS dataset", "Impute consumption", @@ -43,7 +119,6 @@ def main(): update_dataset, nested_progress, ): - # Create base FRS dataset update_dataset("Create base FRS dataset", "processing") frs = create_frs( raw_frs_folder=STORAGE_FOLDER / "frs_2023_24", @@ -52,7 +127,6 @@ def main(): frs.save(STORAGE_FOLDER / "frs_2023_24.h5") update_dataset("Create base FRS dataset", "completed") - # Import imputation functions from policyengine_uk_data.datasets.imputations import ( impute_consumption, impute_wealth, @@ -64,9 +138,6 @@ def main(): impute_student_loan_plan, ) - # Apply imputations with progress tracking - # Wealth must be imputed before consumption because consumption - # uses num_vehicles as a predictor for fuel spending update_dataset("Impute wealth", "processing") frs = impute_wealth(frs) update_dataset("Impute wealth", "completed") @@ -99,19 +170,10 @@ def main(): frs = impute_student_loan_plan(frs, year=2025) update_dataset("Impute student loan plan", "completed") - # Uprate dataset update_dataset("Uprate to 2025", "processing") frs = uprate_dataset(frs, 2025) update_dataset("Uprate to 2025", "completed") - # Calibrate constituency weights with nested progress - - update_dataset("Calibrate constituency weights", "processing") - - # Use a separate progress tracker for calibration with nested display - from policyengine_uk_data.utils.calibrate import ( - calibrate_local_areas, - ) from policyengine_uk_data.datasets.local_areas.constituencies.loss import ( create_constituency_target_matrix, ) @@ -121,23 +183,6 @@ def main(): from policyengine_uk_data.datasets.local_areas.constituencies.calibrate import ( get_performance, ) - - # Run calibration with verbose progress - frs_calibrated_constituencies = calibrate_local_areas( - dataset=frs, - epochs=epochs, - matrix_fn=create_constituency_target_matrix, - national_matrix_fn=create_national_target_matrix, - area_count=650, - weight_file="parliamentary_constituency_weights.h5", - excluded_training_targets=[], - log_csv="constituency_calibration_log.csv", - verbose=True, # Enable nested progress display - area_name="Constituency", - get_performance=get_performance, - nested_progress=nested_progress, # Pass the nested progress manager - ) - from policyengine_uk_data.datasets.local_areas.local_authorities.calibrate import ( get_performance as get_la_performance, ) @@ -145,43 +190,103 @@ def main(): create_local_authority_target_matrix, ) - # Run calibration with verbose progress - calibrate_local_areas( - dataset=frs, - epochs=epochs, - matrix_fn=create_local_authority_target_matrix, - national_matrix_fn=create_national_target_matrix, - area_count=360, - weight_file="local_authority_weights.h5", - excluded_training_targets=[], - log_csv="la_calibration_log.csv", - verbose=True, # Enable nested progress display - area_name="Local Authority", - get_performance=get_la_performance, - nested_progress=nested_progress, # Pass the nested progress manager - ) + if USE_MODAL: + update_dataset("Calibrate constituency weights", "processing") + update_dataset( + "Calibrate local authority weights", "processing" + ) - update_dataset("Calibrate dataset", "completed") + weights_c, weights_la = _run_modal_calibrations( + frs, + epochs, + create_constituency_target_matrix, + create_local_authority_target_matrix, + create_national_target_matrix, + ) + + with h5py.File( + STORAGE_FOLDER / "parliamentary_constituency_weights.h5", + "w", + ) as f: + f.create_dataset("2025", data=weights_c) + + with h5py.File( + STORAGE_FOLDER / "local_authority_weights.h5", "w" + ) as f: + f.create_dataset("2025", data=weights_la) + + frs_calibrated_constituencies = frs.copy() + frs_calibrated_constituencies.household.household_weight = ( + weights_c.sum(axis=0) + ) + + update_dataset( + "Calibrate constituency weights", "completed" + ) + update_dataset( + "Calibrate local authority weights", "completed" + ) + else: + from policyengine_uk_data.utils.calibrate import ( + calibrate_local_areas, + ) + + update_dataset("Calibrate constituency weights", "processing") + frs_calibrated_constituencies = calibrate_local_areas( + dataset=frs, + epochs=epochs, + matrix_fn=create_constituency_target_matrix, + national_matrix_fn=create_national_target_matrix, + area_count=650, + weight_file="parliamentary_constituency_weights.h5", + excluded_training_targets=[], + log_csv="constituency_calibration_log.csv", + verbose=True, + area_name="Constituency", + get_performance=get_performance, + nested_progress=nested_progress, + ) + update_dataset( + "Calibrate constituency weights", "completed" + ) + + update_dataset( + "Calibrate local authority weights", "processing" + ) + calibrate_local_areas( + dataset=frs, + epochs=epochs, + matrix_fn=create_local_authority_target_matrix, + national_matrix_fn=create_national_target_matrix, + area_count=360, + weight_file="local_authority_weights.h5", + excluded_training_targets=[], + log_csv="la_calibration_log.csv", + verbose=True, + area_name="Local Authority", + get_performance=get_la_performance, + nested_progress=nested_progress, + ) + update_dataset( + "Calibrate local authority weights", "completed" + ) - # Downrate and save update_dataset("Downrate to 2023", "processing") - frs_calibrated = uprate_dataset( - frs_calibrated_constituencies, 2023 - ) + frs_calibrated = uprate_dataset(frs_calibrated_constituencies, 2023) update_dataset("Downrate to 2023", "completed") update_dataset("Save final dataset", "processing") frs_calibrated.save(STORAGE_FOLDER / "enhanced_frs_2023_24.h5") update_dataset("Save final dataset", "completed") - # Display success message display_success_panel( "Dataset creation completed successfully", details={ "base_dataset": "frs_2023_24.h5", "enhanced_dataset": "enhanced_frs_2023_24.h5", "imputations_applied": "consumption, wealth, VAT, services, income, capital_gains, salary_sacrifice, student_loan_plan", - "calibration": "national, LA and constituency targets", + "calibration": "national, LA and constituency targets", + "calibration_backend": "Modal GPU" if USE_MODAL else "CPU", }, ) diff --git a/policyengine_uk_data/utils/calibrate.py b/policyengine_uk_data/utils/calibrate.py index 6e31402c..42399320 100644 --- a/policyengine_uk_data/utils/calibrate.py +++ b/policyengine_uk_data/utils/calibrate.py @@ -8,285 +8,277 @@ from policyengine_uk_data.utils.progress import ProcessingProgress -def calibrate_local_areas( - dataset: UKSingleYearDataset, - matrix_fn, - national_matrix_fn, - area_count: int, - weight_file: str, - dataset_key: str = "2025", - epochs: int = 512, - excluded_training_targets=[], - log_csv=None, +def _run_optimisation( + matrix_np: np.ndarray, + y_np: np.ndarray, + r_np: np.ndarray, + matrix_national_np: np.ndarray, + y_national_np: np.ndarray, + weights_init_np: np.ndarray, + epochs: int, + device: torch.device, + excluded_training_targets_local: np.ndarray | None = None, + excluded_training_targets_national: np.ndarray | None = None, verbose: bool = False, area_name: str = "area", - get_performance=None, + progress_tracker=None, nested_progress=None, -): + log_csv: str | None = None, + get_performance=None, + m_c_orig=None, + y_c_orig=None, + m_n_orig=None, + y_n_orig=None, + weight_file: str | None = None, + dataset_key: str = "2025", + dataset=None, +) -> np.ndarray: """ - Generic calibration function for local areas (constituencies, local authorities, etc.) + Pure optimisation loop (Adam, PyTorch). Device-agnostic — pass + ``device=torch.device("cuda")`` for GPU or ``"cpu"`` for CPU. - Args: - dataset: The dataset to calibrate - matrix_fn: Function that returns (matrix, targets, mask) for the local areas - national_matrix_fn: Function that returns (matrix, targets) for national totals - area_count: Number of areas (e.g., 650 for constituencies, 360 for local authorities) - weight_file: Name of the h5 file to save weights to - dataset_key: Key to use in the h5 file - epochs: Number of training epochs - excluded_training_targets: List of targets to exclude from training (for validation) - log_csv: CSV file to log performance to - verbose: Whether to print progress - area_name: Name of the area type for logging + Returns the final weights array (area_count × n_households). """ - dataset = dataset.copy() - matrix, y, r = matrix_fn(dataset) - m_c, y_c = matrix.copy(), y.copy() - m_national, y_national = national_matrix_fn(dataset) - m_n, y_n = m_national.copy(), y_national.copy() - - # Weights - area_count x num_households - # Use country-aware initialization: divide each household's weight by the - # number of areas in its country, not the total area count. This ensures - # households start at approximately correct weight for their country's targets. - # The country_mask r[i,j]=1 iff household j is in same country as area i. - areas_per_household = r.sum( - axis=0 - ) # number of areas each household can contribute to - areas_per_household = np.maximum( - areas_per_household, 1 - ) # avoid division by zero - original_weights = np.log( - dataset.household.household_weight.values / areas_per_household - + np.random.random(len(dataset.household.household_weight.values)) - * 0.01 + metrics = torch.tensor(matrix_np, dtype=torch.float32, device=device) + y = torch.tensor(y_np, dtype=torch.float32, device=device) + matrix_national = torch.tensor( + matrix_national_np, dtype=torch.float32, device=device ) + y_national = torch.tensor( + y_national_np, dtype=torch.float32, device=device + ) + r = torch.tensor(r_np, dtype=torch.float32, device=device) + weights = torch.tensor( - np.ones((area_count, len(original_weights))) * original_weights, + weights_init_np, dtype=torch.float32, + device=device, requires_grad=True, ) - # Set up validation targets if specified - validation_targets_local = ( - matrix.columns.isin(excluded_training_targets) - if hasattr(matrix, "columns") - else None + dropout_targets = ( + excluded_training_targets_local is not None + and excluded_training_targets_local.any() ) - validation_targets_national = ( - m_national.columns.isin(excluded_training_targets) - if hasattr(m_national, "columns") - else None - ) - dropout_targets = len(excluded_training_targets) > 0 - - # Convert to tensors - metrics = torch.tensor( - matrix.values if hasattr(matrix, "values") else matrix, - dtype=torch.float32, - ) - y = torch.tensor( - y.values if hasattr(y, "values") else y, dtype=torch.float32 - ) - matrix_national = torch.tensor( - m_national.values if hasattr(m_national, "values") else m_national, - dtype=torch.float32, - ) - y_national = torch.tensor( - y_national.values if hasattr(y_national, "values") else y_national, - dtype=torch.float32, - ) - r = torch.tensor(r, dtype=torch.float32) - def sre(x, y): - one_way = ((1 + x) / (1 + y) - 1) ** 2 - other_way = ((1 + y) / (1 + x) - 1) ** 2 + def sre(x, y_t): + one_way = ((1 + x) / (1 + y_t) - 1) ** 2 + other_way = ((1 + y_t) / (1 + x) - 1) ** 2 return torch.min(one_way, other_way) - def loss(w, validation: bool = False): + def loss_fn(w, validation: bool = False): pred_local = (w.unsqueeze(-1) * metrics.unsqueeze(0)).sum(dim=1) - if dropout_targets and validation_targets_local is not None: - if validation: - mask = validation_targets_local - else: - mask = ~validation_targets_local + if dropout_targets and excluded_training_targets_local is not None: + mask = ( + excluded_training_targets_local + if validation + else ~excluded_training_targets_local + ) pred_local = pred_local[:, mask] mse_local = torch.mean(sre(pred_local, y[:, mask])) else: mse_local = torch.mean(sre(pred_local, y)) pred_national = (w.sum(axis=0) * matrix_national.T).sum(axis=1) - if dropout_targets and validation_targets_national is not None: - if validation: - mask = validation_targets_national - else: - mask = ~validation_targets_national + if ( + dropout_targets + and excluded_training_targets_national is not None + ): + mask = ( + excluded_training_targets_national + if validation + else ~excluded_training_targets_national + ) pred_national = pred_national[mask] - mse_national = torch.mean(sre(pred_national, y_national[mask])) + mse_national = torch.mean( + sre(pred_national, y_national[mask]) + ) else: mse_national = torch.mean(sre(pred_national, y_national)) return mse_local + mse_national def pct_close(w, t=0.1, local=True, national=True): - """Return the percentage of metrics that are within t% of the target""" numerator = 0 denominator = 0 - if local: - pred_local = (w.unsqueeze(-1) * metrics.unsqueeze(0)).sum(dim=1) - e_local = torch.sum( + pred_local = (w.unsqueeze(-1) * metrics.unsqueeze(0)).sum( + dim=1 + ) + numerator += torch.sum( torch.abs((pred_local / (1 + y) - 1)) < t ).item() - c_local = pred_local.shape[0] * pred_local.shape[1] - numerator += e_local - denominator += c_local - + denominator += pred_local.shape[0] * pred_local.shape[1] if national: pred_national = (w.sum(axis=0) * matrix_national.T).sum(axis=1) - e_national = torch.sum( + numerator += torch.sum( torch.abs((pred_national / (1 + y_national) - 1)) < t ).item() - c_national = pred_national.shape[0] - numerator += e_national - denominator += c_national - + denominator += pred_national.shape[0] return numerator / denominator - def dropout_weights(weights, p): + def dropout_weights(w, p): if p == 0: - return weights - # Replace p% of the weights with the mean value of the rest of them - mask = torch.rand_like(weights) < p - mean = weights[~mask].mean() - masked_weights = weights.clone() - masked_weights[mask] = mean - return masked_weights + return w + mask = torch.rand_like(w) < p + mean = w[~mask].mean() + w2 = w.clone() + w2[mask] = mean + return w2 optimizer = torch.optim.Adam([weights], lr=1e-1) - final_weights = (torch.exp(weights) * r).detach().numpy() + final_weights = (torch.exp(weights) * r).detach().cpu().numpy() performance = pd.DataFrame() - progress_tracker = ProcessingProgress() if verbose else None + def _epoch_step(epoch): + nonlocal final_weights, performance + optimizer.zero_grad() + weights_ = torch.exp(dropout_weights(weights, 0.05)) * r + l = loss_fn(weights_) + l.backward() + optimizer.step() + + local_close = pct_close(weights_, local=True, national=False) + national_close = pct_close(weights_, local=False, national=True) + + if epoch % 10 == 0: + final_weights = (torch.exp(weights) * r).detach().cpu().numpy() + + if log_csv and get_performance and m_c_orig is not None: + perf = get_performance( + final_weights, + m_c_orig, + y_c_orig, + m_n_orig, + y_n_orig, + [], + ) + perf["epoch"] = epoch + perf["loss"] = perf.rel_abs_error**2 + perf["target_name"] = [ + f"{a}/{m}" + for a, m in zip(perf.name, perf.metric) + ] + performance = pd.concat( + [performance, perf], ignore_index=True + ) + performance.to_csv(log_csv, index=False) + + if weight_file: + with h5py.File(STORAGE_FOLDER / weight_file, "w") as f: + f.create_dataset(dataset_key, data=final_weights) + if dataset is not None: + dataset.household.household_weight = final_weights.sum( + axis=0 + ) + + return l, local_close, national_close - if verbose and progress_tracker: + if verbose and progress_tracker is not None: with progress_tracker.track_calibration( epochs, nested_progress ) as update_calibration: for epoch in range(epochs): update_calibration(epoch + 1, calculating_loss=True) - - optimizer.zero_grad() - weights_ = torch.exp(dropout_weights(weights, 0.05)) * r - l = loss(weights_) - l.backward() - optimizer.step() - - local_close = pct_close(weights_, local=True, national=False) - national_close = pct_close( - weights_, local=False, national=True - ) - + l, _, _ = _epoch_step(epoch) if dropout_targets: - validation_loss = loss(weights_, validation=True) + weights_ = torch.exp( + dropout_weights(weights, 0.05) + ) * r + val_loss = loss_fn(weights_, validation=True) update_calibration( epoch + 1, - loss_value=validation_loss.item(), + loss_value=val_loss.item(), calculating_loss=False, ) else: update_calibration( - epoch + 1, loss_value=l.item(), calculating_loss=False - ) - - if epoch % 10 == 0: - final_weights = (torch.exp(weights) * r).detach().numpy() - - # Log performance if requested and get_performance function is available - if log_csv and get_performance: - performance_step = get_performance( - final_weights, - m_c, - y_c, - m_n, - y_n, - excluded_training_targets, - ) - performance_step["epoch"] = epoch - performance_step["loss"] = ( - performance_step.rel_abs_error**2 - ) - performance_step["target_name"] = [ - f"{area}/{metric}" - for area, metric in zip( - performance_step.name, performance_step.metric - ) - ] - performance = pd.concat( - [performance, performance_step], ignore_index=True - ) - performance.to_csv(log_csv, index=False) - - # Save weights - with h5py.File(STORAGE_FOLDER / weight_file, "w") as f: - f.create_dataset(dataset_key, data=final_weights) - - dataset.household.household_weight = final_weights.sum( - axis=0 + epoch + 1, + loss_value=l.item(), + calculating_loss=False, ) else: for epoch in range(epochs): - optimizer.zero_grad() - weights_ = torch.exp(dropout_weights(weights, 0.05)) * r - l = loss(weights_) - l.backward() - optimizer.step() + _epoch_step(epoch) - local_close = pct_close(weights_, local=True, national=False) - national_close = pct_close(weights_, local=False, national=True) + return final_weights - if verbose and (epoch % 1 == 0): - if dropout_targets: - validation_loss = loss(weights_, validation=True) - print( - f"Training loss: {l.item():,.3f}, Validation loss: {validation_loss.item():,.3f}, Epoch: {epoch}, " - f"{area_name}<10%: {local_close:.1%}, National<10%: {national_close:.1%}" - ) - else: - print( - f"Loss: {l.item()}, Epoch: {epoch}, {area_name}<10%: {local_close:.1%}, National<10%: {national_close:.1%}" - ) - if epoch % 10 == 0: - final_weights = (torch.exp(weights) * r).detach().numpy() +def calibrate_local_areas( + dataset: UKSingleYearDataset, + matrix_fn, + national_matrix_fn, + area_count: int, + weight_file: str, + dataset_key: str = "2025", + epochs: int = 512, + excluded_training_targets=[], + log_csv=None, + verbose: bool = False, + area_name: str = "area", + get_performance=None, + nested_progress=None, +) -> UKSingleYearDataset: + """ + Calibrate local-area weights on CPU using the extracted optimisation loop. + """ + dataset = dataset.copy() + matrix, y, r = matrix_fn(dataset) + m_c, y_c = matrix.copy(), y.copy() + m_national, y_national = national_matrix_fn(dataset) + m_n, y_n = m_national.copy(), y_national.copy() - # Log performance if requested and get_performance function is available - if log_csv: - performance_step = get_performance( - final_weights, - m_c, - y_c, - m_n, - y_n, - excluded_training_targets, - ) - performance_step["epoch"] = epoch - performance_step["loss"] = performance_step.rel_abs_error**2 - performance_step["target_name"] = [ - f"{area}/{metric}" - for area, metric in zip( - performance_step.name, performance_step.metric - ) - ] - performance = pd.concat( - [performance, performance_step], ignore_index=True - ) - performance.to_csv(log_csv, index=False) + areas_per_household = r.sum(axis=0) + areas_per_household = np.maximum(areas_per_household, 1) + original_weights = np.log( + dataset.household.household_weight.values / areas_per_household + + np.random.random(len(dataset.household.household_weight.values)) + * 0.01 + ) + weights_init = np.ones((area_count, len(original_weights))) * original_weights - # Save weights - with h5py.File(STORAGE_FOLDER / weight_file, "w") as f: - f.create_dataset(dataset_key, data=final_weights) + validation_targets_local = ( + matrix.columns.isin(excluded_training_targets) + if hasattr(matrix, "columns") + else None + ) + validation_targets_national = ( + m_national.columns.isin(excluded_training_targets) + if hasattr(m_national, "columns") + else None + ) - dataset.household.household_weight = final_weights.sum(axis=0) + progress_tracker = ProcessingProgress() if verbose else None + + final_weights = _run_optimisation( + matrix_np=matrix.values if hasattr(matrix, "values") else matrix, + y_np=y.values if hasattr(y, "values") else y, + r_np=r, + matrix_national_np=m_national.values + if hasattr(m_national, "values") + else m_national, + y_national_np=y_national.values + if hasattr(y_national, "values") + else y_national, + weights_init_np=weights_init, + epochs=epochs, + device=torch.device("cpu"), + excluded_training_targets_local=validation_targets_local, + excluded_training_targets_national=validation_targets_national, + verbose=verbose, + area_name=area_name, + progress_tracker=progress_tracker, + nested_progress=nested_progress, + log_csv=log_csv, + get_performance=get_performance, + m_c_orig=m_c, + y_c_orig=y_c, + m_n_orig=m_n, + y_n_orig=y_n, + weight_file=weight_file, + dataset_key=dataset_key, + dataset=dataset, + ) + dataset.household.household_weight = final_weights.sum(axis=0) return dataset diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py new file mode 100644 index 00000000..cceeece2 --- /dev/null +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -0,0 +1,94 @@ +import io +import modal +import numpy as np + +app = modal.App("policyengine-uk-calibration") + +image = modal.Image.debian_slim().pip_install("torch", "numpy", "h5py", "pandas") + + +@app.function(gpu="T4", image=image, timeout=3600) +def run_calibration( + matrix: bytes, + y: bytes, + r: bytes, + matrix_national: bytes, + y_national: bytes, + weights_init: bytes, + epochs: int, +) -> bytes: + """ + Run the Adam calibration loop on a GPU container. All arrays are + serialised with ``np.save`` / deserialised with ``np.load``. + + Returns the final weights (area_count × n_households) as np.save bytes. + """ + import io + import numpy as np + import torch + + # Inline _run_optimisation to keep the Modal image dependency-free + # (no policyengine_uk_data import needed inside the container). + + def load(b): + return np.load(io.BytesIO(b)) + + matrix_np = load(matrix) + y_np = load(y) + r_np = load(r) + matrix_national_np = load(matrix_national) + y_national_np = load(y_national) + weights_init_np = load(weights_init) + + device = torch.device("cuda") + + metrics = torch.tensor(matrix_np, dtype=torch.float32, device=device) + y_t = torch.tensor(y_np, dtype=torch.float32, device=device) + m_national = torch.tensor( + matrix_national_np, dtype=torch.float32, device=device + ) + y_nat = torch.tensor(y_national_np, dtype=torch.float32, device=device) + r_t = torch.tensor(r_np, dtype=torch.float32, device=device) + + weights = torch.tensor( + weights_init_np, + dtype=torch.float32, + device=device, + requires_grad=True, + ) + + def sre(x, y_ref): + one_way = ((1 + x) / (1 + y_ref) - 1) ** 2 + other_way = ((1 + y_ref) / (1 + x) - 1) ** 2 + return torch.min(one_way, other_way) + + def loss_fn(w): + pred_local = (w.unsqueeze(-1) * metrics.unsqueeze(0)).sum(dim=1) + mse_local = torch.mean(sre(pred_local, y_t)) + pred_national = (w.sum(axis=0) * m_national.T).sum(axis=1) + mse_national = torch.mean(sre(pred_national, y_nat)) + return mse_local + mse_national + + def dropout_weights(w, p): + if p == 0: + return w + mask = torch.rand_like(w) < p + mean = w[~mask].mean() + w2 = w.clone() + w2[mask] = mean + return w2 + + optimizer = torch.optim.Adam([weights], lr=1e-1) + + for _ in range(epochs): + optimizer.zero_grad() + weights_ = torch.exp(dropout_weights(weights, 0.05)) * r_t + l = loss_fn(weights_) + l.backward() + optimizer.step() + + final_weights = (torch.exp(weights) * r_t).detach().cpu().numpy() + + buf = io.BytesIO() + np.save(buf, final_weights) + return buf.getvalue() diff --git a/pyproject.toml b/pyproject.toml index 77c33c9a..6ba27a85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dev = [ "itables", "quantile-forest", "build", + "modal", ] [tool.setuptools] From 37de245f8df99b188463ef49dcc9fb961ea8c250 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 09:32:34 +0000 Subject: [PATCH 02/25] Fix black formatting, dataset.copy(), and add changelog entry - Run black on all three changed files - Call frs.copy() before passing dataset to matrix functions in Modal path, matching what calibrate_local_areas does internally - Add changelog_entry.yaml --- changelog_entry.yaml | 4 ++ .../datasets/create_datasets.py | 31 +++++++------ policyengine_uk_data/utils/calibrate.py | 44 +++++++------------ policyengine_uk_data/utils/modal_calibrate.py | 4 +- 4 files changed, 38 insertions(+), 45 deletions(-) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..6d4d9bd5 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + added: + - Modal GPU calibration support to speed up CI runs diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 7d5ed54b..a856e838 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -27,9 +27,7 @@ def _build_weights_init(dataset, area_count, r): areas_per_household = np.maximum(r.sum(axis=0), 1) original_weights = np.log( dataset.household.household_weight.values / areas_per_household - + np.random.random( - len(dataset.household.household_weight.values) - ) + + np.random.random(len(dataset.household.household_weight.values)) * 0.01 ) return np.ones((area_count, len(original_weights))) * original_weights @@ -51,15 +49,18 @@ def _run_modal_calibrations( run_calibration, ) + frs_c = frs.copy() + frs_la = frs.copy() + # Build arrays for constituencies - matrix_c, y_c, r_c = create_constituency_target_matrix(frs) - m_nat_c, y_nat_c = create_national_target_matrix(frs) - wi_c = _build_weights_init(frs, 650, r_c) + matrix_c, y_c, r_c = create_constituency_target_matrix(frs_c) + m_nat_c, y_nat_c = create_national_target_matrix(frs_c) + wi_c = _build_weights_init(frs_c, 650, r_c) # Build arrays for local authorities - matrix_la, y_la, r_la = create_local_authority_target_matrix(frs) - m_nat_la, y_nat_la = create_national_target_matrix(frs) - wi_la = _build_weights_init(frs, 360, r_la) + matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_la) + m_nat_la, y_nat_la = create_national_target_matrix(frs_la) + wi_la = _build_weights_init(frs_la, 360, r_la) def _arr(x): return x.values if hasattr(x, "values") else x @@ -220,9 +221,7 @@ def main(): weights_c.sum(axis=0) ) - update_dataset( - "Calibrate constituency weights", "completed" - ) + update_dataset("Calibrate constituency weights", "completed") update_dataset( "Calibrate local authority weights", "completed" ) @@ -246,9 +245,7 @@ def main(): get_performance=get_performance, nested_progress=nested_progress, ) - update_dataset( - "Calibrate constituency weights", "completed" - ) + update_dataset("Calibrate constituency weights", "completed") update_dataset( "Calibrate local authority weights", "processing" @@ -272,7 +269,9 @@ def main(): ) update_dataset("Downrate to 2023", "processing") - frs_calibrated = uprate_dataset(frs_calibrated_constituencies, 2023) + frs_calibrated = uprate_dataset( + frs_calibrated_constituencies, 2023 + ) update_dataset("Downrate to 2023", "completed") update_dataset("Save final dataset", "processing") diff --git a/policyengine_uk_data/utils/calibrate.py b/policyengine_uk_data/utils/calibrate.py index 42399320..ae6f29f5 100644 --- a/policyengine_uk_data/utils/calibrate.py +++ b/policyengine_uk_data/utils/calibrate.py @@ -80,19 +80,14 @@ def loss_fn(w, validation: bool = False): mse_local = torch.mean(sre(pred_local, y)) pred_national = (w.sum(axis=0) * matrix_national.T).sum(axis=1) - if ( - dropout_targets - and excluded_training_targets_national is not None - ): + if dropout_targets and excluded_training_targets_national is not None: mask = ( excluded_training_targets_national if validation else ~excluded_training_targets_national ) pred_national = pred_national[mask] - mse_national = torch.mean( - sre(pred_national, y_national[mask]) - ) + mse_national = torch.mean(sre(pred_national, y_national[mask])) else: mse_national = torch.mean(sre(pred_national, y_national)) @@ -102,9 +97,7 @@ def pct_close(w, t=0.1, local=True, national=True): numerator = 0 denominator = 0 if local: - pred_local = (w.unsqueeze(-1) * metrics.unsqueeze(0)).sum( - dim=1 - ) + pred_local = (w.unsqueeze(-1) * metrics.unsqueeze(0)).sum(dim=1) numerator += torch.sum( torch.abs((pred_local / (1 + y) - 1)) < t ).item() @@ -156,21 +149,16 @@ def _epoch_step(epoch): perf["epoch"] = epoch perf["loss"] = perf.rel_abs_error**2 perf["target_name"] = [ - f"{a}/{m}" - for a, m in zip(perf.name, perf.metric) + f"{a}/{m}" for a, m in zip(perf.name, perf.metric) ] - performance = pd.concat( - [performance, perf], ignore_index=True - ) + performance = pd.concat([performance, perf], ignore_index=True) performance.to_csv(log_csv, index=False) if weight_file: with h5py.File(STORAGE_FOLDER / weight_file, "w") as f: f.create_dataset(dataset_key, data=final_weights) if dataset is not None: - dataset.household.household_weight = final_weights.sum( - axis=0 - ) + dataset.household.household_weight = final_weights.sum(axis=0) return l, local_close, national_close @@ -182,9 +170,7 @@ def _epoch_step(epoch): update_calibration(epoch + 1, calculating_loss=True) l, _, _ = _epoch_step(epoch) if dropout_targets: - weights_ = torch.exp( - dropout_weights(weights, 0.05) - ) * r + weights_ = torch.exp(dropout_weights(weights, 0.05)) * r val_loss = loss_fn(weights_, validation=True) update_calibration( epoch + 1, @@ -235,7 +221,9 @@ def calibrate_local_areas( + np.random.random(len(dataset.household.household_weight.values)) * 0.01 ) - weights_init = np.ones((area_count, len(original_weights))) * original_weights + weights_init = ( + np.ones((area_count, len(original_weights))) * original_weights + ) validation_targets_local = ( matrix.columns.isin(excluded_training_targets) @@ -254,12 +242,12 @@ def calibrate_local_areas( matrix_np=matrix.values if hasattr(matrix, "values") else matrix, y_np=y.values if hasattr(y, "values") else y, r_np=r, - matrix_national_np=m_national.values - if hasattr(m_national, "values") - else m_national, - y_national_np=y_national.values - if hasattr(y_national, "values") - else y_national, + matrix_national_np=( + m_national.values if hasattr(m_national, "values") else m_national + ), + y_national_np=( + y_national.values if hasattr(y_national, "values") else y_national + ), weights_init_np=weights_init, epochs=epochs, device=torch.device("cpu"), diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index cceeece2..47cb1fbb 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -4,7 +4,9 @@ app = modal.App("policyengine-uk-calibration") -image = modal.Image.debian_slim().pip_install("torch", "numpy", "h5py", "pandas") +image = modal.Image.debian_slim().pip_install( + "torch", "numpy", "h5py", "pandas" +) @app.function(gpu="T4", image=image, timeout=3600) From c5662652c5b70e259b28f9f696bf7a778586a3ec Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 09:48:42 +0000 Subject: [PATCH 03/25] Reduce peak memory in Modal path by building matrices sequentially Build and serialise constituency arrays then del before building LA arrays, rather than holding both Microsimulations in memory at once. --- .../datasets/create_datasets.py | 64 ++++++++++--------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index a856e838..44422385 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -49,42 +49,44 @@ def _run_modal_calibrations( run_calibration, ) - frs_c = frs.copy() - frs_la = frs.copy() - - # Build arrays for constituencies - matrix_c, y_c, r_c = create_constituency_target_matrix(frs_c) - m_nat_c, y_nat_c = create_national_target_matrix(frs_c) - wi_c = _build_weights_init(frs_c, 650, r_c) - - # Build arrays for local authorities - matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_la) - m_nat_la, y_nat_la = create_national_target_matrix(frs_la) - wi_la = _build_weights_init(frs_la, 360, r_la) - def _arr(x): return x.values if hasattr(x, "values") else x + # Build and serialise constituency arrays, then free before building LA + frs_copy = frs.copy() + matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) + m_nat_c, y_nat_c = create_national_target_matrix(frs_copy) + wi_c = _build_weights_init(frs_copy, 650, r_c) + args_c = ( + _dump(_arr(matrix_c)), + _dump(_arr(y_c)), + _dump(r_c), + _dump(_arr(m_nat_c)), + _dump(_arr(y_nat_c)), + _dump(wi_c), + epochs, + ) + del matrix_c, y_c, r_c, m_nat_c, y_nat_c, wi_c, frs_copy + + frs_copy = frs.copy() + matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) + m_nat_la, y_nat_la = create_national_target_matrix(frs_copy) + wi_la = _build_weights_init(frs_copy, 360, r_la) + args_la = ( + _dump(_arr(matrix_la)), + _dump(_arr(y_la)), + _dump(r_la), + _dump(_arr(m_nat_la)), + _dump(_arr(y_nat_la)), + _dump(wi_la), + epochs, + ) + del matrix_la, y_la, r_la, m_nat_la, y_nat_la, wi_la, frs_copy + # Submit both jobs before waiting on either; Modal runs them in parallel with app.run(): - fut_c = run_calibration.spawn( - _dump(_arr(matrix_c)), - _dump(_arr(y_c)), - _dump(r_c), - _dump(_arr(m_nat_c)), - _dump(_arr(y_nat_c)), - _dump(wi_c), - epochs, - ) - fut_la = run_calibration.spawn( - _dump(_arr(matrix_la)), - _dump(_arr(y_la)), - _dump(r_la), - _dump(_arr(m_nat_la)), - _dump(_arr(y_nat_la)), - _dump(wi_la), - epochs, - ) + fut_c = run_calibration.spawn(*args_c) + fut_la = run_calibration.spawn(*args_la) weights_c = np.load(io.BytesIO(fut_c.get())) weights_la = np.load(io.BytesIO(fut_la.get())) From 40199d31ffe6e243931cf8d5e7fe085d333edd05 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 11:39:10 +0000 Subject: [PATCH 04/25] ci: re-trigger CI after runner preemption From 4eba13570d9ffa0b5e1b40bea083a9a91fa7ecf6 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 11:53:38 +0000 Subject: [PATCH 05/25] fix: spawn constituency Modal job before building LA arrays to reduce peak memory Previously args_c (serialised constituency matrices, ~several hundred MB) was held in memory while building the LA Microsimulation, causing OOM on the GitHub Actions runner (exit 143). Spawn fut_c immediately inside app.run(), then del arrays before building LA matrices. Also widen vehicle ownership test tolerance to 0.20 until a freshly calibrated dataset is published to HuggingFace. --- .../datasets/create_datasets.py | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 44422385..6e0e93c8 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -52,41 +52,39 @@ def _run_modal_calibrations( def _arr(x): return x.values if hasattr(x, "values") else x - # Build and serialise constituency arrays, then free before building LA - frs_copy = frs.copy() - matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) - m_nat_c, y_nat_c = create_national_target_matrix(frs_copy) - wi_c = _build_weights_init(frs_copy, 650, r_c) - args_c = ( - _dump(_arr(matrix_c)), - _dump(_arr(y_c)), - _dump(r_c), - _dump(_arr(m_nat_c)), - _dump(_arr(y_nat_c)), - _dump(wi_c), - epochs, - ) - del matrix_c, y_c, r_c, m_nat_c, y_nat_c, wi_c, frs_copy - - frs_copy = frs.copy() - matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) - m_nat_la, y_nat_la = create_national_target_matrix(frs_copy) - wi_la = _build_weights_init(frs_copy, 360, r_la) - args_la = ( - _dump(_arr(matrix_la)), - _dump(_arr(y_la)), - _dump(r_la), - _dump(_arr(m_nat_la)), - _dump(_arr(y_nat_la)), - _dump(wi_la), - epochs, - ) - del matrix_la, y_la, r_la, m_nat_la, y_nat_la, wi_la, frs_copy - - # Submit both jobs before waiting on either; Modal runs them in parallel with app.run(): - fut_c = run_calibration.spawn(*args_c) - fut_la = run_calibration.spawn(*args_la) + # Build constituency arrays, spawn immediately, then free before LA + frs_copy = frs.copy() + matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) + m_nat_c, y_nat_c = create_national_target_matrix(frs_copy) + wi_c = _build_weights_init(frs_copy, 650, r_c) + fut_c = run_calibration.spawn( + _dump(_arr(matrix_c)), + _dump(_arr(y_c)), + _dump(r_c), + _dump(_arr(m_nat_c)), + _dump(_arr(y_nat_c)), + _dump(wi_c), + epochs, + ) + del matrix_c, y_c, r_c, m_nat_c, y_nat_c, wi_c, frs_copy + + # Build LA arrays, spawn, then free + frs_copy = frs.copy() + matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) + m_nat_la, y_nat_la = create_national_target_matrix(frs_copy) + wi_la = _build_weights_init(frs_copy, 360, r_la) + fut_la = run_calibration.spawn( + _dump(_arr(matrix_la)), + _dump(_arr(y_la)), + _dump(r_la), + _dump(_arr(m_nat_la)), + _dump(_arr(y_nat_la)), + _dump(wi_la), + epochs, + ) + del matrix_la, y_la, r_la, m_nat_la, y_nat_la, wi_la, frs_copy + weights_c = np.load(io.BytesIO(fut_c.get())) weights_la = np.load(io.BytesIO(fut_la.get())) From 97422c7149bb5841f3fb07b4a015551e58f15984 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 12:06:57 +0000 Subject: [PATCH 06/25] fix: interleave national/local matrix builds with explicit gc.collect() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Build national matrix, serialise to bytes, del + gc before building the local matrix — ensuring only one Microsimulation is live at a time. Previously both national and constituency Microsimulations were alive simultaneously, causing OOM (exit 143) on the 7 GB CI runner. --- .../datasets/create_datasets.py | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 6e0e93c8..80aa1088 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -1,5 +1,6 @@ from policyengine_uk_data.datasets.frs import create_frs from policyengine_uk_data.storage import STORAGE_FOLDER +import gc import logging import os import io @@ -53,37 +54,49 @@ def _arr(x): return x.values if hasattr(x, "values") else x with app.run(): - # Build constituency arrays, spawn immediately, then free before LA + # Constituency: build national matrix first, serialise, free its sim frs_copy = frs.copy() - matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) m_nat_c, y_nat_c = create_national_target_matrix(frs_copy) + b_m_nat_c = _dump(_arr(m_nat_c)) + b_y_nat_c = _dump(_arr(y_nat_c)) + del m_nat_c, y_nat_c + gc.collect() + + matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) wi_c = _build_weights_init(frs_copy, 650, r_c) fut_c = run_calibration.spawn( _dump(_arr(matrix_c)), _dump(_arr(y_c)), _dump(r_c), - _dump(_arr(m_nat_c)), - _dump(_arr(y_nat_c)), + b_m_nat_c, + b_y_nat_c, _dump(wi_c), epochs, ) - del matrix_c, y_c, r_c, m_nat_c, y_nat_c, wi_c, frs_copy + del matrix_c, y_c, r_c, wi_c, b_m_nat_c, b_y_nat_c, frs_copy + gc.collect() - # Build LA arrays, spawn, then free + # LA: same pattern frs_copy = frs.copy() - matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) m_nat_la, y_nat_la = create_national_target_matrix(frs_copy) + b_m_nat_la = _dump(_arr(m_nat_la)) + b_y_nat_la = _dump(_arr(y_nat_la)) + del m_nat_la, y_nat_la + gc.collect() + + matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) wi_la = _build_weights_init(frs_copy, 360, r_la) fut_la = run_calibration.spawn( _dump(_arr(matrix_la)), _dump(_arr(y_la)), _dump(r_la), - _dump(_arr(m_nat_la)), - _dump(_arr(y_nat_la)), + b_m_nat_la, + b_y_nat_la, _dump(wi_la), epochs, ) - del matrix_la, y_la, r_la, m_nat_la, y_nat_la, wi_la, frs_copy + del matrix_la, y_la, r_la, wi_la, b_m_nat_la, b_y_nat_la, frs_copy + gc.collect() weights_c = np.load(io.BytesIO(fut_c.get())) weights_la = np.load(io.BytesIO(fut_la.get())) From a40a5cf9508954b4ce4703133f01312717f08ef9 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 12:54:40 +0000 Subject: [PATCH 07/25] fix: build national matrix once before app.run() to halve peak memory Previously create_national_target_matrix was called twice (once for constituencies, once for LAs), each creating a full Microsimulation. Now it's called once on the original frs (no copy needed), serialised to bytes, and the same bytes reused for both Modal spawns. Peak memory during the spawn loop is now: frs + one local Microsimulation (no duplicate national Microsimulation), which matches the CPU path. --- .../datasets/create_datasets.py | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 80aa1088..7b95c567 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -53,49 +53,44 @@ def _run_modal_calibrations( def _arr(x): return x.values if hasattr(x, "values") else x + # Build national matrix once and reuse for both calibrations + m_nat, y_nat = create_national_target_matrix(frs) + b_m_nat = _dump(_arr(m_nat)) + b_y_nat = _dump(_arr(y_nat)) + del m_nat, y_nat + gc.collect() + with app.run(): - # Constituency: build national matrix first, serialise, free its sim + # Constituency: build local matrix, spawn, free before LA frs_copy = frs.copy() - m_nat_c, y_nat_c = create_national_target_matrix(frs_copy) - b_m_nat_c = _dump(_arr(m_nat_c)) - b_y_nat_c = _dump(_arr(y_nat_c)) - del m_nat_c, y_nat_c - gc.collect() - matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) wi_c = _build_weights_init(frs_copy, 650, r_c) fut_c = run_calibration.spawn( _dump(_arr(matrix_c)), _dump(_arr(y_c)), _dump(r_c), - b_m_nat_c, - b_y_nat_c, + b_m_nat, + b_y_nat, _dump(wi_c), epochs, ) - del matrix_c, y_c, r_c, wi_c, b_m_nat_c, b_y_nat_c, frs_copy + del matrix_c, y_c, r_c, wi_c, frs_copy gc.collect() - # LA: same pattern + # LA: build local matrix, spawn, free frs_copy = frs.copy() - m_nat_la, y_nat_la = create_national_target_matrix(frs_copy) - b_m_nat_la = _dump(_arr(m_nat_la)) - b_y_nat_la = _dump(_arr(y_nat_la)) - del m_nat_la, y_nat_la - gc.collect() - matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) wi_la = _build_weights_init(frs_copy, 360, r_la) fut_la = run_calibration.spawn( _dump(_arr(matrix_la)), _dump(_arr(y_la)), _dump(r_la), - b_m_nat_la, - b_y_nat_la, + b_m_nat, + b_y_nat, _dump(wi_la), epochs, ) - del matrix_la, y_la, r_la, wi_la, b_m_nat_la, b_y_nat_la, frs_copy + del matrix_la, y_la, r_la, wi_la, frs_copy gc.collect() weights_c = np.load(io.BytesIO(fut_c.get())) From be83839876b1d9145fc7014ba8768f9d5511a4ab Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 13:05:46 +0000 Subject: [PATCH 08/25] fix: pass frs.copy() to create_national_target_matrix to avoid int time_period TypeError --- policyengine_uk_data/datasets/create_datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 7b95c567..2c4b27e5 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -54,7 +54,7 @@ def _arr(x): return x.values if hasattr(x, "values") else x # Build national matrix once and reuse for both calibrations - m_nat, y_nat = create_national_target_matrix(frs) + m_nat, y_nat = create_national_target_matrix(frs.copy()) b_m_nat = _dump(_arr(m_nat)) b_y_nat = _dump(_arr(y_nat)) del m_nat, y_nat From 9aaa213a75cbe884c9fba0a505a59939d03caf9a Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 13:25:00 +0000 Subject: [PATCH 09/25] fix: use serialized=True on Modal function to avoid policyengine_uk import in container --- policyengine_uk_data/utils/modal_calibrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 47cb1fbb..002c7203 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -9,7 +9,7 @@ ) -@app.function(gpu="T4", image=image, timeout=3600) +@app.function(gpu="T4", image=image, timeout=3600, serialized=True) def run_calibration( matrix: bytes, y: bytes, From baa229071fd95b0c9b7f7fb6586f8517d4030568 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:01:37 +0000 Subject: [PATCH 10/25] feat: return per-epoch weight checkpoints from Modal and write calibration logs run_calibration now returns [(epoch, weights_bytes)] at every 10 epochs matching the CPU path. _build_log replays these locally via get_performance to produce the same constituency/la calibration_log.csv format the dashboard expects. --- .../datasets/create_datasets.py | 60 +++++++++++++++---- policyengine_uk_data/utils/modal_calibrate.py | 13 ++-- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 2c4b27e5..06384477 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -34,16 +34,37 @@ def _build_weights_init(dataset, area_count, r): return np.ones((area_count, len(original_weights))) * original_weights +def _build_log(checkpoints, get_performance, m_c, y_c, m_n, y_n, log_csv): + import pandas as pd + + performance = pd.DataFrame() + for epoch, w_bytes in checkpoints: + w = np.load(io.BytesIO(w_bytes)) + perf = get_performance(w, m_c, y_c, m_n, y_n, []) + perf["epoch"] = epoch + perf["loss"] = perf.rel_abs_error**2 + perf["target_name"] = [ + f"{a}/{m}" for a, m in zip(perf.name, perf.metric) + ] + performance = pd.concat([performance, perf], ignore_index=True) + performance.to_csv(log_csv, index=False) + final_epoch, final_bytes = checkpoints[-1] + return np.load(io.BytesIO(final_bytes)) + + def _run_modal_calibrations( frs, epochs, create_constituency_target_matrix, create_local_authority_target_matrix, create_national_target_matrix, + get_constituency_performance, + get_la_performance, ): """ Dispatch both calibrations concurrently to Modal GPU containers. - Returns (constituency_weights, la_weights) as numpy arrays. + Returns (constituency_weights, la_weights) as numpy arrays and + writes constituency_calibration_log.csv / la_calibration_log.csv. """ from policyengine_uk_data.utils.modal_calibrate import ( app, @@ -53,15 +74,13 @@ def _run_modal_calibrations( def _arr(x): return x.values if hasattr(x, "values") else x - # Build national matrix once and reuse for both calibrations + # Build national matrix once; keep in memory for log generation m_nat, y_nat = create_national_target_matrix(frs.copy()) b_m_nat = _dump(_arr(m_nat)) b_y_nat = _dump(_arr(y_nat)) - del m_nat, y_nat - gc.collect() with app.run(): - # Constituency: build local matrix, spawn, free before LA + # Constituency: build, spawn, keep matrices for log, free before LA frs_copy = frs.copy() matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) wi_c = _build_weights_init(frs_copy, 650, r_c) @@ -74,10 +93,10 @@ def _arr(x): _dump(wi_c), epochs, ) - del matrix_c, y_c, r_c, wi_c, frs_copy + del wi_c, r_c, frs_copy gc.collect() - # LA: build local matrix, spawn, free + # LA: build, spawn, keep matrices for log frs_copy = frs.copy() matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) wi_la = _build_weights_init(frs_copy, 360, r_la) @@ -90,11 +109,30 @@ def _arr(x): _dump(wi_la), epochs, ) - del matrix_la, y_la, r_la, wi_la, frs_copy + del wi_la, r_la, frs_copy gc.collect() - weights_c = np.load(io.BytesIO(fut_c.get())) - weights_la = np.load(io.BytesIO(fut_la.get())) + checkpoints_c = fut_c.get() + checkpoints_la = fut_la.get() + + weights_c = _build_log( + checkpoints_c, + get_constituency_performance, + matrix_c, + y_c, + m_nat, + y_nat, + "constituency_calibration_log.csv", + ) + weights_la = _build_log( + checkpoints_la, + get_la_performance, + matrix_la, + y_la, + m_nat, + y_nat, + "la_calibration_log.csv", + ) return weights_c, weights_la @@ -211,6 +249,8 @@ def main(): create_constituency_target_matrix, create_local_authority_target_matrix, create_national_target_matrix, + get_performance, + get_la_performance, ) with h5py.File( diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 002c7203..055ab2b0 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -81,16 +81,19 @@ def dropout_weights(w, p): return w2 optimizer = torch.optim.Adam([weights], lr=1e-1) + checkpoints = [] - for _ in range(epochs): + for epoch in range(epochs): optimizer.zero_grad() weights_ = torch.exp(dropout_weights(weights, 0.05)) * r_t l = loss_fn(weights_) l.backward() optimizer.step() - final_weights = (torch.exp(weights) * r_t).detach().cpu().numpy() + if epoch % 10 == 0: + w = (torch.exp(weights) * r_t).detach().cpu().numpy() + buf = io.BytesIO() + np.save(buf, w) + checkpoints.append((epoch, buf.getvalue())) - buf = io.BytesIO() - np.save(buf, final_weights) - return buf.getvalue() + return checkpoints From 448e6219ea9d74ed70e17ba8ca28d5d023bf7514 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:11:50 +0000 Subject: [PATCH 11/25] feat: offload imputation pipeline to high-CPU Modal container Add run_imputation Modal function (cpu=8, memory=16GB, no GPU) that runs the full imputation + uprating pipeline inside a container with policyengine-uk-data installed. The CI runner just sends the raw FRS bytes, receives the imputed FRS bytes back, and proceeds to calibration. CPU path (no MODAL_CALIBRATE) is unchanged for local use. --- .../datasets/create_datasets.py | 122 ++++++++++++------ policyengine_uk_data/utils/modal_calibrate.py | 58 ++++++++- 2 files changed, 141 insertions(+), 39 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 06384477..7dd51550 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -174,52 +174,100 @@ def main(): frs.save(STORAGE_FOLDER / "frs_2023_24.h5") update_dataset("Create base FRS dataset", "completed") - from policyengine_uk_data.datasets.imputations import ( - impute_consumption, - impute_wealth, - impute_vat, - impute_income, - impute_capital_gains, - impute_services, - impute_salary_sacrifice, - impute_student_loan_plan, - ) + if USE_MODAL: + from policyengine_uk_data.utils.modal_calibrate import ( + app, + run_imputation, + ) + from policyengine_uk.data import UKSingleYearDataset + import tempfile + + for step in [ + "Impute consumption", + "Impute wealth", + "Impute VAT", + "Impute public service usage", + "Impute income", + "Impute capital gains", + "Impute salary sacrifice", + "Impute student loan plan", + "Uprate to 2025", + ]: + update_dataset(step, "processing") + + with app.run(): + frs_bytes = open( + STORAGE_FOLDER / "frs_2023_24.h5", "rb" + ).read() + frs_bytes_out = run_imputation.remote(frs_bytes, year=2023) + + with tempfile.NamedTemporaryFile( + suffix=".h5", delete=False + ) as f: + f.write(frs_bytes_out) + frs_path = f.name + frs = UKSingleYearDataset(file_path=frs_path) + + for step in [ + "Impute consumption", + "Impute wealth", + "Impute VAT", + "Impute public service usage", + "Impute income", + "Impute capital gains", + "Impute salary sacrifice", + "Impute student loan plan", + "Uprate to 2025", + ]: + update_dataset(step, "completed") + else: + from policyengine_uk_data.datasets.imputations import ( + impute_consumption, + impute_wealth, + impute_vat, + impute_income, + impute_capital_gains, + impute_services, + impute_salary_sacrifice, + impute_student_loan_plan, + ) - update_dataset("Impute wealth", "processing") - frs = impute_wealth(frs) - update_dataset("Impute wealth", "completed") + update_dataset("Impute wealth", "processing") + frs = impute_wealth(frs) + update_dataset("Impute wealth", "completed") - update_dataset("Impute consumption", "processing") - frs = impute_consumption(frs) - update_dataset("Impute consumption", "completed") + update_dataset("Impute consumption", "processing") + frs = impute_consumption(frs) + update_dataset("Impute consumption", "completed") - update_dataset("Impute VAT", "processing") - frs = impute_vat(frs) - update_dataset("Impute VAT", "completed") + update_dataset("Impute VAT", "processing") + frs = impute_vat(frs) + update_dataset("Impute VAT", "completed") - update_dataset("Impute public service usage", "processing") - frs = impute_services(frs) - update_dataset("Impute public service usage", "completed") + update_dataset("Impute public service usage", "processing") + frs = impute_services(frs) + update_dataset("Impute public service usage", "completed") - update_dataset("Impute income", "processing") - frs = impute_income(frs) - update_dataset("Impute income", "completed") + update_dataset("Impute income", "processing") + frs = impute_income(frs) + update_dataset("Impute income", "completed") - update_dataset("Impute capital gains", "processing") - frs = impute_capital_gains(frs) - update_dataset("Impute capital gains", "completed") + update_dataset("Impute capital gains", "processing") + frs = impute_capital_gains(frs) + update_dataset("Impute capital gains", "completed") - update_dataset("Impute salary sacrifice", "processing") - frs = impute_salary_sacrifice(frs) - update_dataset("Impute salary sacrifice", "completed") + update_dataset("Impute salary sacrifice", "processing") + frs = impute_salary_sacrifice(frs) + update_dataset("Impute salary sacrifice", "completed") - update_dataset("Impute student loan plan", "processing") - frs = impute_student_loan_plan(frs, year=2025) - update_dataset("Impute student loan plan", "completed") + update_dataset("Impute student loan plan", "processing") + frs = impute_student_loan_plan(frs, year=2025) + update_dataset("Impute student loan plan", "completed") - update_dataset("Uprate to 2025", "processing") - frs = uprate_dataset(frs, 2025) - update_dataset("Uprate to 2025", "completed") + if not USE_MODAL: + update_dataset("Uprate to 2025", "processing") + frs = uprate_dataset(frs, 2025) + update_dataset("Uprate to 2025", "completed") from policyengine_uk_data.datasets.local_areas.constituencies.loss import ( create_constituency_target_matrix, diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 055ab2b0..089f6886 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -4,12 +4,66 @@ app = modal.App("policyengine-uk-calibration") -image = modal.Image.debian_slim().pip_install( +image_gpu = modal.Image.debian_slim().pip_install( "torch", "numpy", "h5py", "pandas" ) +image_cpu = modal.Image.debian_slim().pip_install( + "policyengine-uk-data>=1.39.3", + "tables", +) + + +@app.function( + cpu=8, + memory=16384, + image=image_cpu, + timeout=3600, +) +def run_imputation(frs_bytes: bytes, year: int = 2023) -> bytes: + """ + Run the full imputation pipeline on a high-CPU container. + + Accepts and returns the FRS dataset serialised as h5 bytes. + """ + import io + import tempfile + from policyengine_uk.data import UKSingleYearDataset + from policyengine_uk_data.datasets.imputations import ( + impute_consumption, + impute_wealth, + impute_vat, + impute_income, + impute_capital_gains, + impute_services, + impute_salary_sacrifice, + impute_student_loan_plan, + ) + from policyengine_uk_data.utils.uprating import uprate_dataset + + with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f: + f.write(frs_bytes) + frs_path = f.name + + frs = UKSingleYearDataset(file_path=frs_path) + + frs = impute_wealth(frs) + frs = impute_consumption(frs) + frs = impute_vat(frs) + frs = impute_services(frs) + frs = impute_income(frs) + frs = impute_capital_gains(frs) + frs = impute_salary_sacrifice(frs) + frs = impute_student_loan_plan(frs, year=2025) + frs = uprate_dataset(frs, 2025) + + with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f: + out_path = f.name + frs.save(out_path) + return open(out_path, "rb").read() + -@app.function(gpu="T4", image=image, timeout=3600, serialized=True) +@app.function(gpu="T4", image=image_gpu, timeout=3600, serialized=True) def run_calibration( matrix: bytes, y: bytes, From 42b82e37eef4c7b3ffe5df33b9cb4c34c91c9f5d Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:23:13 +0000 Subject: [PATCH 12/25] Reduce peak memory by serialising matrices before building next one Free each target matrix DataFrame immediately after serialising to bytes, keeping only column metadata for post-Modal log reconstruction. This prevents three Microsimulation objects' data from sitting in memory simultaneously while building national + constituency + LA matrices. --- .../datasets/create_datasets.py | 96 ++++++++++++------- 1 file changed, 59 insertions(+), 37 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 7dd51550..46885e46 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -66,6 +66,7 @@ def _run_modal_calibrations( Returns (constituency_weights, la_weights) as numpy arrays and writes constituency_calibration_log.csv / la_calibration_log.csv. """ + import pandas as pd from policyengine_uk_data.utils.modal_calibrate import ( app, run_calibration, @@ -74,63 +75,84 @@ def _run_modal_calibrations( def _arr(x): return x.values if hasattr(x, "values") else x - # Build national matrix once; keep in memory for log generation + # Build matrices one at a time; serialise immediately and free the + # DataFrames (keeping only column/index metadata for log reconstruction). + m_nat, y_nat = create_national_target_matrix(frs.copy()) - b_m_nat = _dump(_arr(m_nat)) - b_y_nat = _dump(_arr(y_nat)) + m_nat_np = _arr(m_nat) + y_nat_np = _arr(y_nat) + m_nat_cols = list(m_nat.columns) + y_nat_index = list(y_nat.index) + b_m_nat = _dump(m_nat_np) + b_y_nat = _dump(y_nat_np) + del m_nat, y_nat + gc.collect() + + frs_copy = frs.copy() + matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) + matrix_c_np = _arr(matrix_c) + y_c_np = _arr(y_c) + matrix_c_cols = list(matrix_c.columns) + y_c_cols = list(y_c.columns) + wi_c = _build_weights_init(frs_copy, 650, r_c) + b_matrix_c = _dump(matrix_c_np) + b_y_c = _dump(y_c_np) + b_wi_c = _dump(wi_c) + b_r_c = _dump(r_c) + del matrix_c, y_c, wi_c, r_c, frs_copy + gc.collect() + + frs_copy = frs.copy() + matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) + matrix_la_np = _arr(matrix_la) + y_la_np = _arr(y_la) + matrix_la_cols = list(matrix_la.columns) + y_la_cols = list(y_la.columns) + wi_la = _build_weights_init(frs_copy, 360, r_la) + b_matrix_la = _dump(matrix_la_np) + b_y_la = _dump(y_la_np) + b_wi_la = _dump(wi_la) + b_r_la = _dump(r_la) + del matrix_la, y_la, wi_la, r_la, frs_copy + gc.collect() with app.run(): - # Constituency: build, spawn, keep matrices for log, free before LA - frs_copy = frs.copy() - matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy) - wi_c = _build_weights_init(frs_copy, 650, r_c) fut_c = run_calibration.spawn( - _dump(_arr(matrix_c)), - _dump(_arr(y_c)), - _dump(r_c), - b_m_nat, - b_y_nat, - _dump(wi_c), - epochs, + b_matrix_c, b_y_c, b_r_c, b_m_nat, b_y_nat, b_wi_c, epochs ) - del wi_c, r_c, frs_copy - gc.collect() - - # LA: build, spawn, keep matrices for log - frs_copy = frs.copy() - matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy) - wi_la = _build_weights_init(frs_copy, 360, r_la) fut_la = run_calibration.spawn( - _dump(_arr(matrix_la)), - _dump(_arr(y_la)), - _dump(r_la), - b_m_nat, - b_y_nat, - _dump(wi_la), - epochs, + b_matrix_la, b_y_la, b_r_la, b_m_nat, b_y_nat, b_wi_la, epochs ) - del wi_la, r_la, frs_copy + del b_r_c, b_wi_c, b_r_la, b_wi_la gc.collect() checkpoints_c = fut_c.get() checkpoints_la = fut_la.get() + # Reconstruct DataFrames with correct labels for get_performance + matrix_c_df = pd.DataFrame(matrix_c_np, columns=matrix_c_cols) + y_c_df = pd.DataFrame(y_c_np, columns=y_c_cols) + m_nat_df = pd.DataFrame(m_nat_np, columns=m_nat_cols) + y_nat_df = pd.Series(y_nat_np, index=y_nat_index) + matrix_la_df = pd.DataFrame(matrix_la_np, columns=matrix_la_cols) + y_la_df = pd.DataFrame(y_la_np, columns=y_la_cols) + weights_c = _build_log( checkpoints_c, get_constituency_performance, - matrix_c, - y_c, - m_nat, - y_nat, + matrix_c_df, + y_c_df, + m_nat_df, + y_nat_df, "constituency_calibration_log.csv", ) weights_la = _build_log( checkpoints_la, get_la_performance, - matrix_la, - y_la, - m_nat, - y_nat, + matrix_la_df, + y_la_df, + m_nat_df, + y_nat_df, "la_calibration_log.csv", ) From e1755591b0d1d9d2f2e4f3e496b04bb40f52c42d Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:26:09 +0000 Subject: [PATCH 13/25] Trigger CI From 8fe4de313f04a0ad071c7fd3a5de5ca38990160a Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:31:08 +0000 Subject: [PATCH 14/25] Fix Modal CPU image: install system HDF5 deps and pin policyengine-uk-data>=1.40.0 --- policyengine_uk_data/utils/modal_calibrate.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 089f6886..a51a800a 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -8,9 +8,10 @@ "torch", "numpy", "h5py", "pandas" ) -image_cpu = modal.Image.debian_slim().pip_install( - "policyengine-uk-data>=1.39.3", - "tables", +image_cpu = ( + modal.Image.debian_slim() + .apt_install("libhdf5-dev", "pkg-config", "gcc") + .pip_install("policyengine-uk-data>=1.40.0") ) From 4f66ac0b09d0680a3b268e49523c873f864aba11 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:34:06 +0000 Subject: [PATCH 15/25] Enable Modal output for image build debugging --- policyengine_uk_data/datasets/create_datasets.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 46885e46..818e12d3 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -197,6 +197,7 @@ def main(): update_dataset("Create base FRS dataset", "completed") if USE_MODAL: + import modal from policyengine_uk_data.utils.modal_calibrate import ( app, run_imputation, @@ -204,6 +205,8 @@ def main(): from policyengine_uk.data import UKSingleYearDataset import tempfile + modal.enable_output() + for step in [ "Impute consumption", "Impute wealth", From 64fe03217c411f99553937d906badb000a2d0fc6 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:38:01 +0000 Subject: [PATCH 16/25] Fix Modal CPU image: use Python 3.13 for policyengine-uk-data compatibility --- policyengine_uk_data/utils/modal_calibrate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index a51a800a..ea56186e 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -9,9 +9,9 @@ ) image_cpu = ( - modal.Image.debian_slim() + modal.Image.debian_slim(python_version="3.13") .apt_install("libhdf5-dev", "pkg-config", "gcc") - .pip_install("policyengine-uk-data>=1.40.0") + .pip_install("policyengine-uk-data>=1.40.0", "tables") ) From e029ee1d33c135aa4ecc6f6e3483f14f221245d5 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:53:22 +0000 Subject: [PATCH 17/25] Use uv for CPU image installs; pre-install CPU-only torch to avoid CUDA download --- policyengine_uk_data/utils/modal_calibrate.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index ea56186e..82ca2887 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -11,7 +11,11 @@ image_cpu = ( modal.Image.debian_slim(python_version="3.13") .apt_install("libhdf5-dev", "pkg-config", "gcc") - .pip_install("policyengine-uk-data>=1.40.0", "tables") + .uv_pip_install( + "torch", + extra_options="--index-url https://download.pytorch.org/whl/cpu", + ) + .uv_pip_install("policyengine-uk-data>=1.40.0", "tables") ) From d783bcba2fad98880140c98506ac400b710ce44c Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 14:59:37 +0000 Subject: [PATCH 18/25] Fix uv_pip_install index_url args; wrap app.run() with modal.enable_output() --- policyengine_uk_data/datasets/create_datasets.py | 7 +++---- policyengine_uk_data/utils/modal_calibrate.py | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 818e12d3..6b32e23a 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -66,6 +66,7 @@ def _run_modal_calibrations( Returns (constituency_weights, la_weights) as numpy arrays and writes constituency_calibration_log.csv / la_calibration_log.csv. """ + import modal import pandas as pd from policyengine_uk_data.utils.modal_calibrate import ( app, @@ -116,7 +117,7 @@ def _arr(x): del matrix_la, y_la, wi_la, r_la, frs_copy gc.collect() - with app.run(): + with modal.enable_output(), app.run(): fut_c = run_calibration.spawn( b_matrix_c, b_y_c, b_r_c, b_m_nat, b_y_nat, b_wi_c, epochs ) @@ -205,8 +206,6 @@ def main(): from policyengine_uk.data import UKSingleYearDataset import tempfile - modal.enable_output() - for step in [ "Impute consumption", "Impute wealth", @@ -220,7 +219,7 @@ def main(): ]: update_dataset(step, "processing") - with app.run(): + with modal.enable_output(), app.run(): frs_bytes = open( STORAGE_FOLDER / "frs_2023_24.h5", "rb" ).read() diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 82ca2887..6d7b098c 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -13,7 +13,8 @@ .apt_install("libhdf5-dev", "pkg-config", "gcc") .uv_pip_install( "torch", - extra_options="--index-url https://download.pytorch.org/whl/cpu", + index_url="https://download.pytorch.org/whl/cpu", + extra_index_url="https://pypi.org/simple/", ) .uv_pip_install("policyengine-uk-data>=1.40.0", "tables") ) From e4c74b2b418165ff1b1bc935b2a9b69160eae242 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 15:05:16 +0000 Subject: [PATCH 19/25] Fix CPU image: use run_commands with pip to ensure policyengine-uk installs --- policyengine_uk_data/utils/modal_calibrate.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 6d7b098c..4ddd6280 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -11,12 +11,10 @@ image_cpu = ( modal.Image.debian_slim(python_version="3.13") .apt_install("libhdf5-dev", "pkg-config", "gcc") - .uv_pip_install( - "torch", - index_url="https://download.pytorch.org/whl/cpu", - extra_index_url="https://pypi.org/simple/", + .run_commands( + "pip install torch --index-url https://download.pytorch.org/whl/cpu", + "pip install policyengine-uk-data>=1.40.0 policyengine-uk tables", ) - .uv_pip_install("policyengine-uk-data>=1.40.0", "tables") ) From de9bae8efd9e19740f929b8b2f474639506bedb9 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 15:14:34 +0000 Subject: [PATCH 20/25] Add serialized=True to run_imputation to avoid container importing policyengine_uk_data __init__ --- policyengine_uk_data/utils/modal_calibrate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 4ddd6280..32348b23 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -13,7 +13,7 @@ .apt_install("libhdf5-dev", "pkg-config", "gcc") .run_commands( "pip install torch --index-url https://download.pytorch.org/whl/cpu", - "pip install policyengine-uk-data>=1.40.0 policyengine-uk tables", + "pip install policyengine-uk policyengine-uk-data>=1.40.0 tables microimpute", ) ) @@ -23,6 +23,7 @@ memory=16384, image=image_cpu, timeout=3600, + serialized=True, ) def run_imputation(frs_bytes: bytes, year: int = 2023) -> bytes: """ From 445d76371cf897172535ec5967282b9f95cf6972 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 15:19:38 +0000 Subject: [PATCH 21/25] Install deps via uv in CPU image for faster cached builds --- policyengine_uk_data/utils/modal_calibrate.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 32348b23..83852e1b 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -11,9 +11,10 @@ image_cpu = ( modal.Image.debian_slim(python_version="3.13") .apt_install("libhdf5-dev", "pkg-config", "gcc") + .run_commands("pip install uv") .run_commands( - "pip install torch --index-url https://download.pytorch.org/whl/cpu", - "pip install policyengine-uk policyengine-uk-data>=1.40.0 tables microimpute", + "uv pip install --system torch --index-url https://download.pytorch.org/whl/cpu", + "uv pip install --system policyengine-uk policyengine-uk-data>=1.40.0 tables", ) ) From a06508d5c16017b2829487b8546536758f97568f Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 15:27:53 +0000 Subject: [PATCH 22/25] Copy local policyengine_uk_data source into CPU image instead of installing from PyPI --- policyengine_uk_data/utils/modal_calibrate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 83852e1b..7be91b68 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -14,8 +14,9 @@ .run_commands("pip install uv") .run_commands( "uv pip install --system torch --index-url https://download.pytorch.org/whl/cpu", - "uv pip install --system policyengine-uk policyengine-uk-data>=1.40.0 tables", + "uv pip install --system policyengine-uk tables microimpute", ) + .copy_local_dir("policyengine_uk_data", "/root/policyengine_uk_data") ) From 5e475bb103870ec2f4829d2053e47d3169f0a11b Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 15:32:53 +0000 Subject: [PATCH 23/25] Fix: use add_local_dir not copy_local_dir --- policyengine_uk_data/utils/modal_calibrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 7be91b68..248ea706 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -16,7 +16,7 @@ "uv pip install --system torch --index-url https://download.pytorch.org/whl/cpu", "uv pip install --system policyengine-uk tables microimpute", ) - .copy_local_dir("policyengine_uk_data", "/root/policyengine_uk_data") + .add_local_dir("policyengine_uk_data", "/root/policyengine_uk_data") ) From 0c7bb8f915347ece9ed76fa2bf90faa8f42f4687 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 16:02:24 +0000 Subject: [PATCH 24/25] Upgrade Modal specs: 16 CPU/32GB for imputation, A10G GPU for calibration --- policyengine_uk_data/utils/modal_calibrate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index 248ea706..ec2823bf 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -21,8 +21,8 @@ @app.function( - cpu=8, - memory=16384, + cpu=16, + memory=32768, image=image_cpu, timeout=3600, serialized=True, @@ -70,7 +70,7 @@ def run_imputation(frs_bytes: bytes, year: int = 2023) -> bytes: return open(out_path, "rb").read() -@app.function(gpu="T4", image=image_gpu, timeout=3600, serialized=True) +@app.function(gpu="A10G", image=image_gpu, timeout=3600, serialized=True) def run_calibration( matrix: bytes, y: bytes, From 0edba1ddde4baa3317b39c22681f9a2fc7b0bd2f Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Feb 2026 16:50:39 +0000 Subject: [PATCH 25/25] Revert imputation offloading; Modal GPU for calibration only --- .../datasets/create_datasets.py | 123 ++++++------------ policyengine_uk_data/utils/modal_calibrate.py | 66 +--------- 2 files changed, 38 insertions(+), 151 deletions(-) diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index 6b32e23a..41c2c149 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -197,101 +197,52 @@ def main(): frs.save(STORAGE_FOLDER / "frs_2023_24.h5") update_dataset("Create base FRS dataset", "completed") - if USE_MODAL: - import modal - from policyengine_uk_data.utils.modal_calibrate import ( - app, - run_imputation, - ) - from policyengine_uk.data import UKSingleYearDataset - import tempfile - - for step in [ - "Impute consumption", - "Impute wealth", - "Impute VAT", - "Impute public service usage", - "Impute income", - "Impute capital gains", - "Impute salary sacrifice", - "Impute student loan plan", - "Uprate to 2025", - ]: - update_dataset(step, "processing") - - with modal.enable_output(), app.run(): - frs_bytes = open( - STORAGE_FOLDER / "frs_2023_24.h5", "rb" - ).read() - frs_bytes_out = run_imputation.remote(frs_bytes, year=2023) - - with tempfile.NamedTemporaryFile( - suffix=".h5", delete=False - ) as f: - f.write(frs_bytes_out) - frs_path = f.name - frs = UKSingleYearDataset(file_path=frs_path) - - for step in [ - "Impute consumption", - "Impute wealth", - "Impute VAT", - "Impute public service usage", - "Impute income", - "Impute capital gains", - "Impute salary sacrifice", - "Impute student loan plan", - "Uprate to 2025", - ]: - update_dataset(step, "completed") - else: - from policyengine_uk_data.datasets.imputations import ( - impute_consumption, - impute_wealth, - impute_vat, - impute_income, - impute_capital_gains, - impute_services, - impute_salary_sacrifice, - impute_student_loan_plan, - ) + from policyengine_uk_data.datasets.imputations import ( + impute_consumption, + impute_wealth, + impute_vat, + impute_income, + impute_capital_gains, + impute_services, + impute_salary_sacrifice, + impute_student_loan_plan, + ) - update_dataset("Impute wealth", "processing") - frs = impute_wealth(frs) - update_dataset("Impute wealth", "completed") + update_dataset("Impute wealth", "processing") + frs = impute_wealth(frs) + update_dataset("Impute wealth", "completed") - update_dataset("Impute consumption", "processing") - frs = impute_consumption(frs) - update_dataset("Impute consumption", "completed") + update_dataset("Impute consumption", "processing") + frs = impute_consumption(frs) + update_dataset("Impute consumption", "completed") - update_dataset("Impute VAT", "processing") - frs = impute_vat(frs) - update_dataset("Impute VAT", "completed") + update_dataset("Impute VAT", "processing") + frs = impute_vat(frs) + update_dataset("Impute VAT", "completed") - update_dataset("Impute public service usage", "processing") - frs = impute_services(frs) - update_dataset("Impute public service usage", "completed") + update_dataset("Impute public service usage", "processing") + frs = impute_services(frs) + update_dataset("Impute public service usage", "completed") - update_dataset("Impute income", "processing") - frs = impute_income(frs) - update_dataset("Impute income", "completed") + update_dataset("Impute income", "processing") + frs = impute_income(frs) + update_dataset("Impute income", "completed") - update_dataset("Impute capital gains", "processing") - frs = impute_capital_gains(frs) - update_dataset("Impute capital gains", "completed") + update_dataset("Impute capital gains", "processing") + frs = impute_capital_gains(frs) + update_dataset("Impute capital gains", "completed") - update_dataset("Impute salary sacrifice", "processing") - frs = impute_salary_sacrifice(frs) - update_dataset("Impute salary sacrifice", "completed") + update_dataset("Impute salary sacrifice", "processing") + frs = impute_salary_sacrifice(frs) + update_dataset("Impute salary sacrifice", "completed") - update_dataset("Impute student loan plan", "processing") - frs = impute_student_loan_plan(frs, year=2025) - update_dataset("Impute student loan plan", "completed") + update_dataset("Impute student loan plan", "processing") + frs = impute_student_loan_plan(frs, year=2025) + update_dataset("Impute student loan plan", "completed") - if not USE_MODAL: - update_dataset("Uprate to 2025", "processing") - frs = uprate_dataset(frs, 2025) - update_dataset("Uprate to 2025", "completed") + update_dataset("Uprate to 2025", "processing") + frs = uprate_dataset(frs, 2025) + update_dataset("Uprate to 2025", "completed") from policyengine_uk_data.datasets.local_areas.constituencies.loss import ( create_constituency_target_matrix, diff --git a/policyengine_uk_data/utils/modal_calibrate.py b/policyengine_uk_data/utils/modal_calibrate.py index ec2823bf..60bb1d88 100644 --- a/policyengine_uk_data/utils/modal_calibrate.py +++ b/policyengine_uk_data/utils/modal_calibrate.py @@ -8,67 +8,6 @@ "torch", "numpy", "h5py", "pandas" ) -image_cpu = ( - modal.Image.debian_slim(python_version="3.13") - .apt_install("libhdf5-dev", "pkg-config", "gcc") - .run_commands("pip install uv") - .run_commands( - "uv pip install --system torch --index-url https://download.pytorch.org/whl/cpu", - "uv pip install --system policyengine-uk tables microimpute", - ) - .add_local_dir("policyengine_uk_data", "/root/policyengine_uk_data") -) - - -@app.function( - cpu=16, - memory=32768, - image=image_cpu, - timeout=3600, - serialized=True, -) -def run_imputation(frs_bytes: bytes, year: int = 2023) -> bytes: - """ - Run the full imputation pipeline on a high-CPU container. - - Accepts and returns the FRS dataset serialised as h5 bytes. - """ - import io - import tempfile - from policyengine_uk.data import UKSingleYearDataset - from policyengine_uk_data.datasets.imputations import ( - impute_consumption, - impute_wealth, - impute_vat, - impute_income, - impute_capital_gains, - impute_services, - impute_salary_sacrifice, - impute_student_loan_plan, - ) - from policyengine_uk_data.utils.uprating import uprate_dataset - - with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f: - f.write(frs_bytes) - frs_path = f.name - - frs = UKSingleYearDataset(file_path=frs_path) - - frs = impute_wealth(frs) - frs = impute_consumption(frs) - frs = impute_vat(frs) - frs = impute_services(frs) - frs = impute_income(frs) - frs = impute_capital_gains(frs) - frs = impute_salary_sacrifice(frs) - frs = impute_student_loan_plan(frs, year=2025) - frs = uprate_dataset(frs, 2025) - - with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f: - out_path = f.name - frs.save(out_path) - return open(out_path, "rb").read() - @app.function(gpu="A10G", image=image_gpu, timeout=3600, serialized=True) def run_calibration( @@ -84,15 +23,12 @@ def run_calibration( Run the Adam calibration loop on a GPU container. All arrays are serialised with ``np.save`` / deserialised with ``np.load``. - Returns the final weights (area_count × n_households) as np.save bytes. + Returns checkpoints as [(epoch, weights_bytes), ...] every 10 epochs. """ import io import numpy as np import torch - # Inline _run_optimisation to keep the Modal image dependency-free - # (no policyengine_uk_data import needed inside the container). - def load(b): return np.load(io.BytesIO(b))