Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ Events in this category are logged to the `TELEMETRY` channel.

An event of type `alter_changefeed` is an event for any ALTER CHANGEFEED statements that are run.

Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
To test compatability before this, set the cluster setting
`log.channel_compatibility_mode.enabled` to false. This will send the
events to `CHANGEFEED` instead of `TELEMETRY`.


| Field | Description | Sensitive |
|--|--|--|
Expand All @@ -54,6 +59,11 @@ An event of type `alter_changefeed` is an event for any ALTER CHANGEFEED stateme

An event of type `changefeed_canceled` is an event for any changefeed cancellations.

Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
To test compatability before this, set the cluster setting
`log.channel_compatibility_mode.enabled` to false. This will send the
events to `CHANGEFEED` instead of `TELEMETRY`.




Expand All @@ -73,6 +83,11 @@ An event of type `changefeed_canceled` is an event for any changefeed cancellati

An event of type `changefeed_emitted_bytes` is an event representing the bytes emitted by a changefeed over an interval.

Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
To test compatability before this, set the cluster setting
`log.channel_compatibility_mode.enabled` to false. This will send the
events to `CHANGEFEED` instead of `TELEMETRY`.


| Field | Description | Sensitive |
|--|--|--|
Expand All @@ -99,6 +114,11 @@ An event of type `changefeed_emitted_bytes` is an event representing the bytes e
An event of type `changefeed_failed` is an event for any changefeed failure since the plan hook
was triggered.

Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
To test compatability before this, set the cluster setting
`log.channel_compatibility_mode.enabled` to false. This will send the
events to `CHANGEFEED` instead of `TELEMETRY`.


| Field | Description | Sensitive |
|--|--|--|
Expand All @@ -123,6 +143,11 @@ An event of type `create_changefeed` is an event for any CREATE CHANGEFEED query
successfully starts running. Failed CREATE statements will show up as
ChangefeedFailed events.

Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
To test compatability before this, set the cluster setting
`log.channel_compatibility_mode.enabled` to false. This will send the
events to `CHANGEFEED` instead of `TELEMETRY`.


| Field | Description | Sensitive |
|--|--|--|
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ go_library(
"//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/channel",
"//pkg/util/log/eventpb",
"//pkg/util/log/logcrash",
"//pkg/util/log/severity",
Expand Down Expand Up @@ -280,6 +281,7 @@ go_test(
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigjob",
Expand Down Expand Up @@ -337,6 +339,8 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/log/logpb",
"//pkg/util/log/logtestutils",
"//pkg/util/metamorphic",
"//pkg/util/mon",
"//pkg/util/parquet",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -250,8 +251,8 @@ func alterChangefeedPlanHook(
}

telemetry.Count(telemetryPath)

logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size)
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size, shouldMigrate)

select {
case <-ctx.Done():
Expand Down
61 changes: 42 additions & 19 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand All @@ -78,8 +79,8 @@ func init() {
sql.AddPlanHook("changefeed", changefeedPlanHook, changefeedTypeCheck)
jobs.RegisterConstructor(
jobspb.TypeChangefeed,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
r := &changefeedResumer{job: job}
func(job *jobs.Job, s *cluster.Settings) jobs.Resumer {
r := &changefeedResumer{job: job, sv: &s.SV}
r.mu.perNodeAggregatorStats = make(bulk.ComponentAggregatorStats)
return r
},
Expand Down Expand Up @@ -282,7 +283,8 @@ func changefeedPlanHook(
p.ExtendedEvalContext().Descs.ReleaseAll(ctx)

telemetry.Count(`changefeed.create.core`)
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size)
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size, shouldMigrate)
if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil {
return err
}
Expand Down Expand Up @@ -384,8 +386,8 @@ func changefeedPlanHook(
if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil {
return err
}

logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size)
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size, shouldMigrate)

select {
case <-ctx.Done():
Expand All @@ -400,7 +402,9 @@ func changefeedPlanHook(
rowFnLogErrors := func(ctx context.Context, resultsCh chan<- tree.Datums) error {
err := rowFn(ctx, resultsCh)
if err != nil {
logChangefeedFailedTelemetryDuringStartup(ctx, description, failureTypeForStartupError(err))

shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
logChangefeedFailedTelemetryDuringStartup(ctx, description, failureTypeForStartupError(err), shouldMigrate)
var e *kvpb.BatchTimestampBeforeGCError
if errors.As(err, &e) && opts.HasStartCursor() {
err = errors.Wrapf(err,
Expand Down Expand Up @@ -1320,6 +1324,7 @@ func validateAndNormalizeChangefeedExpression(

type changefeedResumer struct {
job *jobs.Job
sv *settings.Values

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -1792,17 +1797,17 @@ func (b *changefeedResumer) OnFailOrCancel(
}
numTargets = targets.Size
}

shouldMigrate := log.ShouldMigrateEvent(b.sv)
// If this job has failed (not canceled), increment the counter.
if jobs.HasErrJobCanceled(
errors.DecodeError(ctx, *b.job.Payload().FinalResumeError),
) {
telemetry.Count(`changefeed.enterprise.cancel`)
logChangefeedCanceledTelemetry(ctx, b.job, numTargets)
logChangefeedCanceledTelemetry(ctx, b.job, numTargets, shouldMigrate)
} else {
telemetry.Count(`changefeed.enterprise.fail`)
exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1)
logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError, numTargets)
logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError, numTargets, shouldMigrate)
}
return nil
}
Expand Down Expand Up @@ -1890,7 +1895,7 @@ func getChangefeedTargetName(
}

func logCreateChangefeedTelemetry(
ctx context.Context, jr *jobs.Record, isTransformation bool, numTargets uint,
ctx context.Context, jr *jobs.Record, isTransformation bool, numTargets uint, migrateEvent bool,
) {
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if jr != nil {
Expand All @@ -1903,11 +1908,11 @@ func logCreateChangefeedTelemetry(
Transformation: isTransformation,
}

log.StructuredEvent(ctx, severity.INFO, createChangefeedEvent)
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, createChangefeedEvent)
}

func logAlterChangefeedTelemetry(
ctx context.Context, job *jobs.Job, prevDescription string, numTargets uint,
ctx context.Context, job *jobs.Job, prevDescription string, numTargets uint, migrateEvent bool,
) {
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if job != nil {
Expand All @@ -1921,11 +1926,15 @@ func logAlterChangefeedTelemetry(
PreviousDescription: prevDescription,
}

log.StructuredEvent(ctx, severity.INFO, alterChangefeedEvent)
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, alterChangefeedEvent)
}

func logChangefeedFailedTelemetry(
ctx context.Context, job *jobs.Job, failureType changefeedbase.FailureType, numTargets uint,
ctx context.Context,
job *jobs.Job,
failureType changefeedbase.FailureType,
numTargets uint,
migrateEvent bool,
) {
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if job != nil {
Expand All @@ -1938,21 +1947,26 @@ func logChangefeedFailedTelemetry(
FailureType: failureType,
}

log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
}

func logChangefeedFailedTelemetryDuringStartup(
ctx context.Context, description string, failureType changefeedbase.FailureType,
ctx context.Context,
description string,
failureType changefeedbase.FailureType,
migrateEvent bool,
) {
changefeedFailedEvent := &eventpb.ChangefeedFailed{
CommonChangefeedEventDetails: eventpb.CommonChangefeedEventDetails{Description: description},
FailureType: failureType,
}

log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
}

func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTargets uint) {
func logChangefeedCanceledTelemetry(
ctx context.Context, job *jobs.Job, numTargets uint, migrateEvent bool,
) {
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if job != nil {
changefeedDetails := job.Details().(jobspb.ChangefeedDetails)
Expand All @@ -1962,7 +1976,7 @@ func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTarge
changefeedCanceled := &eventpb.ChangefeedCanceled{
CommonChangefeedEventDetails: changefeedEventDetails,
}
log.StructuredEvent(ctx, severity.INFO, changefeedCanceled)
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedCanceled)
}

func makeCommonChangefeedEventDetails(
Expand Down Expand Up @@ -2101,3 +2115,12 @@ func maybeUpgradePreProductionReadyExpression(
"Please see CDC documentation on the use of new cdc_prev tuple.",
tree.AsString(oldExpression), tree.AsString(newExpression))
}

func getChangefeedEventMigrator(migrateEvent bool) log.StructuredEventMigrator {
return log.NewStructuredEventMigrator(
func() bool {
return migrateEvent
},
channel.CHANGEFEED,
)
}
Loading
Loading