Skip to content

Commit 22f815b

Browse files
craig[bot]kyle-a-wongdt
committed
151806: log: add support for migrating logs to different channels r=kyle-a-wong a=kyle-a-wong Adds a safe and backwards compatible means of moving logs from one channel to another. The new log.Migrator and log.StructuredEventMigrator provide engineers with a way of safely migrating logs from one channel to another with minimal interruption to customers. Both of these migrators require a `shouldMigrate` callback which is used to determine whether to log to a new or old logging channel. The StructuredEventMigrator only requires a new channel to be specified, which will be used instead of the channel defined on the event itself. Note that the event proto doc string should still be updated to document that it may be logged to a different channel. The Migrator struct requires an old and new channel to write to. Both of the migrators will write to the "new" channel if `shouldMigrate` returns true, otherwise it writes to the "old" channel. Part of: CRDB-53412 Epic: CRDB-53410 Release note: None 151950: backup: only flush per-node progress every 15s r=dt a=dt Previously this could queue up a large number of small changes to flush, particularly if flushing was slower than updates. Now all the updates that arrive over a 15s window are rolled up before being saved. If the channel becomes full while saving, additional messages may be dropped. Release note: none. Epic: none. Co-authored-by: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Co-authored-by: David Taylor <tinystatemachine@gmail.com>
3 parents 63f8071 + 1d9b563 + 90eb30f commit 22f815b

File tree

9 files changed

+438
-31
lines changed

9 files changed

+438
-31
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, t
106106
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
107107
kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application
108108
kv.transaction.write_pipelining.max_batch_size (alias: kv.transaction.write_pipelining_max_batch_size) integer 128 if non-zero, defines that maximum size batch that will be pipelined through Raft consensus application
109+
log.channel_compatibility_mode.enabled boolean true when true, logs will continue to log to the expected logging channels; when false, logs will be moved to new logging channels as part of a logging channel consolidation effort application
109110
obs.tablemetadata.automatic_updates.enabled boolean false enables automatic updates of the table metadata cache system.table_metadata application
110111
obs.tablemetadata.data_valid_duration duration 20m0s the duration for which the data in system.table_metadata is considered valid application
111112
schedules.backup.gc_protection.enabled boolean true enable chaining of GC protection across backups run as part of a schedule application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
<tr><td><div id="setting-kv-transaction-write-pipelining-max-batch-size" class="anchored"><code>kv.transaction.write_pipelining.max_batch_size<br />(alias: kv.transaction.write_pipelining_max_batch_size)</code></div></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
138138
<tr><td><div id="setting-kvadmission-store-provisioned-bandwidth" class="anchored"><code>kvadmission.store.provisioned_bandwidth</code></div></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to true</td><td>Dedicated/Self-Hosted</td></tr>
139139
<tr><td><div id="setting-kvadmission-store-snapshot-ingest-bandwidth-control-enabled" class="anchored"><code>kvadmission.store.snapshot_ingest_bandwidth_control.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set to true, snapshot ingests will be subject to disk write control in AC</td><td>Dedicated/Self-Hosted</td></tr>
140+
<tr><td><div id="setting-log-channel-compatibility-mode-enabled" class="anchored"><code>log.channel_compatibility_mode.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when true, logs will continue to log to the expected logging channels; when false, logs will be moved to new logging channels as part of a logging channel consolidation effort</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
140141
<tr><td><div id="setting-obs-tablemetadata-automatic-updates-enabled" class="anchored"><code>obs.tablemetadata.automatic_updates.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables automatic updates of the table metadata cache system.table_metadata</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
141142
<tr><td><div id="setting-obs-tablemetadata-data-valid-duration" class="anchored"><code>obs.tablemetadata.data_valid_duration</code></div></td><td>duration</td><td><code>20m0s</code></td><td>the duration for which the data in system.table_metadata is considered valid</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
142143
<tr><td><div id="setting-schedules-backup-gc-protection-enabled" class="anchored"><code>schedules.backup.gc_protection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/backup/backup_job.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,35 @@ func backup(
226226
}
227227
}
228228

229-
// Create a channel that is large enough that it does not block.
230-
perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, numTotalSpans)
229+
// Create a channel with a little buffering, but plan on dropping if blocked.
230+
perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, len(backupSpecs))
231231
storePerNodeProgressLoop := func(ctx context.Context) error {
232+
// track the last progress per component, periodically flushing those that
233+
// have changed to info rows.
234+
current, persisted := make(map[execinfrapb.ComponentID]float32), make(map[execinfrapb.ComponentID]float32)
235+
lastSaved := timeutil.Now()
236+
232237
for {
233238
select {
234239
case prog, ok := <-perNodeProgressCh:
235240
if !ok {
236241
return nil
237242
}
238-
jobsprofiler.StorePerNodeProcessorProgressFraction(
239-
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog)
243+
for k, v := range prog {
244+
current[k] = v
245+
}
246+
if timeutil.Since(lastSaved) > time.Second*15 {
247+
lastSaved = timeutil.Now()
248+
updates := make(map[execinfrapb.ComponentID]float32)
249+
for k := range current {
250+
if current[k] != persisted[k] {
251+
persisted[k] = current[k]
252+
updates[k] = current[k]
253+
}
254+
}
255+
jobsprofiler.StorePerNodeProcessorProgressFraction(
256+
ctx, execCtx.ExecCfg().InternalDB, job.ID(), updates)
257+
}
240258
case <-ctx.Done():
241259
return ctx.Err()
242260
}
@@ -285,11 +303,8 @@ func backup(
285303
perComponentProgress[component] = fraction
286304
}
287305
select {
288-
// This send to a buffered channel should never block but incase it does
289-
// we will fallthrough to the default case.
290306
case perNodeProgressCh <- perComponentProgress:
291-
default:
292-
log.Warningf(ctx, "skipping persisting per component progress as buffered channel was full")
307+
default: // discard the update if the flusher is backed up.
293308
}
294309

295310
// Check if we should persist a checkpoint backup manifest.

pkg/testutils/lint/passes/fmtsafe/functions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ var requireConstFmt = map[string]bool{
177177

178178
"(*github.com/cockroachdb/cockroach/pkg/cloud/amazon.awsLogAdapter).Logf": true,
179179

180+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).logfDepth": true,
181+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Infof": true,
182+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Warningf": true,
183+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Errorf": true,
184+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Fatalf": true,
185+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).VEventf": true,
186+
180187
// Error things are populated in the init() message.
181188
}
182189

pkg/util/log/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"log_entry.go",
3535
"log_flush.go",
3636
"metric.go",
37+
"migrator.go",
3738
"otlp_client.go",
3839
"redact.go",
3940
"registry.go",
@@ -76,6 +77,7 @@ go_library(
7677
"//pkg/util/log/logflags",
7778
"//pkg/util/log/logpb",
7879
"//pkg/util/log/severity",
80+
"//pkg/util/metamorphic",
7981
"//pkg/util/syncutil",
8082
"//pkg/util/sysutil",
8183
"//pkg/util/timeutil",
@@ -178,6 +180,7 @@ go_test(
178180
"intercept_test.go",
179181
"log_decoder_test.go",
180182
"main_test.go",
183+
"migrator_test.go",
181184
"otlp_client_test.go",
182185
"redact_test.go",
183186
"registry_test.go",
@@ -194,6 +197,7 @@ go_test(
194197
"//pkg/build",
195198
"//pkg/cli/exit",
196199
"//pkg/settings/cluster",
200+
"//pkg/testutils/datapathutils",
197201
"//pkg/util/caller",
198202
"//pkg/util/ctxgroup",
199203
"//pkg/util/encoding",

pkg/util/log/event_log.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -96,29 +96,7 @@ func StructuredEvent(ctx context.Context, sev logpb.Severity, event logpb.EventP
9696
func StructuredEventDepth(
9797
ctx context.Context, sev logpb.Severity, depth int, event logpb.EventPayload,
9898
) {
99-
// Populate the missing common fields.
100-
common := event.CommonDetails()
101-
if common.Timestamp == 0 {
102-
common.Timestamp = timeutil.Now().UnixNano()
103-
}
104-
if len(common.EventType) == 0 {
105-
common.EventType = logpb.GetEventTypeName(event)
106-
}
107-
108-
entry := makeStructuredEntry(ctx,
109-
sev,
110-
event.LoggingChannel(),
111-
depth+1,
112-
event)
113-
114-
if sp := getSpan(ctx); sp != nil {
115-
// Prevent `entry` from moving to the heap when this branch is not taken.
116-
heapEntry := entry
117-
eventInternal(sp, entry.sev >= severity.ERROR, &heapEntry)
118-
}
119-
120-
logger := logging.getLogger(entry.ch)
121-
logger.outputLogEntry(entry)
99+
structuredEventDepth(ctx, sev, depth+1, event.LoggingChannel(), event)
122100
}
123101

124102
// EventLog emits a structured event log and writes it to the system.eventlog
@@ -187,3 +165,31 @@ func WriteAsync() StructuredEventSettingsFunc {
187165
return o
188166
}
189167
}
168+
169+
func structuredEventDepth(
170+
ctx context.Context, sev logpb.Severity, depth int, ch Channel, event logpb.EventPayload,
171+
) {
172+
// Populate the missing common fields.
173+
common := event.CommonDetails()
174+
if common.Timestamp == 0 {
175+
common.Timestamp = timeutil.Now().UnixNano()
176+
}
177+
if len(common.EventType) == 0 {
178+
common.EventType = logpb.GetEventTypeName(event)
179+
}
180+
181+
entry := makeStructuredEntry(ctx,
182+
sev,
183+
ch,
184+
depth+1,
185+
event)
186+
187+
if sp := getSpan(ctx); sp != nil {
188+
// Prevent `entry` from moving to the heap when this branch is not taken.
189+
heapEntry := entry
190+
eventInternal(sp, entry.sev >= severity.ERROR, &heapEntry)
191+
}
192+
193+
logger := logging.getLogger(entry.ch)
194+
logger.outputLogEntry(entry)
195+
}

pkg/util/log/migrator.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package log
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/settings"
12+
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
13+
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
14+
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
15+
)
16+
17+
// ChannelCompatibilityModeEnabled controls whether certain logs are routed
18+
// to newly defined logging channels or continue to use their original ones.
19+
var ChannelCompatibilityModeEnabled = settings.RegisterBoolSetting(
20+
settings.ApplicationLevel,
21+
"log.channel_compatibility_mode.enabled",
22+
"when true, logs will continue to log to the expected logging channels; "+
23+
"when false, logs will be moved to new logging channels as part of a "+
24+
"logging channel consolidation effort",
25+
metamorphic.ConstantWithTestBool("log.channel_compatibility_mode.enabled", true),
26+
settings.WithPublic,
27+
)
28+
29+
func ShouldMigrateEvent(sv *settings.Values) bool {
30+
return !ChannelCompatibilityModeEnabled.Get(sv)
31+
}
32+
33+
// StructuredEventMigrator handles conditional routing of structured events
34+
// between old and new logging channels based on a shouldMigrate function.
35+
type StructuredEventMigrator struct {
36+
shouldMigrate func() bool
37+
newChannel Channel
38+
}
39+
40+
// NewStructuredEventMigrator creates a new StructuredEventMigrator that routes
41+
// structured events to either the specified new channel (when shouldMigrate returns
42+
// true) or to the event's original channel (when shouldMigrate returns false).
43+
func NewStructuredEventMigrator(
44+
shouldMigrate func() bool, newChannel Channel,
45+
) StructuredEventMigrator {
46+
return StructuredEventMigrator{
47+
shouldMigrate: shouldMigrate,
48+
newChannel: newChannel,
49+
}
50+
}
51+
52+
// StructuredEvent logs a structured event using the migrator's routing logic.
53+
func (sem StructuredEventMigrator) StructuredEvent(
54+
ctx context.Context, sev logpb.Severity, event logpb.EventPayload,
55+
) {
56+
sem.structuredEventDepth(ctx, sev, 1, event)
57+
}
58+
59+
// StructuredEventDepth logs a structured event with a custom stack depth
60+
// for accurate caller identification in logs.
61+
func (sem StructuredEventMigrator) StructuredEventDepth(
62+
ctx context.Context, sev logpb.Severity, depth int, event logpb.EventPayload,
63+
) {
64+
sem.structuredEventDepth(ctx, sev, depth+1, event)
65+
}
66+
67+
// structuredEventDepth is the internal implementation that performs the actual
68+
// channel routing based on the shouldMigrate function value.
69+
func (sem StructuredEventMigrator) structuredEventDepth(
70+
ctx context.Context, sev logpb.Severity, depth int, event logpb.EventPayload,
71+
) {
72+
if sem.shouldMigrate() {
73+
structuredEventDepth(ctx, sev, depth+1, sem.newChannel, event)
74+
} else {
75+
structuredEventDepth(ctx, sev, depth+1, event.LoggingChannel(), event)
76+
77+
}
78+
}
79+
80+
// Migrator handles conditional routing of formatted log messages between
81+
// deprecated and new logging channels based on a migration setting.
82+
type Migrator struct {
83+
shouldMigrate func() bool
84+
oldChannel Channel
85+
newChannel Channel
86+
}
87+
88+
// NewMigrator creates a new Migrator that routes log messages between old and new
89+
// channels based on the shouldMigrate function.
90+
func NewMigrator(shouldMigrate func() bool, oldChannel Channel, newChannel Channel) Migrator {
91+
return Migrator{shouldMigrate: shouldMigrate, oldChannel: oldChannel, newChannel: newChannel}
92+
}
93+
94+
// logfDepth is the internal helper that routes log messages to either the
95+
// new channel (when shouldMigrate returns true) or old channel (when shouldMigrate returns false).
96+
func (m Migrator) logfDepth(
97+
ctx context.Context, sev Severity, depth int, format string, args ...interface{},
98+
) {
99+
if m.shouldMigrate() {
100+
logfDepth(ctx, depth+1, sev, m.newChannel, format, args...)
101+
} else {
102+
logfDepth(ctx, depth+1, sev, m.oldChannel, format, args...)
103+
}
104+
}
105+
106+
// Infof logs an info-level message using the migrator's routing logic.
107+
func (m Migrator) Infof(ctx context.Context, format string, args ...interface{}) {
108+
m.logfDepth(ctx, severity.INFO, 1, format, args...)
109+
}
110+
111+
// Warningf logs a warning-level message using the migrator's routing logic.
112+
func (m Migrator) Warningf(ctx context.Context, format string, args ...interface{}) {
113+
m.logfDepth(ctx, severity.WARNING, 1, format, args...)
114+
}
115+
116+
// Errorf logs an error-level message using the migrator's routing logic.
117+
func (m Migrator) Errorf(ctx context.Context, format string, args ...interface{}) {
118+
m.logfDepth(ctx, severity.ERROR, 1, format, args...)
119+
}
120+
121+
// Fatalf logs a fatal-level message using the migrator's routing logic.
122+
func (m Migrator) Fatalf(ctx context.Context, format string, args ...interface{}) {
123+
m.logfDepth(ctx, severity.FATAL, 1, format, args...)
124+
}
125+
126+
func (m Migrator) VEventf(ctx context.Context, level Level, format string, args ...interface{}) {
127+
selectedChannel := m.oldChannel
128+
if m.shouldMigrate() {
129+
selectedChannel = m.newChannel
130+
}
131+
vEventf(ctx, false /* isErr */, 1, level, selectedChannel, format, args...)
132+
}

0 commit comments

Comments
 (0)