Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions shortcuts/event/concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package event

import (
"context"
"fmt"
"io"
"path/filepath"
"sync"
"testing"
"time"
)

func makeInboundEnvelopeWithEventID(eventID, eventType, eventJSON string) InboundEnvelope {
body := fmt.Sprintf(`{"schema":"2.0","header":{"event_id":"%s","event_type":"%s"},"event":%s}`, eventID, eventType, eventJSON)
return InboundEnvelope{
Source: SourceWebSocket,
ReceivedAt: nowForTest(),
RawPayload: []byte(body),
}
}

func nowForTest() time.Time {
return time.Unix(1700000000, 0).UTC()
}

func TestOutputRouterWriteRecordConcurrent(t *testing.T) {
router := &outputRouter{
defaultDir: filepath.Join(t.TempDir(), "events"),
seq: new(uint64),
writers: map[string]*dirRecordWriter{},
}

const workers = 64
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(i int) {
defer wg.Done()
if err := router.WriteRecord("im.message.receive_v1", map[string]interface{}{
"event_type": "im.message.receive_v1",
"event_id": fmt.Sprintf("evt-%03d", i),
}); err != nil {
t.Errorf("WriteRecord() error = %v", err)
}
}(i)
}
wg.Wait()
}

func TestPipelineConcurrentProcessCountsAllDispatches(t *testing.T) {
registry := NewHandlerRegistry()
if err := registry.RegisterEventHandler(handlerFuncWith{
id: "counting-handler",
eventType: "im.message.receive_v1",
fn: func(_ context.Context, evt *Event) HandlerResult {
return HandlerResult{Status: HandlerStatusHandled, Output: map[string]interface{}{"event_id": evt.EventID}}
},
}); err != nil {
t.Fatalf("RegisterEventHandler() error = %v", err)
}

p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformCompact}, io.Discard, io.Discard)

const workers = 64
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(i int) {
defer wg.Done()
p.Process(context.Background(), makeInboundEnvelopeWithEventID(
fmt.Sprintf("evt-%03d", i),
"im.message.receive_v1",
fmt.Sprintf(`{"message":{"message_id":"om_%03d"}}`, i),
))
}(i)
}
wg.Wait()

if got, want := p.EventCount(), int64(workers); got != want {
t.Fatalf("EventCount() = %d, want %d", got, want)
}
}
51 changes: 51 additions & 0 deletions shortcuts/event/deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package event

import (
"sync"
"sync/atomic"
"time"
)

const deduperCleanupInterval = 64

// Deduper suppresses repeated keys seen within a TTL window.
type Deduper struct {
ttl time.Duration
seen sync.Map // key -> time.Time
calls atomic.Uint64
}

// NewDeduper creates a deduper with the provided TTL.
func NewDeduper(ttl time.Duration) *Deduper {
return &Deduper{ttl: ttl}
}

// Seen reports whether key has already been seen within ttl and records now.
func (d *Deduper) Seen(key string, now time.Time) bool {
if d == nil || key == "" || d.ttl <= 0 {
return false
}
if d.calls.Add(1)%deduperCleanupInterval == 0 {
d.cleanup(now)
}
if v, loaded := d.seen.LoadOrStore(key, now); loaded {
if ts, ok := v.(time.Time); ok && now.Sub(ts) < d.ttl {
return true
}
d.seen.Store(key, now)
}
return false
}
Comment on lines +14 to +41
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if Deduper cleanup is implemented elsewhere or if there's a bounded lifetime
rg -nP 'Deduper|Cleanup|cleanup' --type=go -C3

Repository: larksuite/cli

Length of output: 20673


🏁 Script executed:

cat -n shortcuts/event/pipeline.go | head -100

Repository: larksuite/cli

Length of output: 3059


🏁 Script executed:

# Check how Pipeline is created and used - search for NewPipeline and Pipeline instantiation
rg -n 'NewPipeline|Pipeline{' --type=go -B2 -A2 | head -60

Repository: larksuite/cli

Length of output: 162


🏁 Script executed:

# Check event processing context - is it short-lived or long-running?
rg -n 'event\..*Pipeline\|pipeline.*Run\|pipeline.*Process' --type=go | head -30

Repository: larksuite/cli

Length of output: 39


🏁 Script executed:

# Search for where EventPipeline is created and used
rg -n 'NewEventPipeline|\.Process\(' shortcuts/event/ --type=go -B2 -A2

Repository: larksuite/cli

Length of output: 8121


🏁 Script executed:

# Check the broader context - where is event processing called from?
rg -n 'EventPipeline|event\.New' --type=go | grep -v test | head -40

Repository: larksuite/cli

Length of output: 1299


🏁 Script executed:

# Look at the entire deduper.go to understand the implementation fully
cat -n shortcuts/event/deduper.go

Repository: larksuite/cli

Length of output: 1065


Memory accumulation in sync.Map for long-running subscriptions.

In the event subscription context (shortcuts/event/subscribe.go), the EventPipeline is created once and processes events continuously via WebSocket. The Deduper stores event identifiers indefinitely—entries older than the 5-minute TTL are never evicted, only overwritten if the same key reappears. For high-volume event streams with many unique identifiers, this can accumulate significant memory over time.

Consider implementing periodic cleanup:

♻️ Cleanup approach
// Cleanup removes entries older than TTL. Call periodically from a background goroutine.
func (d *Deduper) Cleanup(now time.Time) {
	d.seen.Range(func(key, value interface{}) bool {
		if ts, ok := value.(time.Time); ok && now.Sub(ts) >= d.ttl {
			d.seen.Delete(key)
		}
		return true
	})
}

Then invoke it periodically (e.g., every ttl interval) from the event loop or a dedicated goroutine in subscribe.go.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/deduper.go` around lines 11 - 34, Deduper currently retains
keys indefinitely in the sync.Map (seen) causing memory growth; add a method
func (d *Deduper) Cleanup(now time.Time) that ranges over d.seen and deletes
entries whose timestamp is older than d.ttl, and ensure Deduper.NewDeduper or
the subscriber setup starts a background goroutine that calls
d.Cleanup(time.Now()) periodically (e.g., every d.ttl or a fraction thereof) and
stops when the pipeline/subscription shuts down so stale entries are evicted and
memory is bounded; reference Deduper, seen, Cleanup and the subscription/event
loop setup in subscribe.go to wire the periodic call and shutdown.


func (d *Deduper) cleanup(now time.Time) {
d.seen.Range(func(key, value any) bool {
ts, ok := value.(time.Time)
if ok && now.Sub(ts) >= d.ttl {
d.seen.Delete(key)
}
return true
})
}
44 changes: 44 additions & 0 deletions shortcuts/event/deduper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package event

import (
"fmt"
"testing"
"time"
)

func deduperHasKey(d *Deduper, key string) bool {
if d == nil {
return false
}
found := false
d.seen.Range(func(k, _ any) bool {
if k == key {
found = true
return false
}
return true
})
return found
}

func TestDeduperEvictsExpiredKeysDuringSteadyState(t *testing.T) {
d := NewDeduper(time.Second)
now := time.Unix(100, 0).UTC()
if d.Seen("stale", now) {
t.Fatal("first observation of stale key should not dedupe")
}

later := now.Add(2 * time.Second)
for i := 0; i < 128; i++ {
if d.Seen(fmt.Sprintf("fresh-%03d", i), later) {
t.Fatalf("fresh key %d should not dedupe on first observation", i)
}
}

if deduperHasKey(d, "stale") {
t.Fatal("stale key should be evicted after periodic cleanup")
}
}
48 changes: 48 additions & 0 deletions shortcuts/event/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package event

import "context"

// Dispatcher routes normalized events to registered handlers.
type Dispatcher struct {
registry *HandlerRegistry
}

// NewDispatcher creates a dispatcher backed by the provided registry.
func NewDispatcher(registry *HandlerRegistry) *Dispatcher {
if registry == nil {
registry = NewHandlerRegistry()
}
return &Dispatcher{registry: registry}
}

// Dispatch runs matching event handlers first, then matching domain handlers.
// Fallback is only used when no direct handlers matched.
func (d *Dispatcher) Dispatch(ctx context.Context, evt *Event) DispatchResult {
if d == nil || d.registry == nil || evt == nil {
return DispatchResult{}
}

matched := append([]EventHandler{}, d.registry.EventHandlers(evt.EventType)...)
matched = append(matched, d.registry.DomainHandlers(evt.Domain)...)
if len(matched) == 0 {
if fallback := d.registry.FallbackHandler(); fallback != nil {
matched = append(matched, fallback)
}
}

result := DispatchResult{Results: make([]DispatchRecord, 0, len(matched))}
for _, handler := range matched {
handlerResult := handler.Handle(ctx, evt)
result.Results = append(result.Results, DispatchRecord{
HandlerID: handler.ID(),
Status: handlerResult.Status,
Reason: handlerResult.Reason,
Err: handlerResult.Err,
Output: handlerResult.Output,
})
Comment on lines +37 to +45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recover handler panics inside the dispatch loop.

Handle is invoked on network-derived payloads, but a panic here will abort Dispatch and stop later handlers from running. Convert panics into HandlerStatusFailed records so one bad handler or payload does not take down the whole subscriber.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/dispatcher.go` around lines 37 - 45, In Dispatch's loop over
matched handlers, wrap the call to handler.Handle(ctx, evt) in a panic recovery
so any panic is converted into a DispatchRecord with HandlerID: handler.ID(),
Status: HandlerStatusFailed, Reason set to a short message like "panic during
handle", Err set to the recovered panic (and/or an error created from it), and
Output empty; then append that record and continue to the next handler instead
of letting the panic escape. Ensure the existing successful handlerResult path
remains unchanged and only the panic path synthesizes a DispatchRecord;
reference the Dispatch function, handler.Handle, DispatchRecord, and
HandlerStatusFailed when making the change.

}
return result
}
Loading
Loading