diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index 8a1ed7d..e4eb0ca 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -73,6 +73,21 @@ "description": "The max number of times we continue syncing after potential errors", "order": 8 }, + "timeout_seconds": { + "type": "integer", + "title": "Timeout (in seconds)", + "default": 300, + "minimum": 300, + "description": "Timeout in seconds for a sync attempt", + "order": 9 + }, + "use_gtid_with_table_pks": { + "type": "boolean", + "title": "Use GTID with table primary keys", + "default": false, + "description": "Use GTID position together with table primary keys", + "order": 10 + }, "options": { "type": "object", "title": "Customize serialization", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index ed5e273..8fd5266 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -12,16 +12,18 @@ import ( // PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database type PlanetScaleSource struct { - Host string `json:"host"` - Database string `json:"database"` - Username string `json:"username"` - Password string `json:"password"` - Shards string `json:"shards"` - UseReplica bool `json:"use_replica"` - UseRdonly bool `json:"use_rdonly"` - StartingGtids string `json:"starting_gtids"` - Options CustomSourceOptions `json:"options"` - MaxRetries uint `json:"max_retries"` + Host string `json:"host"` + Database string `json:"database"` + Username string `json:"username"` + Password string `json:"password"` + Shards string `json:"shards"` + UseReplica bool `json:"use_replica"` + UseRdonly bool `json:"use_rdonly"` + StartingGtids string `json:"starting_gtids"` + Options CustomSourceOptions `json:"options"` + MaxRetries uint `json:"max_retries"` + TimeoutSeconds *int `json:"timeout_seconds"` + UseGTIDWithTablePKs bool `json:"use_gtid_with_table_pks"` } type CustomSourceOptions struct { diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 383ea55..760c1cb 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -224,7 +224,10 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } table := s.Stream - readDuration := 5 * time.Minute + timeout := 5 * time.Minute + if timeoutSeconds := ps.TimeoutSeconds; timeoutSeconds != nil { + timeout = time.Duration(*timeoutSeconds) * time.Second + } maxRetries := ps.MaxRetries preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard) @@ -245,7 +248,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane p.Logger.Log(LOGLEVEL_INFO, preamble+"No new GTIDs found, exiting") return TableCursorToSerializedCursor(currentPosition) } - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New GTIDs found, syncing for %v", readDuration)) + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New GTIDs found, syncing for %v", timeout)) var syncCount uint = 0 totalRecordCount := 0 @@ -253,7 +256,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane for { syncCount += 1 p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sStarting sync #%v", preamble, syncCount)) - newPosition, recordCount, err := p.sync(ctx, syncMode, currentPosition, stopPosition, table, ps, tabletType, readDuration) + newPosition, recordCount, err := p.sync(ctx, syncMode, currentPosition, stopPosition, table, ps, tabletType, timeout) totalRecordCount += recordCount currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition) if sErr != nil { @@ -279,11 +282,11 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } } -func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, int, error) { +func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, timeout time.Duration) (*psdbconnect.TableCursor, int, error) { preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", s.Namespace, TabletTypeToString(tabletType), s.Name, tc.Shard) defer p.Logger.Flush() - ctx, cancel := context.WithTimeout(ctx, readDuration) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() var ( @@ -300,7 +303,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * defer conn.Close() } - if tc.LastKnownPk != nil { + if tc.LastKnownPk != nil && !ps.UseGTIDWithTablePKs { tc.Position = "" }