Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cmd/airbyte-source/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,34 +59,41 @@
"default": false,
"order": 5
},
"include_metadata": {
"description": "Include PlanetScale metadata in the sync. When enabled, this adds a column named `_planetscale_metadata` to each table containing a JSON object with the following fields: `vgtid_position` (the Vitess VGTID replication position), `extracted_at` (the extraction timestamp in nanoseconds), and `sequence_number` (a monotonically increasing per-sync sequence number).",
"title": "Include metadata?",
"type": "boolean",
"default": false,
"order": 7
},
"starting_gtids": {
"type": "string",
"title": "Starting GTIDs",
"default": "",
"description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }",
"order": 7
"order": 8
},
"max_retries": {
"type": "integer",
"title": "Max retries",
"default": 3,
"description": "The max number of times we continue syncing after potential errors",
"order": 8
"order": 9
},
"timeout_seconds": {
"type": "integer",
"title": "Timeout (in seconds)",
"default": 300,
"minimum": 300,
"description": "Timeout in seconds for a sync attempt",
"order": 9
"order": 10
},
"use_gtid_with_table_pks": {
"type": "boolean",
"title": "Use GTID with table primary keys",
"default": false,
"description": "Use GTID position together with table primary keys",
"order": 10
"order": 11
},
"options": {
"type": "object",
Expand Down
1 change: 1 addition & 0 deletions cmd/internal/planetscale_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type PlanetScaleSource struct {
Shards string `json:"shards"`
UseReplica bool `json:"use_replica"`
UseRdonly bool `json:"use_rdonly"`
IncludeMetadata bool `json:"include_metadata"`
StartingGtids string `json:"starting_gtids"`
Options CustomSourceOptions `json:"options"`
MaxRetries uint `json:"max_retries"`
Expand Down
49 changes: 41 additions & 8 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,11 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
}
}

var rows []*query.QueryResult
type rowWithPosition struct {
Result *query.QueryResult
Position string
}
var rows []rowWithPosition
for _, event := range res.Events {
switch event.Type {
case binlogdata.VEventType_VGTID:
Expand Down Expand Up @@ -389,9 +393,12 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
// Collect rows for processing
for _, change := range event.RowEvent.RowChanges {
if change.After != nil {
rows = append(rows, &query.QueryResult{
Fields: fields,
Rows: []*query.Row{change.After},
rows = append(rows, rowWithPosition{
Result: &query.QueryResult{
Fields: fields,
Rows: []*query.Row{change.After},
},
Position: tc.Position,
})
}
}
Expand Down Expand Up @@ -422,16 +429,16 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
}

if len(rows) > 0 {
for _, result := range rows {
qr := sqltypes.Proto3ToResult(result)
for _, rwp := range rows {
qr := sqltypes.Proto3ToResult(rwp.Result)
for _, row := range qr.Rows {
resultCount += 1
sqlResult := &sqltypes.Result{
Fields: fields,
}
sqlResult.Rows = append(sqlResult.Rows, row)
// Results queued to Airbyte here, and flushed at the end of sync()
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps)
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount)
}
}
}
Expand Down Expand Up @@ -529,10 +536,36 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps

// printQueryResult will pretty-print an AirbyteRecordMessage to the logger.
// Copied from vtctl/query.go
func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableNamespace, tableName string, ps *PlanetScaleSource) {
func (p PlanetScaleEdgeDatabase) printQueryResult(
qr *sqltypes.Result,
tableNamespace, tableName string,
ps *PlanetScaleSource,
position string,
resultCounter int,
) {
data := QueryResultToRecords(qr, ps)

for _, record := range data {
if record == nil {
continue
}

if ps.IncludeMetadata {
// Ensure there's a _metadata field (map[string]interface{})
metadata, ok := record["_planetscale_metadata"].(map[string]interface{})
if !ok || metadata == nil {
metadata = map[string]interface{}{}
}

// Attach the VGTID position inside _metadata
metadata["vgtid_position"] = position
// Attach the extraction timestamp inside _metadata
metadata["extracted_at"] = time.Now().UnixNano()
// Attach a per sync sequence number inside _metadata
metadata["sequence_number"] = resultCounter
record["_planetscale_metadata"] = metadata
}

p.Logger.Record(tableNamespace, tableName, record)
}
}
Expand Down
247 changes: 247 additions & 0 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,253 @@ func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) {
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

// CanIncludesMetadata tests returning the metadata columns when requested
func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) {
tma := getTestMysqlAccess()
tal := testAirbyteLogger{}
ped := PlanetScaleEdgeDatabase{
Logger: &tal,
Mysql: tma,
}

keyspace := "connect-test"
shard := "-"
table := "products"
startVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-2,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2"
middleVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-3,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2"
stopVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-4,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2"

startCursor := &psdbconnect.TableCursor{
Shard: shard,
Position: startVGtid,
Keyspace: keyspace,
}

vstreamSyncClient := &vtgateVStreamClientMock{
vstreamResponses: []*vstreamResponse{
// First sync to get stop position
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: stopVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
// 1st recv() of second sync for rows to get start VGtid
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: startVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
// 2nd recv() of second sync for rows to get intermediate GTID
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: middleVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
// 3rd recv() for second sync for rows to get fields
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_FIELD,
FieldEvent: &binlogdata.FieldEvent{
TableName: table,
Fields: []*query.Field{
{
Name: "pid",
Type: query.Type_INT64,
Table: table,
OrgTable: table,
Database: keyspace,
ColumnLength: 20,
Charset: 63,
ColumnType: "bigint",
},
{
Name: "description",
Type: query.Type_VARCHAR,
Table: table,
OrgTable: table,
Database: keyspace,
ColumnLength: 1024,
Charset: 255,
ColumnType: "varchar(256)",
},
},
},
},
},
},
},
// 4th recv() of second sync for rows to get records
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_ROW,
RowEvent: &binlogdata.RowEvent{
TableName: table,
Keyspace: keyspace,
Shard: shard,
RowChanges: []*binlogdata.RowChange{
{
After: &query.Row{
Lengths: []int64{1, 8},
Values: []byte("1keyboard"),
},
},
{
After: &query.Row{
Lengths: []int64{1, 7},
Values: []byte("2monitor"),
},
},
},
},
},
},
},
},
// 5th recv() of second sync for rows to advance GTid to stop position
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: stopVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
// Will not reach this event since stop position passed
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_ROW,
RowEvent: &binlogdata.RowEvent{
TableName: table,
Keyspace: keyspace,
Shard: shard,
RowChanges: []*binlogdata.RowChange{
{
After: &query.Row{
Lengths: []int64{1, 8},
Values: []byte("1keyboard"),
},
},
{
After: &query.Row{
Lengths: []int64{1, 7},
Values: []byte("2monitor"),
},
},
},
},
},
},
},
},
},
}

vsc := vstreamClientMock{
vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) {
assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType)
return vstreamSyncClient, nil
},
}

ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) {
return &vsc, nil
}

ps := PlanetScaleSource{
Database: "connect-test",
IncludeMetadata: true,
}
cs := ConfiguredStream{
Stream: Stream{
Name: "products",
Namespace: "connect-test",
},
}
sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor)
if testing.Verbose() {
for _, entry := range tal.logMessages {
t.Logf("airbyte log entry: [%s] %s", entry.level, entry.message)
}
}
assert.NoError(t, err)
assert.NotNil(t, sc)
assert.Equal(t, 2, len(tal.records["connect-test.products"]))
records := tal.records["connect-test.products"]

for _, r := range records {
metadataRaw, ok := r["_planetscale_metadata"]
assert.True(t, ok, "metadata must be present when IncludeMetadata=true")

metadata, ok := metadataRaw.(map[string]interface{})
assert.True(t, ok, "metadata must be a map[string]interface{}")

pos, hasPos := metadata["vgtid_position"]
assert.True(t, hasPos, "missing vgtid_position")
assert.Equal(t, middleVGtid, pos, "incorrect vgtid_position")

_, hasExtractedAt := metadata["extracted_at"]
assert.True(t, hasExtractedAt, "missing extracted_at")

_, hasSeq := metadata["sequence_number"]
assert.True(t, hasSeq, "missing sequence_number")
}
}

// CanReturnNewCursorIfNewFound tests returning the same GTID as stop position
func TestRead_IncrementalSync_CanReturnOriginalCursorIfNoNewFound(t *testing.T) {
tma := getTestMysqlAccess()
Expand Down
Loading
Loading