Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,41 @@ func ReadCommand(ch *Helper) *cobra.Command {
streamState, ok := syncState.Streams[streamStateKey]
if !ok {
ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey))
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
os.Exit(1)
}

ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_STARTED)

streamFailed := false
for shardName, shardState := range streamState.Shards {
var tc *psdbconnectv1alpha1.TableCursor

tc, err = shardState.SerializedCursorToTableCursor(configuredStream)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
streamFailed = true
break
}

sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, configuredStream, tc)
if err != nil {
ch.Logger.Error(err.Error())
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
streamFailed = true
break
}

if sc != nil {
// if we get any new state, we assign it here.
// otherwise, the older state is round-tripped back to Airbyte.
syncState.Streams[streamStateKey].Shards[shardName] = sc
}
ch.Logger.State(syncState)
}

if !streamFailed {
ch.Logger.StreamState(keyspaceOrDatabase, configuredStream.Stream.Name, syncState.Streams[streamStateKey])
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_COMPLETE)
}
}
},
Expand All @@ -153,9 +163,26 @@ func readState(state string, psc internal.PlanetScaleSource, streams []internal.
Streams: map[string]internal.ShardStates{},
}
if state != "" {
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
// Try parsing as Airbyte v2 per-stream state array first
var perStreamStates []internal.AirbyteState
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && len(perStreamStates) > 0 && perStreamStates[0].Type == internal.STATE_TYPE_STREAM {
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Parsing Airbyte v2 per-stream state (%d streams)", len(perStreamStates)))
for _, s := range perStreamStates {
if s.Stream != nil && s.Stream.StreamState != nil {
ns := s.Stream.StreamDescriptor.Namespace
if ns == "" {
ns = psc.Database
}
key := ns + ":" + s.Stream.StreamDescriptor.Name
syncState.Streams[key] = *s.Stream.StreamState
}
}
} else {
// Fall back to legacy global state format
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
}
}
}

Expand Down
Loading