diff --git a/ann_arbor_test/kpis.json b/ann_arbor_test/kpis.json new file mode 100644 index 0000000..9d09a03 --- /dev/null +++ b/ann_arbor_test/kpis.json @@ -0,0 +1,18 @@ +{ + "overall_model_quality_score": 0.0, + "overall_texture_quality": { + "raw_counts": { + "High confidence": 0, + "Low confidence": 0, + "Occluded": 0, + "Missing": 198 + }, + "percentages": { + "High confidence": 0.0, + "Low confidence": 0.0, + "Occluded": 0.0, + "Missing": 100.0 + } + }, + "overall_3d_model_to_texture_correlation": 0.0 +} \ No newline at end of file diff --git a/ann_arbor_test/meshes/combined.glb b/ann_arbor_test/meshes/combined.glb new file mode 100644 index 0000000..794792e Binary files /dev/null and b/ann_arbor_test/meshes/combined.glb differ diff --git a/ann_arbor_test/meshes/relation_16068956.glb b/ann_arbor_test/meshes/relation_16068956.glb new file mode 100644 index 0000000..5305a5e Binary files /dev/null and b/ann_arbor_test/meshes/relation_16068956.glb differ diff --git a/requirements.txt b/requirements.txt index b4dbee4..74d6efd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,12 @@ ruff pytest geopandas +python-dotenv +trimesh shapely -dotenv \ No newline at end of file +scipy +opencv-python +Pillow +requests +networkx +mapbox-earcut \ No newline at end of file diff --git a/src/cli/__init__.py b/src/cli/__init__.py index e69de29..21db616 100644 --- a/src/cli/__init__.py +++ b/src/cli/__init__.py @@ -0,0 +1,3 @@ +from .main import main + +__all__ = ["main"] diff --git a/src/cli/main.py b/src/cli/main.py new file mode 100644 index 0000000..5200608 --- /dev/null +++ b/src/cli/main.py @@ -0,0 +1,68 @@ +import argparse +import json +from pathlib import Path + +from dotenv import load_dotenv + +from src.evals import generate_kpi_report +from src.ingestion import run_ingestion +from src.mesh import build_scene +from src.texture import run_raycaster + + +def main(): + load_dotenv() + parser = argparse.ArgumentParser( + description="POSM: Python OpenStreetMap to 3D Mesh Pipeline", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Required Arguments + parser.add_argument("lat", type=float, help="Latitude of the center point") + parser.add_argument("lon", type=float, help="Longitude of the center point") + + # Optional Arguments + parser.add_argument( + "--buffer", type=float, default=0.001, help="Bounding box buffer size in degrees" + ) + parser.add_argument( + "--output-dir", type=str, default="output", help="Directory to save meshes and data" + ) + + args = parser.parse_args() + + print(f"=== Starting POSM Pipeline for ({args.lat}, {args.lon}) ===") + + # Step 1: Ingestion + print("\n--- Phase 1: Ingestion ---") + data = run_ingestion(args.lat, args.lon, args.buffer, f"{args.output_dir}/mapillary") + + if not data.get("buildings_joined"): + print("Pipeline aborted: No buildings found in this area.") + return + + # Step 2: Mesh Generation + print("\n--- Phase 2: Mesh Generation ---") + combined_mesh_path = f"{args.output_dir}/meshes/combined.glb" + build_scene(data, output_dir=f"{args.output_dir}/meshes") + + # Step 3: Texture Mapping (Raycasting) + print("\n--- Phase 3: Texture Raycasting ---") + raycast_results = run_raycaster(data, combined_mesh_path) + + # Step 4: KPI Evaluation + print("\n--- Phase 4: KPI Evaluation ---") + kpis = generate_kpi_report(**raycast_results) + + # Save KPIs to disk + kpi_path = Path(args.output_dir) / "kpis.json" + with open(kpi_path, "w") as f: + json.dump(kpis, f, indent=4) + + print("\n=== POSM Pipeline Complete! ===") + print(f"Overall Model Quality Score: {kpis['overall_model_quality_score']}/100") + print(f"Results saved to: {args.output_dir}/") + + +if __name__ == "__main__": + main() diff --git a/src/evals/__init__.py b/src/evals/__init__.py new file mode 100644 index 0000000..fb43310 --- /dev/null +++ b/src/evals/__init__.py @@ -0,0 +1,30 @@ +from .model_quality import calculate_overall_model_quality, calculate_texture_correlation +from .texture_quality import aggregate_texture_quality, evaluate_face_confidence + + +def generate_kpi_report( + image_colors: list, mesh_colors: list, faces_data: dict, total_faces: int +) -> dict: + """Generates the full suite of Model Quality Indicators required by Phase I.""" + + correlation = calculate_texture_correlation(image_colors, mesh_colors) + texture_quality = aggregate_texture_quality(faces_data, total_faces) + + overall_score = calculate_overall_model_quality( + correlation_score=correlation, texture_percentages=texture_quality["percentages"] + ) + + return { + "overall_model_quality_score": overall_score, + "overall_texture_quality": texture_quality, + "overall_3d_model_to_texture_correlation": correlation, + } + + +__all__ = [ + "evaluate_face_confidence", + "aggregate_texture_quality", + "calculate_texture_correlation", + "calculate_overall_model_quality", + "generate_kpi_report", +] diff --git a/src/evals/model_quality.py b/src/evals/model_quality.py new file mode 100644 index 0000000..dc8697f --- /dev/null +++ b/src/evals/model_quality.py @@ -0,0 +1,58 @@ +from typing import Dict, List + +import numpy as np + + +def calculate_texture_correlation( + image_colors: List[List[float]], mesh_colors: List[List[float]] +) -> float: + """ + Calculates the 'Overall 3D model to texture correlation' using cosine similarity. + Measures how well the input 3D models and generated texture are aligned. + """ + if not image_colors or not mesh_colors or len(image_colors) != len(mesh_colors): + return 0.0 + + cos_sims = [] + for img_c, mesh_c in zip(image_colors, mesh_colors): + # Prevent division by zero and normalize vectors + img_vec = np.array(img_c[:3], dtype=float) + mesh_vec = np.array(mesh_c[:3], dtype=float) + + norm_img = np.linalg.norm(img_vec) + norm_mesh = np.linalg.norm(mesh_vec) + + if norm_img == 0 or norm_mesh == 0: + continue + + sim = np.dot(img_vec / norm_img, mesh_vec / norm_mesh) + # Clip to handle minor floating point inaccuracies + sim = np.clip(sim, -1.0, 1.0) + cos_sims.append(sim) + + # Convert average from [-1, 1] scale to [0, 1] scale + avg_sim = float(np.mean(cos_sims)) if cos_sims else 0.0 + correlation_normalized = (avg_sim + 1) / 2 + + return round(correlation_normalized, 4) + + +def calculate_overall_model_quality( + correlation_score: float, texture_percentages: Dict[str, float] +) -> float: + """ + Calculates 'Overall model quality' on a scale from 0 to 100. + Combines texture quality scores with the 3D correlation score. + """ + # Weighting the confidence percentages + high_conf = texture_percentages.get("High confidence", 0) + low_conf = texture_percentages.get("Low confidence", 0) + + # Give partial credit for low confidence, full credit for high confidence + coverage_score = (high_conf * 1.0) + (low_conf * 0.4) + + # Final formula: Coverage heavily dictates the baseline, correlation scales it. + # If coverage is perfect (100) and correlation is perfect (1.0), score is 100. + final_score = coverage_score * correlation_score + + return round(final_score, 2) diff --git a/src/evals/texture_quality.py b/src/evals/texture_quality.py new file mode 100644 index 0000000..f77d250 --- /dev/null +++ b/src/evals/texture_quality.py @@ -0,0 +1,46 @@ +from typing import Any, Dict, List + + +def evaluate_face_confidence(face_hits: List[Dict[str, Any]]) -> str: + """ + Evaluates a single mesh polygon (face) and categorizes its texture confidence. + Categories match the Phase I criteria: High confidence, Low confidence, Occluded, Missing. + """ + # 4. Missing: No photograph of area + if not face_hits: + return "Missing" + + # 3. Occluded: Unseen area blocked by objects (simulated by hit distance vs expected) + if any(hit.get("is_occluded", False) for hit in face_hits): + return "Occluded" + + unique_viewpoints = set(hit.get("image_id") for hit in face_hits) + + # Calculate average visibility/goodness score for the hits on this face + avg_score = sum(hit.get("visibility_score", 0) for hit in face_hits) / len(face_hits) + + # 1. High confidence: Good, detailed imagery available from multiple viewpoints + if len(unique_viewpoints) >= 2 and avg_score >= 1.5: + return "High confidence" + + # 2. Low confidence: Out-of-focus, grainy, or inconsistent imagery + return "Low confidence" + + +def aggregate_texture_quality( + faces_data: Dict[int, List[Dict[str, Any]]], total_faces: int +) -> Dict[str, Any]: + """ + Aggregates the per-area evaluations into an overall texture quality distribution. + """ + distribution = {"High confidence": 0, "Low confidence": 0, "Occluded": 0, "Missing": 0} + + for face_id in range(total_faces): + hits = faces_data.get(face_id, []) + confidence = evaluate_face_confidence(hits) + distribution[confidence] += 1 + + # Calculate percentages + percentages = {k: round((v / total_faces) * 100, 2) for k, v in distribution.items()} + + return {"raw_counts": distribution, "percentages": percentages} diff --git a/src/ingestion/__init__.py b/src/ingestion/__init__.py index 83fa292..d19bc0e 100644 --- a/src/ingestion/__init__.py +++ b/src/ingestion/__init__.py @@ -1,9 +1,8 @@ from pathlib import Path from typing import Any, Dict -from posm.src.ingestion.spatial_joiner import spatial_join_data - from .api_client import fetch_mapillary_metadata, fetch_osm_buildings +from .spatial_joiner import spatial_join_data def run_ingestion( diff --git a/src/mesh/__init__.py b/src/mesh/__init__.py index e69de29..cb52b05 100644 --- a/src/mesh/__init__.py +++ b/src/mesh/__init__.py @@ -0,0 +1,3 @@ +from .generator import build_scene + +__all__ = ["build_scene"] diff --git a/src/mesh/generator.py b/src/mesh/generator.py new file mode 100644 index 0000000..b41d0af --- /dev/null +++ b/src/mesh/generator.py @@ -0,0 +1,115 @@ +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np +import trimesh + +SCALE = 5 + + +def generate_plane(height: float, width: float): + corners = [ + [0, 0, 0], + [0, width, 0], + [height, width, 0], + [height, 0, 0], + ] + faces = np.array([[0, 1, 2, 3]]) + plane = trimesh.Trimesh(vertices=corners, faces=faces) + return plane, corners, faces + + +def initialize_plane(min_lat: float, min_lon: float, max_lat: float, max_lon: float): + max_lat_scaled = int(max_lat * (10**SCALE)) + min_lat_scaled = int(min_lat * (10**SCALE)) + max_lon_scaled = int(max_lon * (10**SCALE)) + min_lon_scaled = int(min_lon * (10**SCALE)) + + delta_lat = abs(max_lat_scaled - min_lat_scaled) + delta_long = abs(max_lon_scaled - min_lon_scaled) + + plane, corners, faces = generate_plane(delta_lat, delta_long) + return plane, corners, faces + + +def get_corners(footprint_latlon: List, min_lat: float, min_lon: float): + corners = [] + for point in footprint_latlon: + latitude = int(float(point[0]) * (10**SCALE)) + longitude = int(float(point[1]) * (10**SCALE)) + + local_i = abs(latitude - int(min_lat * (10**SCALE))) + local_j = abs(longitude - int(min_lon * (10**SCALE))) + + corners.append([local_i, local_j]) + return corners + + +def get_lines(corners: List, loop: bool = True): + lines = [] + start = 0 + end = 1 + lines.append(trimesh.path.entities.Line([start, end])) + + for i in range(len(corners) - 2): + start += 1 + end += 1 + lines.append(trimesh.path.entities.Line([start, end])) + + if loop: + lines.append(trimesh.path.entities.Line([end, 0])) + return lines + + +def build_scene( + ingestion_data: Dict[str, Any], output_dir: str = "output_meshes" +) -> trimesh.Trimesh: + """Takes ingested OSM data and builds 3D extruded building meshes.""" + bbox = ingestion_data.get("bbox_south_west_north_east", [0, 0, 0, 0]) + min_lat, min_lon, max_lat, max_lon = bbox[0], bbox[1], bbox[2], bbox[3] + + plane, plane_vertices, plane_faces = initialize_plane(min_lat, min_lon, max_lat, max_lon) + buildings = [plane] + + out_path = Path(output_dir) + out_path.mkdir(parents=True, exist_ok=True) + + buildings_data = ingestion_data.get("buildings_joined", []) + print(f"Extruding {len(buildings_data)} buildings...") + + for b in buildings_data: + osm_id = b.get("osm_id", "unknown").replace("/", "_") + corners = get_corners(b.get("footprint_latlon", []), min_lat, min_lon) + lines = get_lines(corners) + + path = trimesh.path.path.Path2D( + entities=lines, + vertices=corners, + ) + + polys = path.polygons_closed + if not polys or not polys[0]: + continue + + height = b.get("height_m") or 3.0 + height = -1 * height + mesh = path.extrude(height=height) + + if isinstance(mesh, list): + mesh = trimesh.util.concatenate( + [m.to_mesh() if hasattr(m, "to_mesh") else m for m in mesh] + ) + else: + if hasattr(mesh, "to_mesh"): + mesh = mesh.to_mesh() + + mesh.export(str(out_path / f"{osm_id}.glb"), file_type="glb") + buildings.append(mesh) + + combined_mesh = trimesh.util.concatenate(buildings) + + combined_mesh_path = out_path / "combined.glb" + combined_mesh.export(str(combined_mesh_path)) + print(f"Saved combined building mesh to {combined_mesh_path}!") + + return combined_mesh diff --git a/src/texture/__init__.py b/src/texture/__init__.py index e69de29..6c1674a 100644 --- a/src/texture/__init__.py +++ b/src/texture/__init__.py @@ -0,0 +1,3 @@ +from .raycaster import run_raycaster + +__all__ = ["run_raycaster"] diff --git a/src/texture/raycaster.py b/src/texture/raycaster.py new file mode 100644 index 0000000..55b86d0 --- /dev/null +++ b/src/texture/raycaster.py @@ -0,0 +1,202 @@ +import math +from typing import Any, Dict, List, Tuple + +import cv2 +import numpy as np +import requests +import trimesh + +# Hyperparameters preserved from original script +SCALE = 5 +interval = 50 +altitude = 1.83 +FOCAL_LENGTH = 3165 + + +def shoot_rays_for_image( + CAMERA_LOC: Tuple[float, float], + HEADING: float, + INPUT_IMG: str, + image_id: str, + bbox: List[float], + street_mesh: trimesh.Trimesh, + visibility_score: float, +) -> Tuple[List[Dict[str, Any]], List[List[float]], List[List[float]]]: + """ + Casts rays from the camera's perspective into the scene and records the hits. + Returns: (list_of_face_hits, list_of_image_colors, list_of_mesh_colors) + """ + # 1. Fetch and decode the Mapillary image + res = requests.get(INPUT_IMG) + if res.status_code != 200: + return [], [], [] + + img_array = np.frombuffer(res.content, np.uint8) + splatter_img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + splatter_img = cv2.cvtColor(splatter_img, cv2.COLOR_BGR2RGB) + + height, width, _ = splatter_img.shape + + MIN_LAT, MIN_LON = bbox[0], bbox[1] + + # 2. Convert coordinates to local metric space + converted_min_lon = int(MIN_LON * 10**SCALE) + converted_min_lat = int(MIN_LAT * 10**SCALE) + converted_cam_lat = int(CAMERA_LOC[0] * 10**SCALE) + converted_cam_lon = int(CAMERA_LOC[1] * 10**SCALE) + + local_cam_lat = converted_cam_lat - converted_min_lat + local_cam_lon = converted_cam_lon - converted_min_lon + + ray_origin = np.array([[local_cam_lat, local_cam_lon, -1 * altitude]]) + + # 3. Calculate camera FOV and headings + HOR_FOV = math.atan((width / 2) / FOCAL_LENGTH) + VERT_FOV = math.atan((height / 2) / FOCAL_LENGTH) + + MIN_HEADING = math.radians(HEADING) - HOR_FOV + + delta_heading = (HOR_FOV * 2) / int(width / interval) + delta_tilt = (VERT_FOV * 2) / int(height / interval) + height_center = int(height / 2) + + rays = [] + colors = [] + heading = MIN_HEADING + + # 4. Sweep the rays across the image + for i in range(0, width, interval): + x = math.cos(heading) + y = math.sin(heading) + tilt = 0 + + column_colors = [] + column_rays = [] + + height_offset = 0 + focal_length_adj = abs(FOCAL_LENGTH / math.cos(abs(math.radians(HEADING) - heading))) + + while height_offset <= int(height_center): + tilt_sin = math.sin(tilt) + column_rays.append(np.array([[x, y, tilt_sin]])) + column_rays.append(np.array([[x, y, -1 * tilt_sin]])) + + if height_center + height_offset < height and i < width: + column_colors.append(splatter_img[height_center + height_offset][i].tolist()) + column_colors.append(splatter_img[height_center - height_offset][i].tolist()) + else: + column_colors.append([255, 255, 255]) + column_colors.append([255, 255, 255]) + + tilt += delta_tilt + denom = math.sin((math.pi / 2) - tilt) + height_offset = abs( + int((focal_length_adj * math.sin(tilt)) / (denom if denom != 0 else 0.0001)) + ) + + colors.append(column_colors) + rays.append(column_rays) + heading += delta_heading + + # 5. Calculate Intersections and Record Data + face_hits = [] + image_colors = [] + mesh_colors = [] + + for i, column_ray in enumerate(rays): + for j, ray in enumerate(column_ray): + locations, index_ray, index_tri = street_mesh.ray.intersects_location( + ray_origins=ray_origin, ray_directions=ray, multiple_hits=False + ) + + if len(locations) != 0: + tri_index = int(index_tri[0]) + hit_loc = locations[0] + + # Calculate distance from camera to hit + distance = float(np.linalg.norm(hit_loc - ray_origin[0])) + + # Basic occlusion check (if ray hits something much closer than expected) + is_occluded = distance < 2.0 + + # Record the hit for the evaluation module + face_hits.append( + { + "face_id": tri_index, + "image_id": image_id, + "distance": distance, + "is_occluded": is_occluded, + "visibility_score": visibility_score, + } + ) + + # Record the colors for correlation scoring + image_colors.append(colors[i][j]) + + # Extract the mesh color at this triangle + if ( + hasattr(street_mesh.visual, "vertex_colors") + and street_mesh.visual.vertex_colors is not None + ): + triangle_vertex_indices = street_mesh.faces[tri_index] + tri_vertex_colors = street_mesh.visual.vertex_colors[triangle_vertex_indices] + avg_color = tri_vertex_colors.mean(axis=0).tolist() + mesh_colors.append(avg_color) + else: + mesh_colors.append([128, 128, 128, 255]) # Fallback gray + + return face_hits, image_colors, mesh_colors + + +def run_raycaster( + ingestion_data: Dict[str, Any], combined_mesh_path: str = "combined.glb" +) -> Dict[str, Any]: + """ + Iterates through all kept images, fires rays, and packages the data for the evaluation module. + """ + print(f"Loading mesh from {combined_mesh_path}...") + street_mesh = trimesh.load_mesh(combined_mesh_path) + + bbox = ingestion_data.get("bbox_south_west_north_east", [0, 0, 0, 0]) + mapillary_kept = ingestion_data.get("mapillary_kept", []) + + all_image_colors = [] + all_mesh_colors = [] + faces_data = {} # Maps face_id to a list of hits + + print(f"Raycasting {len(mapillary_kept)} images...") + + for entry in mapillary_kept: + pose = entry.get("pose", {}) + CAMERA_LOC = (pose.get("lat"), pose.get("lon")) + HEADING = pose.get("heading") + INPUT_IMG = entry.get("url") + image_id = entry.get("image_id", "unknown") + + # Grab the visibility score calculated during ingestion + visibility_score = entry.get("best_candidate", {}).get("visibility_score", 0.0) + + if CAMERA_LOC[0] and CAMERA_LOC[1] and HEADING and INPUT_IMG: + hits, img_colors, msh_colors = shoot_rays_for_image( + CAMERA_LOC, HEADING, INPUT_IMG, image_id, bbox, street_mesh, visibility_score + ) + + # Aggregate colors + all_image_colors.extend(img_colors) + all_mesh_colors.extend(msh_colors) + + # Group hits by the mesh face they hit + for hit in hits: + face_id = hit["face_id"] + if face_id not in faces_data: + faces_data[face_id] = [] + faces_data[face_id].append(hit) + + print("Raycasting complete! Packaging data for evaluation...") + + return { + "image_colors": all_image_colors, + "mesh_colors": all_mesh_colors, + "faces_data": faces_data, + "total_faces": len(street_mesh.faces), + }