From cf8d22e994b4472563aa1fbe7405522899eac071 Mon Sep 17 00:00:00 2001 From: suhr25 Date: Mon, 2 Mar 2026 04:54:42 +0000 Subject: [PATCH 1/2] fix: propagate context into static GTFS HTTP download rawGtfsData used http.NewRequest (no context), so ForceUpdate's caller context was never linked to the HTTP request. Switch to http.NewRequestWithContext and thread ctx through loadGTFSData and rawGtfsData so cancellations and deadlines take effect. Signed-off-by: suhr25 --- internal/gtfs/gtfs_manager.go | 2 +- internal/gtfs/static.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/gtfs/gtfs_manager.go b/internal/gtfs/gtfs_manager.go index 411fc545..864723f5 100644 --- a/internal/gtfs/gtfs_manager.go +++ b/internal/gtfs/gtfs_manager.go @@ -81,7 +81,7 @@ func (manager *Manager) MarkReady() { func InitGTFSManager(config Config) (*Manager, error) { isLocalFile := !strings.HasPrefix(config.GtfsURL, "http://") && !strings.HasPrefix(config.GtfsURL, "https://") - staticData, err := loadGTFSData(config.GtfsURL, isLocalFile, config) + staticData, err := loadGTFSData(context.Background(), config.GtfsURL, isLocalFile, config) if err != nil { return nil, err } diff --git a/internal/gtfs/static.go b/internal/gtfs/static.go index 738044b2..8cdb48b2 100644 --- a/internal/gtfs/static.go +++ b/internal/gtfs/static.go @@ -15,7 +15,7 @@ import ( "maglev.onebusaway.org/internal/logging" ) -func rawGtfsData(source string, isLocalFile bool, config Config) ([]byte, error) { +func rawGtfsData(ctx context.Context, source string, isLocalFile bool, config Config) ([]byte, error) { var b []byte var err error @@ -27,7 +27,7 @@ func rawGtfsData(source string, isLocalFile bool, config Config) ([]byte, error) return nil, fmt.Errorf("error reading local GTFS file: %w", err) } } else { - req, err := http.NewRequest("GET", source, nil) + req, err := http.NewRequestWithContext(ctx, "GET", source, nil) if err != nil { return nil, fmt.Errorf("error creating GTFS request: %w", err) } @@ -115,8 +115,8 @@ func buildGtfsDB(config Config, isLocalFile bool, dbPath string) (*gtfsdb.Client } // loadGTFSData loads and parses GTFS data from either a URL or a local file -func loadGTFSData(source string, isLocalFile bool, config Config) (*gtfs.Static, error) { - b, err := rawGtfsData(source, isLocalFile, config) +func loadGTFSData(ctx context.Context, source string, isLocalFile bool, config Config) (*gtfs.Static, error) { + b, err := rawGtfsData(ctx, source, isLocalFile, config) if err != nil { return nil, fmt.Errorf("error reading GTFS data: %w", err) } @@ -190,7 +190,7 @@ func (manager *Manager) ForceUpdate(ctx context.Context) error { logger := slog.Default().With(slog.String("component", "gtfs_updater")) - newStaticData, err := loadGTFSData(manager.config.GtfsURL, manager.isLocalFile, manager.config) + newStaticData, err := loadGTFSData(ctx, manager.config.GtfsURL, manager.isLocalFile, manager.config) if err != nil { logging.LogError(logger, "Error updating GTFS data", err, slog.String("source", manager.config.GtfsURL)) From 9fe937d63b01b57001e4243efc3aaa792464d0fd Mon Sep 17 00:00:00 2001 From: Suhrid Marwah Date: Wed, 4 Mar 2026 12:19:59 +0000 Subject: [PATCH 2/2] fix: resolve merge conflict combining ctx propagation and startup retry logic - InitGTFSManager now accepts context.Context and propagates it through loadGTFSData and buildGtfsDB instead of using context.Background() - Preserve retry loop from main: configurable backoffs, cancellable sleeps, partial DB cleanup between attempts, and retry success logging - buildGtfsDB signature updated to accept context.Context; ForceUpdate call site updated accordingly - Add StartupRetries []time.Duration field to Config for configurable backoffs - Update all InitGTFSManager call sites to pass context.Background() --- cmd/api/app.go | 2 +- .../advanced_direction_calculator_test.go | 2 +- internal/gtfs/config.go | 9 +- internal/gtfs/global_cache_test.go | 6 +- internal/gtfs/gtfs_manager.go | 101 ++++++++++++++++-- internal/gtfs/gtfs_manager_test.go | 8 +- internal/gtfs/hot_swap_test.go | 10 +- internal/gtfs/shutdown_test.go | 7 +- internal/gtfs/static.go | 6 +- internal/restapi/http_test.go | 2 +- .../input_validation_integration_test.go | 3 +- .../vehicles_for_agency_handler_test.go | 2 +- 12 files changed, 120 insertions(+), 38 deletions(-) diff --git a/cmd/api/app.go b/cmd/api/app.go index c4063ed6..8ba307a1 100644 --- a/cmd/api/app.go +++ b/cmd/api/app.go @@ -70,7 +70,7 @@ func ParseAPIKeys(apiKeysFlag string) []string { func BuildApplication(cfg appconf.Config, gtfsCfg gtfs.Config) (*app.Application, error) { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - gtfsManager, err := gtfs.InitGTFSManager(gtfsCfg) + gtfsManager, err := gtfs.InitGTFSManager(context.Background(), gtfsCfg) if err != nil { return nil, fmt.Errorf("failed to initialize GTFS manager: %w", err) } diff --git a/internal/gtfs/advanced_direction_calculator_test.go b/internal/gtfs/advanced_direction_calculator_test.go index 8d5fcb35..57d0137a 100644 --- a/internal/gtfs/advanced_direction_calculator_test.go +++ b/internal/gtfs/advanced_direction_calculator_test.go @@ -32,7 +32,7 @@ func getSharedTestComponents(t *testing.T) (*Manager, *AdvancedDirectionCalculat } var err error - sharedManager, err = InitGTFSManager(gtfsConfig) + sharedManager, err = InitGTFSManager(context.Background(), gtfsConfig) if err != nil { panic("Failed to init shared GTFS manager: " + err.Error()) } diff --git a/internal/gtfs/config.go b/internal/gtfs/config.go index 7a71864b..4edd8e98 100644 --- a/internal/gtfs/config.go +++ b/internal/gtfs/config.go @@ -1,6 +1,8 @@ package gtfs import ( + "time" + "maglev.onebusaway.org/internal/appconf" ) @@ -23,9 +25,10 @@ type Config struct { StaticAuthHeaderValue string RTFeeds []RTFeedConfig GTFSDataPath string - Env appconf.Environment - Verbose bool - EnableGTFSTidy bool + Env appconf.Environment + Verbose bool + EnableGTFSTidy bool + StartupRetries []time.Duration // Backoff durations between startup load attempts; defaults to [5s,15s,30s,60s] } // enabledFeeds returns only the enabled feeds that have at least one URL configured. diff --git a/internal/gtfs/global_cache_test.go b/internal/gtfs/global_cache_test.go index 10774490..527a41ca 100644 --- a/internal/gtfs/global_cache_test.go +++ b/internal/gtfs/global_cache_test.go @@ -76,7 +76,7 @@ func TestInitializeGlobalCache_EmptyDatabase(t *testing.T) { GTFSDataPath: ":memory:", Env: appconf.Test, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } @@ -99,7 +99,7 @@ func TestInitializeGlobalCache_DatabaseError(t *testing.T) { GTFSDataPath: ":memory:", Env: appconf.Test, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } @@ -125,7 +125,7 @@ func TestInitializeGlobalCache_StopsWithoutShapes(t *testing.T) { GTFSDataPath: ":memory:", Env: appconf.Test, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } diff --git a/internal/gtfs/gtfs_manager.go b/internal/gtfs/gtfs_manager.go index 864723f5..759939ee 100644 --- a/internal/gtfs/gtfs_manager.go +++ b/internal/gtfs/gtfs_manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "os" "sort" "strings" "sync" @@ -78,12 +79,96 @@ func (manager *Manager) MarkReady() { // InitGTFSManager initializes the Manager with the GTFS data from the given source // The source can be either a URL or a local file path -func InitGTFSManager(config Config) (*Manager, error) { +func InitGTFSManager(ctx context.Context, config Config) (*Manager, error) { isLocalFile := !strings.HasPrefix(config.GtfsURL, "http://") && !strings.HasPrefix(config.GtfsURL, "https://") - staticData, err := loadGTFSData(context.Background(), config.GtfsURL, isLocalFile, config) - if err != nil { - return nil, err + logger := slog.Default().With(slog.String("component", "gtfs_manager")) + + var staticData *gtfs.Static + var gtfsDB *gtfsdb.Client + var err error + + // Use configurable backoffs or default to production values + backoffs := config.StartupRetries + if len(backoffs) == 0 { + backoffs = []time.Duration{5 * time.Second, 15 * time.Second, 30 * time.Second, 60 * time.Second} + } + maxAttempts := len(backoffs) + 1 + + // Skip retries for local files - they will fail identically every time + if isLocalFile { + maxAttempts = 1 + } + + var attemptsMade int + + for attempt := 1; attempt <= maxAttempts; attempt++ { + attemptsMade = attempt + // Attempt to load in-memory static data if we haven't already succeeded + if staticData == nil { + staticData, err = loadGTFSData(ctx, config.GtfsURL, isLocalFile, config) + if err != nil { + if attempt < maxAttempts { + delay := backoffs[attempt-1] + logging.LogError(logger, "Failed to load GTFS static data, retrying", err, + slog.Int("attempt", attempt), + slog.Int("max_attempts", maxAttempts), + slog.Duration("retry_delay", delay), + ) + + // Cancellable sleep + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(delay): + } + continue + } + return nil, fmt.Errorf("failed to load GTFS data after %d attempts: %w", maxAttempts, err) + } + } + + // Attempt to build the SQLite DB if we haven't already succeeded + if gtfsDB == nil { + // Clean up partial SQLite file from previous failed attempts + if attempt > 1 && config.GTFSDataPath != "" && config.GTFSDataPath != ":memory:" { + if removeErr := os.Remove(config.GTFSDataPath); removeErr != nil && !os.IsNotExist(removeErr) { + logging.LogError(logger, "Failed to clean up partial SQLite file before retry", removeErr, + slog.String("path", config.GTFSDataPath), + slog.Int("attempt", attempt), + ) + } + } + + gtfsDB, err = buildGtfsDB(ctx, config, isLocalFile, "") + if err != nil { + if attempt < maxAttempts { + delay := backoffs[attempt-1] + logging.LogError(logger, "Failed to build GTFS database, retrying", err, + slog.Int("attempt", attempt), + slog.Int("max_attempts", maxAttempts), + slog.Duration("retry_delay", delay), + ) + + // Cancellable sleep + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(delay): + } + continue + } + return nil, fmt.Errorf("failed to build GTFS database after %d attempts: %w", maxAttempts, err) + } + } + + // Both loads succeeded, break out of the retry loop + break + } + + // Log success if we recovered via retries + if attemptsMade > 1 { + logger.Info("GTFS data loaded after retry", slog.Int("attempts", attemptsMade)) } manager := &Manager{ @@ -100,21 +185,15 @@ func InitGTFSManager(config Config) (*Manager, error) { feedVehicleTimestamp: make(map[string]uint64), } manager.setStaticGTFS(staticData) - - gtfsDB, err := buildGtfsDB(config, isLocalFile, "") - if err != nil { - return nil, fmt.Errorf("error building GTFS database: %w", err) - } manager.GtfsDB = gtfsDB // Populate systemETag from import metadata - metadata, err := gtfsDB.Queries.GetImportMetadata(context.Background()) + metadata, err := gtfsDB.Queries.GetImportMetadata(ctx) if err == nil && metadata.FileHash != "" { manager.systemETag = fmt.Sprintf(`"%s"`, metadata.FileHash) } // Build spatial index for fast stop location queries - ctx := context.Background() spatialIndex, err := buildStopSpatialIndex(ctx, gtfsDB.Queries) if err != nil { _ = gtfsDB.Close() diff --git a/internal/gtfs/gtfs_manager_test.go b/internal/gtfs/gtfs_manager_test.go index 7652b589..f7a54043 100644 --- a/internal/gtfs/gtfs_manager_test.go +++ b/internal/gtfs/gtfs_manager_test.go @@ -268,7 +268,7 @@ func TestManager_GetVehicleForTrip(t *testing.T) { Env: appconf.Test, } // We use isolated GTFSManager here instead of shared test components because we want to control the real-time vehicles for this test. - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) assert.Nil(t, err) defer manager.Shutdown() @@ -368,7 +368,7 @@ func TestRoutesForAgencyID_MapOptimization(t *testing.T) { GTFSDataPath: ":memory:", Env: appconf.Test, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) require.NoError(t, err, "Failed to initialize manager") defer manager.Shutdown() @@ -403,7 +403,7 @@ func TestRoutesForAgencyID_ConcurrentAccess(t *testing.T) { GTFSDataPath: ":memory:", Env: appconf.Test, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) require.NoError(t, err) defer manager.Shutdown() @@ -472,7 +472,7 @@ func BenchmarkRoutesForAgencyID_MapLookup(b *testing.B) { GTFSDataPath: ":memory:", Env: appconf.Test, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { b.Fatalf("Failed to initialize: %v", err) } diff --git a/internal/gtfs/hot_swap_test.go b/internal/gtfs/hot_swap_test.go index 540b1ec6..bb0170a0 100644 --- a/internal/gtfs/hot_swap_test.go +++ b/internal/gtfs/hot_swap_test.go @@ -33,7 +33,7 @@ func TestHotSwap_QueriesCompleteDuringSwap(t *testing.T) { Env: appconf.Development, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } @@ -115,7 +115,7 @@ func TestHotSwap_FailureRecovery(t *testing.T) { Env: appconf.Development, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } @@ -166,7 +166,7 @@ func TestHotSwap_OldDatabaseCleanup(t *testing.T) { Env: appconf.Development, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } @@ -207,7 +207,7 @@ func TestHotSwap_MutexProtectedSwap(t *testing.T) { Env: appconf.Development, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to init manager: %v", err) } @@ -262,7 +262,7 @@ func TestHotSwap_ConcurrentForceUpdate(t *testing.T) { Env: appconf.Development, } - manager, err := InitGTFSManager(gtfsConfig) + manager, err := InitGTFSManager(context.Background(), gtfsConfig) require.NoError(t, err) defer manager.Shutdown() diff --git a/internal/gtfs/shutdown_test.go b/internal/gtfs/shutdown_test.go index 48dd62fb..f0870c76 100644 --- a/internal/gtfs/shutdown_test.go +++ b/internal/gtfs/shutdown_test.go @@ -1,6 +1,7 @@ package gtfs import ( + "context" "path/filepath" "testing" "time" @@ -23,7 +24,7 @@ func TestManagerShutdown(t *testing.T) { } // Initialize manager - manager, err := InitGTFSManager(config) + manager, err := InitGTFSManager(context.Background(), config) require.NoError(t, err, "Failed to initialize GTFS manager") require.NotNil(t, manager, "Manager should not be nil") @@ -69,7 +70,7 @@ func TestManagerShutdownWithRealtime(t *testing.T) { } // Initialize manager - manager, err := InitGTFSManager(config) + manager, err := InitGTFSManager(context.Background(), config) require.NoError(t, err, "Failed to initialize GTFS manager") require.NotNil(t, manager, "Manager should not be nil") @@ -105,7 +106,7 @@ func TestManagerShutdownIdempotent(t *testing.T) { } // Initialize manager - manager, err := InitGTFSManager(config) + manager, err := InitGTFSManager(context.Background(), config) require.NoError(t, err, "Failed to initialize GTFS manager") // Call shutdown multiple times - should not panic or hang diff --git a/internal/gtfs/static.go b/internal/gtfs/static.go index 8cdb48b2..7b25421c 100644 --- a/internal/gtfs/static.go +++ b/internal/gtfs/static.go @@ -80,7 +80,7 @@ func rawGtfsData(ctx context.Context, source string, isLocalFile bool, config Co return b, nil } -func buildGtfsDB(config Config, isLocalFile bool, dbPath string) (*gtfsdb.Client, error) { +func buildGtfsDB(ctx context.Context, config Config, isLocalFile bool, dbPath string) (*gtfsdb.Client, error) { // If no specific path is provided, use the one from config if dbPath == "" { dbPath = config.GTFSDataPath @@ -91,8 +91,6 @@ func buildGtfsDB(config Config, isLocalFile bool, dbPath string) (*gtfsdb.Client return nil, fmt.Errorf("failed to create GTFS database client: %w", err) } - ctx := context.Background() - if isLocalFile { err = client.ImportFromFile(ctx, config.GtfsURL) } else { @@ -208,7 +206,7 @@ func (manager *Manager) ForceUpdate(ctx context.Context) error { logging.LogError(logger, "Failed to remove existing temp DB", err) } - newGtfsDB, err := buildGtfsDB(manager.config, manager.isLocalFile, tempDBPath) + newGtfsDB, err := buildGtfsDB(ctx, manager.config, manager.isLocalFile, tempDBPath) if err != nil { logging.LogError(logger, "Error building new GTFS DB", err) return err diff --git a/internal/restapi/http_test.go b/internal/restapi/http_test.go index 6d45b074..44903823 100644 --- a/internal/restapi/http_test.go +++ b/internal/restapi/http_test.go @@ -57,7 +57,7 @@ func createTestApiWithClock(t testing.TB, c clock.Clock) *RestAPI { GTFSDataPath: testDbPath, } var err error - testGtfsManager, err = gtfs.InitGTFSManager(gtfsConfig) + testGtfsManager, err = gtfs.InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to initialize shared test GTFS manager: %v", err) } diff --git a/internal/restapi/input_validation_integration_test.go b/internal/restapi/input_validation_integration_test.go index 7d5c224f..76661a94 100644 --- a/internal/restapi/input_validation_integration_test.go +++ b/internal/restapi/input_validation_integration_test.go @@ -1,6 +1,7 @@ package restapi import ( + "context" "fmt" "io" "net/http" @@ -27,7 +28,7 @@ func createTestApiForValidationTests(t *testing.T) *RestAPI { GTFSDataPath: testDbPath, } var err error - testGtfsManager, err = gtfs.InitGTFSManager(gtfsConfig) + testGtfsManager, err = gtfs.InitGTFSManager(context.Background(), gtfsConfig) if err != nil { t.Fatalf("Failed to initialize shared test GTFS manager: %v", err) } diff --git a/internal/restapi/vehicles_for_agency_handler_test.go b/internal/restapi/vehicles_for_agency_handler_test.go index 1838595f..88a03bb4 100644 --- a/internal/restapi/vehicles_for_agency_handler_test.go +++ b/internal/restapi/vehicles_for_agency_handler_test.go @@ -345,7 +345,7 @@ func createTestApiWithRealTimeData(t *testing.T) (*RestAPI, func()) { }, } - gtfsManager, err := gtfs.InitGTFSManager(gtfsConfig) + gtfsManager, err := gtfs.InitGTFSManager(context.Background(), gtfsConfig) require.NoError(t, err) dirCalc := gtfs.NewAdvancedDirectionCalculator(gtfsManager.GtfsDB.Queries)