diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index e4eb0ca..3d4a8d1 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -59,19 +59,26 @@ "default": false, "order": 5 }, + "include_metadata": { + "description": "Include PlanetScale metadata in the sync. When enabled, this adds a column named `_planetscale_metadata` to each table containing a JSON object with the following fields: `vgtid_position` (the Vitess VGTID replication position), `extracted_at` (the extraction timestamp in nanoseconds), and `sequence_number` (a monotonically increasing per-sync sequence number).", + "title": "Include metadata?", + "type": "boolean", + "default": false, + "order": 7 + }, "starting_gtids": { "type": "string", "title": "Starting GTIDs", "default": "", "description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }", - "order": 7 + "order": 8 }, "max_retries": { "type": "integer", "title": "Max retries", "default": 3, "description": "The max number of times we continue syncing after potential errors", - "order": 8 + "order": 9 }, "timeout_seconds": { "type": "integer", @@ -79,14 +86,14 @@ "default": 300, "minimum": 300, "description": "Timeout in seconds for a sync attempt", - "order": 9 + "order": 10 }, "use_gtid_with_table_pks": { "type": "boolean", "title": "Use GTID with table primary keys", "default": false, "description": "Use GTID position together with table primary keys", - "order": 10 + "order": 11 }, "options": { "type": "object", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index 8fd5266..249381f 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -19,6 +19,7 @@ type PlanetScaleSource struct { Shards string `json:"shards"` UseReplica bool `json:"use_replica"` UseRdonly bool `json:"use_rdonly"` + IncludeMetadata bool `json:"include_metadata"` StartingGtids string `json:"starting_gtids"` Options CustomSourceOptions `json:"options"` MaxRetries uint `json:"max_retries"` diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 760c1cb..2faaf20 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -350,7 +350,11 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } } - var rows []*query.QueryResult + type rowWithPosition struct { + Result *query.QueryResult + Position string + } + var rows []rowWithPosition for _, event := range res.Events { switch event.Type { case binlogdata.VEventType_VGTID: @@ -389,9 +393,12 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * // Collect rows for processing for _, change := range event.RowEvent.RowChanges { if change.After != nil { - rows = append(rows, &query.QueryResult{ - Fields: fields, - Rows: []*query.Row{change.After}, + rows = append(rows, rowWithPosition{ + Result: &query.QueryResult{ + Fields: fields, + Rows: []*query.Row{change.After}, + }, + Position: tc.Position, }) } } @@ -422,8 +429,8 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } if len(rows) > 0 { - for _, result := range rows { - qr := sqltypes.Proto3ToResult(result) + for _, rwp := range rows { + qr := sqltypes.Proto3ToResult(rwp.Result) for _, row := range qr.Rows { resultCount += 1 sqlResult := &sqltypes.Result{ @@ -431,7 +438,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } sqlResult.Rows = append(sqlResult.Rows, row) // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps) + p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount) } } } @@ -529,10 +536,36 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps // printQueryResult will pretty-print an AirbyteRecordMessage to the logger. // Copied from vtctl/query.go -func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableNamespace, tableName string, ps *PlanetScaleSource) { +func (p PlanetScaleEdgeDatabase) printQueryResult( + qr *sqltypes.Result, + tableNamespace, tableName string, + ps *PlanetScaleSource, + position string, + resultCounter int, +) { data := QueryResultToRecords(qr, ps) for _, record := range data { + if record == nil { + continue + } + + if ps.IncludeMetadata { + // Ensure there's a _metadata field (map[string]interface{}) + metadata, ok := record["_planetscale_metadata"].(map[string]interface{}) + if !ok || metadata == nil { + metadata = map[string]interface{}{} + } + + // Attach the VGTID position inside _metadata + metadata["vgtid_position"] = position + // Attach the extraction timestamp inside _metadata + metadata["extracted_at"] = time.Now().UnixNano() + // Attach a per sync sequence number inside _metadata + metadata["sequence_number"] = resultCounter + record["_planetscale_metadata"] = metadata + } + p.Logger.Record(tableNamespace, tableName, record) } } diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index afbbebe..55f3e50 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -638,6 +638,253 @@ func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) { assert.False(t, tma.GetVitessTabletsFnInvoked) } +// CanIncludesMetadata tests returning the metadata columns when requested +func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "products" + startVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-2,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + middleVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-3,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + stopVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-4,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + // First sync to get stop position + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // 1st recv() of second sync for rows to get start VGtid + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // 2nd recv() of second sync for rows to get intermediate GTID + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: middleVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // 3rd recv() for second sync for rows to get fields + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: []*query.Field{ + { + Name: "pid", + Type: query.Type_INT64, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 20, + Charset: 63, + ColumnType: "bigint", + }, + { + Name: "description", + Type: query.Type_VARCHAR, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 1024, + Charset: 255, + ColumnType: "varchar(256)", + }, + }, + }, + }, + }, + }, + }, + // 4th recv() of second sync for rows to get records + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: &query.Row{ + Lengths: []int64{1, 8}, + Values: []byte("1keyboard"), + }, + }, + { + After: &query.Row{ + Lengths: []int64{1, 7}, + Values: []byte("2monitor"), + }, + }, + }, + }, + }, + }, + }, + }, + // 5th recv() of second sync for rows to advance GTid to stop position + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + // Will not reach this event since stop position passed + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: &query.Row{ + Lengths: []int64{1, 8}, + Values: []byte("1keyboard"), + }, + }, + { + After: &query.Row{ + Lengths: []int64{1, 7}, + Values: []byte("2monitor"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType) + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + IncludeMetadata: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: "products", + Namespace: "connect-test", + }, + } + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + if testing.Verbose() { + for _, entry := range tal.logMessages { + t.Logf("airbyte log entry: [%s] %s", entry.level, entry.message) + } + } + assert.NoError(t, err) + assert.NotNil(t, sc) + assert.Equal(t, 2, len(tal.records["connect-test.products"])) + records := tal.records["connect-test.products"] + + for _, r := range records { + metadataRaw, ok := r["_planetscale_metadata"] + assert.True(t, ok, "metadata must be present when IncludeMetadata=true") + + metadata, ok := metadataRaw.(map[string]interface{}) + assert.True(t, ok, "metadata must be a map[string]interface{}") + + pos, hasPos := metadata["vgtid_position"] + assert.True(t, hasPos, "missing vgtid_position") + assert.Equal(t, middleVGtid, pos, "incorrect vgtid_position") + + _, hasExtractedAt := metadata["extracted_at"] + assert.True(t, hasExtractedAt, "missing extracted_at") + + _, hasSeq := metadata["sequence_number"] + assert.True(t, hasSeq, "missing sequence_number") + } +} + // CanReturnNewCursorIfNewFound tests returning the same GTID as stop position func TestRead_IncrementalSync_CanReturnOriginalCursorIfNoNewFound(t *testing.T) { tma := getTestMysqlAccess() diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go index 2f318a0..df0ebea 100644 --- a/cmd/internal/planetscale_edge_mysql.go +++ b/cmd/internal/planetscale_edge_mysql.go @@ -181,6 +181,14 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan return properties, errors.Wrapf(err, "unable to iterate columns for table %s", tableName) } + // Inject metadata column when include_metadata is true. + if psc.IncludeMetadata { + properties["_planetscale_metadata"] = PropertyType{ + Type: []string{"object"}, + AirbyteType: "object", + } + } + return properties, nil }