From 6e52e0cb3caa0a93bf5d30f9e3452c54dec45218 Mon Sep 17 00:00:00 2001 From: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Date: Fri, 15 Aug 2025 12:33:53 -0500 Subject: [PATCH] sql: prep migration for sampled_query and sampled_transaction events In v26.1, sampled_query and sampled_transaction events will be moved from the TELEMETRY logging channel to the SQL_EXEC logging channel. This commit gates this migration under the cluster setting: `log.channel_compatibility_mode.enabled` and will log these events to the SQL_EXEC channel if this setting is set to false. Users can set this setting to false in their clusters to validate, test, and identify potential downstream impacts to their logging setups and pipelines. Epic: CRDB-53410 Part of: CRDB-53412 Release note (ops change): sampled_query and sampled_transaction events will be moved to the SQL_EXEC channel in 26.1. In order to test the impact of these changes, users can set the setting: `log.channel_compatibility_mode.enabled` to false. Note that this will cause these logs to start logging in the SQL_EXEC channel so this shouldn't be tested in a production environment. --- docs/generated/eventlog.md | 10 + pkg/ccl/telemetryccl/BUILD.bazel | 1 + .../telemetryccl/telemetry_logging_test.go | 30 ++- pkg/sql/exec_log.go | 13 +- pkg/sql/telemetry_datadriven_test.go | 9 +- pkg/sql/telemetry_logging_test.go | 213 ++++++++---------- pkg/util/log/eventpb/telemetry.proto | 21 ++ .../log/logtestutils/structured_log_spy.go | 5 + 8 files changed, 166 insertions(+), 136 deletions(-) diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 80dfd84e2da9..72279cca7ba6 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -3364,6 +3364,11 @@ Fields in this struct should be updated in sync with apps_stats.proto. An event of type `sampled_query` is the SQL query event logged to the telemetry channel. It contains common SQL event/execution details. +Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `SQL_EXEC` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| @@ -3472,6 +3477,11 @@ contains common SQL event/execution details. An event of type `sampled_transaction` is the event logged to telemetry at the end of transaction execution. +Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `SQL_EXEC` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| diff --git a/pkg/ccl/telemetryccl/BUILD.bazel b/pkg/ccl/telemetryccl/BUILD.bazel index c1c160d00b42..8052f9250f52 100644 --- a/pkg/ccl/telemetryccl/BUILD.bazel +++ b/pkg/ccl/telemetryccl/BUILD.bazel @@ -20,6 +20,7 @@ go_test( "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings", "//pkg/sql", "//pkg/sql/sem/tree", "//pkg/sql/sqltestutils", diff --git a/pkg/ccl/telemetryccl/telemetry_logging_test.go b/pkg/ccl/telemetryccl/telemetry_logging_test.go index 52568a46f997..9a2c09a5162b 100644 --- a/pkg/ccl/telemetryccl/telemetry_logging_test.go +++ b/pkg/ccl/telemetryccl/telemetry_logging_test.go @@ -13,6 +13,7 @@ import ( "net/http" "net/http/httptest" "regexp" + "slices" "strings" "testing" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -155,20 +157,29 @@ type expectedSampleQueryEvent struct { } type telemetrySpy struct { - t *testing.T + t *testing.T + sv *settings.Values sampledQueries []eventpb.SampledQuery sampledQueriesRaw []logpb.Entry recoveryEvents []eventpb.RecoveryEvent } +func (l *telemetrySpy) channelsToIntercept() []log.Channel { + if log.ShouldMigrateEvent(l.sv) { + return []log.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC} + } + + return []log.Channel{logpb.Channel_TELEMETRY} +} + func (l *telemetrySpy) Intercept(entry []byte) { var rawLog logpb.Entry if err := json.Unmarshal(entry, &rawLog); err != nil { l.t.Errorf("failed unmarshaling %s: %s", entry, err) } - if rawLog.Channel != logpb.Channel_TELEMETRY { + if !slices.Contains(l.channelsToIntercept(), rawLog.Channel) { return } @@ -204,12 +215,6 @@ func TestBulkJobTelemetryLogging(t *testing.T) { ctx := context.Background() - spy := &telemetrySpy{ - t: t, - } - cleanup := log.InterceptWith(ctx, spy) - defer cleanup() - st := logtestutils.StubTime{} sqm := logtestutils.StubQueryStats{} sts := logtestutils.StubTracingStatus{} @@ -229,6 +234,15 @@ func TestBulkJobTelemetryLogging(t *testing.T) { ExternalIODir: dir, }, }) + + spy := &telemetrySpy{ + t: t, + sv: &testCluster.Server(0).ClusterSettings().SV, + } + + cleanup := log.InterceptWith(ctx, spy) + defer cleanup() + sqlDB := testCluster.ServerConn(0) defer func() { testCluster.Stopper().Stop(context.Background()) diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 31394da2d8d7..33f6924bb34b 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -358,6 +358,7 @@ func (p *planner) maybeLogStatementInternal( *sampledQuery = eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, + CommonSQLEventDetails: p.getCommonSQLEventDetails(), SkippedQueries: skippedQueries, CostEstimate: p.curPlan.instrumentation.costEstimate, Distribution: p.curPlan.instrumentation.distribution.String(), @@ -436,7 +437,11 @@ func (p *planner) maybeLogStatementInternal( SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(), } - p.logEventsOnlyExternally(ctx, sampledQuery) + migrator := log.NewStructuredEventMigrator(func() bool { + return log.ShouldMigrateEvent(p.ExecCfg().SV()) + }, logpb.Channel_SQL_EXEC) + + migrator.StructuredEvent(ctx, severity.INFO, sampledQuery) } } @@ -521,7 +526,11 @@ func (p *planner) logTransaction( } } - log.StructuredEvent(ctx, severity.INFO, sampledTxn) + migrator := log.NewStructuredEventMigrator(func() bool { + return log.ShouldMigrateEvent(p.ExecCfg().SV()) + }, logpb.Channel_SQL_EXEC) + + migrator.StructuredEvent(ctx, severity.INFO, sampledTxn) } func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { diff --git a/pkg/sql/telemetry_datadriven_test.go b/pkg/sql/telemetry_datadriven_test.go index 7ebe9b8afb93..1a1836b85d8f 100644 --- a/pkg/sql/telemetry_datadriven_test.go +++ b/pkg/sql/telemetry_datadriven_test.go @@ -60,13 +60,12 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { sc := log.Scope(t) defer sc.Close(t) - appName := "telemetry-logging-datadriven" ignoredAppname := "telemetry-datadriven-ignored-appname" ctx := context.Background() stmtSpy := logtestutils.NewStructuredLogSpy( t, - []logpb.Channel{logpb.Channel_TELEMETRY}, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, []string{"sampled_query"}, logtestutils.FormatEntryAsJSON, func(_ logpb.Entry, logStr string) bool { @@ -79,7 +78,7 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { txnsSpy := logtestutils.NewStructuredLogSpy( t, - []logpb.Channel{logpb.Channel_TELEMETRY}, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, []string{"sampled_transaction"}, logtestutils.FormatEntryAsJSON, func(_ logpb.Entry, logStr string) bool { @@ -209,13 +208,13 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } newStmtLogCount := stmtSpy.Count() - sb.WriteString(strings.Join(stmtSpy.GetLastNLogs(logpb.Channel_TELEMETRY, newStmtLogCount-stmtLogCount), "\n")) + sb.WriteString(strings.Join(stmtSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), newStmtLogCount-stmtLogCount), "\n")) if newStmtLogCount > stmtLogCount { sb.WriteString("\n") } newTxnLogCount := txnsSpy.Count() - sb.WriteString(strings.Join(txnsSpy.GetLastNLogs(logpb.Channel_TELEMETRY, newTxnLogCount-txnLogCount), "\n")) + sb.WriteString(strings.Join(txnsSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), newTxnLogCount-txnLogCount), "\n")) return sb.String() case "reset-last-sampled": telemetryLogging.resetLastSampledTime() diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index de6fcf4044ab..2b7ac3aa1734 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -9,13 +9,13 @@ import ( "context" "encoding/json" "fmt" - "math" "regexp" "strings" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" @@ -40,9 +40,25 @@ func TestTelemetryLogging(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() + txnSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_transaction"}, + logtestutils.AsLogEntry, + ) + txnCleanup := log.InterceptWith(ctx, txnSpy) + defer txnCleanup() + st := logtestutils.StubTime{} sqm := logtestutils.StubQueryStats{} sts := logtestutils.StubTracingStatus{} @@ -432,27 +448,10 @@ func TestTelemetryLogging(t *testing.T) { // We should not see any transaction events in statement // telemetry mode. - txnEntries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_transaction"`), - log.WithMarkedSensitiveData, - ) - require.NoError(t, err) + txnEntries := txnSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), txnSpy.Count()) require.Emptyf(t, txnEntries, "found unexpected transaction telemetry events: %v", txnEntries) - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), stmtSpy.Count()) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -471,9 +470,7 @@ func TestTelemetryLogging(t *testing.T) { }) logCount := 0 expectedLogCount := len(tc.expectedSkipped) - // NB: FetchEntriesFromFiles delivers entries in reverse order. - for i := len(entries) - 1; i >= 0; i-- { - e := entries[i] + for _, e := range entries { if strings.Contains(e.Message, tc.expectedLogStatement+"\"") { if logCount == expectedLogCount { @@ -482,7 +479,7 @@ func TestTelemetryLogging(t *testing.T) { } var sampledQueryFromLog eventpb.SampledQuery - err = json.Unmarshal([]byte(e.Message), &sampledQueryFromLog) + err := json.Unmarshal([]byte(e.Message), &sampledQueryFromLog) require.NoError(t, err) require.Equal(t, tc.expectedSkipped[logCount], sampledQueryFromLog.SkippedQueries, "%v", e.Message) @@ -707,8 +704,14 @@ func TestTelemetryLoggingInternalEnabled(t *testing.T) { defer leaktest.AfterTest(t)() sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -748,19 +751,7 @@ func TestTelemetryLoggingInternalEnabled(t *testing.T) { `TRUNCATE TABLE system.public.transaction_statistics`, } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -797,7 +788,14 @@ func TestTelemetryLoggingInternalConsoleEnabled(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -862,16 +860,7 @@ func TestTelemetryLoggingInternalConsoleEnabled(t *testing.T) { db.Exec(t, query) log.FlushFiles() - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) } @@ -895,7 +884,14 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -963,19 +959,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { db.Exec(t, tc.query) } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1004,7 +988,14 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) @@ -1167,19 +1158,7 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) { db.Exec(t, tc.query) } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1243,7 +1222,14 @@ func TestTelemetryScanCounts(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) @@ -1422,19 +1408,7 @@ func TestTelemetryScanCounts(t *testing.T) { db.Exec(t, tc.query) } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1513,7 +1487,14 @@ func TestFunctionBodyRedacted(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) @@ -1536,19 +1517,7 @@ $$` db.Exec(t, stmt) - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1574,7 +1543,14 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -1617,19 +1593,7 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { `BEGIN`, `SELECT ‹1›`, `SELECT ‹2›`, `SELECT ‹3›`, `COMMIT`, } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) require.NotEmpty(t, entries) var expectedTxnID string @@ -1663,3 +1627,10 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { } } } + +func getSampleQueryLoggingChannel(sv *settings.Values) logpb.Channel { + if log.ShouldMigrateEvent(sv) { + return logpb.Channel_SQL_EXEC + } + return logpb.Channel_TELEMETRY +} diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 2346d75b287b..2a39a1900bd1 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -22,9 +22,19 @@ import "util/log/logpb/event.proto"; // The comment at the top has a specific format for the doc generator. // *Really look at doc.go before modifying this file.* +// TODO (#151948): Move this event definition to +// `pkg/util/log/eventpb/sql_audit_events.proto` to be in the +// `SQL Execution Log` channel once the `log.channel_compatibility_mode.enabled` +// cluster setting is set to false by default and cluster setting is set for +// removal. // SampledQuery is the SQL query event logged to the telemetry channel. It // contains common SQL event/execution details. +// +// Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `SQL_EXEC` instead of `TELEMETRY`. message SampledQuery { CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; CommonSQLEventDetails sql = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; @@ -406,7 +416,18 @@ message MVCCIteratorStats { int64 range_key_skipped_points = 13 [(gogoproto.jsontag) = ",includeempty"]; } +// TODO (#151948): Move this event definition to +// `pkg/util/log/eventpb/sql_audit_events.proto` to be in the +// `SQL Execution Log` channel once the `log.channel_compatibility_mode.enabled` +// cluster setting is set to false by default and cluster setting is set for +// removal. + // SampledTransaction is the event logged to telemetry at the end of transaction execution. +// +// Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `SQL_EXEC` instead of `TELEMETRY`. message SampledTransaction { // Common contains common event details shared by all log events. CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; diff --git a/pkg/util/log/logtestutils/structured_log_spy.go b/pkg/util/log/logtestutils/structured_log_spy.go index 9cd56d643be7..6e55fb9cd306 100644 --- a/pkg/util/log/logtestutils/structured_log_spy.go +++ b/pkg/util/log/logtestutils/structured_log_spy.go @@ -110,6 +110,11 @@ func FromLogEntry[T any](entry logpb.Entry) (T, error) { return payload, err } +func AsLogEntry(entry logpb.Entry) (logpb.Entry, error) { + entry.Message = entry.Message[entry.StructuredStart:entry.StructuredEnd] + return entry, nil +} + // StructuredLogSpy is a test utility that intercepts structured log entries // and stores them in memory. It can be used to verify the contents of log // entries in tests.