From aba58639f9cedc1536d52892739f891a6624fe35 Mon Sep 17 00:00:00 2001 From: lahdirakram Date: Thu, 30 Oct 2025 11:58:21 +0100 Subject: [PATCH 1/5] adds _plaetscale_metadata.vgtid_position object into all table streams, for event ordering issues --- cmd/internal/planetscale_edge_database.go | 39 +++++++++++++++++------ cmd/internal/planetscale_edge_mysql.go | 3 +- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 760c1cb..7daf4dc 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -350,7 +350,11 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } } - var rows []*query.QueryResult + type rowWithPosition struct { + Result *query.QueryResult + Position string + } + var rows []rowWithPosition for _, event := range res.Events { switch event.Type { case binlogdata.VEventType_VGTID: @@ -386,19 +390,21 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * fields = event.FieldEvent.Fields p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sFIELD event found, setting fields to %+v", preamble, fields)) case binlogdata.VEventType_ROW: - // Collect rows for processing for _, change := range event.RowEvent.RowChanges { if change.After != nil { - rows = append(rows, &query.QueryResult{ - Fields: fields, - Rows: []*query.Row{change.After}, + rows = append(rows, rowWithPosition{ + Result: &query.QueryResult{ + Fields: fields, + Rows: []*query.Row{change.After}, + }, + Position: tc.Position, }) } } case binlogdata.VEventType_COPY_COMPLETED: p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCOPY_COMPLETED event found, copy phase finished", preamble)) copyCompletedSeen = true - case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT, + case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT, binlogdata.VEventType_DDL, binlogdata.VEventType_DELETE, binlogdata.VEventType_GTID, binlogdata.VEventType_HEARTBEAT, binlogdata.VEventType_INSERT, binlogdata.VEventType_JOURNAL, @@ -422,8 +428,8 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } if len(rows) > 0 { - for _, result := range rows { - qr := sqltypes.Proto3ToResult(result) + for _, rwp := range rows { + qr := sqltypes.Proto3ToResult(rwp.Result) for _, row := range qr.Rows { resultCount += 1 sqlResult := &sqltypes.Result{ @@ -431,7 +437,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } sqlResult.Rows = append(sqlResult.Rows, row) // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps) + p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position) } } } @@ -529,10 +535,23 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps // printQueryResult will pretty-print an AirbyteRecordMessage to the logger. // Copied from vtctl/query.go -func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableNamespace, tableName string, ps *PlanetScaleSource) { +func (p PlanetScaleEdgeDatabase) printQueryResult( qr *sqltypes.Result, tableNamespace, tableName string, ps *PlanetScaleSource, position string) { data := QueryResultToRecords(qr, ps) for _, record := range data { + if record == nil { + continue + } + + metadata, ok := record["_planetscale_metadata"].(map[string]interface{}) + if !ok || metadata == nil { + metadata = map[string]interface{}{} + } + + // Attach the VGTID position inside _metadata + metadata["vgtid_position"] = position + record["_planetscale_metadata"] = metadata + p.Logger.Record(tableNamespace, tableName, record) } } diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go index bb3fffc..1840909 100644 --- a/cmd/internal/planetscale_edge_mysql.go +++ b/cmd/internal/planetscale_edge_mysql.go @@ -157,7 +157,8 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan columnNamesQR, err := p.db.QueryContext( ctx, - "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=?;", + "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=? " + + "union all select '_planetscale_metadata' as column_name, 'JSON' as column_type, false as is_nullable;", tableName, psc.Database, ) if err != nil { From 656e55f667673f509313897daf3f03613b0b6468 Mon Sep 17 00:00:00 2001 From: lahdirakram Date: Thu, 6 Nov 2025 11:14:17 +0100 Subject: [PATCH 2/5] adds extracted_at nanosecond timestamp and a per sync sequence number --- cmd/internal/planetscale_edge_database.go | 34 ++++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 7daf4dc..2faaf20 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -390,6 +390,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * fields = event.FieldEvent.Fields p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sFIELD event found, setting fields to %+v", preamble, fields)) case binlogdata.VEventType_ROW: + // Collect rows for processing for _, change := range event.RowEvent.RowChanges { if change.After != nil { rows = append(rows, rowWithPosition{ @@ -404,7 +405,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * case binlogdata.VEventType_COPY_COMPLETED: p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCOPY_COMPLETED event found, copy phase finished", preamble)) copyCompletedSeen = true - case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT, + case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT, binlogdata.VEventType_DDL, binlogdata.VEventType_DELETE, binlogdata.VEventType_GTID, binlogdata.VEventType_HEARTBEAT, binlogdata.VEventType_INSERT, binlogdata.VEventType_JOURNAL, @@ -437,7 +438,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } sqlResult.Rows = append(sqlResult.Rows, row) // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position) + p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount) } } } @@ -535,7 +536,13 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps // printQueryResult will pretty-print an AirbyteRecordMessage to the logger. // Copied from vtctl/query.go -func (p PlanetScaleEdgeDatabase) printQueryResult( qr *sqltypes.Result, tableNamespace, tableName string, ps *PlanetScaleSource, position string) { +func (p PlanetScaleEdgeDatabase) printQueryResult( + qr *sqltypes.Result, + tableNamespace, tableName string, + ps *PlanetScaleSource, + position string, + resultCounter int, +) { data := QueryResultToRecords(qr, ps) for _, record := range data { @@ -543,15 +550,22 @@ func (p PlanetScaleEdgeDatabase) printQueryResult( qr *sqltypes.Result, tableNam continue } - metadata, ok := record["_planetscale_metadata"].(map[string]interface{}) - if !ok || metadata == nil { - metadata = map[string]interface{}{} + if ps.IncludeMetadata { + // Ensure there's a _metadata field (map[string]interface{}) + metadata, ok := record["_planetscale_metadata"].(map[string]interface{}) + if !ok || metadata == nil { + metadata = map[string]interface{}{} + } + + // Attach the VGTID position inside _metadata + metadata["vgtid_position"] = position + // Attach the extraction timestamp inside _metadata + metadata["extracted_at"] = time.Now().UnixNano() + // Attach a per sync sequence number inside _metadata + metadata["sequence_number"] = resultCounter + record["_planetscale_metadata"] = metadata } - // Attach the VGTID position inside _metadata - metadata["vgtid_position"] = position - record["_planetscale_metadata"] = metadata - p.Logger.Record(tableNamespace, tableName, record) } } From 6cfbfe713cc19936ea67db866f37800086124410 Mon Sep 17 00:00:00 2001 From: lahdirakram Date: Thu, 6 Nov 2025 11:15:04 +0100 Subject: [PATCH 3/5] makes planetscale metadata field optional --- cmd/airbyte-source/spec.json | 15 +++++++++++---- cmd/internal/planetscale_connection.go | 1 + cmd/internal/planetscale_edge_mysql.go | 11 +++++++++-- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index e4eb0ca..3d4a8d1 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -59,19 +59,26 @@ "default": false, "order": 5 }, + "include_metadata": { + "description": "Include PlanetScale metadata in the sync. When enabled, this adds a column named `_planetscale_metadata` to each table containing a JSON object with the following fields: `vgtid_position` (the Vitess VGTID replication position), `extracted_at` (the extraction timestamp in nanoseconds), and `sequence_number` (a monotonically increasing per-sync sequence number).", + "title": "Include metadata?", + "type": "boolean", + "default": false, + "order": 7 + }, "starting_gtids": { "type": "string", "title": "Starting GTIDs", "default": "", "description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }", - "order": 7 + "order": 8 }, "max_retries": { "type": "integer", "title": "Max retries", "default": 3, "description": "The max number of times we continue syncing after potential errors", - "order": 8 + "order": 9 }, "timeout_seconds": { "type": "integer", @@ -79,14 +86,14 @@ "default": 300, "minimum": 300, "description": "Timeout in seconds for a sync attempt", - "order": 9 + "order": 10 }, "use_gtid_with_table_pks": { "type": "boolean", "title": "Use GTID with table primary keys", "default": false, "description": "Use GTID position together with table primary keys", - "order": 10 + "order": 11 }, "options": { "type": "object", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index 8fd5266..249381f 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -19,6 +19,7 @@ type PlanetScaleSource struct { Shards string `json:"shards"` UseReplica bool `json:"use_replica"` UseRdonly bool `json:"use_rdonly"` + IncludeMetadata bool `json:"include_metadata"` StartingGtids string `json:"starting_gtids"` Options CustomSourceOptions `json:"options"` MaxRetries uint `json:"max_retries"` diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go index 1840909..eb47bf5 100644 --- a/cmd/internal/planetscale_edge_mysql.go +++ b/cmd/internal/planetscale_edge_mysql.go @@ -157,8 +157,7 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan columnNamesQR, err := p.db.QueryContext( ctx, - "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=? " + - "union all select '_planetscale_metadata' as column_name, 'JSON' as column_type, false as is_nullable;", + "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=?;" tableName, psc.Database, ) if err != nil { @@ -182,6 +181,14 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan return properties, errors.Wrapf(err, "unable to iterate columns for table %s", tableName) } + // Inject metadata column when include_metadata is true. + if psc.IncludeMetadata { + properties["_planetscale_metadata"] = PropertyType{ + Type: []string{"object"}, + AirbyteType: "object", + } + } + return properties, nil } From 6c43a61e92c893e2a9d23cbab8d62f7ccb1df994 Mon Sep 17 00:00:00 2001 From: lahdirakram Date: Fri, 14 Nov 2025 17:39:21 +0100 Subject: [PATCH 4/5] adds tests --- .../planetscale_edge_database_test.go | 246 ++++++++++++++++++ cmd/internal/planetscale_edge_mysql.go | 2 +- 2 files changed, 247 insertions(+), 1 deletion(-) diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index afbbebe..d0d6a4f 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -638,6 +638,252 @@ func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) { assert.False(t, tma.GetVitessTabletsFnInvoked) } +// CanIncludesMetadata tests returning the metadata columns when requested +func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "products" + startVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-2,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + middleVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-3,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + stopVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-4,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + // First sync to get stop position + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // 1st recv() of second sync for rows to get start VGtid + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // 2nd recv() of second sync for rows to get intermediate GTID + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: middleVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // 3rd recv() for second sync for rows to get fields + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: []*query.Field{ + { + Name: "pid", + Type: query.Type_INT64, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 20, + Charset: 63, + ColumnType: "bigint", + }, + { + Name: "description", + Type: query.Type_VARCHAR, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 1024, + Charset: 255, + ColumnType: "varchar(256)", + }, + }, + }, + }, + }, + }, + }, + // 4th recv() of second sync for rows to get records + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: &query.Row{ + Lengths: []int64{1, 8}, + Values: []byte("1keyboard"), + }, + }, + { + After: &query.Row{ + Lengths: []int64{1, 7}, + Values: []byte("2monitor"), + }, + }, + }, + }, + }, + }, + }, + }, + // 5th recv() of second sync for rows to advance GTid to stop position + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // Will not reach this event since stop position passed + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: &query.Row{ + Lengths: []int64{1, 8}, + Values: []byte("1keyboard"), + }, + }, + { + After: &query.Row{ + Lengths: []int64{1, 7}, + Values: []byte("2monitor"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType) + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + IncludeMetadata: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: "products", + Namespace: "connect-test", + }, + } + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + if testing.Verbose() { + for _, entry := range tal.logMessages { + t.Logf("airbyte log entry: [%s] %s", entry.level, entry.message) + } + } + assert.NoError(t, err) + assert.NotNil(t, sc) + assert.Equal(t, 2, len(tal.records["connect-test.products"])) + records := tal.records["connect-test.products"] + + for _, r := range records { + metadataRaw, ok := r["_planetscale_metadata"] + assert.True(t, ok, "metadata must be present when IncludeMetadata=true") + + metadata, ok := metadataRaw.(map[string]interface{}) + assert.True(t, ok, "metadata must be a map[string]interface{}") + + _, hasPos := metadata["vgtid_position"] + assert.True(t, hasPos, "missing vgtid_position") + + _, hasExtractedAt := metadata["extracted_at"] + assert.True(t, hasExtractedAt, "missing extracted_at") + + _, hasSeq := metadata["sequence_number"] + assert.True(t, hasSeq, "missing sequence_number") + } +} + // CanReturnNewCursorIfNewFound tests returning the same GTID as stop position func TestRead_IncrementalSync_CanReturnOriginalCursorIfNoNewFound(t *testing.T) { tma := getTestMysqlAccess() diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go index eb47bf5..893743e 100644 --- a/cmd/internal/planetscale_edge_mysql.go +++ b/cmd/internal/planetscale_edge_mysql.go @@ -157,7 +157,7 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan columnNamesQR, err := p.db.QueryContext( ctx, - "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=?;" + "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=?;", tableName, psc.Database, ) if err != nil { From a6c6c54386e9a3617eb27c96cf3d490e9fae5797 Mon Sep 17 00:00:00 2001 From: lahdirakram Date: Mon, 17 Nov 2025 11:27:44 +0100 Subject: [PATCH 5/5] adds test on vgthid position value --- cmd/internal/planetscale_edge_database_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index d0d6a4f..55f3e50 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -873,8 +873,9 @@ func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) { metadata, ok := metadataRaw.(map[string]interface{}) assert.True(t, ok, "metadata must be a map[string]interface{}") - _, hasPos := metadata["vgtid_position"] + pos, hasPos := metadata["vgtid_position"] assert.True(t, hasPos, "missing vgtid_position") + assert.Equal(t, middleVGtid, pos, "incorrect vgtid_position") _, hasExtractedAt := metadata["extracted_at"] assert.True(t, hasExtractedAt, "missing extracted_at")