Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 259 additions & 2 deletions openavmkit/land.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
add_area_fields,
)
from openavmkit.utilities.plotting import plot_histogram_df
from openavmkit.utilities.settings import get_model_group_ids
from openavmkit.utilities.settings import area_unit, get_model_group_ids

from openavmkit.utilities.stats import calc_correlations, calc_cod, calc_r2, calc_mse_r2_adj_r2

Expand Down Expand Up @@ -787,4 +787,261 @@ def _convolve_land_analysis(
print(f"TEST LAND RESULTS, MODEL GROUP : {model_group}, count: {count}")
print("=" * 80)
print(df_results_test.to_string())
print("")
print("")


def calc_lycd_land_values(
df: pd.DataFrame,
settings: dict,
land_alloc: "float | dict | None" = None,
market_value_field: str = "model_market_value",
subarea_field: "str | None" = None,
min_improved_per_cell: int = 10,
) -> pd.DataFrame:
"""Compute land values using the "Least You Can Do" (LYCD) method.

For each local area this method:

1. Takes the median market value and median lot size of **improved**
(non-vacant) properties in the area.
2. Derives a uniform local land rate::

local_land_rate = (median_market_value * land_alloc_pct) / median_lot_size

3. Applies that rate to every parcel in the area::

land_value = local_land_rate * parcel_lot_size

Because every parcel's land value is driven by the *typical* parcel in its
area rather than its own improvement value, the method avoids the absurd
side-by-side disparities that arise from naively multiplying each parcel's
market value by a fixed allocation fraction.

When ``subarea_field`` is provided, rates are computed per
``(subarea, model_group)`` cell rather than per model group alone. This
captures the sharp neighbourhood-to-neighbourhood price jumps that a single
county-wide rate per land-use type would miss. Cells with fewer than
``min_improved_per_cell`` improved parcels fall back first to the
model-group rate, then to the global rate.

When ``land_alloc`` is ``None`` the allocation for each cell is derived
automatically from the data. The median per-unit value of **vacant**
properties is divided by the median per-unit value of **improved**
properties; that ratio is the implied land allocation. Fallback order:
cell → model group → global.

Parameters
----------
df : pd.DataFrame
DataFrame containing at minimum:

- ``"key"``
- ``"model_group"``
- ``market_value_field`` (default ``"model_market_value"``)
- ``"is_vacant"`` (bool)
- ``f"land_area_{unit}"`` where *unit* is ``"sqft"`` or ``"sqm"``
- ``subarea_field`` column, if provided

settings : dict
Settings dictionary (used to determine the area unit).
land_alloc : float, dict, or None
Fraction of market value attributable to land.

- **float** – applied uniformly to all cells.
- **dict** – keyed by ``model_group`` (``{mg: float}``) for per-group
allocations; individual cells inherit their group's allocation.
- **None** – derived automatically from the ratio of vacant-to-improved
per-unit market values, at the finest available level.

market_value_field : str
Column name holding the market value estimate. Defaults to
``"model_market_value"``.
subarea_field : str or None
Column name whose values define geographic subareas (e.g.
``"neighborhood"``, ``"census_tract"``). When supplied, one local land
rate is derived per ``(subarea, model_group)`` cell. When ``None``
(default), a single rate is derived per ``model_group``, which is
equivalent to treating the entire county as one area per land-use type.
min_improved_per_cell : int
Minimum number of improved parcels required in a
``(subarea, model_group)`` cell before that cell gets its own rate.
Cells below this threshold fall back to the model-group rate.
Ignored when ``subarea_field`` is ``None``. Default: 10.

Returns
-------
pd.DataFrame
Copy of *df* with three additional columns:

- ``"lycd_land_alloc"`` – land allocation fraction used for the cell.
- ``"lycd_local_land_rate"`` – implied per-area-unit land rate for the cell.
- ``"lycd_land_value"`` – resulting land value (clamped to ``[0, market_value]``).
"""
unit = area_unit(settings)
lot_area_field = f"land_area_{unit}"

df = df.copy()
model_groups = df["model_group"].unique()

# ------------------------------------------------------------------
# Resolve land_alloc to a per-model-group dict (used as fallback even
# when subarea_field is set, and as the primary source when it is not).
# ------------------------------------------------------------------
if isinstance(land_alloc, dict):
alloc_by_group = {mg: land_alloc.get(mg, np.nan) for mg in model_groups}
elif land_alloc is not None:
alloc_by_group = {mg: float(land_alloc) for mg in model_groups}
else:
alloc_by_group = _derive_lycd_alloc_from_data(
df, lot_area_field, market_value_field, group_field="model_group"
)

# ------------------------------------------------------------------
# Helper: compute one local land rate for a slice of the dataframe.
# Returns np.nan if the slice is too small or the medians are invalid.
# ------------------------------------------------------------------
def _rate_for_mask(mask_improved, alloc):
sub = df[mask_improved]
if len(sub) == 0 or np.isnan(alloc):
return np.nan
median_mv = sub[market_value_field].median()
median_lot = sub[lot_area_field].median()
if median_lot <= 0 or np.isnan(median_lot) or np.isnan(median_mv):
return np.nan
return (median_mv * alloc) / median_lot

# ------------------------------------------------------------------
# Build per-model-group fallback rates (always needed).
# ------------------------------------------------------------------
group_rates = {}
for mg in model_groups:
mask = df["model_group"].eq(mg) & df["is_vacant"].eq(False)
group_rates[mg] = _rate_for_mask(mask, alloc_by_group.get(mg, np.nan))

# ------------------------------------------------------------------
# Assign rates and allocs row-by-row into output columns.
# ------------------------------------------------------------------
df["lycd_land_alloc"] = np.nan
df["lycd_local_land_rate"] = np.nan

if subarea_field is None:
# No geographic subdivision: one rate per model group.
for mg in model_groups:
idx = df["model_group"].eq(mg)
df.loc[idx, "lycd_local_land_rate"] = group_rates[mg]
df.loc[idx, "lycd_land_alloc"] = alloc_by_group.get(mg, np.nan)
else:
# Geographic subdivision: one rate per (subarea, model_group) cell,
# with fallback to model-group rate for thin cells.
subareas = df[subarea_field].unique()

# Pre-derive per-cell allocations when land_alloc is None.
if land_alloc is None:
cell_allocs = _derive_lycd_alloc_from_data(
df,
lot_area_field,
market_value_field,
group_field="model_group",
subarea_field=subarea_field,
alloc_by_group_fallback=alloc_by_group,
)
else:
cell_allocs = {} # will use alloc_by_group for every cell

for mg in model_groups:
mask_mg = df["model_group"].eq(mg)
for sa in subareas:
mask_sa = df[subarea_field].eq(sa)
mask_cell_improved = mask_mg & mask_sa & df["is_vacant"].eq(False)
n_improved = mask_cell_improved.sum()
idx = mask_mg & mask_sa

if n_improved >= min_improved_per_cell:
cell_key = (sa, mg)
alloc = cell_allocs.get(cell_key, alloc_by_group.get(mg, np.nan))
rate = _rate_for_mask(mask_cell_improved, alloc)
if np.isnan(rate):
# Cell medians invalid despite enough rows; fall back.
rate = group_rates[mg]
alloc = alloc_by_group.get(mg, np.nan)
else:
# Too few improved parcels in this cell; use group rate.
rate = group_rates[mg]
alloc = alloc_by_group.get(mg, np.nan)

df.loc[idx, "lycd_local_land_rate"] = rate
df.loc[idx, "lycd_land_alloc"] = alloc

df["lycd_land_value"] = df["lycd_local_land_rate"] * df[lot_area_field]

# Clamp: must be >= 0 and <= market value
df["lycd_land_value"] = df["lycd_land_value"].clip(lower=0)
exceeds_mv = df["lycd_land_value"].gt(df[market_value_field])
df.loc[exceeds_mv, "lycd_land_value"] = df.loc[exceeds_mv, market_value_field]

return df


def _derive_lycd_alloc_from_data(
df: pd.DataFrame,
lot_area_field: str,
market_value_field: str,
group_field: str = "model_group",
subarea_field: "str | None" = None,
alloc_by_group_fallback: "dict | None" = None,
) -> dict:
"""Derive land allocation fractions from vacant vs improved per-unit values.

When ``subarea_field`` is None, returns ``{model_group: alloc}``.
When ``subarea_field`` is provided, returns ``{(subarea, model_group): alloc}``,
falling back to the group-level allocation for cells with no vacant parcels,
and to the global allocation if the group also has none.
"""
def _per_unit_median(mask):
sub = df[mask].copy()
sub = sub[sub[lot_area_field].gt(0)]
rates = sub[market_value_field] / sub[lot_area_field]
return rates.median()

# Global fallback
global_vacant_rate = _per_unit_median(df["is_vacant"].eq(True))
global_improved_rate = _per_unit_median(df["is_vacant"].eq(False))
if global_improved_rate > 0 and not (
np.isnan(global_vacant_rate) or np.isnan(global_improved_rate)
):
global_alloc = global_vacant_rate / global_improved_rate
else:
global_alloc = np.nan

# Per-group allocations (always computed; used as fallback for cells).
group_allocs = {}
for mg in df[group_field].unique():
mask_mg = df[group_field].eq(mg)
vr = _per_unit_median(mask_mg & df["is_vacant"].eq(True))
ir = _per_unit_median(mask_mg & df["is_vacant"].eq(False))
if ir > 0 and not (np.isnan(vr) or np.isnan(ir)):
group_allocs[mg] = vr / ir
else:
group_allocs[mg] = (
alloc_by_group_fallback.get(mg, global_alloc)
if alloc_by_group_fallback
else global_alloc
)

if subarea_field is None:
return group_allocs

# Per-(subarea, group) allocations with group → global fallback.
result = {}
for mg in df[group_field].unique():
mask_mg = df[group_field].eq(mg)
for sa in df[subarea_field].unique():
mask_sa = df[subarea_field].eq(sa)
vr = _per_unit_median(mask_mg & mask_sa & df["is_vacant"].eq(True))
ir = _per_unit_median(mask_mg & mask_sa & df["is_vacant"].eq(False))
if ir > 0 and not (np.isnan(vr) or np.isnan(ir)):
result[(sa, mg)] = vr / ir
else:
result[(sa, mg)] = group_allocs.get(mg, global_alloc)

return result
Loading