diff --git a/openavmkit/land.py b/openavmkit/land.py index 61a53656..b9af6d09 100644 --- a/openavmkit/land.py +++ b/openavmkit/land.py @@ -22,7 +22,7 @@ add_area_fields, ) from openavmkit.utilities.plotting import plot_histogram_df -from openavmkit.utilities.settings import get_model_group_ids +from openavmkit.utilities.settings import area_unit, get_model_group_ids from openavmkit.utilities.stats import calc_correlations, calc_cod, calc_r2, calc_mse_r2_adj_r2 @@ -787,4 +787,261 @@ def _convolve_land_analysis( print(f"TEST LAND RESULTS, MODEL GROUP : {model_group}, count: {count}") print("=" * 80) print(df_results_test.to_string()) - print("") \ No newline at end of file + print("") + + +def calc_lycd_land_values( + df: pd.DataFrame, + settings: dict, + land_alloc: "float | dict | None" = None, + market_value_field: str = "model_market_value", + subarea_field: "str | None" = None, + min_improved_per_cell: int = 10, +) -> pd.DataFrame: + """Compute land values using the "Least You Can Do" (LYCD) method. + + For each local area this method: + + 1. Takes the median market value and median lot size of **improved** + (non-vacant) properties in the area. + 2. Derives a uniform local land rate:: + + local_land_rate = (median_market_value * land_alloc_pct) / median_lot_size + + 3. Applies that rate to every parcel in the area:: + + land_value = local_land_rate * parcel_lot_size + + Because every parcel's land value is driven by the *typical* parcel in its + area rather than its own improvement value, the method avoids the absurd + side-by-side disparities that arise from naively multiplying each parcel's + market value by a fixed allocation fraction. + + When ``subarea_field`` is provided, rates are computed per + ``(subarea, model_group)`` cell rather than per model group alone. This + captures the sharp neighbourhood-to-neighbourhood price jumps that a single + county-wide rate per land-use type would miss. Cells with fewer than + ``min_improved_per_cell`` improved parcels fall back first to the + model-group rate, then to the global rate. + + When ``land_alloc`` is ``None`` the allocation for each cell is derived + automatically from the data. The median per-unit value of **vacant** + properties is divided by the median per-unit value of **improved** + properties; that ratio is the implied land allocation. Fallback order: + cell → model group → global. + + Parameters + ---------- + df : pd.DataFrame + DataFrame containing at minimum: + + - ``"key"`` + - ``"model_group"`` + - ``market_value_field`` (default ``"model_market_value"``) + - ``"is_vacant"`` (bool) + - ``f"land_area_{unit}"`` where *unit* is ``"sqft"`` or ``"sqm"`` + - ``subarea_field`` column, if provided + + settings : dict + Settings dictionary (used to determine the area unit). + land_alloc : float, dict, or None + Fraction of market value attributable to land. + + - **float** – applied uniformly to all cells. + - **dict** – keyed by ``model_group`` (``{mg: float}``) for per-group + allocations; individual cells inherit their group's allocation. + - **None** – derived automatically from the ratio of vacant-to-improved + per-unit market values, at the finest available level. + + market_value_field : str + Column name holding the market value estimate. Defaults to + ``"model_market_value"``. + subarea_field : str or None + Column name whose values define geographic subareas (e.g. + ``"neighborhood"``, ``"census_tract"``). When supplied, one local land + rate is derived per ``(subarea, model_group)`` cell. When ``None`` + (default), a single rate is derived per ``model_group``, which is + equivalent to treating the entire county as one area per land-use type. + min_improved_per_cell : int + Minimum number of improved parcels required in a + ``(subarea, model_group)`` cell before that cell gets its own rate. + Cells below this threshold fall back to the model-group rate. + Ignored when ``subarea_field`` is ``None``. Default: 10. + + Returns + ------- + pd.DataFrame + Copy of *df* with three additional columns: + + - ``"lycd_land_alloc"`` – land allocation fraction used for the cell. + - ``"lycd_local_land_rate"`` – implied per-area-unit land rate for the cell. + - ``"lycd_land_value"`` – resulting land value (clamped to ``[0, market_value]``). + """ + unit = area_unit(settings) + lot_area_field = f"land_area_{unit}" + + df = df.copy() + model_groups = df["model_group"].unique() + + # ------------------------------------------------------------------ + # Resolve land_alloc to a per-model-group dict (used as fallback even + # when subarea_field is set, and as the primary source when it is not). + # ------------------------------------------------------------------ + if isinstance(land_alloc, dict): + alloc_by_group = {mg: land_alloc.get(mg, np.nan) for mg in model_groups} + elif land_alloc is not None: + alloc_by_group = {mg: float(land_alloc) for mg in model_groups} + else: + alloc_by_group = _derive_lycd_alloc_from_data( + df, lot_area_field, market_value_field, group_field="model_group" + ) + + # ------------------------------------------------------------------ + # Helper: compute one local land rate for a slice of the dataframe. + # Returns np.nan if the slice is too small or the medians are invalid. + # ------------------------------------------------------------------ + def _rate_for_mask(mask_improved, alloc): + sub = df[mask_improved] + if len(sub) == 0 or np.isnan(alloc): + return np.nan + median_mv = sub[market_value_field].median() + median_lot = sub[lot_area_field].median() + if median_lot <= 0 or np.isnan(median_lot) or np.isnan(median_mv): + return np.nan + return (median_mv * alloc) / median_lot + + # ------------------------------------------------------------------ + # Build per-model-group fallback rates (always needed). + # ------------------------------------------------------------------ + group_rates = {} + for mg in model_groups: + mask = df["model_group"].eq(mg) & df["is_vacant"].eq(False) + group_rates[mg] = _rate_for_mask(mask, alloc_by_group.get(mg, np.nan)) + + # ------------------------------------------------------------------ + # Assign rates and allocs row-by-row into output columns. + # ------------------------------------------------------------------ + df["lycd_land_alloc"] = np.nan + df["lycd_local_land_rate"] = np.nan + + if subarea_field is None: + # No geographic subdivision: one rate per model group. + for mg in model_groups: + idx = df["model_group"].eq(mg) + df.loc[idx, "lycd_local_land_rate"] = group_rates[mg] + df.loc[idx, "lycd_land_alloc"] = alloc_by_group.get(mg, np.nan) + else: + # Geographic subdivision: one rate per (subarea, model_group) cell, + # with fallback to model-group rate for thin cells. + subareas = df[subarea_field].unique() + + # Pre-derive per-cell allocations when land_alloc is None. + if land_alloc is None: + cell_allocs = _derive_lycd_alloc_from_data( + df, + lot_area_field, + market_value_field, + group_field="model_group", + subarea_field=subarea_field, + alloc_by_group_fallback=alloc_by_group, + ) + else: + cell_allocs = {} # will use alloc_by_group for every cell + + for mg in model_groups: + mask_mg = df["model_group"].eq(mg) + for sa in subareas: + mask_sa = df[subarea_field].eq(sa) + mask_cell_improved = mask_mg & mask_sa & df["is_vacant"].eq(False) + n_improved = mask_cell_improved.sum() + idx = mask_mg & mask_sa + + if n_improved >= min_improved_per_cell: + cell_key = (sa, mg) + alloc = cell_allocs.get(cell_key, alloc_by_group.get(mg, np.nan)) + rate = _rate_for_mask(mask_cell_improved, alloc) + if np.isnan(rate): + # Cell medians invalid despite enough rows; fall back. + rate = group_rates[mg] + alloc = alloc_by_group.get(mg, np.nan) + else: + # Too few improved parcels in this cell; use group rate. + rate = group_rates[mg] + alloc = alloc_by_group.get(mg, np.nan) + + df.loc[idx, "lycd_local_land_rate"] = rate + df.loc[idx, "lycd_land_alloc"] = alloc + + df["lycd_land_value"] = df["lycd_local_land_rate"] * df[lot_area_field] + + # Clamp: must be >= 0 and <= market value + df["lycd_land_value"] = df["lycd_land_value"].clip(lower=0) + exceeds_mv = df["lycd_land_value"].gt(df[market_value_field]) + df.loc[exceeds_mv, "lycd_land_value"] = df.loc[exceeds_mv, market_value_field] + + return df + + +def _derive_lycd_alloc_from_data( + df: pd.DataFrame, + lot_area_field: str, + market_value_field: str, + group_field: str = "model_group", + subarea_field: "str | None" = None, + alloc_by_group_fallback: "dict | None" = None, +) -> dict: + """Derive land allocation fractions from vacant vs improved per-unit values. + + When ``subarea_field`` is None, returns ``{model_group: alloc}``. + When ``subarea_field`` is provided, returns ``{(subarea, model_group): alloc}``, + falling back to the group-level allocation for cells with no vacant parcels, + and to the global allocation if the group also has none. + """ + def _per_unit_median(mask): + sub = df[mask].copy() + sub = sub[sub[lot_area_field].gt(0)] + rates = sub[market_value_field] / sub[lot_area_field] + return rates.median() + + # Global fallback + global_vacant_rate = _per_unit_median(df["is_vacant"].eq(True)) + global_improved_rate = _per_unit_median(df["is_vacant"].eq(False)) + if global_improved_rate > 0 and not ( + np.isnan(global_vacant_rate) or np.isnan(global_improved_rate) + ): + global_alloc = global_vacant_rate / global_improved_rate + else: + global_alloc = np.nan + + # Per-group allocations (always computed; used as fallback for cells). + group_allocs = {} + for mg in df[group_field].unique(): + mask_mg = df[group_field].eq(mg) + vr = _per_unit_median(mask_mg & df["is_vacant"].eq(True)) + ir = _per_unit_median(mask_mg & df["is_vacant"].eq(False)) + if ir > 0 and not (np.isnan(vr) or np.isnan(ir)): + group_allocs[mg] = vr / ir + else: + group_allocs[mg] = ( + alloc_by_group_fallback.get(mg, global_alloc) + if alloc_by_group_fallback + else global_alloc + ) + + if subarea_field is None: + return group_allocs + + # Per-(subarea, group) allocations with group → global fallback. + result = {} + for mg in df[group_field].unique(): + mask_mg = df[group_field].eq(mg) + for sa in df[subarea_field].unique(): + mask_sa = df[subarea_field].eq(sa) + vr = _per_unit_median(mask_mg & mask_sa & df["is_vacant"].eq(True)) + ir = _per_unit_median(mask_mg & mask_sa & df["is_vacant"].eq(False)) + if ir > 0 and not (np.isnan(vr) or np.isnan(ir)): + result[(sa, mg)] = vr / ir + else: + result[(sa, mg)] = group_allocs.get(mg, global_alloc) + + return result \ No newline at end of file