Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# syntax=docker/dockerfile:1

ARG GO_VERSION=1.22.2
FROM pscale.dev/wolfi-prod/go:${GO_VERSION} AS build
FROM golang:${GO_VERSION} AS build

WORKDIR /airbyte-source
COPY . .

RUN go mod download
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -trimpath -o /connect

FROM pscale.dev/wolfi-prod/base:latest
FROM alpine:latest

RUN apk add --no-cache ca-certificates
COPY --from=build /connect /usr/local/bin/
ENV AIRBYTE_ENTRYPOINT "/usr/local/bin/connect"
ENTRYPOINT ["/usr/local/bin/connect"]
20 changes: 15 additions & 5 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,41 @@ func ReadCommand(ch *Helper) *cobra.Command {
streamState, ok := syncState.Streams[streamStateKey]
if !ok {
ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey))
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
os.Exit(1)
}

ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_STARTED)

streamFailed := false
for shardName, shardState := range streamState.Shards {
var tc *psdbconnectv1alpha1.TableCursor

tc, err = shardState.SerializedCursorToTableCursor(configuredStream)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
streamFailed = true
break
}

sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, configuredStream, tc)
if err != nil {
ch.Logger.Error(err.Error())
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
streamFailed = true
break
}

if sc != nil {
// if we get any new state, we assign it here.
// otherwise, the older state is round-tripped back to Airbyte.
syncState.Streams[streamStateKey].Shards[shardName] = sc
}
ch.Logger.State(syncState)
}

if !streamFailed {
ch.Logger.StreamState(keyspaceOrDatabase, configuredStream.Stream.Name, syncState.Streams[streamStateKey])
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_COMPLETE)
}
}
},
Expand Down
Loading