diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 6e3fef873f6b..c950030909e8 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -32,6 +32,11 @@ Events in this category are logged to the `TELEMETRY` channel. An event of type `alter_changefeed` is an event for any ALTER CHANGEFEED statements that are run. +Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `CHANGEFEED` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| @@ -54,6 +59,11 @@ An event of type `alter_changefeed` is an event for any ALTER CHANGEFEED stateme An event of type `changefeed_canceled` is an event for any changefeed cancellations. +Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `CHANGEFEED` instead of `TELEMETRY`. + @@ -73,6 +83,11 @@ An event of type `changefeed_canceled` is an event for any changefeed cancellati An event of type `changefeed_emitted_bytes` is an event representing the bytes emitted by a changefeed over an interval. +Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `CHANGEFEED` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| @@ -99,6 +114,11 @@ An event of type `changefeed_emitted_bytes` is an event representing the bytes e An event of type `changefeed_failed` is an event for any changefeed failure since the plan hook was triggered. +Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `CHANGEFEED` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| @@ -123,6 +143,11 @@ An event of type `create_changefeed` is an event for any CREATE CHANGEFEED query successfully starts running. Failed CREATE statements will show up as ChangefeedFailed events. +Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `CHANGEFEED` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index ae2bb1933a04..5fa016f63a3a 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -144,6 +144,7 @@ go_library( "//pkg/util/intsets", "//pkg/util/json", "//pkg/util/log", + "//pkg/util/log/channel", "//pkg/util/log/eventpb", "//pkg/util/log/logcrash", "//pkg/util/log/severity", @@ -280,6 +281,7 @@ go_test( "//pkg/server", "//pkg/server/serverpb", "//pkg/server/telemetry", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/spanconfig/spanconfigjob", @@ -337,6 +339,8 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/log/eventpb", + "//pkg/util/log/logpb", + "//pkg/util/log/logtestutils", "//pkg/util/metamorphic", "//pkg/util/mon", "//pkg/util/parquet", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index 81b7968aabf3..2bbde400d0e2 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -250,8 +251,8 @@ func alterChangefeedPlanHook( } telemetry.Count(telemetryPath) - - logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size) + shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV()) + logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size, shouldMigrate) select { case <-ctx.Done(): diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 31bdc80436a6..0386a3f90f1d 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/channel" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/severity" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -78,8 +79,8 @@ func init() { sql.AddPlanHook("changefeed", changefeedPlanHook, changefeedTypeCheck) jobs.RegisterConstructor( jobspb.TypeChangefeed, - func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { - r := &changefeedResumer{job: job} + func(job *jobs.Job, s *cluster.Settings) jobs.Resumer { + r := &changefeedResumer{job: job, sv: &s.SV} r.mu.perNodeAggregatorStats = make(bulk.ComponentAggregatorStats) return r }, @@ -282,7 +283,8 @@ func changefeedPlanHook( p.ExtendedEvalContext().Descs.ReleaseAll(ctx) telemetry.Count(`changefeed.create.core`) - logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size) + shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV()) + logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size, shouldMigrate) if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil { return err } @@ -384,8 +386,8 @@ func changefeedPlanHook( if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil { return err } - - logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size) + shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV()) + logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size, shouldMigrate) select { case <-ctx.Done(): @@ -400,7 +402,9 @@ func changefeedPlanHook( rowFnLogErrors := func(ctx context.Context, resultsCh chan<- tree.Datums) error { err := rowFn(ctx, resultsCh) if err != nil { - logChangefeedFailedTelemetryDuringStartup(ctx, description, failureTypeForStartupError(err)) + + shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV()) + logChangefeedFailedTelemetryDuringStartup(ctx, description, failureTypeForStartupError(err), shouldMigrate) var e *kvpb.BatchTimestampBeforeGCError if errors.As(err, &e) && opts.HasStartCursor() { err = errors.Wrapf(err, @@ -1320,6 +1324,7 @@ func validateAndNormalizeChangefeedExpression( type changefeedResumer struct { job *jobs.Job + sv *settings.Values mu struct { syncutil.Mutex @@ -1792,17 +1797,17 @@ func (b *changefeedResumer) OnFailOrCancel( } numTargets = targets.Size } - + shouldMigrate := log.ShouldMigrateEvent(b.sv) // If this job has failed (not canceled), increment the counter. if jobs.HasErrJobCanceled( errors.DecodeError(ctx, *b.job.Payload().FinalResumeError), ) { telemetry.Count(`changefeed.enterprise.cancel`) - logChangefeedCanceledTelemetry(ctx, b.job, numTargets) + logChangefeedCanceledTelemetry(ctx, b.job, numTargets, shouldMigrate) } else { telemetry.Count(`changefeed.enterprise.fail`) exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1) - logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError, numTargets) + logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError, numTargets, shouldMigrate) } return nil } @@ -1890,7 +1895,7 @@ func getChangefeedTargetName( } func logCreateChangefeedTelemetry( - ctx context.Context, jr *jobs.Record, isTransformation bool, numTargets uint, + ctx context.Context, jr *jobs.Record, isTransformation bool, numTargets uint, migrateEvent bool, ) { var changefeedEventDetails eventpb.CommonChangefeedEventDetails if jr != nil { @@ -1903,11 +1908,11 @@ func logCreateChangefeedTelemetry( Transformation: isTransformation, } - log.StructuredEvent(ctx, severity.INFO, createChangefeedEvent) + getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, createChangefeedEvent) } func logAlterChangefeedTelemetry( - ctx context.Context, job *jobs.Job, prevDescription string, numTargets uint, + ctx context.Context, job *jobs.Job, prevDescription string, numTargets uint, migrateEvent bool, ) { var changefeedEventDetails eventpb.CommonChangefeedEventDetails if job != nil { @@ -1921,11 +1926,15 @@ func logAlterChangefeedTelemetry( PreviousDescription: prevDescription, } - log.StructuredEvent(ctx, severity.INFO, alterChangefeedEvent) + getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, alterChangefeedEvent) } func logChangefeedFailedTelemetry( - ctx context.Context, job *jobs.Job, failureType changefeedbase.FailureType, numTargets uint, + ctx context.Context, + job *jobs.Job, + failureType changefeedbase.FailureType, + numTargets uint, + migrateEvent bool, ) { var changefeedEventDetails eventpb.CommonChangefeedEventDetails if job != nil { @@ -1938,21 +1947,26 @@ func logChangefeedFailedTelemetry( FailureType: failureType, } - log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent) + getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedFailedEvent) } func logChangefeedFailedTelemetryDuringStartup( - ctx context.Context, description string, failureType changefeedbase.FailureType, + ctx context.Context, + description string, + failureType changefeedbase.FailureType, + migrateEvent bool, ) { changefeedFailedEvent := &eventpb.ChangefeedFailed{ CommonChangefeedEventDetails: eventpb.CommonChangefeedEventDetails{Description: description}, FailureType: failureType, } - log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent) + getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedFailedEvent) } -func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTargets uint) { +func logChangefeedCanceledTelemetry( + ctx context.Context, job *jobs.Job, numTargets uint, migrateEvent bool, +) { var changefeedEventDetails eventpb.CommonChangefeedEventDetails if job != nil { changefeedDetails := job.Details().(jobspb.ChangefeedDetails) @@ -1962,7 +1976,7 @@ func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTarge changefeedCanceled := &eventpb.ChangefeedCanceled{ CommonChangefeedEventDetails: changefeedEventDetails, } - log.StructuredEvent(ctx, severity.INFO, changefeedCanceled) + getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedCanceled) } func makeCommonChangefeedEventDetails( @@ -2101,3 +2115,12 @@ func maybeUpgradePreProductionReadyExpression( "Please see CDC documentation on the use of new cdc_prev tuple.", tree.AsString(oldExpression), tree.AsString(newExpression)) } + +func getChangefeedEventMigrator(migrateEvent bool) log.StructuredEventMigrator { + return log.NewStructuredEventMigrator( + func() bool { + return migrateEvent + }, + channel.CHANGEFEED, + ) +} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 8b7374007ce6..58a51ccdabfe 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -60,6 +60,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -95,6 +96,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randident" @@ -7945,6 +7948,16 @@ func TestChangefeedContinuousTelemetry(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"changefeed_emitted_bytes"}, + logtestutils.FromLogEntry[eventpb.ChangefeedEmittedBytes], + ) + + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { // Hack: since setting a zero value disabled, set a negative value to ensure we always log. interval := -10 * time.Millisecond @@ -7964,9 +7977,8 @@ func TestChangefeedContinuousTelemetry(t *testing.T) { } for i := 0; i < 5; i++ { - beforeCreate := timeutil.Now() sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo VALUES (%d) RETURNING cluster_logical_timestamp()`, i)) - verifyLogsWithEmittedBytesAndMessages(t, jobID, beforeCreate.UnixNano(), interval.Nanoseconds(), false) + verifyLogsWithEmittedBytesAndMessages(t, jobID, cfLogSpy, &s.Server.ClusterSettings().SV, interval.Nanoseconds(), false) } } @@ -8001,7 +8013,16 @@ func (t *testTelemetryLogger) close() { func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"changefeed_emitted_bytes"}, + logtestutils.FromLogEntry[eventpb.ChangefeedEmittedBytes], + ) + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { interval := 24 * time.Hour continuousTelemetryInterval.Override(context.Background(), &s.Server.ClusterSettings().SV, interval) @@ -8036,18 +8057,16 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) { } // Insert a row and wait for logs to be created. - beforeFirstLog := timeutil.Now() foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) var jobID jobspb.JobID if foo, ok := foo.(cdctest.EnterpriseTestFeed); ok { jobID = foo.JobID() } testutils.SucceedsSoon(t, waitForIncEmittedCounters) - verifyLogsWithEmittedBytesAndMessages(t, jobID, beforeFirstLog.UnixNano(), interval.Nanoseconds(), false /* closing */) + verifyLogsWithEmittedBytesAndMessages(t, jobID, cfLogSpy, &s.Server.ClusterSettings().SV, interval.Nanoseconds(), false /* closing */) // Insert more rows. No logs should be created for these since we recently // published them above and the interval is 24h. - afterFirstLog := timeutil.Now() seen.Store(false) sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`) @@ -8065,9 +8084,7 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) { t.Log("transient error") } - verifyLogsWithEmittedBytesAndMessages( - t, jobID, afterFirstLog.UnixNano(), interval.Nanoseconds(), true, /* closing */ - ) + verifyLogsWithEmittedBytesAndMessages(t, jobID, cfLogSpy, &s.Server.ClusterSettings().SV, interval.Nanoseconds(), true /* closing */) } cdcTest(t, testFn) @@ -8076,6 +8093,16 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) { func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"changefeed_emitted_bytes"}, + logtestutils.FromLogEntry[eventpb.ChangefeedEmittedBytes], + ) + + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { // Hack: since setting a zero value disabled, set a negative value to ensure we always log. @@ -8090,16 +8117,14 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) { job1 := foo.(cdctest.EnterpriseTestFeed).JobID() job2 := foo2.(cdctest.EnterpriseTestFeed).JobID() - beforeInsert := timeutil.Now() sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) + verifyLogsWithEmittedBytesAndMessages(t, job1, cfLogSpy, &s.Server.ClusterSettings().SV, interval.Nanoseconds(), false) sqlDB.Exec(t, `INSERT INTO foo2 VALUES (1)`) - verifyLogsWithEmittedBytesAndMessages(t, job1, beforeInsert.UnixNano(), interval.Nanoseconds(), false) - verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false) + verifyLogsWithEmittedBytesAndMessages(t, job2, cfLogSpy, &s.Server.ClusterSettings().SV, interval.Nanoseconds(), false) require.NoError(t, foo.Close()) - beforeInsert = timeutil.Now() sqlDB.Exec(t, `INSERT INTO foo2 VALUES (2)`) - verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false) + verifyLogsWithEmittedBytesAndMessages(t, job2, cfLogSpy, &s.Server.ClusterSettings().SV, interval.Nanoseconds(), false) require.NoError(t, foo2.Close()) } @@ -10615,6 +10640,17 @@ func TestCreateChangefeedTelemetryLogs(t *testing.T) { s, stopServer := makeServer(t) defer stopServer() + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"create_changefeed"}, + logtestutils.FromLogEntry[eventpb.CreateChangefeed], + ) + + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() + sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) @@ -10625,22 +10661,20 @@ func TestCreateChangefeedTelemetryLogs(t *testing.T) { coreFeedFactory, cleanup := makeFeedFactory(t, "sinkless", s.Server, s.DB) defer cleanup() - beforeCreateSinkless := timeutil.Now() coreFeed := feed(t, coreFeedFactory, `CREATE CHANGEFEED FOR foo`) defer closeFeed(t, coreFeed) - createLogs := checkCreateChangefeedLogs(t, beforeCreateSinkless.UnixNano()) + createLogs := cfLogSpy.GetUnreadLogs(getChangefeedLoggingChannel(&s.Server.ClusterSettings().SV)) require.Equal(t, 1, len(createLogs)) require.Equal(t, "core", createLogs[0].SinkType) }) t.Run(`gcpubsub_sink_type_with_options`, func(t *testing.T) { pubsubFeedFactory := makePubsubFeedFactory(s.Server, s.DB) - beforeCreatePubsub := timeutil.Now() pubsubFeed := feed(t, pubsubFeedFactory, `CREATE CHANGEFEED FOR foo, bar WITH resolved="10s", no_initial_scan`) defer closeFeed(t, pubsubFeed) - createLogs := checkCreateChangefeedLogs(t, beforeCreatePubsub.UnixNano()) + createLogs := cfLogSpy.GetUnreadLogs(getChangefeedLoggingChannel(&s.Server.ClusterSettings().SV)) require.Equal(t, 1, len(createLogs)) require.Equal(t, `gcpubsub`, createLogs[0].SinkType) require.Equal(t, int32(2), createLogs[0].NumTables) @@ -10651,11 +10685,10 @@ func TestCreateChangefeedTelemetryLogs(t *testing.T) { t.Run(`with_transformation`, func(t *testing.T) { pubsubFeedFactory := makePubsubFeedFactory(s.Server, s.DB) - beforeCreateWithTransformation := timeutil.Now() pubsubFeed := feed(t, pubsubFeedFactory, `CREATE CHANGEFEED AS SELECT b FROM foo`) defer closeFeed(t, pubsubFeed) - createLogs := checkCreateChangefeedLogs(t, beforeCreateWithTransformation.UnixNano()) + createLogs := cfLogSpy.GetUnreadLogs(getChangefeedLoggingChannel(&s.Server.ClusterSettings().SV)) require.Equal(t, 1, len(createLogs)) require.Equal(t, true, createLogs[0].Transformation) }) @@ -10665,12 +10698,21 @@ func TestAlterChangefeedTelemetryLogs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"alter_changefeed"}, + logtestutils.FromLogEntry[eventpb.AlterChangefeed], + ) + + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) - beforeCreate := timeutil.Now() testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -10682,7 +10724,7 @@ func TestAlterChangefeedTelemetryLogs(t *testing.T) { var logs []eventpb.AlterChangefeed testutils.SucceedsSoon(t, func() error { - logs = checkAlterChangefeedLogs(t, beforeCreate.UnixNano()) + logs = cfLogSpy.GetUnreadLogs(getChangefeedLoggingChannel(&s.Server.ClusterSettings().SV)) if len(logs) < 1 { return errors.New("no logs found") } @@ -10704,10 +10746,22 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - waitForLogs := func(t *testing.T, startTime time.Time) []eventpb.ChangefeedFailed { + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"changefeed_failed"}, + logtestutils.FromLogEntry[eventpb.ChangefeedFailed], + ) + + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() + + waitForLogs := func(t *testing.T, sv *settings.Values) []eventpb.ChangefeedFailed { + t.Helper() var logs []eventpb.ChangefeedFailed testutils.SucceedsSoon(t, func() error { - logs = checkChangefeedFailedLogs(t, startTime.UnixNano()) + logs = cfLogSpy.GetUnreadLogs(getChangefeedLoggingChannel(sv)) if len(logs) < 1 { return fmt.Errorf("no logs found") } @@ -10730,12 +10784,10 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { assertPayloads(t, coreFeed, []string{ `foo: [0]->{"after": {"a": 0, "b": "updated"}}`, }) - beforeCoreSinkClose := timeutil.Now() sinkCleanup() closeFeed(t, coreFeed) - - failLogs := waitForLogs(t, beforeCoreSinkClose) + failLogs := waitForLogs(t, &s.Server.ClusterSettings().SV) require.Equal(t, 1, len(failLogs)) require.Equal(t, failLogs[0].FailureType, changefeedbase.ConnectionClosed) }) @@ -10744,11 +10796,10 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - beforeCreate := timeutil.Now() _, err := f.Feed(`CREATE CHANGEFEED FOR foo, invalid_table`) require.Error(t, err) - failLogs := waitForLogs(t, beforeCreate) + failLogs := waitForLogs(t, &s.Server.ClusterSettings().SV) require.Equal(t, 1, len(failLogs)) require.Equal(t, failLogs[0].FailureType, changefeedbase.UserInput) }, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error")) @@ -10764,14 +10815,13 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { return changefeedbase.WithTerminalError(errors.New("should fail")) } - beforeCreate := timeutil.Now() foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error=FAIL`) sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'next')`) feedJob := foo.(cdctest.EnterpriseTestFeed) require.NoError(t, feedJob.WaitForState(func(s jobs.State) bool { return s == jobs.StateFailed })) closeFeed(t, foo) - failLogs := waitForLogs(t, beforeCreate) + failLogs := waitForLogs(t, &s.Server.ClusterSettings().SV) require.Equal(t, 1, len(failLogs)) require.Equal(t, failLogs[0].FailureType, changefeedbase.UnknownError) require.Contains(t, []string{`gcpubsub`, `external`}, failLogs[0].SinkType) @@ -10783,10 +10833,20 @@ func TestChangefeedCanceledTelemetryLogs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - waitForLogs := func(t *testing.T, startTime time.Time) []eventpb.ChangefeedCanceled { + ctx := context.Background() + cfLogSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_CHANGEFEED}, + []string{"changefeed_canceled"}, + logtestutils.FromLogEntry[eventpb.ChangefeedCanceled], + ) + + cleanup := log.InterceptWith(ctx, cfLogSpy) + defer cleanup() + waitForLogs := func(t *testing.T, sv *settings.Values) []eventpb.ChangefeedCanceled { var logs []eventpb.ChangefeedCanceled testutils.SucceedsSoon(t, func() error { - logs = checkChangefeedCanceledLogs(t, startTime.UnixNano()) + logs = cfLogSpy.GetUnreadLogs(getChangefeedLoggingChannel(sv)) if len(logs) < 1 { return fmt.Errorf("no logs found") } @@ -10799,17 +10859,15 @@ func TestChangefeedCanceledTelemetryLogs(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - beforeCreate := timeutil.Now() feed, err := f.Feed(`CREATE CHANGEFEED FOR foo`) require.NoError(t, err) enterpriseFeed := feed.(cdctest.EnterpriseTestFeed) sqlDB.Exec(t, `CANCEL JOB $1`, enterpriseFeed.JobID()) - canceledLogs := waitForLogs(t, beforeCreate) + canceledLogs := waitForLogs(t, &s.Server.ClusterSettings().SV) require.Equal(t, 1, len(canceledLogs)) require.Equal(t, enterpriseFeed.JobID().String(), strconv.FormatInt(canceledLogs[0].JobId, 10)) - require.Equal(t, "changefeed_canceled", canceledLogs[0].EventType) require.NoError(t, feed.Close()) }, feedTestEnterpriseSinks) } @@ -12547,3 +12605,10 @@ func TestTableRenameDuringDatabaseLevelChangefeed(t *testing.T) { } cdcTest(t, testFn) } + +func getChangefeedLoggingChannel(sv *settings.Values) logpb.Channel { + if log.ShouldMigrateEvent(sv) { + return logpb.Channel_CHANGEFEED + } + return logpb.Channel_TELEMETRY +} diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 9e8bca5d9c52..eb09b3023213 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -17,7 +17,6 @@ import ( "net/url" "os" "reflect" - "regexp" "slices" "sort" "strconv" @@ -43,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -1724,68 +1725,20 @@ func forceTableGC( } } -// All structured logs should contain this property which stores the snake_cased -// version of the name of the message struct -type BaseEventStruct struct { - EventType string -} - -var cmLogRe = regexp.MustCompile(`event_log\.go`) - -func checkStructuredLogs(t *testing.T, eventType string, startTime int64) []string { - var matchingEntries []string - testutils.SucceedsSoon(t, func() error { - log.FlushFiles() - entries, err := log.FetchEntriesFromFiles(startTime, - math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) - if err != nil { - t.Fatal(err) - } - - for _, e := range entries { - jsonPayload := []byte(e.Message) - var baseStruct BaseEventStruct - if err := gojson.Unmarshal(jsonPayload, &baseStruct); err != nil { - continue - } - if baseStruct.EventType != eventType { - continue - } - - matchingEntries = append(matchingEntries, e.Message) - } - - return nil - }) - - return matchingEntries -} - -func checkContinuousChangefeedLogs(t *testing.T, startTime int64) []eventpb.ChangefeedEmittedBytes { - logs := checkStructuredLogs(t, "changefeed_emitted_bytes", startTime) - matchingEntries := make([]eventpb.ChangefeedEmittedBytes, len(logs)) - - for i, m := range logs { - jsonPayload := []byte(m) - var event eventpb.ChangefeedEmittedBytes - if err := gojson.Unmarshal(jsonPayload, &event); err != nil { - t.Errorf("unmarshalling %q: %v", m, err) - } - matchingEntries[i] = event - } - - return matchingEntries -} - // verifyLogsWithEmittedBytes fetches changefeed_emitted_bytes telemetry logs produced // after startTime for a particular job and asserts that at least one message has positive emitted bytes. // This function also asserts the LoggingInterval and Closing fields of // each message. func verifyLogsWithEmittedBytesAndMessages( - t *testing.T, jobID jobspb.JobID, startTime int64, interval int64, closing bool, + t *testing.T, + jobID jobspb.JobID, + logSpy *logtestutils.StructuredLogSpy[eventpb.ChangefeedEmittedBytes], + sv *settings.Values, + interval int64, + closing bool, ) { testutils.SucceedsSoon(t, func() error { - emittedBytesLogs := checkContinuousChangefeedLogs(t, startTime) + emittedBytesLogs := logSpy.GetUnreadLogs(getChangefeedLoggingChannel(sv)) if len(emittedBytesLogs) == 0 { return errors.New("no logs found") } @@ -1811,37 +1764,6 @@ func verifyLogsWithEmittedBytesAndMessages( }) } -func checkCreateChangefeedLogs(t *testing.T, startTime int64) []eventpb.CreateChangefeed { - return checkStructuredChangefeedLogs[eventpb.CreateChangefeed](t, `create_changefeed`, startTime) -} - -func checkAlterChangefeedLogs(t *testing.T, startTime int64) []eventpb.AlterChangefeed { - return checkStructuredChangefeedLogs[eventpb.AlterChangefeed](t, `alter_changefeed`, startTime) -} - -func checkChangefeedFailedLogs(t *testing.T, startTime int64) []eventpb.ChangefeedFailed { - return checkStructuredChangefeedLogs[eventpb.ChangefeedFailed](t, `changefeed_failed`, startTime) -} - -func checkChangefeedCanceledLogs(t *testing.T, startTime int64) []eventpb.ChangefeedCanceled { - return checkStructuredChangefeedLogs[eventpb.ChangefeedCanceled](t, `changefeed_canceled`, startTime) -} - -func checkStructuredChangefeedLogs[E any](t *testing.T, name string, startTime int64) []E { - var matchingEntries []E - - for _, m := range checkStructuredLogs(t, name, startTime) { - jsonPayload := []byte(m) - var event E - if err := gojson.Unmarshal(jsonPayload, &event); err != nil { - t.Errorf("unmarshalling %q: %v", m, err) - } - matchingEntries = append(matchingEntries, event) - } - - return matchingEntries -} - func checkS3Credentials(t *testing.T) (bucket string, accessKey string, secretKey string) { accessKey = os.Getenv("AWS_ACCESS_KEY_ID") if accessKey == "" { diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 4e032a533c26..86a0e0a1b071 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -106,7 +106,9 @@ func (ptl *periodicTelemetryLogger) maybeFlushLogs() { EmittedMessages: ptl.resetEmittedMessages(), LoggingInterval: loggingInterval, } - log.StructuredEvent(ptl.ctx, severity.INFO, continuousTelemetryEvent) + + shouldMigrate := log.ShouldMigrateEvent(&ptl.settings.SV) + getChangefeedEventMigrator(shouldMigrate).StructuredEvent(ptl.ctx, severity.INFO, continuousTelemetryEvent) } func (ptl *periodicTelemetryLogger) close() { @@ -122,7 +124,9 @@ func (ptl *periodicTelemetryLogger) close() { LoggingInterval: loggingInterval, Closing: true, } - log.StructuredEvent(ptl.ctx, severity.INFO, continuousTelemetryEvent) + + shouldMigrate := log.ShouldMigrateEvent(&ptl.settings.SV) + getChangefeedEventMigrator(shouldMigrate).StructuredEvent(ptl.ctx, severity.INFO, continuousTelemetryEvent) } func wrapMetricsRecorderWithTelemetry( diff --git a/pkg/util/log/eventpb/changefeed_events.proto b/pkg/util/log/eventpb/changefeed_events.proto index 7823db23d493..6a32e2d6583c 100644 --- a/pkg/util/log/eventpb/changefeed_events.proto +++ b/pkg/util/log/eventpb/changefeed_events.proto @@ -10,6 +10,10 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/util/log/eventpb"; import "gogoproto/gogo.proto"; import "util/log/eventpb/events.proto"; +// TODO (#151948): Update channel from TELEMETRY to CHANGEFEED once the +// `log.channel_compatibility_mode_enabled` cluster setting is set to false +// by default. + // Category: Changefeed telemetry events // Channel: TELEMETRY // @@ -18,6 +22,11 @@ import "util/log/eventpb/events.proto"; // CreateChangefeed is an event for any CREATE CHANGEFEED query that // successfully starts running. Failed CREATE statements will show up as // ChangefeedFailed events. +// +// Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `CHANGEFEED` instead of `TELEMETRY`. message CreateChangefeed { CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; @@ -26,6 +35,11 @@ message CreateChangefeed { } // AlterChangefeed is an event for any ALTER CHANGEFEED statements that are run. +// +// Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `CHANGEFEED` instead of `TELEMETRY`. message AlterChangefeed { CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; @@ -38,6 +52,11 @@ message AlterChangefeed { // ChangefeedFailed is an event for any changefeed failure since the plan hook // was triggered. +// +// Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `CHANGEFEED` instead of `TELEMETRY`. message ChangefeedFailed { CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; @@ -47,11 +66,21 @@ message ChangefeedFailed { } // ChangefeedCanceled is an event for any changefeed cancellations. +// +// Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `CHANGEFEED` instead of `TELEMETRY`. message ChangefeedCanceled { CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; } // ChangefeedEmittedBytes is an event representing the bytes emitted by a changefeed over an interval. +// +// Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `CHANGEFEED` instead of `TELEMETRY`. message ChangefeedEmittedBytes { CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];