diff --git a/internal/logbuf/logbuf.go b/internal/logbuf/logbuf.go index ef814de..0de031e 100644 --- a/internal/logbuf/logbuf.go +++ b/internal/logbuf/logbuf.go @@ -7,9 +7,24 @@ // lines; the buffer splits them. Lines exceeding the capacity are // silently truncated to the capacity — that is, the buffer stays // bounded in memory regardless of input. +// +// Cisco-style dedup: consecutive identical lines are collapsed. The +// first occurrence lands in the ring as-is; subsequent repeats bump +// an in-memory counter without consuming a slot. When a different +// line arrives, the buffer synthesizes a "last message repeated N +// times" summary before recording the new line. Tail() also +// synthesizes a "so far" summary for any still-active run so an +// operator querying in the middle of a burst sees it. +// +// Motivated by aae-orc-1d2: Skippy's desk poll loop filed one "ssh: +// client connected" line at 0.5 Hz, saturating the ring and pushing +// every interesting event out of the 5.5-hour window. Dedup turns +// a run of 1000 identical lines into 2 ring slots (one "real" + one +// summary) without hiding the rate. package logbuf import ( + "fmt" "strings" "sync" ) @@ -20,6 +35,13 @@ type Buffer struct { mu sync.Mutex lines []string max int + + // Dedup state. When a write records a line identical to lastLine, + // the buffer bumps runCount instead of appending. When a different + // line arrives (or Flush is called), a summary line is appended + // before the new content. lastLine == "" means no run is active. + lastLine string + runCount int // 1 after initial write, 2+ when active run is accumulating } // New returns a Buffer that retains the most recent max lines. @@ -30,10 +52,11 @@ func New(max int) *Buffer { return &Buffer{max: max} } -// Write implements io.Writer. It splits the input on newlines and -// appends each non-empty line to the ring. Partial trailing lines -// (no newline) are still recorded — this matches the "tee stderr" -// use case where writes often arrive one log line at a time. +// Write implements io.Writer. It splits the input on newlines, +// deduplicates adjacent identical lines (see package doc), and +// appends the result to the ring. Partial trailing lines (no +// newline) are still recorded — this matches the "tee stderr" use +// case where writes often arrive one log line at a time. func (b *Buffer) Write(p []byte) (int, error) { n := len(p) s := string(p) @@ -54,33 +77,103 @@ func (b *Buffer) Write(p []byte) (int, error) { for _, line := range parts { // Strip carriage returns from CRLF inputs. line = strings.TrimRight(line, "\r") - b.lines = append(b.lines, line) + b.appendLocked(line) + } + b.trimLocked() + return n, nil +} + +// appendLocked records one line, applying dedup. Caller holds b.mu. +func (b *Buffer) appendLocked(line string) { + if b.runCount > 0 && line == b.lastLine { + b.runCount++ + return + } + // Run broke: emit a summary for the prior run before appending the + // new line. runCount == 1 means the prior line occurred exactly + // once — no summary needed. + if b.runCount > 1 { + b.lines = append(b.lines, summarize(b.runCount)) } + b.lines = append(b.lines, line) + b.lastLine = line + b.runCount = 1 +} + +// trimLocked enforces the capacity bound. Caller holds b.mu. +func (b *Buffer) trimLocked() { if len(b.lines) > b.max { - // Drop from the head; keep the tail. b.lines = b.lines[len(b.lines)-b.max:] } - return n, nil } -// Tail returns the most recent n lines (chronological order). If n -// exceeds the buffered count, returns everything. n<=0 returns empty. +// summarize returns the Cisco-style "last message repeated N times" +// summary line for a run of count identical lines. +func summarize(count int) string { + return fmt.Sprintf("last message repeated %d times", count-1) +} + +// Flush forces any active dedup run to emit its summary line into +// the ring. Useful for periodic flushing (a long-running identical +// message otherwise only surfaces a summary when a different line +// arrives) and for clean shutdown. +// +// Flush is idempotent: calling it twice in a row without an +// intervening Write appends nothing on the second call. +func (b *Buffer) Flush() { + b.mu.Lock() + defer b.mu.Unlock() + if b.runCount > 1 { + b.lines = append(b.lines, summarize(b.runCount)) + b.trimLocked() + // Reset the run so subsequent identical writes start fresh — + // the next occurrence re-records the line as a new run of 1, + // which is the right answer: the summary's "N times" covered + // the history up to now, and the clock starts over. + b.lastLine = "" + b.runCount = 0 + } +} + +// Tail returns the most recent n lines (chronological order). If +// a dedup run is currently active (runCount > 1 but no summary has +// been flushed), Tail synthesizes a "... so far" summary line at +// the end so the operator sees the run without having to wait for +// a Flush. The synthesized line is read-only — it doesn't touch +// the stored buffer. +// +// If n exceeds the buffered count, returns everything. n<=0 returns empty. func (b *Buffer) Tail(n int) []string { if n <= 0 { return nil } b.mu.Lock() defer b.mu.Unlock() + activeSummary := "" + if b.runCount > 1 { + activeSummary = summarize(b.runCount) + " so far" + } + total := len(b.lines) + if activeSummary != "" { + total++ + } start := 0 - if len(b.lines) > n { - start = len(b.lines) - n + if total > n { + start = total - n + } + out := make([]string, 0, total-start) + for i := start; i < len(b.lines); i++ { + out = append(out, b.lines[i]) + } + if activeSummary != "" && (start <= len(b.lines)) { + out = append(out, activeSummary) } - out := make([]string, len(b.lines)-start) - copy(out, b.lines[start:]) return out } -// Len returns the number of lines currently buffered. +// Len returns the number of lines currently stored. Does not include +// the synthesized active-run summary that Tail may append — this is +// the raw stored count. func (b *Buffer) Len() int { b.mu.Lock() defer b.mu.Unlock() diff --git a/internal/logbuf/logbuf_test.go b/internal/logbuf/logbuf_test.go index d3b6ea0..8a67494 100644 --- a/internal/logbuf/logbuf_test.go +++ b/internal/logbuf/logbuf_test.go @@ -89,10 +89,109 @@ func TestConcurrentWrites(t *testing.T) { } func TestWrite_LargeSinglePayload(t *testing.T) { + // Dedup collapses 100 identical "x" lines into 1 stored line plus + // an active run counter. Cap is irrelevant here — the win of dedup + // is precisely that an unbounded run consumes a single ring slot. b := New(3) big := strings.Repeat("x\n", 100) _, _ = b.Write([]byte(big)) - if got := b.Len(); got != 3 { - t.Errorf("ring cap violated: got %d, want 3", got) + if got := b.Len(); got != 1 { + t.Errorf("expected dedup to collapse 100 identical lines to 1 stored line, got %d", got) + } + tail := b.Tail(10) + if len(tail) != 2 || tail[0] != "x" || tail[1] != "last message repeated 99 times so far" { + t.Errorf("tail = %v, want [x, last message repeated 99 times so far]", tail) + } +} + +// TestDedup_RunBrokenBySummary: when a different line arrives, the +// prior run's summary is emitted before the new line. Cisco-style +// "last message repeated N times" — N is the count of repeats after +// the first occurrence. +func TestDedup_RunBrokenBySummary(t *testing.T) { + b := New(100) + _, _ = b.Write([]byte("A\nA\nA\nB\n")) + got := b.Tail(10) + want := []string{"A", "last message repeated 2 times", "B"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Errorf("got %v, want %v", got, want) + } +} + +// TestDedup_NonAdjacentNotDeduped: previous-line-only matching means +// A B A B doesn't collapse — each adjacent pair differs. Documented +// design choice (see _kos/ideas/log-rrd-deduplication.md). +func TestDedup_NonAdjacentNotDeduped(t *testing.T) { + b := New(100) + _, _ = b.Write([]byte("A\nB\nA\nB\n")) + got := b.Tail(10) + want := []string{"A", "B", "A", "B"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Errorf("got %v, want %v", got, want) + } +} + +// TestDedup_TailSynthesizesActiveSummary: when an operator queries +// in the middle of an active run, Tail appends a "... so far" +// summary so the run is visible without waiting for a different line. +func TestDedup_TailSynthesizesActiveSummary(t *testing.T) { + b := New(100) + _, _ = b.Write([]byte("A\nA\nA\nA\n")) + got := b.Tail(10) + want := []string{"A", "last message repeated 3 times so far"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Errorf("got %v, want %v", got, want) + } + // Tail must NOT mutate stored state — the synthesized line is + // query-time only. + if b.Len() != 1 { + t.Errorf("Tail should not store synthesized summary; Len = %d, want 1", b.Len()) + } +} + +// TestDedup_FlushEmitsSummary: Flush is the periodic cut for very +// long runs. After Flush, the summary is in the ring and the run is +// cleared so the next identical write starts a fresh run. +func TestDedup_FlushEmitsSummary(t *testing.T) { + b := New(100) + _, _ = b.Write([]byte("A\nA\nA\n")) + b.Flush() + got := b.Tail(10) + want := []string{"A", "last message repeated 2 times"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Errorf("after Flush: got %v, want %v", got, want) + } + // Subsequent identical write starts a fresh run, not continuing + // the cleared one. + _, _ = b.Write([]byte("A\n")) + got = b.Tail(10) + want = []string{"A", "last message repeated 2 times", "A"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Errorf("after Flush + new A: got %v, want %v", got, want) + } +} + +// TestDedup_FlushIdempotent: a second Flush with no intervening +// Write must not append a second summary. +func TestDedup_FlushIdempotent(t *testing.T) { + b := New(100) + _, _ = b.Write([]byte("A\nA\n")) + b.Flush() + before := b.Len() + b.Flush() + if b.Len() != before { + t.Errorf("Flush not idempotent: %d -> %d", before, b.Len()) + } +} + +// TestDedup_SingleOccurrenceNoSummary: a one-shot line followed by +// a different one must NOT emit a "repeated 0 times" summary. +func TestDedup_SingleOccurrenceNoSummary(t *testing.T) { + b := New(100) + _, _ = b.Write([]byte("A\nB\n")) + got := b.Tail(10) + want := []string{"A", "B"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Errorf("got %v, want %v", got, want) } }