Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ann_arbor_test/kpis.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"overall_model_quality_score": 0.0,
"overall_texture_quality": {
"raw_counts": {
"High confidence": 0,
"Low confidence": 0,
"Occluded": 0,
"Missing": 198
},
"percentages": {
"High confidence": 0.0,
"Low confidence": 0.0,
"Occluded": 0.0,
"Missing": 100.0
}
},
"overall_3d_model_to_texture_correlation": 0.0
}
Binary file added ann_arbor_test/meshes/combined.glb
Binary file not shown.
Binary file added ann_arbor_test/meshes/relation_16068956.glb
Binary file not shown.
9 changes: 8 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
ruff
pytest
geopandas
python-dotenv
trimesh
shapely
dotenv
scipy
opencv-python
Pillow
requests
networkx
mapbox-earcut
3 changes: 3 additions & 0 deletions src/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .main import main

__all__ = ["main"]
68 changes: 68 additions & 0 deletions src/cli/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import argparse
import json
from pathlib import Path

from dotenv import load_dotenv

from src.evals import generate_kpi_report
from src.ingestion import run_ingestion
from src.mesh import build_scene
from src.texture import run_raycaster


def main():
load_dotenv()
parser = argparse.ArgumentParser(
description="POSM: Python OpenStreetMap to 3D Mesh Pipeline",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

# Required Arguments
parser.add_argument("lat", type=float, help="Latitude of the center point")
parser.add_argument("lon", type=float, help="Longitude of the center point")

# Optional Arguments
parser.add_argument(
"--buffer", type=float, default=0.001, help="Bounding box buffer size in degrees"
)
parser.add_argument(
"--output-dir", type=str, default="output", help="Directory to save meshes and data"
)

args = parser.parse_args()

print(f"=== Starting POSM Pipeline for ({args.lat}, {args.lon}) ===")

# Step 1: Ingestion
print("\n--- Phase 1: Ingestion ---")
data = run_ingestion(args.lat, args.lon, args.buffer, f"{args.output_dir}/mapillary")

if not data.get("buildings_joined"):
print("Pipeline aborted: No buildings found in this area.")
return

# Step 2: Mesh Generation
print("\n--- Phase 2: Mesh Generation ---")
combined_mesh_path = f"{args.output_dir}/meshes/combined.glb"
build_scene(data, output_dir=f"{args.output_dir}/meshes")

# Step 3: Texture Mapping (Raycasting)
print("\n--- Phase 3: Texture Raycasting ---")
raycast_results = run_raycaster(data, combined_mesh_path)

# Step 4: KPI Evaluation
print("\n--- Phase 4: KPI Evaluation ---")
kpis = generate_kpi_report(**raycast_results)

# Save KPIs to disk
kpi_path = Path(args.output_dir) / "kpis.json"
with open(kpi_path, "w") as f:
json.dump(kpis, f, indent=4)

print("\n=== POSM Pipeline Complete! ===")
print(f"Overall Model Quality Score: {kpis['overall_model_quality_score']}/100")
print(f"Results saved to: {args.output_dir}/")


if __name__ == "__main__":
main()
30 changes: 30 additions & 0 deletions src/evals/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from .model_quality import calculate_overall_model_quality, calculate_texture_correlation
from .texture_quality import aggregate_texture_quality, evaluate_face_confidence


def generate_kpi_report(
image_colors: list, mesh_colors: list, faces_data: dict, total_faces: int
) -> dict:
"""Generates the full suite of Model Quality Indicators required by Phase I."""

correlation = calculate_texture_correlation(image_colors, mesh_colors)
texture_quality = aggregate_texture_quality(faces_data, total_faces)

overall_score = calculate_overall_model_quality(
correlation_score=correlation, texture_percentages=texture_quality["percentages"]
)

return {
"overall_model_quality_score": overall_score,
"overall_texture_quality": texture_quality,
"overall_3d_model_to_texture_correlation": correlation,
}


__all__ = [
"evaluate_face_confidence",
"aggregate_texture_quality",
"calculate_texture_correlation",
"calculate_overall_model_quality",
"generate_kpi_report",
]
58 changes: 58 additions & 0 deletions src/evals/model_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Dict, List

import numpy as np


def calculate_texture_correlation(
image_colors: List[List[float]], mesh_colors: List[List[float]]
) -> float:
"""
Calculates the 'Overall 3D model to texture correlation' using cosine similarity.
Measures how well the input 3D models and generated texture are aligned.
"""
if not image_colors or not mesh_colors or len(image_colors) != len(mesh_colors):
return 0.0

cos_sims = []
for img_c, mesh_c in zip(image_colors, mesh_colors):
# Prevent division by zero and normalize vectors
img_vec = np.array(img_c[:3], dtype=float)
mesh_vec = np.array(mesh_c[:3], dtype=float)

norm_img = np.linalg.norm(img_vec)
norm_mesh = np.linalg.norm(mesh_vec)

if norm_img == 0 or norm_mesh == 0:
continue

sim = np.dot(img_vec / norm_img, mesh_vec / norm_mesh)
# Clip to handle minor floating point inaccuracies
sim = np.clip(sim, -1.0, 1.0)
cos_sims.append(sim)

# Convert average from [-1, 1] scale to [0, 1] scale
avg_sim = float(np.mean(cos_sims)) if cos_sims else 0.0
correlation_normalized = (avg_sim + 1) / 2

return round(correlation_normalized, 4)


def calculate_overall_model_quality(
correlation_score: float, texture_percentages: Dict[str, float]
) -> float:
"""
Calculates 'Overall model quality' on a scale from 0 to 100.
Combines texture quality scores with the 3D correlation score.
"""
# Weighting the confidence percentages
high_conf = texture_percentages.get("High confidence", 0)
low_conf = texture_percentages.get("Low confidence", 0)

# Give partial credit for low confidence, full credit for high confidence
coverage_score = (high_conf * 1.0) + (low_conf * 0.4)

# Final formula: Coverage heavily dictates the baseline, correlation scales it.
# If coverage is perfect (100) and correlation is perfect (1.0), score is 100.
final_score = coverage_score * correlation_score

return round(final_score, 2)
46 changes: 46 additions & 0 deletions src/evals/texture_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Dict, List


def evaluate_face_confidence(face_hits: List[Dict[str, Any]]) -> str:
"""
Evaluates a single mesh polygon (face) and categorizes its texture confidence.
Categories match the Phase I criteria: High confidence, Low confidence, Occluded, Missing.
"""
# 4. Missing: No photograph of area
if not face_hits:
return "Missing"

# 3. Occluded: Unseen area blocked by objects (simulated by hit distance vs expected)
if any(hit.get("is_occluded", False) for hit in face_hits):
return "Occluded"

unique_viewpoints = set(hit.get("image_id") for hit in face_hits)

# Calculate average visibility/goodness score for the hits on this face
avg_score = sum(hit.get("visibility_score", 0) for hit in face_hits) / len(face_hits)

# 1. High confidence: Good, detailed imagery available from multiple viewpoints
if len(unique_viewpoints) >= 2 and avg_score >= 1.5:
return "High confidence"

# 2. Low confidence: Out-of-focus, grainy, or inconsistent imagery
return "Low confidence"


def aggregate_texture_quality(
faces_data: Dict[int, List[Dict[str, Any]]], total_faces: int
) -> Dict[str, Any]:
"""
Aggregates the per-area evaluations into an overall texture quality distribution.
"""
distribution = {"High confidence": 0, "Low confidence": 0, "Occluded": 0, "Missing": 0}

for face_id in range(total_faces):
hits = faces_data.get(face_id, [])
confidence = evaluate_face_confidence(hits)
distribution[confidence] += 1

# Calculate percentages
percentages = {k: round((v / total_faces) * 100, 2) for k, v in distribution.items()}

return {"raw_counts": distribution, "percentages": percentages}
3 changes: 1 addition & 2 deletions src/ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from pathlib import Path
from typing import Any, Dict

from posm.src.ingestion.spatial_joiner import spatial_join_data

from .api_client import fetch_mapillary_metadata, fetch_osm_buildings
from .spatial_joiner import spatial_join_data


def run_ingestion(
Expand Down
3 changes: 3 additions & 0 deletions src/mesh/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .generator import build_scene

__all__ = ["build_scene"]
115 changes: 115 additions & 0 deletions src/mesh/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from pathlib import Path
from typing import Any, Dict, List

import numpy as np
import trimesh

SCALE = 5


def generate_plane(height: float, width: float):
corners = [
[0, 0, 0],
[0, width, 0],
[height, width, 0],
[height, 0, 0],
]
faces = np.array([[0, 1, 2, 3]])
plane = trimesh.Trimesh(vertices=corners, faces=faces)
return plane, corners, faces


def initialize_plane(min_lat: float, min_lon: float, max_lat: float, max_lon: float):
max_lat_scaled = int(max_lat * (10**SCALE))
min_lat_scaled = int(min_lat * (10**SCALE))
max_lon_scaled = int(max_lon * (10**SCALE))
min_lon_scaled = int(min_lon * (10**SCALE))

delta_lat = abs(max_lat_scaled - min_lat_scaled)
delta_long = abs(max_lon_scaled - min_lon_scaled)

plane, corners, faces = generate_plane(delta_lat, delta_long)
return plane, corners, faces


def get_corners(footprint_latlon: List, min_lat: float, min_lon: float):
corners = []
for point in footprint_latlon:
latitude = int(float(point[0]) * (10**SCALE))
longitude = int(float(point[1]) * (10**SCALE))

local_i = abs(latitude - int(min_lat * (10**SCALE)))
local_j = abs(longitude - int(min_lon * (10**SCALE)))

corners.append([local_i, local_j])
return corners


def get_lines(corners: List, loop: bool = True):
lines = []
start = 0
end = 1
lines.append(trimesh.path.entities.Line([start, end]))

for i in range(len(corners) - 2):
start += 1
end += 1
lines.append(trimesh.path.entities.Line([start, end]))

if loop:
lines.append(trimesh.path.entities.Line([end, 0]))
return lines


def build_scene(
ingestion_data: Dict[str, Any], output_dir: str = "output_meshes"
) -> trimesh.Trimesh:
"""Takes ingested OSM data and builds 3D extruded building meshes."""
bbox = ingestion_data.get("bbox_south_west_north_east", [0, 0, 0, 0])
min_lat, min_lon, max_lat, max_lon = bbox[0], bbox[1], bbox[2], bbox[3]

plane, plane_vertices, plane_faces = initialize_plane(min_lat, min_lon, max_lat, max_lon)
buildings = [plane]

out_path = Path(output_dir)
out_path.mkdir(parents=True, exist_ok=True)

buildings_data = ingestion_data.get("buildings_joined", [])
print(f"Extruding {len(buildings_data)} buildings...")

for b in buildings_data:
osm_id = b.get("osm_id", "unknown").replace("/", "_")
corners = get_corners(b.get("footprint_latlon", []), min_lat, min_lon)
lines = get_lines(corners)

path = trimesh.path.path.Path2D(
entities=lines,
vertices=corners,
)

polys = path.polygons_closed
if not polys or not polys[0]:
continue

height = b.get("height_m") or 3.0
height = -1 * height
mesh = path.extrude(height=height)

if isinstance(mesh, list):
mesh = trimesh.util.concatenate(
[m.to_mesh() if hasattr(m, "to_mesh") else m for m in mesh]
)
else:
if hasattr(mesh, "to_mesh"):
mesh = mesh.to_mesh()

mesh.export(str(out_path / f"{osm_id}.glb"), file_type="glb")
buildings.append(mesh)

combined_mesh = trimesh.util.concatenate(buildings)

combined_mesh_path = out_path / "combined.glb"
combined_mesh.export(str(combined_mesh_path))
print(f"Saved combined building mesh to {combined_mesh_path}!")

return combined_mesh
3 changes: 3 additions & 0 deletions src/texture/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .raycaster import run_raycaster

__all__ = ["run_raycaster"]
Loading