diff --git a/cmd/airbyte-source/helper.go b/cmd/airbyte-source/helper.go index 3e01f9f..0ee1153 100644 --- a/cmd/airbyte-source/helper.go +++ b/cmd/airbyte-source/helper.go @@ -1,9 +1,10 @@ package airbyte_source import ( - "github.com/planetscale/airbyte-source/cmd/internal" "io" "os" + + "github.com/planetscale/airbyte-source/cmd/internal" ) type Helper struct { diff --git a/cmd/airbyte-source/read.go b/cmd/airbyte-source/read.go index 383340e..09b8031 100644 --- a/cmd/airbyte-source/read.go +++ b/cmd/airbyte-source/read.go @@ -38,14 +38,13 @@ func ReadCommand(ch *Helper) *cobra.Command { os.Exit(1) } - ch.Logger.Log(internal.LOGLEVEL_INFO, "Checking connection") - psc, err := parseSource(ch.FileReader, readSourceConfigFilePath) if err != nil { fmt.Fprintln(cmd.OutOrStdout(), "Please provide path to a valid configuration file") return } + ch.Logger.Log(internal.LOGLEVEL_INFO, "Ensure database") if err := ch.EnsureDB(psc); err != nil { fmt.Fprintln(cmd.OutOrStdout(), "Unable to connect to PlanetScale Database") return @@ -57,12 +56,14 @@ func ReadCommand(ch *Helper) *cobra.Command { } }() + ch.Logger.Log(internal.LOGLEVEL_INFO, "Checking connection") cs, err := checkConnectionStatus(ctx, ch.Database, psc) if err != nil { ch.Logger.ConnectionStatus(cs) return } + ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading catalog") catalog, err := readCatalog(readSourceCatalogPath) if err != nil { ch.Logger.Error(fmt.Sprintf("Unable to read catalog: %+v", err)) @@ -84,24 +85,27 @@ func ReadCommand(ch *Helper) *cobra.Command { } state = string(b) } + + ch.Logger.Log(internal.LOGLEVEL_INFO, "Listing shards") shards, err := ch.Database.ListShards(ctx, psc) if err != nil { ch.Logger.Error(fmt.Sprintf("Unable to list shards : %v", err)) os.Exit(1) } + ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading state") syncState, err := readState(state, psc, catalog.Streams, shards, ch.Logger) if err != nil { ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err)) os.Exit(1) } - for _, table := range catalog.Streams { - keyspaceOrDatabase := table.Stream.Namespace + for _, configuredStream := range catalog.Streams { + keyspaceOrDatabase := configuredStream.Stream.Namespace if keyspaceOrDatabase == "" { keyspaceOrDatabase = psc.Database } - streamStateKey := keyspaceOrDatabase + ":" + table.Stream.Name + streamStateKey := keyspaceOrDatabase + ":" + configuredStream.Stream.Name streamState, ok := syncState.Streams[streamStateKey] if !ok { ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey)) @@ -111,14 +115,14 @@ func ReadCommand(ch *Helper) *cobra.Command { for shardName, shardState := range streamState.Shards { var tc *psdbconnectv1alpha1.TableCursor - tc, err = shardState.SerializedCursorToTableCursor(table) + tc, err = shardState.SerializedCursorToTableCursor(configuredStream) ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey)) if err != nil { ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err)) os.Exit(1) } - sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, table, tc) + sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, configuredStream, tc) if err != nil { ch.Logger.Error(err.Error()) os.Exit(1)