Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 319 additions & 2 deletions map2loop/thickness_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
multiline_to_line,
find_segment_strike_from_pt,
set_z_values_from_raster_df,
value_from_raster
value_from_raster,
segment_measure_range,
clean_line_geometry,
nearest_orientation_to_line,
iter_line_segments
)
from .interpolators import DipDipDirectionInterpolator

Expand All @@ -15,11 +19,13 @@

# external imports
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, List
from collections import defaultdict
import scipy.interpolate
import beartype
import numpy
import scipy
from scipy.spatial import cKDTree
import pandas
import geopandas
import shapely
Expand Down Expand Up @@ -756,3 +762,314 @@ def compute(
self._check_thickness_percentage_calculations(output_units)

return output_units


class AlongSection(ThicknessCalculator):
"""Thickness calculator that estimates true thicknesses along supplied section lines."""

def __init__(
self,
sections: geopandas.GeoDataFrame,
dtm_data: Optional[gdal.Dataset] = None,
bounding_box: Optional[dict] = None,
max_line_length: Optional[float] = None,
is_strike: Optional[bool] = False,
):
super().__init__(dtm_data, bounding_box, max_line_length, is_strike) # initialise base calculator bits
self.thickness_calculator_label = "AlongSection" # label used externally to identify this strategy
self.sections = (
sections.copy() # keep a copy so editing outside does not mutate our internal state
if sections is not None
else geopandas.GeoDataFrame({"geometry": []}, geometry="geometry") # ensure predictable structure when no sections supplied
)
self.section_thickness_records: geopandas.GeoDataFrame = geopandas.GeoDataFrame() # populated as GeoDataFrame during compute
self.section_intersection_points: dict = {}

def type(self):
"""Return the calculator label."""
return self.thickness_calculator_label

@beartype.beartype
def compute(
self,
units: pandas.DataFrame,
stratigraphic_order: list,
basal_contacts: geopandas.GeoDataFrame,
structure_data: pandas.DataFrame,
geology_data: geopandas.GeoDataFrame,
sampled_contacts: pandas.DataFrame,
) -> pandas.DataFrame:
"""Estimate unit thicknesses along cross sections.

Args:
units (pandas.DataFrame): Table of stratigraphic units that will be annotated with
thickness statistics. Must expose a ``name`` column used to map aggregated
thickness values back to each unit.
stratigraphic_order (list): Accepted for API compatibility but not used by this
calculator. Retained so the method signature matches the other calculators.
basal_contacts (geopandas.GeoDataFrame): Unused placeholder maintained for parity
with the other calculators.
structure_data (pandas.DataFrame): Orientation measurements containing X/Y columns
and one dip column (``DIP``, ``dip`` or ``Dip``). A KD-tree is built from these
points so each split section segment can grab the nearest dip when converting its
length to true thickness.
geology_data (geopandas.GeoDataFrame): Mandatory polygon dataset describing map
units. Each section is intersected with these polygons to generate unit-aware
line segments whose lengths underpin the thickness calculation.
sampled_contacts (pandas.DataFrame): Part of the public interface but not used in
this implementation.

Returns:
pandas.DataFrame: Copy of ``units`` with three new columns – ``ThicknessMean``,
``ThicknessMedian`` and ``ThicknessStdDev`` – populated from the accumulated segment
thickness samples. Units that never receive a measurement retain the sentinel value
``-1`` in each column.

Workflow:
1. Validate the presence of section geometries, geology polygons, and a recognizable
unit-name column. Reproject sections to match the geology CRS when required.
2. Pre-build helpers: a spatial index over geology polygons for fast section/polygon
queries, and (if possible) a KD-tree of orientation points so the closest dip to a
split segment can be queried in O(log n).
3. For every section, reduce complex/multi-part geometries to a single `LineString`,
intersect it with candidate geology polygons, and collect the resulting split line
segments along with their length and measure positions along the parent section.
4. Walk the ordered segments and keep those bounded by two distinct neighbouring units
(i.e., the segment sits between different units on either side). Fetch the nearest
dip (fallback to 90° once if no structures exist), convert the segment length to
true thickness using ``length * sin(dip)``, and store the sample plus provenance in
``self.section_thickness_records``.
5. Aggregate all collected samples per unit (mean/median/std) and return the enriched
``units`` table. The helper ``self.section_intersection_points`` captures the raw
split segments grouped by section to aid downstream inspection.

Notes:
- ``stratigraphic_order``, ``basal_contacts`` and ``sampled_contacts`` are retained for
interface compatibility but do not influence this algorithm.
- When no nearby orientation measurements exist, the method emits a single warning and
assumes a vertical dip (90°) for affected segments to avoid silently dropping units.
- ``self.section_thickness_records`` is materialised as a ``GeoDataFrame`` so the split
segments (geometry column) can be visualised directly in notebooks or GIS clients.
"""

if self.sections is None or self.sections.empty:
logger.warning("AlongSection: No sections provided; skipping thickness calculation.")
return units

if geology_data is None or geology_data.empty:
logger.warning(
"AlongSection: Geology polygons are required to split sections; skipping thickness calculation."
)
return units

unit_column_candidates = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not already consistent in map2loop? I think the thickness calculators/samplers should all assume the data is appropriately formatted.

"UNITNAME",
"unitname",
"UNIT_NAME",
"UnitName",
"unit",
"Unit",
"UNIT",
"name",
"Name",
]
unit_column = next((col for col in unit_column_candidates if col in geology_data.columns), None)
if unit_column is None:
logger.warning(
"AlongSection: Unable to identify a unit-name column in geology data; expected one of %s.",
unit_column_candidates,
)
return units

sections = self.sections.copy()
geology = geology_data.copy()

if geology.crs is not None:
if sections.crs is None:
sections = sections.set_crs(geology.crs)
elif sections.crs != geology.crs:
sections = sections.to_crs(geology.crs)
elif sections.crs is not None:
geology = geology.set_crs(sections.crs)

try:
geology_sindex = geology.sindex
except Exception as e:
logger.error("Failed to create spatial index for geology data: %s", e, exc_info=True)
geology_sindex = None

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth having a check to make sure the segments are in the same place as the polygons? Maybe just check that the extents overlap?

dip_column = next((col for col in ("DIP", "dip", "Dip") if col in structure_data.columns), None)
orientation_tree = None
orientation_dips = None
orientation_coords = None
if dip_column is not None and {"X", "Y"}.issubset(structure_data.columns):
orient_df = structure_data.dropna(subset=["X", "Y", dip_column]).copy()
if not orient_df.empty:
orient_df["_dip_value"] = pandas.to_numeric(orient_df[dip_column], errors="coerce")
orient_df = orient_df.dropna(subset=["_dip_value"])
if not orient_df.empty:
try:
orientation_coords = orient_df[["X", "Y"]].astype(float).to_numpy()
except ValueError:
logger.debug(
"Failed to convert orientation coordinates to float for %d rows. Data quality issue likely. Example rows: %s",
len(orient_df),
orient_df[["X", "Y"]].head(3).to_dict(orient="records")
)
orientation_coords = numpy.empty((0, 2))
if orientation_coords.size:
orientation_dips = orient_df["_dip_value"].astype(float).to_numpy()
try:
orientation_tree = cKDTree(orientation_coords)
except (ValueError, TypeError) as e:
logger.error(f"Failed to construct cKDTree for orientation data: {e}")
orientation_tree = None
default_dip_warning_emitted = False

units_lookup = dict(zip(units["name"], units.index))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are allowing different colum names this need to also change

thickness_by_unit = {name: [] for name in units_lookup.keys()}
thickness_records: List[dict] = []
split_segments_by_section: dict = defaultdict(list)

for section_idx, section_row in sections.iterrows():
line = clean_line_geometry(section_row.geometry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to merge multi-linestrings in this case or treat them as separate lines? I am not sure

if line is None or line.length == 0:
continue
section_id = section_row.get("ID", section_idx)

candidate_idx = (
list(geology_sindex.intersection(line.bounds))
if geology_sindex is not None
else list(range(len(geology)))
)
if not candidate_idx:
continue

split_segments = []
for idx in candidate_idx:
poly = geology.iloc[idx]
polygon_geom = poly.geometry
if polygon_geom is None or polygon_geom.is_empty:
continue
intersection = line.intersection(polygon_geom)
if intersection.is_empty:
continue
for segment in iter_line_segments(intersection):
seg_length = segment.length
if seg_length <= 0:
continue
start_measure, end_measure = segment_measure_range(line, segment)
segment_record = {
"section_id": section_id,
"geometry": segment,
"unit": poly[unit_column],
"length": seg_length,
"start_measure": start_measure,
"end_measure": end_measure,
}
split_segments.append(segment_record)
split_segments_by_section[section_id].append(segment_record)

if len(split_segments) < 1:
continue

split_segments.sort(key=lambda item: (item["start_measure"], item["end_measure"]))

for idx_segment, segment in enumerate(split_segments):
prev_unit = split_segments[idx_segment - 1]["unit"] if idx_segment > 0 else None
next_unit = (
split_segments[idx_segment + 1]["unit"]
if idx_segment + 1 < len(split_segments)
else None
)

if prev_unit is None or next_unit is None:
continue
if prev_unit == segment["unit"] or next_unit == segment["unit"]:
continue
if prev_unit == next_unit:
continue
Comment on lines +986 to +991
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The segment filtering logic may not correctly identify all segments that represent a unit boundary. The current conditions (lines 979-984) skip segments where:

  1. Either neighbor is None (edge segments)
  2. The segment's unit matches either neighbor
  3. Both neighbors are the same unit

However, this logic assumes that a valid thickness measurement requires the segment to be sandwiched between two different neighboring units, neither of which is the segment's own unit. This means segments where the unit itself is flanked by two different units (segment unit != prev_unit and segment unit != next_unit and prev_unit != next_unit) are kept. Consider documenting this assumption more clearly in the code or the docstring, as it represents a specific geological interpretation.

Copilot uses AI. Check for mistakes.

dip_value, orientation_idx, orientation_distance = nearest_orientation_to_line(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be buffered? in case there is nothing nearby?

orientation_tree,
orientation_dips,
orientation_coords,
segment["geometry"]
)
if numpy.isnan(dip_value):
dip_value = 90.0
orientation_idx = None
orientation_distance = None
if not default_dip_warning_emitted:
logger.warning(
"AlongSection: Missing structure measurements near some sections; assuming vertical dip (90°) for those segments."
)
default_dip_warning_emitted = True

thickness = segment["length"] * abs(math.sin(math.radians(dip_value)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to calculate apparent dip?

if thickness <= 0:
continue

unit_name = segment["unit"]
if unit_name not in thickness_by_unit:
thickness_by_unit[unit_name] = []
thickness_by_unit[unit_name].append(thickness)

thickness_records.append(
{
"section_id": section_id,
"unit_id": unit_name,
"thickness": thickness,
"segment_length": segment["length"],
"dip_used_deg": dip_value,
"prev_unit": prev_unit,
"next_unit": next_unit,
"orientation_index": orientation_idx,
"orientation_distance": orientation_distance,
"geometry": segment["geometry"],
}
)

output_units = units.copy()
output_units["ThicknessMean"] = -1.0
output_units["ThicknessMedian"] = -1.0
output_units["ThicknessStdDev"] = -1.0

for unit_name, values in thickness_by_unit.items():
if not values:
continue
idx = units_lookup.get(unit_name)
if idx is None:
continue
arr = numpy.asarray(values, dtype=numpy.float64)
output_units.at[idx, "ThicknessMean"] = float(numpy.nanmean(arr))
output_units.at[idx, "ThicknessMedian"] = float(numpy.nanmedian(arr))
output_units.at[idx, "ThicknessStdDev"] = float(numpy.nanstd(arr))

if thickness_records:
self.section_thickness_records = geopandas.GeoDataFrame(
thickness_records,
geometry="geometry",
crs=sections.crs,
)
else:
self.section_thickness_records = geopandas.GeoDataFrame(
columns=[
"section_id",
"unit_id",
"thickness",
"segment_length",
"dip_used_deg",
"prev_unit",
"next_unit",
"orientation_index",
"orientation_distance",
"geometry",
],
geometry="geometry",
crs=sections.crs,
)
self.section_intersection_points = dict(split_segments_by_section)
self._check_thickness_percentage_calculations(output_units)

return output_units
Loading