Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 107 additions & 14 deletions internal/logbuf/logbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,24 @@
// lines; the buffer splits them. Lines exceeding the capacity are
// silently truncated to the capacity — that is, the buffer stays
// bounded in memory regardless of input.
//
// Cisco-style dedup: consecutive identical lines are collapsed. The
// first occurrence lands in the ring as-is; subsequent repeats bump
// an in-memory counter without consuming a slot. When a different
// line arrives, the buffer synthesizes a "last message repeated N
// times" summary before recording the new line. Tail() also
// synthesizes a "so far" summary for any still-active run so an
// operator querying in the middle of a burst sees it.
//
// Motivated by aae-orc-1d2: Skippy's desk poll loop filed one "ssh:
// client connected" line at 0.5 Hz, saturating the ring and pushing
// every interesting event out of the 5.5-hour window. Dedup turns
// a run of 1000 identical lines into 2 ring slots (one "real" + one
// summary) without hiding the rate.
package logbuf

import (
"fmt"
"strings"
"sync"
)
Expand All @@ -20,6 +35,13 @@ type Buffer struct {
mu sync.Mutex
lines []string
max int

// Dedup state. When a write records a line identical to lastLine,
// the buffer bumps runCount instead of appending. When a different
// line arrives (or Flush is called), a summary line is appended
// before the new content. lastLine == "" means no run is active.
lastLine string
runCount int // 1 after initial write, 2+ when active run is accumulating
}

// New returns a Buffer that retains the most recent max lines.
Expand All @@ -30,10 +52,11 @@ func New(max int) *Buffer {
return &Buffer{max: max}
}

// Write implements io.Writer. It splits the input on newlines and
// appends each non-empty line to the ring. Partial trailing lines
// (no newline) are still recorded — this matches the "tee stderr"
// use case where writes often arrive one log line at a time.
// Write implements io.Writer. It splits the input on newlines,
// deduplicates adjacent identical lines (see package doc), and
// appends the result to the ring. Partial trailing lines (no
// newline) are still recorded — this matches the "tee stderr" use
// case where writes often arrive one log line at a time.
func (b *Buffer) Write(p []byte) (int, error) {
n := len(p)
s := string(p)
Expand All @@ -54,33 +77,103 @@ func (b *Buffer) Write(p []byte) (int, error) {
for _, line := range parts {
// Strip carriage returns from CRLF inputs.
line = strings.TrimRight(line, "\r")
b.lines = append(b.lines, line)
b.appendLocked(line)
}
b.trimLocked()
return n, nil
}

// appendLocked records one line, applying dedup. Caller holds b.mu.
func (b *Buffer) appendLocked(line string) {
if b.runCount > 0 && line == b.lastLine {
b.runCount++
return
}
// Run broke: emit a summary for the prior run before appending the
// new line. runCount == 1 means the prior line occurred exactly
// once — no summary needed.
if b.runCount > 1 {
b.lines = append(b.lines, summarize(b.runCount))
}
b.lines = append(b.lines, line)
b.lastLine = line
b.runCount = 1
}

// trimLocked enforces the capacity bound. Caller holds b.mu.
func (b *Buffer) trimLocked() {
if len(b.lines) > b.max {
// Drop from the head; keep the tail.
b.lines = b.lines[len(b.lines)-b.max:]
}
return n, nil
}

// Tail returns the most recent n lines (chronological order). If n
// exceeds the buffered count, returns everything. n<=0 returns empty.
// summarize returns the Cisco-style "last message repeated N times"
// summary line for a run of count identical lines.
func summarize(count int) string {
return fmt.Sprintf("last message repeated %d times", count-1)
}

// Flush forces any active dedup run to emit its summary line into
// the ring. Useful for periodic flushing (a long-running identical
// message otherwise only surfaces a summary when a different line
// arrives) and for clean shutdown.
//
// Flush is idempotent: calling it twice in a row without an
// intervening Write appends nothing on the second call.
func (b *Buffer) Flush() {
b.mu.Lock()
defer b.mu.Unlock()
if b.runCount > 1 {
b.lines = append(b.lines, summarize(b.runCount))
b.trimLocked()
// Reset the run so subsequent identical writes start fresh —
// the next occurrence re-records the line as a new run of 1,
// which is the right answer: the summary's "N times" covered
// the history up to now, and the clock starts over.
b.lastLine = ""
b.runCount = 0
}
}

// Tail returns the most recent n lines (chronological order). If
// a dedup run is currently active (runCount > 1 but no summary has
// been flushed), Tail synthesizes a "... so far" summary line at
// the end so the operator sees the run without having to wait for
// a Flush. The synthesized line is read-only — it doesn't touch
// the stored buffer.
//
// If n exceeds the buffered count, returns everything. n<=0 returns empty.
func (b *Buffer) Tail(n int) []string {
if n <= 0 {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
activeSummary := ""
if b.runCount > 1 {
activeSummary = summarize(b.runCount) + " so far"
}
total := len(b.lines)
if activeSummary != "" {
total++
}
start := 0
if len(b.lines) > n {
start = len(b.lines) - n
if total > n {
start = total - n
}
out := make([]string, 0, total-start)
for i := start; i < len(b.lines); i++ {
out = append(out, b.lines[i])
}
if activeSummary != "" && (start <= len(b.lines)) {
out = append(out, activeSummary)
}
out := make([]string, len(b.lines)-start)
copy(out, b.lines[start:])
return out
}

// Len returns the number of lines currently buffered.
// Len returns the number of lines currently stored. Does not include
// the synthesized active-run summary that Tail may append — this is
// the raw stored count.
func (b *Buffer) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down
103 changes: 101 additions & 2 deletions internal/logbuf/logbuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,109 @@ func TestConcurrentWrites(t *testing.T) {
}

func TestWrite_LargeSinglePayload(t *testing.T) {
// Dedup collapses 100 identical "x" lines into 1 stored line plus
// an active run counter. Cap is irrelevant here — the win of dedup
// is precisely that an unbounded run consumes a single ring slot.
b := New(3)
big := strings.Repeat("x\n", 100)
_, _ = b.Write([]byte(big))
if got := b.Len(); got != 3 {
t.Errorf("ring cap violated: got %d, want 3", got)
if got := b.Len(); got != 1 {
t.Errorf("expected dedup to collapse 100 identical lines to 1 stored line, got %d", got)
}
tail := b.Tail(10)
if len(tail) != 2 || tail[0] != "x" || tail[1] != "last message repeated 99 times so far" {
t.Errorf("tail = %v, want [x, last message repeated 99 times so far]", tail)
}
}

// TestDedup_RunBrokenBySummary: when a different line arrives, the
// prior run's summary is emitted before the new line. Cisco-style
// "last message repeated N times" — N is the count of repeats after
// the first occurrence.
func TestDedup_RunBrokenBySummary(t *testing.T) {
b := New(100)
_, _ = b.Write([]byte("A\nA\nA\nB\n"))
got := b.Tail(10)
want := []string{"A", "last message repeated 2 times", "B"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
}

// TestDedup_NonAdjacentNotDeduped: previous-line-only matching means
// A B A B doesn't collapse — each adjacent pair differs. Documented
// design choice (see _kos/ideas/log-rrd-deduplication.md).
func TestDedup_NonAdjacentNotDeduped(t *testing.T) {
b := New(100)
_, _ = b.Write([]byte("A\nB\nA\nB\n"))
got := b.Tail(10)
want := []string{"A", "B", "A", "B"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
}

// TestDedup_TailSynthesizesActiveSummary: when an operator queries
// in the middle of an active run, Tail appends a "... so far"
// summary so the run is visible without waiting for a different line.
func TestDedup_TailSynthesizesActiveSummary(t *testing.T) {
b := New(100)
_, _ = b.Write([]byte("A\nA\nA\nA\n"))
got := b.Tail(10)
want := []string{"A", "last message repeated 3 times so far"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
// Tail must NOT mutate stored state — the synthesized line is
// query-time only.
if b.Len() != 1 {
t.Errorf("Tail should not store synthesized summary; Len = %d, want 1", b.Len())
}
}

// TestDedup_FlushEmitsSummary: Flush is the periodic cut for very
// long runs. After Flush, the summary is in the ring and the run is
// cleared so the next identical write starts a fresh run.
func TestDedup_FlushEmitsSummary(t *testing.T) {
b := New(100)
_, _ = b.Write([]byte("A\nA\nA\n"))
b.Flush()
got := b.Tail(10)
want := []string{"A", "last message repeated 2 times"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("after Flush: got %v, want %v", got, want)
}
// Subsequent identical write starts a fresh run, not continuing
// the cleared one.
_, _ = b.Write([]byte("A\n"))
got = b.Tail(10)
want = []string{"A", "last message repeated 2 times", "A"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("after Flush + new A: got %v, want %v", got, want)
}
}

// TestDedup_FlushIdempotent: a second Flush with no intervening
// Write must not append a second summary.
func TestDedup_FlushIdempotent(t *testing.T) {
b := New(100)
_, _ = b.Write([]byte("A\nA\n"))
b.Flush()
before := b.Len()
b.Flush()
if b.Len() != before {
t.Errorf("Flush not idempotent: %d -> %d", before, b.Len())
}
}

// TestDedup_SingleOccurrenceNoSummary: a one-shot line followed by
// a different one must NOT emit a "repeated 0 times" summary.
func TestDedup_SingleOccurrenceNoSummary(t *testing.T) {
b := New(100)
_, _ = b.Write([]byte("A\nB\n"))
got := b.Tail(10)
want := []string{"A", "B"}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
}
Loading