From dae88cceca7001f3451afed7f036a30d8ecd9151 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 5 Apr 2025 16:09:03 -0400 Subject: [PATCH 1/4] add option for specifying timeout in seconds Signed-off-by: Max Englander --- cmd/airbyte-source/spec.json | 8 ++++++++ cmd/internal/planetscale_connection.go | 1 + cmd/internal/planetscale_edge_database.go | 10 +++++----- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index 8a1ed7d..3449fed 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -73,6 +73,14 @@ "description": "The max number of times we continue syncing after potential errors", "order": 8 }, + "timeout_seconds": { + "type": "integer", + "title": "Timeout (in seconds)", + "default": 300, + "minimum": 300, + "description": "Timeout in seconds for a sync attempt", + "order": 9 + }, "options": { "type": "object", "title": "Customize serialization", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index ed5e273..c8168f8 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -22,6 +22,7 @@ type PlanetScaleSource struct { StartingGtids string `json:"starting_gtids"` Options CustomSourceOptions `json:"options"` MaxRetries uint `json:"max_retries"` + TimeoutSeonds int `json:"timeout_seconds"` } type CustomSourceOptions struct { diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 383ea55..ddd1c8e 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -224,7 +224,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } table := s.Stream - readDuration := 5 * time.Minute + timeout := time.Duration(ps.TimeoutSeonds) * time.Second maxRetries := ps.MaxRetries preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard) @@ -245,7 +245,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane p.Logger.Log(LOGLEVEL_INFO, preamble+"No new GTIDs found, exiting") return TableCursorToSerializedCursor(currentPosition) } - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New GTIDs found, syncing for %v", readDuration)) + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New GTIDs found, syncing for %v", timeout)) var syncCount uint = 0 totalRecordCount := 0 @@ -253,7 +253,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane for { syncCount += 1 p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sStarting sync #%v", preamble, syncCount)) - newPosition, recordCount, err := p.sync(ctx, syncMode, currentPosition, stopPosition, table, ps, tabletType, readDuration) + newPosition, recordCount, err := p.sync(ctx, syncMode, currentPosition, stopPosition, table, ps, tabletType, timeout) totalRecordCount += recordCount currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition) if sErr != nil { @@ -279,11 +279,11 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } } -func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, int, error) { +func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, timeout time.Duration) (*psdbconnect.TableCursor, int, error) { preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", s.Namespace, TabletTypeToString(tabletType), s.Name, tc.Shard) defer p.Logger.Flush() - ctx, cancel := context.WithTimeout(ctx, readDuration) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() var ( From ec6f4ac861475ea91cf55d6386bdf2eacdfbd6dd Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 5 Apr 2025 16:41:38 -0400 Subject: [PATCH 2/4] fix typo Signed-off-by: Max Englander --- cmd/internal/planetscale_connection.go | 22 +++++++++++----------- cmd/internal/planetscale_edge_database.go | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index c8168f8..b35ca2e 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -12,17 +12,17 @@ import ( // PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database type PlanetScaleSource struct { - Host string `json:"host"` - Database string `json:"database"` - Username string `json:"username"` - Password string `json:"password"` - Shards string `json:"shards"` - UseReplica bool `json:"use_replica"` - UseRdonly bool `json:"use_rdonly"` - StartingGtids string `json:"starting_gtids"` - Options CustomSourceOptions `json:"options"` - MaxRetries uint `json:"max_retries"` - TimeoutSeonds int `json:"timeout_seconds"` + Host string `json:"host"` + Database string `json:"database"` + Username string `json:"username"` + Password string `json:"password"` + Shards string `json:"shards"` + UseReplica bool `json:"use_replica"` + UseRdonly bool `json:"use_rdonly"` + StartingGtids string `json:"starting_gtids"` + Options CustomSourceOptions `json:"options"` + MaxRetries uint `json:"max_retries"` + TimeoutSeconds int `json:"timeout_seconds"` } type CustomSourceOptions struct { diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index ddd1c8e..f3d2807 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -224,7 +224,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } table := s.Stream - timeout := time.Duration(ps.TimeoutSeonds) * time.Second + timeout := time.Duration(ps.TimeoutSeconds) * time.Second maxRetries := ps.MaxRetries preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard) From d3d0904da8d826ac6cd68e2dce0101731ef14661 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 5 Apr 2025 17:16:19 -0400 Subject: [PATCH 3/4] default to 5 minutes if source opt is undefined Signed-off-by: Max Englander --- cmd/internal/planetscale_connection.go | 2 +- cmd/internal/planetscale_edge_database.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index b35ca2e..904801a 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -22,7 +22,7 @@ type PlanetScaleSource struct { StartingGtids string `json:"starting_gtids"` Options CustomSourceOptions `json:"options"` MaxRetries uint `json:"max_retries"` - TimeoutSeconds int `json:"timeout_seconds"` + TimeoutSeconds *int `json:"timeout_seconds"` } type CustomSourceOptions struct { diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index f3d2807..d57c8aa 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -224,7 +224,10 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } table := s.Stream - timeout := time.Duration(ps.TimeoutSeconds) * time.Second + timeout := 5 * time.Minute + if timeoutSeconds := ps.TimeoutSeconds; timeoutSeconds != nil { + timeout = time.Duration(*timeoutSeconds) * time.Second + } maxRetries := ps.MaxRetries preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard) From aa659f2bcf8b1d68f3b9a00b36f4efff1274c59a Mon Sep 17 00:00:00 2001 From: Max Englander Date: Mon, 7 Apr 2025 10:44:27 -0400 Subject: [PATCH 4/4] add option to use gtid position together with last known pk (#140) Currently when airbyte-source syncs it wipes out `gtid` information from the shard gtid before passing it along in the VStream request. This means that, upon retry, it's possible for the connector to lose rows that are inserted between the `gtid` of the last `vgtid` event it received in one attempt and the `@@gtid_executed` of the next sync attempt. This new option changes the current behavior so that `gtid` is always passed along in the VStream request, if it is available. This makes for a safer retry. However, it does mean that it is possible for the connector to received duplicates of the same row. How it will look in Airbyte UI after upgrading to a version with this change. image Signed-off-by: Max Englander --- cmd/airbyte-source/spec.json | 7 +++++++ cmd/internal/planetscale_connection.go | 23 ++++++++++++----------- cmd/internal/planetscale_edge_database.go | 2 +- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index 3449fed..e4eb0ca 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -81,6 +81,13 @@ "description": "Timeout in seconds for a sync attempt", "order": 9 }, + "use_gtid_with_table_pks": { + "type": "boolean", + "title": "Use GTID with table primary keys", + "default": false, + "description": "Use GTID position together with table primary keys", + "order": 10 + }, "options": { "type": "object", "title": "Customize serialization", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index 904801a..8fd5266 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -12,17 +12,18 @@ import ( // PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database type PlanetScaleSource struct { - Host string `json:"host"` - Database string `json:"database"` - Username string `json:"username"` - Password string `json:"password"` - Shards string `json:"shards"` - UseReplica bool `json:"use_replica"` - UseRdonly bool `json:"use_rdonly"` - StartingGtids string `json:"starting_gtids"` - Options CustomSourceOptions `json:"options"` - MaxRetries uint `json:"max_retries"` - TimeoutSeconds *int `json:"timeout_seconds"` + Host string `json:"host"` + Database string `json:"database"` + Username string `json:"username"` + Password string `json:"password"` + Shards string `json:"shards"` + UseReplica bool `json:"use_replica"` + UseRdonly bool `json:"use_rdonly"` + StartingGtids string `json:"starting_gtids"` + Options CustomSourceOptions `json:"options"` + MaxRetries uint `json:"max_retries"` + TimeoutSeconds *int `json:"timeout_seconds"` + UseGTIDWithTablePKs bool `json:"use_gtid_with_table_pks"` } type CustomSourceOptions struct { diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index d57c8aa..760c1cb 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -303,7 +303,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * defer conn.Close() } - if tc.LastKnownPk != nil { + if tc.LastKnownPk != nil && !ps.UseGTIDWithTablePKs { tc.Position = "" }