diff --git a/internal/gtfs/advanced_direction_calculator.go b/internal/gtfs/advanced_direction_calculator.go index 84453c27..c9cc41aa 100644 --- a/internal/gtfs/advanced_direction_calculator.go +++ b/internal/gtfs/advanced_direction_calculator.go @@ -12,6 +12,7 @@ import ( "sync" "sync/atomic" + "golang.org/x/sync/singleflight" "maglev.onebusaway.org/gtfsdb" "maglev.onebusaway.org/internal/utils" ) @@ -29,7 +30,13 @@ type AdvancedDirectionCalculator struct { shapeCache map[string][]gtfsdb.GetShapePointsWithDistanceRow // Cache of all shape data for bulk operations initialized atomic.Bool // Tracks whether concurrent operations have started cacheMutex sync.RWMutex // Protects map access - directionResults sync.Map // Cached direction results (stopID -> string), includes negative cache + + // directionResults caches computed stop directions. + // Lifecycle note: This map grows indefinitely for the lifetime of the application. + // Unbounded growth is acceptable here because it is strictly bounded by the finite + // number of valid real-world stops, and computed directions remain stable across GTFS reloads. + directionResults sync.Map // Cached direction results (stopID -> string), includes negative cache + requestGroup singleflight.Group // Prevents duplicate concurrent computations for the same stop } // NewAdvancedDirectionCalculator creates a new advanced direction calculator @@ -54,8 +61,9 @@ func (adc *AdvancedDirectionCalculator) SetStandardDeviationThreshold(threshold return nil } -// SetShapeCache sets a pre-loaded cache of shape data to avoid database queries during bulk operations. -// This significantly improves performance when calculating directions for many stops. +// SetShapeCache is retained exclusively for use by the DirectionPrecomputer during startup. +// It sets a pre-loaded cache of shape data to avoid thousands of database queries during +// the precomputation phase, significantly improving startup performance. // IMPORTANT: This must be called before any concurrent operations begin. // Returns an error if called after CalculateStopDirection has been invoked. func (adc *AdvancedDirectionCalculator) SetShapeCache(cache map[string][]gtfsdb.GetShapePointsWithDistanceRow) error { @@ -69,20 +77,6 @@ func (adc *AdvancedDirectionCalculator) SetShapeCache(cache map[string][]gtfsdb. return nil } -// SetContextCache injects the bulk-loaded context data. -// IMPORTANT: This must be called before any concurrent calculation operations begin. -// Returns an error if called after CalculateStopDirection has been invoked. -func (adc *AdvancedDirectionCalculator) SetContextCache(cache map[string][]gtfsdb.GetStopsWithShapeContextRow) error { - adc.cacheMutex.Lock() - defer adc.cacheMutex.Unlock() - - if adc.initialized.Load() { - return errors.New("SetContextCache called after concurrent operations have started") - } - adc.contextCache = cache - return nil -} - // CalculateStopDirection computes the direction for a stop using the Java algorithm func (adc *AdvancedDirectionCalculator) CalculateStopDirection(ctx context.Context, stopID string, gtfsDirection ...sql.NullString) string { if len(gtfsDirection) > 0 && gtfsDirection[0].Valid && gtfsDirection[0].String != "" { @@ -99,12 +93,24 @@ func (adc *AdvancedDirectionCalculator) CalculateStopDirection(ctx context.Conte // Mark as initialized for concurrency safety adc.initialized.Store(true) - result := adc.computeFromShapes(ctx, stopID) + // Fall back to computing from shapes, protected by singleflight + // This ensures concurrent requests for the SAME stopID don't hit the DB multiple times. + v, _, _ := adc.requestGroup.Do(stopID, func() (interface{}, error) { + // Double-check cache inside the singleflight in case another goroutine just finished it + if cached, ok := adc.directionResults.Load(stopID); ok { + return cached.(string), nil + } + + // Actually compute it (Hits the DB) + computedDir := adc.computeFromShapes(ctx, stopID) + + // Store in sync.Map for all future requests + adc.directionResults.Store(stopID, computedDir) - // Cache the result (even empty strings) to avoid recomputation - adc.directionResults.Store(stopID, result) + return computedDir, nil + }) - return result + return v.(string) } // translateGtfsDirection converts GTFS direction field to compass direction diff --git a/internal/gtfs/advanced_direction_calculator_test.go b/internal/gtfs/advanced_direction_calculator_test.go index c34155d1..ad0ce289 100644 --- a/internal/gtfs/advanced_direction_calculator_test.go +++ b/internal/gtfs/advanced_direction_calculator_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "maglev.onebusaway.org/gtfsdb" "maglev.onebusaway.org/internal/models" ) @@ -105,16 +104,14 @@ func TestTranslateGtfsDirection(t *testing.T) { } func TestCalculateStopDirectionResultCache(t *testing.T) { - calc := &AdvancedDirectionCalculator{} - // Set an empty context cache so computeFromShapes doesn't try to hit the DB - _ = calc.SetContextCache(make(map[string][]gtfsdb.GetStopsWithShapeContextRow)) + _, calc := getSharedTestComponents(t) // First call: precomputed direction "NE" from DB should be recognized result := calc.CalculateStopDirection(context.Background(), "stop-1", sql.NullString{String: "NE", Valid: true}) assert.Equal(t, "NE", result, "should recognize compass abbreviation NE from precomputed direction") // Verify that a stop with no GTFS direction falls through to computeFromShapes, - // gets an empty result (no data in cache), and caches the empty result. + // gets an empty result (no data in cache for nonexistent stop), and caches the empty result. result = calc.CalculateStopDirection(context.Background(), "nonexistent-stop", sql.NullString{Valid: false}) assert.Equal(t, "", result, "should return empty for stop with no direction data") @@ -274,7 +271,7 @@ func TestStandardDeviationThreshold(t *testing.T) { func TestCalculateStopDirection_WithShapeData(t *testing.T) { ctx := context.Background() - // Optimization: Reuse shared DB and Cache + // Optimization: Reuse shared DB _, calc := getSharedTestComponents(t) // Test with a real stop from RABA data @@ -285,7 +282,7 @@ func TestCalculateStopDirection_WithShapeData(t *testing.T) { func TestComputeFromShapes_NoShapeData(t *testing.T) { ctx := context.Background() - // Optimization: Reuse shared DB and Cache + // Optimization: Reuse shared DB _, calc := getSharedTestComponents(t) // Test with a non-existent stop @@ -295,7 +292,7 @@ func TestComputeFromShapes_NoShapeData(t *testing.T) { func TestComputeFromShapes_SingleOrientation(t *testing.T) { ctx := context.Background() - // Optimization: Reuse shared DB and Cache + // Optimization: Reuse shared DB _, calc := getSharedTestComponents(t) // Test with actual stop data - single orientation path will be taken if only one trip @@ -442,43 +439,6 @@ func TestTranslateGtfsDirection_NumericEdgeCases(t *testing.T) { } } -func TestSetContextCache_HappyPath(t *testing.T) { - // Create a bare instance (no queries needed for this test) - adc := &AdvancedDirectionCalculator{} - - // Create dummy cache data - cache := make(map[string][]gtfsdb.GetStopsWithShapeContextRow) - cache["stop1"] = []gtfsdb.GetStopsWithShapeContextRow{ - { - ID: "stop1", - Lat: 40.7128, - Lon: -74.0060, - }, - } - - // Set the cache - err := adc.SetContextCache(cache) - assert.NoError(t, err) - - // Verify it was set correctly (accessing private field) - assert.Equal(t, 1, len(adc.contextCache)) - assert.Equal(t, "stop1", adc.contextCache["stop1"][0].ID) -} - -func TestSetContextCache_ReturnsErrorAfterInit(t *testing.T) { - // Create the instance - adc := &AdvancedDirectionCalculator{} - - // Simulate that concurrent operations have already started - // We manually toggle the atomic boolean to "true" - adc.initialized.Store(true) - - // This call MUST return an error now - err := adc.SetContextCache(make(map[string][]gtfsdb.GetStopsWithShapeContextRow)) - assert.Error(t, err) - assert.Equal(t, "SetContextCache called after concurrent operations have started", err.Error()) -} - func TestCalculateStopDirection_VariadicSignature(t *testing.T) { ctx := context.Background() _, calc := getSharedTestComponents(t) @@ -495,51 +455,6 @@ func TestCalculateStopDirection_VariadicSignature(t *testing.T) { assert.Equal(t, "", dirOmitted, "Should fall back gracefully when argument is omitted") } -func TestSetContextCache_ConcurrentAccess(t *testing.T) { - ctx := context.Background() - manager, _ := getSharedTestComponents(t) - // We use shared DB, but MUST use a fresh Calculator to test the race condition specifically on that instance. - calc := NewAdvancedDirectionCalculator(manager.GtfsDB.Queries) - - // Create dummy cache - cache := make(map[string][]gtfsdb.GetStopsWithShapeContextRow) - - // Channel to coordinate start - start := make(chan struct{}) - done := make(chan struct{}) - setErrCh := make(chan error, 1) - - // Launch a "Reader" Goroutine (Simulating a request coming in) - go func() { - <-start // Wait for signal - // This triggers 'initialized.Store(true)' internally - calc.CalculateStopDirection(ctx, "7000") - close(done) - }() - - // Launch a "Writer" (Simulating the bulk loader trying to set cache late) - // We want to verify this doesn't crash the program with a race condition, - // but correctly returns an error if it happens too late. - go func() { - <-start // Wait for signal - setErrCh <- calc.SetContextCache(cache) - }() - - // Start the race - close(start) - - // Wait for reader to finish - <-done - - // Wait for writer to finish - err := <-setErrCh - if err != nil { - assert.Equal(t, "SetContextCache called after concurrent operations have started", err.Error()) - } - - // If got here without the test binary crashing/deadlocking, the atomic guards did their job. -} - // TestBulkQuery_GetStopsWithShapeContextByIDs verifies the bulk optimization func TestBulkQuery_GetStopsWithShapeContextByIDs(t *testing.T) { ctx := context.Background() diff --git a/internal/gtfs/global_cache.go b/internal/gtfs/global_cache.go deleted file mode 100644 index c40667ba..00000000 --- a/internal/gtfs/global_cache.go +++ /dev/null @@ -1,74 +0,0 @@ -package gtfs - -import ( - "context" - "fmt" - "log/slog" - - "maglev.onebusaway.org/gtfsdb" -) - -func InitializeGlobalCache(ctx context.Context, queries *gtfsdb.Queries, adc *AdvancedDirectionCalculator) error { - slog.Info("starting global cache warmup...") - - allStopIDs, err := queries.GetAllStopIDs(ctx) - if err != nil { - return fmt.Errorf("failed to fetch all stop IDs: %w", err) - } - - contextRows, err := queries.GetStopsWithShapeContextByIDs(ctx, allStopIDs) - if err != nil { - return fmt.Errorf("failed to fetch stop context rows: %w", err) - } - - contextCache := make(map[string][]gtfsdb.GetStopsWithShapeContextRow) - shapeIDMap := make(map[string]bool) - var uniqueShapeIDs []string - - for _, row := range contextRows { - calcRow := gtfsdb.GetStopsWithShapeContextRow{ - ID: row.StopID, - ShapeID: row.ShapeID, - Lat: row.Lat, - Lon: row.Lon, - ShapeDistTraveled: row.ShapeDistTraveled, - } - contextCache[row.StopID] = append(contextCache[row.StopID], calcRow) - - if row.ShapeID.Valid && row.ShapeID.String != "" && !shapeIDMap[row.ShapeID.String] { - shapeIDMap[row.ShapeID.String] = true - uniqueShapeIDs = append(uniqueShapeIDs, row.ShapeID.String) - } - } - - shapeCache := make(map[string][]gtfsdb.GetShapePointsWithDistanceRow) - - if len(uniqueShapeIDs) > 0 { - shapePoints, err := queries.GetShapePointsByIDs(ctx, uniqueShapeIDs) - if err != nil { - return fmt.Errorf("failed to fetch shape points for global cache: %w", err) - } - - for _, p := range shapePoints { - shapeCache[p.ShapeID] = append(shapeCache[p.ShapeID], gtfsdb.GetShapePointsWithDistanceRow{ - Lat: p.Lat, - Lon: p.Lon, - ShapeDistTraveled: p.ShapeDistTraveled, - ShapePtSequence: p.ShapePtSequence, - }) - } - } - - if err := adc.SetShapeCache(shapeCache); err != nil { - return fmt.Errorf("failed to set shape cache: %w", err) - } - if err := adc.SetContextCache(contextCache); err != nil { - return fmt.Errorf("failed to set context cache: %w", err) - } - - slog.Info("global cache warmup complete", - slog.Int("stops_cached", len(contextCache)), - slog.Int("shapes_cached", len(shapeCache))) - - return nil -} diff --git a/internal/gtfs/global_cache_test.go b/internal/gtfs/global_cache_test.go deleted file mode 100644 index 7b984bdd..00000000 --- a/internal/gtfs/global_cache_test.go +++ /dev/null @@ -1,181 +0,0 @@ -package gtfs - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "maglev.onebusaway.org/gtfsdb" - "maglev.onebusaway.org/internal/appconf" - "maglev.onebusaway.org/internal/models" -) - -// wipeDatabase clears all tables in the correct order to respect Foreign Keys. -// We delete dependent ("child") tables before independent ("parent") tables. -func wipeDatabase(t *testing.T, client *gtfsdb.Client) { - ctx := context.Background() - // Order is critical to satisfy Foreign Key constraints: - // 1. block_trip_entry (depends on trips AND block_trip_index) - // 2. stop_times (depends on trips AND stops) - // 3. trips (depends on routes, shapes, calendar) - // 4. block_trip_index (referenced by block_trip_entry) - // 5. calendar_dates (depends on service_id) - // 6. calendar, shapes, stops, routes (now safe to delete) - // 7. agencies (root) - queries := []string{ - "DELETE FROM block_trip_entry;", - "DELETE FROM stop_times;", - "DELETE FROM trips;", - "DELETE FROM block_trip_index;", - "DELETE FROM calendar_dates;", - "DELETE FROM calendar;", - "DELETE FROM shapes;", - "DELETE FROM stops;", - "DELETE FROM routes;", - "DELETE FROM agencies;", - } - - for _, q := range queries { - _, err := client.DB.ExecContext(ctx, q) - if err != nil { - t.Fatalf("Failed to execute cleanup query %q: %v", q, err) - } - } -} - -// HAPPY PATH: Uses the Shared DB (Fast, Standard Data) -func TestInitializeGlobalCache_HappyPath(t *testing.T) { - manager, _ := getSharedTestComponents(t) - - calc := NewAdvancedDirectionCalculator(manager.GtfsDB.Queries) - - err := InitializeGlobalCache(context.Background(), manager.GtfsDB.Queries, calc) - assert.NoError(t, err) - - // Verify real data from raba.zip - assert.Greater(t, len(calc.contextCache), 0, "Context cache should be populated") - assert.Greater(t, len(calc.shapeCache), 0, "Shape cache should be populated") - - // Dynamic check: Pick any ID to verify - var sampleID string - for id := range calc.contextCache { - sampleID = id - break - } - assert.NotEmpty(t, sampleID) - - stops := calc.contextCache[sampleID] - assert.NotEmpty(t, stops) - assert.Equal(t, sampleID, stops[0].ID) -} - -// EDGE CASE: Empty Database -func TestInitializeGlobalCache_EmptyDatabase(t *testing.T) { - ctx := context.Background() - - gtfsConfig := Config{ - GtfsURL: models.GetFixturePath(t, "raba.zip"), - GTFSDataPath: ":memory:", - Env: appconf.Test, - } - manager, err := InitGTFSManager(ctx, gtfsConfig) - if err != nil { - t.Fatalf("Failed to init manager: %v", err) - } - defer manager.Shutdown() - - wipeDatabase(t, manager.GtfsDB) - - calc := NewAdvancedDirectionCalculator(manager.GtfsDB.Queries) - err = InitializeGlobalCache(context.Background(), manager.GtfsDB.Queries, calc) - - assert.NoError(t, err) - assert.Equal(t, 0, len(calc.contextCache)) - assert.Equal(t, 0, len(calc.shapeCache)) -} - -// FAILURE CASE: Database Error -func TestInitializeGlobalCache_DatabaseError(t *testing.T) { - ctx := context.Background() - - gtfsConfig := Config{ - GtfsURL: models.GetFixturePath(t, "raba.zip"), - GTFSDataPath: ":memory:", - Env: appconf.Test, - } - manager, err := InitGTFSManager(ctx, gtfsConfig) - if err != nil { - t.Fatalf("Failed to init manager: %v", err) - } - // Important: Defer shutdown to clean up background routines - defer manager.Shutdown() - - // SABOTAGE: Close DB to force errors - _ = manager.GtfsDB.DB.Close() - - calc := NewAdvancedDirectionCalculator(manager.GtfsDB.Queries) - err = InitializeGlobalCache(context.Background(), manager.GtfsDB.Queries, calc) - - assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to fetch") -} - -// EDGE CASE: Stops without Shapes -// Tests that the calculator handles active stops gracefully when the associated -// trip lacks shape geometry (e.g., shape_id is NULL). -func TestInitializeGlobalCache_StopsWithoutShapes(t *testing.T) { - ctx := context.Background() - - gtfsConfig := Config{ - GtfsURL: models.GetFixturePath(t, "raba.zip"), - GTFSDataPath: ":memory:", - Env: appconf.Test, - } - manager, err := InitGTFSManager(ctx, gtfsConfig) - if err != nil { - t.Fatalf("Failed to init manager: %v", err) - } - defer manager.Shutdown() - - // Clear any existing data - wipeDatabase(t, manager.GtfsDB) - - // Setup: Create a "Valid" Stop (Active, but no Shape) - // Link the stop to a trip, otherwise the cache loader will correctly - // ignore it as "unused/ghost" data. - queries := []string{ - // Hierarchy: Agency -> Route -> Calendar -> Trip - `INSERT INTO agencies (id, name, url, timezone) VALUES ('agency_1', 'Test Agency', 'http://example.com', 'UTC');`, - `INSERT INTO routes (id, agency_id, type) VALUES ('route_1', 'agency_1', 3);`, - `INSERT INTO calendar (id, monday, tuesday, wednesday, thursday, friday, saturday, sunday, start_date, end_date) - VALUES ('service_1', 1, 1, 1, 1, 1, 1, 1, '20240101', '20250101');`, - - // CRITICAL: Trip has NULL shape_id - `INSERT INTO trips (id, route_id, service_id, shape_id) VALUES ('trip_1', 'route_1', 'service_1', NULL);`, - - // The Stop itself - `INSERT INTO stops (id, code, name, lat, lon, location_type) VALUES ('orphan_stop', '999', 'Orphan Stop', 10.0, 20.0, 0);`, - - // Link: Stop -> Trip (Makes the stop "Active") - `INSERT INTO stop_times (trip_id, arrival_time, departure_time, stop_id, stop_sequence) - VALUES ('trip_1', 0, 0, 'orphan_stop', 1);`, - } - - for _, q := range queries { - _, err := manager.GtfsDB.DB.ExecContext(ctx, q) - assert.NoError(t, err, "Failed to execute setup query") - } - - // Run the Test - calc := NewAdvancedDirectionCalculator(manager.GtfsDB.Queries) - err = InitializeGlobalCache(ctx, manager.GtfsDB.Queries, calc) - - // Verify - assert.NoError(t, err) - - // Should be 1 because the stop is now active (linked to trip_1) - assert.Equal(t, 1, len(calc.contextCache), "Active stop should be cached") - - // Should be 0 because we explicitly set shape_id to NULL - assert.Equal(t, 0, len(calc.shapeCache), "No shapes should be cached") -}