Skip to content

feat: implement event bus subscription and dispatch pipeline#279

Open
niuchong0523 wants to merge 1 commit intolarksuite:mainfrom
niuchong0523:feat/event-bus-subscription-dispatch
Open

feat: implement event bus subscription and dispatch pipeline#279
niuchong0523 wants to merge 1 commit intolarksuite:mainfrom
niuchong0523:feat/event-bus-subscription-dispatch

Conversation

@niuchong0523
Copy link
Copy Markdown
Collaborator

@niuchong0523 niuchong0523 commented Apr 6, 2026

Summary

  • implement the event +subscribe event bus pipeline, including inbound envelope normalization, event matching, domain resolution, deduplication, dispatch, and output routing
  • wire the existing IM event processors into the new handler-based dispatch flow while preserving raw and compact output modes, catch-all subscription behavior, and routed/JSON output behavior
  • switch embedded registry loading to depend only on meta_data_default.json, while keeping remote metadata overlay available at runtime

Test Plan

  • go test ./shortcuts/event/...
  • go test ./internal/registry/...
  • go test ./...

Summary by CodeRabbit

  • New Features

    • Added Calendar API baseline (calendar service with calendars/events operations).
    • Introduced a comprehensive event subsystem: inbound event normalization, domain resolution, handler registry & dispatch, processing pipeline, routing/record writing, and a deduper to suppress duplicates.
  • Chores

    • Updated ignore rules to exclude an internal registry metadata file.
  • Tests

    • Extensive unit, integration, concurrency, and end-to-end tests covering normalization, dispatch, dedupe, routing, and subscription flows.

@github-actions github-actions bot added the size/XL Architecture-level or global-impact change label Apr 6, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Refactors the event pipeline to a handler/dispatcher model: adds inbound envelope normalization, domain resolution, TTL deduper, HandlerRegistry/Dispatcher, handler-aware processors, routing/output writers, a calendar service in default registry metadata, and adjusts embedded registry embedding and related tests.

Changes

Cohort / File(s) Summary
Repo ignore & embedded registry
\.gitignore, internal/registry/loader.go, internal/registry/loader_embedded.go, internal/registry/meta_data_default.json, internal/registry/remote_test.go
Ignore internal/registry/meta_data.json; remove embed.FS fallback and inline initialization; relocate/streamline embeddedMetaJSON usage and comments; add calendar service to default metadata; add helper to detect embedded baseline in tests.
Envelope & normalization
shortcuts/event/envelope.go, shortcuts/event/normalizer.go, shortcuts/event/normalizer_test.go
Introduce InboundEnvelope, normalized Event model, MalformedEventError, NormalizeEnvelope (schema/event_type validation, idempotency key), and comprehensive tests for success and error cases.
Domain matching & raw-type matching
shortcuts/event/domain_resolver.go, shortcuts/event/domain_resolver_test.go, shortcuts/event/matcher.go, shortcuts/event/domain_resolver_test.go
Add ResolveDomain prefix→domain mapping and MatchRawEventType/RawEventMatch utilities with unit tests for known, aliased, and unknown prefixes.
Deduper & tests
shortcuts/event/deduper.go, shortcuts/event/deduper_test.go
Add TTL-based Deduper using sync.Map and an atomic call counter, periodic cleanup heuristic, constructor and Seen(key, now) semantics, plus eviction/steady-state tests.
Handler contracts & registry
shortcuts/event/handlers.go, shortcuts/event/registry.go
Define HandlerStatus, HandlerResult, Dispatch types and EventHandler interface; add HandlerRegistry with event/domain handler maps, fallback handler, ID uniqueness checks, and builtin registry constructor.
Dispatcher & tests
shortcuts/event/dispatcher.go, shortcuts/event/dispatcher_test.go
Add Dispatcher that queries registry for event then domain handlers, optionally fallback, invokes handlers in order, aggregates DispatchResult; tests cover ordering, fallback behavior, error isolation, and registry invariants.
Pipeline & processing flow
shortcuts/event/pipeline.go, shortcuts/event/processor.go, shortcuts/event/processor_*, shortcuts/event/processor_test.go, shortcuts/event/processor_im_*.go
Refactor pipeline to normalize→match→resolve→filter→dedupe→dispatch; Process now accepts InboundEnvelope; introduce PrettyJSON flag; bridge helpers (transformViaHandler, legacyEventFromRaw); migrate many processors to EventHandler API (ID/Domain/Handle) and centralize compact-output builders; adjust many tests to handler-driven flows.
Routing & record writers
shortcuts/event/router.go, shortcuts/event/router_permissions_test.go, shortcuts/event/concurrency_test.go
Add OutputRecordWriter interface, outputRouter and dirRecordWriter with deterministic filenames, atomic writes, private dir/file permissions, a concurrent-safe writer cache, and tests for permissions and concurrency.
Subscribe CLI & integration tests
shortcuts/event/subscribe.go, shortcuts/event/subscribe_test.go, shortcuts/event/subscribe_e2e_test.go
Switch WebSocket handling to BuildWebSocketEnvelope, introduce wsClient abstraction, new pipeline wiring with optional outputRouter/recordWriter, configurable subscribed event types; add unit and E2E tests for subscription, routing, and CLI flags.
Tests: concurrency & assorted
shortcuts/event/*_test.go
Large set of new and refactored tests: normalization, deduper eviction, dispatcher ordering, pipeline concurrency, router concurrency and permissions, subscribe integration/E2E, and many processor→handler-focused unit tests.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Pipeline as Pipeline
    participant Normalizer as Normalizer
    participant Deduper as Deduper
    participant Dispatcher as Dispatcher
    participant Registry as HandlerRegistry
    participant Handler as EventHandler

    Client->>Pipeline: Process(InboundEnvelope)
    Pipeline->>Normalizer: NormalizeEnvelope(env)
    Normalizer->>Normalizer: validate schema & event_type
    Normalizer->>Normalizer: resolve Domain
    Normalizer-->>Pipeline: *Event
    Pipeline->>Deduper: Seen(IdempotencyKey, ReceivedAt)
    alt duplicate
        Deduper-->>Pipeline: true (skip)
    else new
        Deduper-->>Pipeline: false
        Pipeline->>Dispatcher: Dispatch(ctx, *Event)
        Dispatcher->>Registry: EventHandlers(eventType)
        Registry-->>Dispatcher: []EventHandler
        Dispatcher->>Registry: DomainHandlers(domain)
        Registry-->>Dispatcher: []EventHandler
        loop for each handler
            Dispatcher->>Handler: Handle(ctx, *Event)
            Handler-->>Dispatcher: HandlerResult
            Dispatcher->>Dispatcher: append DispatchRecord
        end
        Dispatcher-->>Pipeline: DispatchResult
        Pipeline->>Pipeline: format/write output via OutputRecordWriter / stdout
    end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Suggested labels

domain/im, domain/calendar

Suggested reviewers

  • liangshuo-1
  • tuxedomm

Poem

🐰 Hopping through envelopes and queues,
Handlers line up to chase the news,
Deduper nibbles repeats away,
Dispatcher sends each hop on its stray,
A calendar service hops in to play — hooray!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.34% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: implementing an event bus subscription and dispatch pipeline as the primary feature of this PR.
Description check ✅ Passed The PR description covers the main objectives and changes, but the Test Plan section uses checkmarks for incomplete items (all marked completed) without explicit test results or evidence.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

🚀 PR Preview Install Guide

🧰 CLI update

npm i -g https://pkg.pr.new/larksuite/cli/@larksuite/cli@5c27be2a0b93a2aea2022d39ca53a2616f69f95a

🧩 Skill update

npx skills add niuchong0523/cli#feat/event-bus-subscription-dispatch -y -g

@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Apr 6, 2026

Greptile Summary

This PR introduces the full event +subscribe pipeline — inbound envelope normalization, domain resolution, handler/processor dispatch, TTL-based deduplication, regex routing, and output record writing — and wires the existing IM event processors into the new handler-based flow. The subsystem is well-structured and thoroughly tested. Two minor points remain: EnsureDirs() is exported but never invoked (directories are created lazily already), and the Deduper has a narrow concurrent-expiry window where two goroutines seeing the same stale key can both pass deduplication simultaneously.

Confidence Score: 5/5

Safe to merge; all remaining findings are P2 style/quality issues that do not block correctness.

All P0/P1 issues from previous review threads have been addressed (quiet mode now respects errOut, deduper has periodic cleanup). The two remaining observations — an unused exported function and a narrow concurrent-expiry race in the deduper — are P2. Per the confidence guidance, a PR with only P2 findings scores 5.

shortcuts/event/pipeline.go (EnsureDirs unused) and shortcuts/event/deduper.go (expiry race) are worth a second look, but neither blocks merge.

Important Files Changed

Filename Overview
shortcuts/event/pipeline.go Core processing pipeline wiring normalize→filter→dedupe→dispatch; EnsureDirs exported but never called
shortcuts/event/deduper.go TTL-based deduplicator with periodic cleanup; narrow concurrent-expiry race allows a duplicate through
shortcuts/event/dispatcher.go Routes events to registered event-type, domain, and fallback handlers cleanly
shortcuts/event/router.go Regex-based event routing to output directories with atomic file writes and safe path validation
shortcuts/event/subscribe.go WebSocket subscribe command wiring filters, pipeline, and SDK dispatcher with single-instance lock
shortcuts/event/normalizer.go Converts raw envelopes to normalized Event model with schema validation and SHA-256 fingerprint fallback
shortcuts/event/registry.go Dual-registry (ProcessorRegistry + HandlerRegistry) with duplicate-ID guard and builtin runtime wiring
shortcuts/event/processor_im_message.go im.message.receive_v1 handler with compact conversion via convertlib and interactive-message fallback
shortcuts/event/processor_im_chat.go im.chat.updated/disbanded handlers with compact before/after-change field extraction
shortcuts/event/processor_im_chat_member.go Bot and user member add/remove handlers sharing a single payload struct
shortcuts/event/processor_im_message_reaction.go Reaction created/deleted handler mapping emoji and operator to compact output
shortcuts/event/processor_im_message_read.go Message read event handler extracting reader_id and message_id_list
shortcuts/event/processor_generic.go Generic fallback handler producing compact or raw output for unregistered event types
shortcuts/event/domain_resolver.go Prefix-based domain resolution mapping event_type to routing domain
shortcuts/event/envelope.go Core event data types: InboundEnvelope, Event, EventHeader, MalformedEventError
shortcuts/event/matcher.go Thin wrapper that validates and extracts event_type from a normalized Event
shortcuts/event/handlers.go Handler interfaces and HandlerResult/DispatchRecord/DispatchResult types
shortcuts/event/processor.go EventProcessor interface and legacy transformViaHandler bridge for backward compatibility

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    WS[WebSocket SDK] -->|raw body bytes| ENV[BuildWebSocketEnvelope]
    ENV --> NORM[NormalizeEnvelope]
    NORM -->|MalformedEventError| FALL[malformedFallbackEvent]
    NORM -->|*Event| MATCH[MatchRawEventType]
    FALL --> MATCH
    MATCH -->|ok=false| DISP
    MATCH -->|ok=true| FILTER[FilterChain.Allow]
    FILTER -->|blocked| DROP1[drop]
    FILTER -->|allowed| DEDUP[Deduper.Seen]
    DEDUP -->|duplicate| DROP2[log dedup + drop]
    DEDUP -->|new| DISP[Dispatcher.Dispatch]
    DISP -->|EventHandlers + DomainHandlers| HREG[HandlerRegistry]
    HREG -->|no match| FB[FallbackHandler]
    DISP --> RESULTS[DispatchResult.Results]
    RESULTS --> MODE{TransformMode}
    MODE -->|Raw| RAW[rawModeRecord]
    MODE -->|Compact| COMPACT[compactModeRecord]
    RAW --> WRITER{recordWriter?}
    COMPACT --> WRITER
    WRITER -->|yes| ROUTER[outputRouter]
    WRITER -->|no| STDOUT[writeRecord stdout]
    ROUTER -->|route match| DIR[dirRecordWriter → .json file]
    ROUTER -->|no match| FALLBACK[ndjson fallback → stdout]
Loading

Reviews (6): Last reviewed commit: "feat: implement event bus subscription a..." | Re-trigger Greptile

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
shortcuts/event/processor_generic.go (1)

40-54: ⚠️ Potential issue | 🟠 Major

Handle null payloads before writing into eventMap.

json.Unmarshal([]byte("null"), &eventMap) returns nil with eventMap == nil, so the first assignment here panics. event: null is already treated as valid elsewhere in this PR; the generic compact path should initialize an empty map first.

🛠️ Minimal guard
 func genericCompactMap(raw *RawEvent) interface{} {
 	// Compact: parse event as flat map, inject envelope metadata so AI
 	// can always identify the event type regardless of which processor ran.
 	var eventMap map[string]interface{}
 	if err := json.Unmarshal(raw.Event, &eventMap); err != nil {
 		return raw
 	}
+	if eventMap == nil {
+		eventMap = map[string]interface{}{}
+	}
 	eventMap["type"] = raw.Header.EventType
 	if raw.Header.EventID != "" {
 		eventMap["event_id"] = raw.Header.EventID
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_generic.go` around lines 40 - 54, genericCompactMap
can panic when raw.Event is the JSON literal null because json.Unmarshal leaves
eventMap as nil; update genericCompactMap (and use RawEvent) to initialize
eventMap to an empty map before/after unmarshalling if nil (e.g., set eventMap =
map[string]interface{}{} when json.Unmarshal returns nil and eventMap == nil) so
subsequent assignments to eventMap["type"], ["event_id"], and ["timestamp"]
never panic; keep the rest of the logic intact.
🧹 Nitpick comments (5)
shortcuts/event/normalizer.go (1)

40-50: Minor: ResolveDomain call creates intermediate Event object.

Line 44 creates a temporary Event just to resolve the domain. If ResolveDomain accepted a string eventType parameter directly, this allocation could be avoided. This is a minor optimization opportunity for a hot path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/normalizer.go` around lines 40 - 50, The ResolveDomain call
is creating a temporary Event just to read EventType; change ResolveDomain to
accept an eventType string (e.g., ResolveDomain(eventType string)) and update
this call in normalizer.go to pass raw.Header.EventType directly instead of
constructing &Event{EventType: ...}; then update the ResolveDomain
implementation and any other callers (or provide an overload/helper
ResolveDomainFromEvent(e *Event) that delegates to the new
ResolveDomain(eventType string)) so all call sites compile and no extra Event
allocation occurs on the hot path.
shortcuts/event/subscribe_test.go (1)

27-40: Test relies on exact SDK error message format.

The assertion on line 37 depends on the exact error message string from the SDK. If the SDK updates its error format, this test will fail. Consider using strings.Contains for the key parts or documenting that this test intentionally validates SDK behavior.

♻️ More resilient assertion
+import "strings"
+
 func TestCatchAllUsesSDKDefaultCustomizedHandler(t *testing.T) {
 	eventDispatcher := dispatcher.NewEventDispatcher("", "")
 	eventDispatcher.OnCustomizedEvent("", func(_ context.Context, _ *larkevent.EventReq) error {
 		return nil
 	})

 	_, err := eventDispatcher.Do(context.Background(), []byte(`{"header":{"event_type":"contact.user.created_v3"}}`))
 	if err == nil {
 		t.Fatal("dispatcher.Do() error = nil, want not found because parse() still routes by concrete event type")
 	}
-	if err.Error() != "event type: contact.user.created_v3, not found handler" {
-		t.Fatalf("dispatcher.Do() error = %v, want concrete event type not found", err)
-	}
+	if !strings.Contains(err.Error(), "not found handler") {
+		t.Fatalf("dispatcher.Do() error = %v, want error containing 'not found handler'", err)
+	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/subscribe_test.go` around lines 27 - 40, Update
TestCatchAllUsesSDKDefaultCustomizedHandler to avoid relying on the exact SDK
error string: instead of comparing err.Error() for exact equality, assert that
err is non-nil and that err.Error() contains the key substrings (e.g.,
"contact.user.created_v3" and "not found") using strings.Contains; update
imports to include "strings" if needed. This keeps the test resilient if the SDK
tweaks formatting while still verifying the concrete event type was not found
when calling dispatcher.Do.
shortcuts/event/domain_resolver.go (1)

10-24: Add a comment documenting the order-sensitivity of prefix matching.

The ResolveDomain function uses first-match semantics. The current ordering is correct (e.g., bitable.base works because it's listed explicitly), but reordering could break alias mappings. A brief comment would help prevent future regressions.

📝 Suggested documentation
+// domainPrefixes maps event type prefixes to routing domains.
+// Order matters: first matching prefix wins. Aliases (e.g., bitable → base)
+// must be listed explicitly since they don't share a common prefix with their target domain.
 var domainPrefixes = []struct {
 	prefix string
 	domain string
 }{
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/domain_resolver.go` around lines 10 - 24, The domainPrefixes
slice relies on first-match, order-sensitive prefix matching used by
ResolveDomain; add a brief comment above the domainPrefixes declaration stating
that entries are checked in order and that longer/specific prefixes must come
before shorter/ambiguous ones (e.g., "bitable." must be listed before "base."),
so future reordering won't break alias mappings; reference domainPrefixes and
ResolveDomain in the comment for clarity.
shortcuts/event/matcher.go (1)

6-22: Consider removing redundant Matched field.

The Matched field in RawEventMatch appears redundant with the ok return value. Both convey whether a match occurred. If Matched is intended for future use cases or API consistency, a brief comment explaining its purpose would help.

♻️ Simplified alternative (if Matched is not needed elsewhere)
 // RawEventMatch captures the exact raw event type extracted from a normalized event.
 type RawEventMatch struct {
 	EventType string
-	Matched   bool
 }

 // MatchRawEventType returns the exact event type when present on the normalized event.
 func MatchRawEventType(evt *Event) (RawEventMatch, bool) {
 	if evt == nil || evt.EventType == "" {
 		return RawEventMatch{}, false
 	}

 	return RawEventMatch{
 		EventType: evt.EventType,
-		Matched:   true,
 	}, true
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/matcher.go` around lines 6 - 22, The struct RawEventMatch
contains a redundant Matched field because MatchRawEventType already returns a
boolean ok to indicate a match; remove the Matched field from RawEventMatch and
update MatchRawEventType to return only EventType (or return RawEventMatch
without Matched) so the match presence is conveyed by the boolean return value
from MatchRawEventType; if you want to keep Matched for API consistency, add a
short comment above Matched explaining its purpose and when it differs from the
boolean return value so callers of RawEventMatch and the MatchRawEventType
function understand the intent.
shortcuts/event/subscribe.go (1)

52-75: Generate the catch-all subscription list from the builtin registry.

subscribedEventTypes duplicates the handler set in shortcuts/event/registry.go:79-97. If a new builtin handler is added without touching this slice, catch-all mode will silently never receive that event from the SDK. Deriving this list from the registry removes that drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/subscribe.go` around lines 52 - 75, Replace the hard-coded
subscribedEventTypes slice with a derived list built from the builtin handler
registry: call NewBuiltinHandlerRegistry (or the appropriate registry
constructor in shortcuts/event/registry.go), iterate its registered handler
types (or use its exported method that returns event types) to produce the
catch-all subscription list, and return that list from subscribedEventTypesFor
(or expose a new function that returns it). Ensure you remove the duplicate
literal slice, keep the original comment about alignment, and preserve the same
ordering/format expected by the SDK when constructing the returned []string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/registry/loader_embedded.go`:
- Around line 8-9: The embedded build currently only includes
meta_data_default.json via the embeddedMetaJSON variable so
loadEmbeddedIntoMerged() only sees the tiny baseline; update the build/fetch
pipeline or embedding so the fetched catalog is used as the cold-start source:
either have scripts/fetch_meta.py regenerate meta_data_default.json with the
full fetched catalog, or change the embedding to include the fetched
internal/registry/meta_data.json (or embed both and modify
loadEmbeddedIntoMerged() to prefer meta_data.json when present); reference
embeddedMetaJSON, loadEmbeddedIntoMerged(), and scripts/fetch_meta.py when
making the change.

In `@internal/registry/remote_test.go`:
- Around line 78-82: TestColdStart_UsesEmbedded calls resetInit() while a
background refresh goroutine may still be executing remoteMetaURL(), causing
races on refreshOnce, configuredBrand, and testMetaURL; change the test and
resetInit() usage to wait deterministically for any prior refresh goroutine to
finish instead of proceeding immediately. Add or use a test hook (e.g., a
waitForRefresh()/waitCh/WG) that the refresh goroutine signals when it
completes, update the test to call that hook before invoking resetInit(), and
ensure resetInit() only mutates the package globals (refreshOnce,
configuredBrand, testMetaURL) after the wait completes so races are eliminated.

In `@shortcuts/event/deduper.go`:
- Around line 11-34: Deduper currently retains keys indefinitely in the sync.Map
(seen) causing memory growth; add a method func (d *Deduper) Cleanup(now
time.Time) that ranges over d.seen and deletes entries whose timestamp is older
than d.ttl, and ensure Deduper.NewDeduper or the subscriber setup starts a
background goroutine that calls d.Cleanup(time.Now()) periodically (e.g., every
d.ttl or a fraction thereof) and stops when the pipeline/subscription shuts down
so stale entries are evicted and memory is bounded; reference Deduper, seen,
Cleanup and the subscription/event loop setup in subscribe.go to wire the
periodic call and shutdown.

In `@shortcuts/event/dispatcher.go`:
- Around line 37-45: In Dispatch's loop over matched handlers, wrap the call to
handler.Handle(ctx, evt) in a panic recovery so any panic is converted into a
DispatchRecord with HandlerID: handler.ID(), Status: HandlerStatusFailed, Reason
set to a short message like "panic during handle", Err set to the recovered
panic (and/or an error created from it), and Output empty; then append that
record and continue to the next handler instead of letting the panic escape.
Ensure the existing successful handlerResult path remains unchanged and only the
panic path synthesizes a DispatchRecord; reference the Dispatch function,
handler.Handle, DispatchRecord, and HandlerStatusFailed when making the change.

In `@shortcuts/event/pipeline.go`:
- Around line 58-63: NewDeduper currently stores keys indefinitely so
EventPipeline's deduper grows without bound; update the Deduper implementation
used by NewDeduper so that Deduper.Seen checks an entry's timestamp and treats
expired entries as missing (deleting them), and add either a background janitor
goroutine to periodically purge entries older than dedupTTL or switch to a
TTL-capable cache implementation; ensure NewDeduper accepts the dedupTTL and
Deduper.Seen both returns true for recent keys and removes/ignores expired keys
so memory is bounded to the configured window.
- Around line 170-191: rawModeRecord currently omits evt.Metadata so
malformedFallbackEvent's metadata (e.g., malformed_reason/normalization_error)
is lost in --raw output; update rawModeRecord to include evt.Metadata under the
same "metadata" key used by the routed writer (only add the field when
evt.Metadata is non-nil/empty) so the raw-mode map produced by rawModeRecord
matches the routed raw output and includes normalization/malformed info.

In `@shortcuts/event/processor_im_message_reaction.go`:
- Around line 53-55: The compact output helper currently swallows
marshal/unmarshal problems causing dropped fields; change
imMessageReactionCompactOutput to return an error (or an ok flag) when decoding
fails, and update ImMessageReactionProcessor.Handle to call the compact helper
and, on any error or missing required fields (e.g., message_id, emoji_type,
operator_id), fall back to the full/raw/generic output (e.g.,
imMessageReactionFullOutput or the raw evt payload) instead of silently
returning the compact result; apply the same change to the other handler usage
referenced around the 87-95 region so decoding failures always produce a
fallback output.

In `@shortcuts/event/processor_im_message_read.go`:
- Around line 30-32: The Handle method in ImMessageReadProcessor always returns
HandlerStatusHandled and calls imMessageReadCompactOutput which swallows
marshal/unmarshal errors, so decoding failures are silently dropped; modify
ImMessageReadProcessor.Handle to detect decode errors from
imMessageReadCompactOutput (or make imMessageReadCompactOutput return (output,
error)), and on error return a non-success HandlerResult (e.g.,
HandlerStatusFailed or the same fallback used by Transform) and include the
raw/generic output instead of the compact output; update any call sites to
handle the new error return and ensure HandlerResult construction uses the
appropriate status and payload when imMessageReadCompactOutput fails.

In `@shortcuts/event/processor_im_message.go`:
- Around line 95-100: The builder write to os.Stderr in
buildIMMessageCompactOutput bypasses EventPipeline.errOut/Quiet; remove the
fmt.Fprintf(os.Stderr...) call and stop emitting the hint inside
buildIMMessageCompactOutput so the function simply returns nil, false for
interactive messages, and update the caller(s) that invoke
buildIMMessageCompactOutput to detect the false return and emit the hint via
EventPipeline.errOut (so the pipeline's Quiet setting is respected) instead of
letting the builder print directly.

In `@shortcuts/event/router.go`:
- Around line 108-116: Protect concurrent access to the r.writers map by adding
a mutex (e.g., writersMu sync.Mutex) to the router struct and use it in the loop
that iterates dirs: lock before reading r.writers[dir], if nil create the
dirRecordWriter while still holding the lock and assign it to r.writers[dir],
then unlock before calling writer.WriteRecord so the I/O isn't done under the
lock. Also avoid sharing the same seq pointer between writers: when
instantiating dirRecordWriter in the creation branch, copy the seq value from
r.seq (or otherwise produce a distinct sequence owner) instead of assigning the
shared pointer so file-name collisions cannot occur. Ensure references:
r.writers, writersMu, dirRecordWriter, r.seq, and writer.WriteRecord are updated
accordingly.

In `@shortcuts/event/subscribe.go`:
- Around line 197-209: The fallback writer for routed output is hard-coded to
ndjsonRecordWriter which bypasses PipelineConfig.PrettyJSON; update the
construction of recordWriter in the branch that builds &outputRouter so the
fallback honors PrettyJSON: determine the appropriate fallback writer based on
PipelineConfig.PrettyJSON (or the flag that drives it) instead of always using
ndjsonRecordWriter — e.g., choose a pretty/json record writer when
PipelineConfig.PrettyJSON is true and pass that as fallback to outputRouter
(symbols: recordWriter, outputRouter, ndjsonRecordWriter,
PipelineConfig.PrettyJSON, writeRecord).

---

Outside diff comments:
In `@shortcuts/event/processor_generic.go`:
- Around line 40-54: genericCompactMap can panic when raw.Event is the JSON
literal null because json.Unmarshal leaves eventMap as nil; update
genericCompactMap (and use RawEvent) to initialize eventMap to an empty map
before/after unmarshalling if nil (e.g., set eventMap = map[string]interface{}{}
when json.Unmarshal returns nil and eventMap == nil) so subsequent assignments
to eventMap["type"], ["event_id"], and ["timestamp"] never panic; keep the rest
of the logic intact.

---

Nitpick comments:
In `@shortcuts/event/domain_resolver.go`:
- Around line 10-24: The domainPrefixes slice relies on first-match,
order-sensitive prefix matching used by ResolveDomain; add a brief comment above
the domainPrefixes declaration stating that entries are checked in order and
that longer/specific prefixes must come before shorter/ambiguous ones (e.g.,
"bitable." must be listed before "base."), so future reordering won't break
alias mappings; reference domainPrefixes and ResolveDomain in the comment for
clarity.

In `@shortcuts/event/matcher.go`:
- Around line 6-22: The struct RawEventMatch contains a redundant Matched field
because MatchRawEventType already returns a boolean ok to indicate a match;
remove the Matched field from RawEventMatch and update MatchRawEventType to
return only EventType (or return RawEventMatch without Matched) so the match
presence is conveyed by the boolean return value from MatchRawEventType; if you
want to keep Matched for API consistency, add a short comment above Matched
explaining its purpose and when it differs from the boolean return value so
callers of RawEventMatch and the MatchRawEventType function understand the
intent.

In `@shortcuts/event/normalizer.go`:
- Around line 40-50: The ResolveDomain call is creating a temporary Event just
to read EventType; change ResolveDomain to accept an eventType string (e.g.,
ResolveDomain(eventType string)) and update this call in normalizer.go to pass
raw.Header.EventType directly instead of constructing &Event{EventType: ...};
then update the ResolveDomain implementation and any other callers (or provide
an overload/helper ResolveDomainFromEvent(e *Event) that delegates to the new
ResolveDomain(eventType string)) so all call sites compile and no extra Event
allocation occurs on the hot path.

In `@shortcuts/event/subscribe_test.go`:
- Around line 27-40: Update TestCatchAllUsesSDKDefaultCustomizedHandler to avoid
relying on the exact SDK error string: instead of comparing err.Error() for
exact equality, assert that err is non-nil and that err.Error() contains the key
substrings (e.g., "contact.user.created_v3" and "not found") using
strings.Contains; update imports to include "strings" if needed. This keeps the
test resilient if the SDK tweaks formatting while still verifying the concrete
event type was not found when calling dispatcher.Do.

In `@shortcuts/event/subscribe.go`:
- Around line 52-75: Replace the hard-coded subscribedEventTypes slice with a
derived list built from the builtin handler registry: call
NewBuiltinHandlerRegistry (or the appropriate registry constructor in
shortcuts/event/registry.go), iterate its registered handler types (or use its
exported method that returns event types) to produce the catch-all subscription
list, and return that list from subscribedEventTypesFor (or expose a new
function that returns it). Ensure you remove the duplicate literal slice, keep
the original comment about alignment, and preserve the same ordering/format
expected by the SDK when constructing the returned []string.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d65d1ba-154c-431f-8fd8-c3c678853af3

📥 Commits

Reviewing files that changed from the base of the PR and between 0c77c95 and eae1b70.

📒 Files selected for processing (28)
  • .gitignore
  • internal/registry/loader.go
  • internal/registry/loader_embedded.go
  • internal/registry/meta_data_default.json
  • internal/registry/remote_test.go
  • shortcuts/event/deduper.go
  • shortcuts/event/dispatcher.go
  • shortcuts/event/dispatcher_test.go
  • shortcuts/event/domain_resolver.go
  • shortcuts/event/domain_resolver_test.go
  • shortcuts/event/envelope.go
  • shortcuts/event/handlers.go
  • shortcuts/event/matcher.go
  • shortcuts/event/normalizer.go
  • shortcuts/event/normalizer_test.go
  • shortcuts/event/pipeline.go
  • shortcuts/event/processor_generic.go
  • shortcuts/event/processor_im_chat.go
  • shortcuts/event/processor_im_chat_member.go
  • shortcuts/event/processor_im_message.go
  • shortcuts/event/processor_im_message_reaction.go
  • shortcuts/event/processor_im_message_read.go
  • shortcuts/event/processor_im_test.go
  • shortcuts/event/processor_test.go
  • shortcuts/event/registry.go
  • shortcuts/event/router.go
  • shortcuts/event/subscribe.go
  • shortcuts/event/subscribe_test.go

Comment on lines 78 to 82
func TestColdStart_UsesEmbedded(t *testing.T) {
if !hasEmbeddedData() {
t.Skip("no embedded from_meta data")
t.Skip("no embedded default registry baseline")
}
resetInit()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stop resetting package globals while prior refresh goroutines are still running.

resetInit() here can execute while a background refresh from an earlier test is still inside remoteMetaURL(). The race detector is already catching that in CI, so these tests need a deterministic wait/test hook before mutating refreshOnce, configuredBrand, and testMetaURL instead of relying on sleeps.

🧰 Tools
🪛 GitHub Actions: Coverage

[error] 82-82: TestColdStart_UsesEmbedded failed due to Go data race detected during execution

🪛 GitHub Actions: Tests

[error] 82-82: TestColdStart_UsesEmbedded failed due to race detected during execution when running with -race

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/registry/remote_test.go` around lines 78 - 82,
TestColdStart_UsesEmbedded calls resetInit() while a background refresh
goroutine may still be executing remoteMetaURL(), causing races on refreshOnce,
configuredBrand, and testMetaURL; change the test and resetInit() usage to wait
deterministically for any prior refresh goroutine to finish instead of
proceeding immediately. Add or use a test hook (e.g., a
waitForRefresh()/waitCh/WG) that the refresh goroutine signals when it
completes, update the test to call that hook before invoking resetInit(), and
ensure resetInit() only mutates the package globals (refreshOnce,
configuredBrand, testMetaURL) after the wait completes so races are eliminated.

Comment on lines +11 to +34
// Deduper suppresses repeated keys seen within a TTL window.
type Deduper struct {
ttl time.Duration
seen sync.Map // key -> time.Time
}

// NewDeduper creates a deduper with the provided TTL.
func NewDeduper(ttl time.Duration) *Deduper {
return &Deduper{ttl: ttl}
}

// Seen reports whether key has already been seen within ttl and records now.
func (d *Deduper) Seen(key string, now time.Time) bool {
if d == nil || key == "" || d.ttl <= 0 {
return false
}
if v, loaded := d.seen.LoadOrStore(key, now); loaded {
if ts, ok := v.(time.Time); ok && now.Sub(ts) < d.ttl {
return true
}
d.seen.Store(key, now)
}
return false
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if Deduper cleanup is implemented elsewhere or if there's a bounded lifetime
rg -nP 'Deduper|Cleanup|cleanup' --type=go -C3

Repository: larksuite/cli

Length of output: 20673


🏁 Script executed:

cat -n shortcuts/event/pipeline.go | head -100

Repository: larksuite/cli

Length of output: 3059


🏁 Script executed:

# Check how Pipeline is created and used - search for NewPipeline and Pipeline instantiation
rg -n 'NewPipeline|Pipeline{' --type=go -B2 -A2 | head -60

Repository: larksuite/cli

Length of output: 162


🏁 Script executed:

# Check event processing context - is it short-lived or long-running?
rg -n 'event\..*Pipeline\|pipeline.*Run\|pipeline.*Process' --type=go | head -30

Repository: larksuite/cli

Length of output: 39


🏁 Script executed:

# Search for where EventPipeline is created and used
rg -n 'NewEventPipeline|\.Process\(' shortcuts/event/ --type=go -B2 -A2

Repository: larksuite/cli

Length of output: 8121


🏁 Script executed:

# Check the broader context - where is event processing called from?
rg -n 'EventPipeline|event\.New' --type=go | grep -v test | head -40

Repository: larksuite/cli

Length of output: 1299


🏁 Script executed:

# Look at the entire deduper.go to understand the implementation fully
cat -n shortcuts/event/deduper.go

Repository: larksuite/cli

Length of output: 1065


Memory accumulation in sync.Map for long-running subscriptions.

In the event subscription context (shortcuts/event/subscribe.go), the EventPipeline is created once and processes events continuously via WebSocket. The Deduper stores event identifiers indefinitely—entries older than the 5-minute TTL are never evicted, only overwritten if the same key reappears. For high-volume event streams with many unique identifiers, this can accumulate significant memory over time.

Consider implementing periodic cleanup:

♻️ Cleanup approach
// Cleanup removes entries older than TTL. Call periodically from a background goroutine.
func (d *Deduper) Cleanup(now time.Time) {
	d.seen.Range(func(key, value interface{}) bool {
		if ts, ok := value.(time.Time); ok && now.Sub(ts) >= d.ttl {
			d.seen.Delete(key)
		}
		return true
	})
}

Then invoke it periodically (e.g., every ttl interval) from the event loop or a dedicated goroutine in subscribe.go.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/deduper.go` around lines 11 - 34, Deduper currently retains
keys indefinitely in the sync.Map (seen) causing memory growth; add a method
func (d *Deduper) Cleanup(now time.Time) that ranges over d.seen and deletes
entries whose timestamp is older than d.ttl, and ensure Deduper.NewDeduper or
the subscriber setup starts a background goroutine that calls
d.Cleanup(time.Now()) periodically (e.g., every d.ttl or a fraction thereof) and
stops when the pipeline/subscription shuts down so stale entries are evicted and
memory is bounded; reference Deduper, seen, Cleanup and the subscription/event
loop setup in subscribe.go to wire the periodic call and shutdown.

Comment on lines +37 to +45
for _, handler := range matched {
handlerResult := handler.Handle(ctx, evt)
result.Results = append(result.Results, DispatchRecord{
HandlerID: handler.ID(),
Status: handlerResult.Status,
Reason: handlerResult.Reason,
Err: handlerResult.Err,
Output: handlerResult.Output,
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recover handler panics inside the dispatch loop.

Handle is invoked on network-derived payloads, but a panic here will abort Dispatch and stop later handlers from running. Convert panics into HandlerStatusFailed records so one bad handler or payload does not take down the whole subscriber.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/dispatcher.go` around lines 37 - 45, In Dispatch's loop over
matched handlers, wrap the call to handler.Handle(ctx, evt) in a panic recovery
so any panic is converted into a DispatchRecord with HandlerID: handler.ID(),
Status: HandlerStatusFailed, Reason set to a short message like "panic during
handle", Err set to the recovered panic (and/or an error created from it), and
Output empty; then append that record and continue to the next handler instead
of letting the panic escape. Ensure the existing successful handlerResult path
remains unchanged and only the panic path synthesizes a DispatchRecord;
reference the Dispatch function, handler.Handle, DispatchRecord, and
HandlerStatusFailed when making the change.

Comment on lines +53 to +55
func (p *ImMessageReactionProcessor) Handle(_ context.Context, evt *Event) HandlerResult {
return HandlerResult{Status: HandlerStatusHandled, Output: imMessageReactionCompactOutput(evt, p.eventType)}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve a fallback when reaction payload decoding fails.

This helper also ignores marshal/unmarshal errors while Handle reports the event as handled. On unexpected reaction payload shapes, compact mode will silently drop fields like message_id, emoji_type, and operator_id instead of surfacing a failed dispatch or falling back to raw/generic output.

Also applies to: 87-95

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_im_message_reaction.go` around lines 53 - 55, The
compact output helper currently swallows marshal/unmarshal problems causing
dropped fields; change imMessageReactionCompactOutput to return an error (or an
ok flag) when decoding fails, and update ImMessageReactionProcessor.Handle to
call the compact helper and, on any error or missing required fields (e.g.,
message_id, emoji_type, operator_id), fall back to the full/raw/generic output
(e.g., imMessageReactionFullOutput or the raw evt payload) instead of silently
returning the compact result; apply the same change to the other handler usage
referenced around the 87-95 region so decoding failures always produce a
fallback output.

Comment on lines +30 to +32
func (p *ImMessageReadProcessor) Handle(_ context.Context, evt *Event) HandlerResult {
return HandlerResult{Status: HandlerStatusHandled, Output: imMessageReadCompactOutput(evt)}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Surface message_read decode failures instead of reporting success.

The compact-output helper swallows marshal/unmarshal errors, and Handle always returns HandlerStatusHandled. A schema drift or malformed payload will quietly lose reader_id, read_time, and message_ids instead of falling back to raw/generic output like Transform does.

Also applies to: 60-67

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_im_message_read.go` around lines 30 - 32, The
Handle method in ImMessageReadProcessor always returns HandlerStatusHandled and
calls imMessageReadCompactOutput which swallows marshal/unmarshal errors, so
decoding failures are silently dropped; modify ImMessageReadProcessor.Handle to
detect decode errors from imMessageReadCompactOutput (or make
imMessageReadCompactOutput return (output, error)), and on error return a
non-success HandlerResult (e.g., HandlerStatusFailed or the same fallback used
by Transform) and include the raw/generic output instead of the compact output;
update any call sites to handle the new error return and ensure HandlerResult
construction uses the appropriate status and payload when
imMessageReadCompactOutput fails.

Comment on lines +108 to +116
for _, dir := range dirs {
writer := r.writers[dir]
if writer == nil {
writer = &dirRecordWriter{dir: dir, seq: r.seq}
r.writers[dir] = writer
}
if err := writer.WriteRecord(eventType, record); err != nil {
return err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if outputRouter.WriteRecord is called from concurrent contexts
rg -nP 'WriteRecord|outputRouter' --type=go -C5

Repository: larksuite/cli

Length of output: 8660


🏁 Script executed:

#!/bin/bash
# Check if Process is called concurrently (spawns goroutines or called from multiple goroutines)
rg -n "go\s+(p\.Process|Process\()" --type=go -C3

# Also check the newEventPipeline function signature and usage
rg -n "newEventPipeline|func.*Process" --type=go -B2 -A5

Repository: larksuite/cli

Length of output: 44306


🏁 Script executed:

#!/bin/bash
# Check the subscribe.go context around where rawHandler is used
sed -n '215,240p' shortcuts/event/subscribe.go

# Also search for how larkevent dispatcher is used
rg "larkevent|rawHandler" shortcuts/event/subscribe.go -B2 -A2

Repository: larksuite/cli

Length of output: 1640


🏁 Script executed:

#!/bin/bash
# Check processor_test.go to see if tests use goroutines
rg "go\s+\(|goroutine|concurrent" shortcuts/event/processor_test.go

# Also check if outputRouter or pipeline is documented as thread-safe
rg -A5 "type outputRouter|type EventPipeline" shortcuts/event/router.go shortcuts/event/pipeline.go | head -30

Repository: larksuite/cli

Length of output: 709


🏁 Script executed:

#!/bin/bash
# Check if sync.Mutex or sync.Map is used anywhere in router.go or related files
rg "sync\.|Mutex|RWMutex|atomic" shortcuts/event/router.go shortcuts/event/pipeline.go

# Also check the complete dirRecordWriter implementation
sed -n '135,160p' shortcuts/event/router.go

# Check if there are any comments about thread safety or concurrency assumptions
rg -i "thread|concurrent|race|safe|goroutine" shortcuts/event/router.go shortcuts/event/pipeline.go

Repository: larksuite/cli

Length of output: 1044


Add synchronization to protect concurrent map access on writers.

The r.writers map is accessed without synchronization in lines 109-112. If WriteRecord is called concurrently from the event dispatcher, concurrent reads/writes to the map create a race condition. Additionally, multiple threads could create separate dirRecordWriter instances with the same shared seq pointer, causing file name collisions.

Use sync.Mutex to protect map reads/writes, or refactor to use sync.Map for thread-safe concurrent access.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/router.go` around lines 108 - 116, Protect concurrent access
to the r.writers map by adding a mutex (e.g., writersMu sync.Mutex) to the
router struct and use it in the loop that iterates dirs: lock before reading
r.writers[dir], if nil create the dirRecordWriter while still holding the lock
and assign it to r.writers[dir], then unlock before calling writer.WriteRecord
so the I/O isn't done under the lock. Also avoid sharing the same seq pointer
between writers: when instantiating dirRecordWriter in the creation branch, copy
the seq value from r.seq (or otherwise produce a distinct sequence owner)
instead of assigning the shared pointer so file-name collisions cannot occur.
Ensure references: r.writers, writersMu, dirRecordWriter, r.seq, and
writer.WriteRecord are updated accordingly.

@niuchong0523 niuchong0523 force-pushed the feat/event-bus-subscription-dispatch branch from eae1b70 to c6a4e59 Compare April 6, 2026 15:14
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/registry/loader.go (1)

25-25: ⚠️ Potential issue | 🟡 Minor

Inconsistent comment: still references meta_data.json.

Line 25 still says // version from embedded meta_data.json while lines 43 and 69-70 were updated to use "default registry baseline" / "default registry JSON" terminology. This should be updated for consistency.

✏️ Suggested fix
-	embeddedVersion   string                                    // version from embedded meta_data.json
+	embeddedVersion   string                                    // version from embedded default registry baseline
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/registry/loader.go` at line 25, Update the stale comment on the
embeddedVersion variable (embeddedVersion string) to match the newer terminology
used elsewhere: replace the reference to "meta_data.json" with "default registry
JSON" or "default registry baseline" so the comment reads something like "//
version from embedded default registry JSON" ensuring consistency with comments
at lines referencing the default registry baseline and default registry JSON.
shortcuts/event/router.go (1)

73-80: ⚠️ Potential issue | 🟡 Minor

Deduplicate matching route targets before fan-out.

If two patterns both resolve to the same directory, Match() returns that directory twice and WriteRecord() emits two files for one event.

💡 Proposed fix
func (r *EventRouter) Match(eventType string) []string {
 	var dirs []string
+	seen := make(map[string]struct{}, len(r.routes))
 	for _, route := range r.routes {
 		if route.pattern.MatchString(eventType) {
+			if _, ok := seen[route.dir]; ok {
+				continue
+			}
+			seen[route.dir] = struct{}{}
 			dirs = append(dirs, route.dir)
 		}
 	}
 	return dirs
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/router.go` around lines 73 - 80, EventRouter.Match currently
appends route.dir for every matching route, causing duplicate dirs when multiple
patterns resolve to the same directory and leading WriteRecord to emit duplicate
files; modify EventRouter.Match to deduplicate directory targets before
returning by tracking seen route.dir values (e.g., with a map[string]bool) while
iterating r.routes and only appending unseen dirs to the result slice so order
is preserved and duplicates are removed.
♻️ Duplicate comments (3)
shortcuts/event/processor_im_message.go (1)

95-100: ⚠️ Potential issue | 🟠 Major

Remove direct os.Stderr logging from compact builder.

Writing hints inside buildIMMessageCompactOutput bypasses pipeline-controlled logging (errOut / --quiet) and leaks output in quiet mode.

🔧 Proposed fix
 import (
 	"context"
 	"encoding/json"
-	"fmt"
-	"os"
@@
 func buildIMMessageCompactOutput(eventType, headerCreateTime string, ev imMessagePayload) (map[string]interface{}, bool) {
@@
 	if ev.Message.MessageType == "interactive" {
-		fmt.Fprintf(os.Stderr, "%s[hint]%s card message (interactive) compact conversion is not yet supported, returning raw event data\n", output.Dim, output.Reset)
 		return nil, false
 	}

Emit the hint at the pipeline layer instead, where quiet/routing policies are already enforced.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_im_message.go` around lines 95 - 100, The function
buildIMMessageCompactOutput currently writes a hint directly to os.Stderr;
remove the fmt.Fprintf(os.Stderr...) call inside buildIMMessageCompactOutput and
stop emitting any logs from that function, keep returning (nil, false) for
interactive/card messages, and update the caller (the pipeline layer that
invokes buildIMMessageCompactOutput) to detect the false result and emit the
hint via the pipeline's errOut/quiet-aware logging path so routing/quiet
policies are respected.
shortcuts/event/processor_im_message_read.go (1)

30-32: ⚠️ Potential issue | 🟠 Major

Do not silently mark decode failures as handled.

Handle always returns HandlerStatusHandled, while imMessageReadCompactOutput suppresses marshal/unmarshal errors. Malformed/schema-drift payloads can lose compact fields without fallback visibility.

🔧 Proposed fix
-func (p *ImMessageReadProcessor) Handle(_ context.Context, evt *Event) HandlerResult {
-	return HandlerResult{Status: HandlerStatusHandled, Output: imMessageReadCompactOutput(evt)}
-}
+func (p *ImMessageReadProcessor) Handle(_ context.Context, evt *Event) HandlerResult {
+	out, err := imMessageReadCompactOutput(evt)
+	if err != nil {
+		return HandlerResult{
+			Status: HandlerStatusHandled,
+			Output: genericCompactOutput(evt),
+			Reason: "decode_fallback",
+		}
+	}
+	return HandlerResult{Status: HandlerStatusHandled, Output: out}
+}
@@
-func imMessageReadCompactOutput(evt *Event) map[string]interface{} {
+func imMessageReadCompactOutput(evt *Event) (map[string]interface{}, error) {
 	if evt == nil {
-		return map[string]interface{}{"type": ""}
+		return map[string]interface{}{"type": ""}, nil
 	}
-	data, _ := json.Marshal(evt.Payload.Data)
+	data, err := json.Marshal(evt.Payload.Data)
+	if err != nil {
+		return nil, err
+	}
 	var payload imMessageReadPayload
-	_ = json.Unmarshal(data, &payload)
-	return buildIMMessageReadCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, payload)
+	if err := json.Unmarshal(data, &payload); err != nil {
+		return nil, err
+	}
+	return buildIMMessageReadCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, payload), nil
 }

Also applies to: 60-67

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_im_message_read.go` around lines 30 - 32,
ImMessageReadProcessor.Handle currently always returns HandlerStatusHandled
while imMessageReadCompactOutput swallows marshal/unmarshal errors, causing
decode failures to be hidden; modify imMessageReadCompactOutput to surface
decoding errors (return an error or a result with an error) and update
ImMessageReadProcessor.Handle to check that error and return a non-handled/error
status (e.g., HandlerStatusUnhandled or HandlerStatusError) and log the decode
failure, ensuring malformed/schema-drift payloads are not silently treated as
handled and compact fields remain visible for fallback handling.
shortcuts/event/pipeline.go (1)

119-123: ⚠️ Potential issue | 🟠 Major

Keep raw-mode metadata in the shared serializer.

rawModeRecord() still drops evt.Metadata, so whether raw stdout includes malformed/normalization details currently depends on whether routing was configured. The routed branch patches it back in, the plain stdout path does not.

💡 Proposed fix
func rawModeRecord(evt *Event, record DispatchRecord) map[string]interface{} {
	entry := map[string]interface{}{
		"event_type":      evt.EventType,
		"status":          record.Status,
		"payload":         evt.Payload.Data,
		"raw_payload":     string(evt.RawPayload),
		"source":          evt.Source,
		"idempotency_key": evt.IdempotencyKey,
	}
+	if len(evt.Metadata) > 0 {
+		entry["metadata"] = evt.Metadata
+	}
 	if evt.Domain != "" {
 		entry["domain"] = evt.Domain
 	}

Also applies to: 172-193

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/pipeline.go` around lines 119 - 123, The raw-mode path
currently drops evt.Metadata because rawModeRecord() doesn't include it and only
the routed branch re-attaches metadata; update rawModeRecord (or the place where
entry is constructed when p.config.Mode == TransformRaw and p.recordWriter !=
nil) to preserve and include evt.Metadata into the returned entry (e.g., ensure
entry["metadata"] = evt.Metadata when len(evt.Metadata)>0), and mirror the same
fix for the other raw-mode spot around the code covering the alternate block
(the code noted as also applying to lines 172-193) so the shared serializer
always receives metadata in raw mode regardless of routing.
🧹 Nitpick comments (1)
shortcuts/event/processor_im_message.go (1)

32-39: Differentiate fallback reasons from decode failures vs interactive fallback.

Handle sets Reason: "interactive_fallback" for every ok=false, including marshal/unmarshal failures in imMessageCompactOutput. That makes diagnostics/metrics misleading.

Also applies to: 80-93

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_im_message.go` around lines 32 - 39, The current
Handle method always sets HandlerResult.Reason to "interactive_fallback"
whenever imMessageCompactOutput(evt) returns ok=false, which conflates
decode/marshal errors with true interactive fallbacks; update Handle to
distinguish causes by inspecting the error/ok result from imMessageCompactOutput
(or change imMessageCompactOutput to return (out, ok, err)), and when the
failure is a decode/marshal error set Reason to a distinct value like
"decode_failure" (and include any error info in logs), otherwise keep
"interactive_fallback"; update any other branches (e.g., the same pattern in
lines 80-93) to follow the same logic and ensure HandlerStatus/Output handling
remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@shortcuts/event/pipeline.go`:
- Around line 251-261: The ndjsonRecordWriter and writeRecord functions
currently write directly to the shared io.Writer (ndjsonRecordWriter.w) and can
race when Process() is called concurrently; modify ndjsonRecordWriter to embed
or reference a sync.Mutex (or a shared sync.Mutex pointer) and lock/unlock
around the WriteRecord body (marshal + w.Write) to serialize writes, and apply
the same guard to the writeRecord helper so all writes to the shared io.Writer
are protected by the same mutex to prevent interleaved/corrupted NDJSON output.

In `@shortcuts/event/processor_im_chat_member.go`:
- Around line 50-52: The handlers currently always return HandlerStatusHandled
and call compact builders that swallow decode errors (e.g.,
imChatBotCompactOutput and imChatMemberUserCompactOutput); update each Handle
method to capture any error returned by these compact output functions, and when
an error occurs return HandlerResult using genericCompactOutput(evt) with a
reason string like "decode_fallback" instead of silently proceeding;
specifically modify ImChatBotProcessor.Handle (and the other chat-member Handle
implementations that call imChatMemberUserCompactOutput) to check builder errors
and fall back to genericCompactOutput(evt, "decode_fallback") so missing/
malformed fields (chat_id, operator_id, user_ids) are not dropped silently.

In `@shortcuts/event/registry.go`:
- Around line 79-97: The current NewBuiltinHandlerRegistry swallows errors from
HandlerRegistry.RegisterEventHandler and HandlerRegistry.SetFallbackHandler
which hides configuration mistakes; update NewBuiltinHandlerRegistry to fail
fast by checking each RegisterEventHandler(h) and SetFallbackHandler(...) return
value and immediately panic or log.Fatalf with a clear message (including the
handler ID/type and the returned error) when an error occurs; reference the
RegisterEventHandler calls inside NewBuiltinHandlerRegistry and the final
SetFallbackHandler(NewGenericFallbackHandler()) so you validate and propagate
their errors instead of assigning to `_`.

---

Outside diff comments:
In `@internal/registry/loader.go`:
- Line 25: Update the stale comment on the embeddedVersion variable
(embeddedVersion string) to match the newer terminology used elsewhere: replace
the reference to "meta_data.json" with "default registry JSON" or "default
registry baseline" so the comment reads something like "// version from embedded
default registry JSON" ensuring consistency with comments at lines referencing
the default registry baseline and default registry JSON.

In `@shortcuts/event/router.go`:
- Around line 73-80: EventRouter.Match currently appends route.dir for every
matching route, causing duplicate dirs when multiple patterns resolve to the
same directory and leading WriteRecord to emit duplicate files; modify
EventRouter.Match to deduplicate directory targets before returning by tracking
seen route.dir values (e.g., with a map[string]bool) while iterating r.routes
and only appending unseen dirs to the result slice so order is preserved and
duplicates are removed.

---

Duplicate comments:
In `@shortcuts/event/pipeline.go`:
- Around line 119-123: The raw-mode path currently drops evt.Metadata because
rawModeRecord() doesn't include it and only the routed branch re-attaches
metadata; update rawModeRecord (or the place where entry is constructed when
p.config.Mode == TransformRaw and p.recordWriter != nil) to preserve and include
evt.Metadata into the returned entry (e.g., ensure entry["metadata"] =
evt.Metadata when len(evt.Metadata)>0), and mirror the same fix for the other
raw-mode spot around the code covering the alternate block (the code noted as
also applying to lines 172-193) so the shared serializer always receives
metadata in raw mode regardless of routing.

In `@shortcuts/event/processor_im_message_read.go`:
- Around line 30-32: ImMessageReadProcessor.Handle currently always returns
HandlerStatusHandled while imMessageReadCompactOutput swallows marshal/unmarshal
errors, causing decode failures to be hidden; modify imMessageReadCompactOutput
to surface decoding errors (return an error or a result with an error) and
update ImMessageReadProcessor.Handle to check that error and return a
non-handled/error status (e.g., HandlerStatusUnhandled or HandlerStatusError)
and log the decode failure, ensuring malformed/schema-drift payloads are not
silently treated as handled and compact fields remain visible for fallback
handling.

In `@shortcuts/event/processor_im_message.go`:
- Around line 95-100: The function buildIMMessageCompactOutput currently writes
a hint directly to os.Stderr; remove the fmt.Fprintf(os.Stderr...) call inside
buildIMMessageCompactOutput and stop emitting any logs from that function, keep
returning (nil, false) for interactive/card messages, and update the caller (the
pipeline layer that invokes buildIMMessageCompactOutput) to detect the false
result and emit the hint via the pipeline's errOut/quiet-aware logging path so
routing/quiet policies are respected.

---

Nitpick comments:
In `@shortcuts/event/processor_im_message.go`:
- Around line 32-39: The current Handle method always sets HandlerResult.Reason
to "interactive_fallback" whenever imMessageCompactOutput(evt) returns ok=false,
which conflates decode/marshal errors with true interactive fallbacks; update
Handle to distinguish causes by inspecting the error/ok result from
imMessageCompactOutput (or change imMessageCompactOutput to return (out, ok,
err)), and when the failure is a decode/marshal error set Reason to a distinct
value like "decode_failure" (and include any error info in logs), otherwise keep
"interactive_fallback"; update any other branches (e.g., the same pattern in
lines 80-93) to follow the same logic and ensure HandlerStatus/Output handling
remains unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 46e6ca60-aec8-4e9a-9c40-626154d7737a

📥 Commits

Reviewing files that changed from the base of the PR and between eae1b70 and c6a4e59.

📒 Files selected for processing (31)
  • .gitignore
  • internal/registry/loader.go
  • internal/registry/loader_embedded.go
  • internal/registry/meta_data_default.json
  • internal/registry/remote_test.go
  • shortcuts/event/concurrency_test.go
  • shortcuts/event/deduper.go
  • shortcuts/event/deduper_test.go
  • shortcuts/event/dispatcher.go
  • shortcuts/event/dispatcher_test.go
  • shortcuts/event/domain_resolver.go
  • shortcuts/event/domain_resolver_test.go
  • shortcuts/event/envelope.go
  • shortcuts/event/handlers.go
  • shortcuts/event/matcher.go
  • shortcuts/event/normalizer.go
  • shortcuts/event/normalizer_test.go
  • shortcuts/event/pipeline.go
  • shortcuts/event/processor_generic.go
  • shortcuts/event/processor_im_chat.go
  • shortcuts/event/processor_im_chat_member.go
  • shortcuts/event/processor_im_message.go
  • shortcuts/event/processor_im_message_reaction.go
  • shortcuts/event/processor_im_message_read.go
  • shortcuts/event/processor_im_test.go
  • shortcuts/event/processor_test.go
  • shortcuts/event/registry.go
  • shortcuts/event/router.go
  • shortcuts/event/router_permissions_test.go
  • shortcuts/event/subscribe.go
  • shortcuts/event/subscribe_test.go
✅ Files skipped from review due to trivial changes (9)
  • internal/registry/remote_test.go
  • .gitignore
  • shortcuts/event/domain_resolver.go
  • shortcuts/event/matcher.go
  • shortcuts/event/domain_resolver_test.go
  • shortcuts/event/envelope.go
  • shortcuts/event/normalizer_test.go
  • shortcuts/event/handlers.go
  • shortcuts/event/dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (7)
  • internal/registry/loader_embedded.go
  • shortcuts/event/dispatcher.go
  • shortcuts/event/processor_im_test.go
  • shortcuts/event/subscribe.go
  • internal/registry/meta_data_default.json
  • shortcuts/event/processor_im_message_reaction.go
  • shortcuts/event/processor_generic.go

Comment on lines +251 to +261
type ndjsonRecordWriter struct {
w io.Writer
}

jsonData, err := json.MarshalIndent(data, "", " ")
func (w ndjsonRecordWriter) WriteRecord(_ string, value map[string]interface{}) error {
data, err := json.Marshal(value)
if err != nil {
return "", err
return err
}
_, err = w.w.Write(append(data, '\n'))
return err
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Serialize shared writer access in the concurrent pipeline.

The package already exercises Process() from multiple goroutines, but both ndjsonRecordWriter and writeRecord() write straight to the shared io.Writer. That can race on bytes.Buffer and interleave larger stdout/pipe writes, corrupting NDJSON output.

Wrap the shared writers with a mutex, or make the single-threaded contract explicit and stop signaling concurrency support elsewhere.

Also applies to: 264-284

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/pipeline.go` around lines 251 - 261, The ndjsonRecordWriter
and writeRecord functions currently write directly to the shared io.Writer
(ndjsonRecordWriter.w) and can race when Process() is called concurrently;
modify ndjsonRecordWriter to embed or reference a sync.Mutex (or a shared
sync.Mutex pointer) and lock/unlock around the WriteRecord body (marshal +
w.Write) to serialize writes, and apply the same guard to the writeRecord helper
so all writes to the shared io.Writer are protected by the same mutex to prevent
interleaved/corrupted NDJSON output.

Comment on lines +50 to +52
func (p *ImChatBotProcessor) Handle(_ context.Context, evt *Event) HandlerResult {
return HandlerResult{Status: HandlerStatusHandled, Output: imChatBotCompactOutput(evt, p.eventType)}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid swallowing payload decode errors in chat-member handlers.

Both handler paths always report handled and compact builders ignore decode errors. Schema drift/malformed payloads can silently drop chat_id, operator_id, and user_ids instead of emitting a clear fallback.

🔧 Proposed fix pattern
-func imChatBotCompactOutput(evt *Event, processorEventType string) map[string]interface{} {
+func imChatBotCompactOutput(evt *Event, processorEventType string) (map[string]interface{}, error) {
 	if evt == nil {
-		return map[string]interface{}{"type": processorEventType}
+		return map[string]interface{}{"type": processorEventType}, nil
 	}
-	data, _ := json.Marshal(evt.Payload.Data)
+	data, err := json.Marshal(evt.Payload.Data)
+	if err != nil {
+		return nil, err
+	}
 	var payload imChatMemberPayload
-	_ = json.Unmarshal(data, &payload)
-	return buildIMChatBotCompactOutput(...), nil
+	if err := json.Unmarshal(data, &payload); err != nil {
+		return nil, err
+	}
+	return buildIMChatBotCompactOutput(...), nil
 }

Apply the same contract to imChatMemberUserCompactOutput, then in both Handle methods fallback to genericCompactOutput(evt) with a reason like "decode_fallback" when error is returned.

Also applies to: 124-126, 153-160, 186-193

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_im_chat_member.go` around lines 50 - 52, The
handlers currently always return HandlerStatusHandled and call compact builders
that swallow decode errors (e.g., imChatBotCompactOutput and
imChatMemberUserCompactOutput); update each Handle method to capture any error
returned by these compact output functions, and when an error occurs return
HandlerResult using genericCompactOutput(evt) with a reason string like
"decode_fallback" instead of silently proceeding; specifically modify
ImChatBotProcessor.Handle (and the other chat-member Handle implementations that
call imChatMemberUserCompactOutput) to check builder errors and fall back to
genericCompactOutput(evt, "decode_fallback") so missing/ malformed fields
(chat_id, operator_id, user_ids) are not dropped silently.

Comment on lines +79 to +97
func NewBuiltinHandlerRegistry() *HandlerRegistry {
r := NewHandlerRegistry()
for _, h := range []EventHandler{
NewIMMessageReceiveHandler(),
NewIMMessageReadHandler(),
NewIMReactionCreatedHandler(),
NewIMReactionDeletedHandler(),
NewIMChatUpdatedHandler(),
NewIMChatDisbandedHandler(),
NewIMChatMemberBotAddedHandler(),
NewIMChatMemberBotDeletedHandler(),
NewIMChatMemberUserAddedHandler(),
NewIMChatMemberUserWithdrawnHandler(),
NewIMChatMemberUserDeletedHandler(),
} {
_ = r.RegisterEventHandler(h)
}
_ = r.SetFallbackHandler(NewGenericFallbackHandler())
return r
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Fail fast when builtin handler registration breaks.

Every _ = r.RegisterEventHandler(...) / _ = r.SetFallbackHandler(...) here swallows configuration errors. If a builtin ships with a duplicate ID or invalid scope, dispatch just loses that handler silently.

Treat these as programmer errors here instead of ignoring them.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/registry.go` around lines 79 - 97, The current
NewBuiltinHandlerRegistry swallows errors from
HandlerRegistry.RegisterEventHandler and HandlerRegistry.SetFallbackHandler
which hides configuration mistakes; update NewBuiltinHandlerRegistry to fail
fast by checking each RegisterEventHandler(h) and SetFallbackHandler(...) return
value and immediately panic or log.Fatalf with a clear message (including the
handler ID/type and the returned error) when an error occurs; reference the
RegisterEventHandler calls inside NewBuiltinHandlerRegistry and the final
SetFallbackHandler(NewGenericFallbackHandler()) so you validate and propagate
their errors instead of assigning to `_`.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
shortcuts/event/processor_test.go (1)

72-91: Consider accepting *testing.T for clearer test failure messages.

Using panic(err) in test helpers produces stack traces that are harder to read than using t.Helper() + t.Fatalf(). This is a minor improvement for test maintainability.

♻️ Optional refactor to improve test failure output
-func makeTestRegistry(handler EventHandler, fallback EventHandler) *HandlerRegistry {
+func makeTestRegistry(t *testing.T, handler EventHandler, fallback EventHandler) *HandlerRegistry {
+	t.Helper()
 	registry := NewHandlerRegistry()
 	if handler != nil {
 		if handler.EventType() != "" {
 			if err := registry.RegisterEventHandler(handler); err != nil {
-				panic(err)
+				t.Fatalf("RegisterEventHandler() error = %v", err)
 			}
 		} else if handler.Domain() != "" {
 			if err := registry.RegisterDomainHandler(handler); err != nil {
-				panic(err)
+				t.Fatalf("RegisterDomainHandler() error = %v", err)
 			}
 		}
 	}
 	if fallback != nil {
 		if err := registry.SetFallbackHandler(fallback); err != nil {
-			panic(err)
+			t.Fatalf("SetFallbackHandler() error = %v", err)
 		}
 	}
 	return registry
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_test.go` around lines 72 - 91, Update the test
helper makeTestRegistry to accept a *testing.T (e.g., func makeTestRegistry(t
*testing.T, handler EventHandler, fallback EventHandler) *HandlerRegistry), call
t.Helper() at the top, and replace panic(err) calls with t.Fatalf including the
error (e.g., when calling registry.RegisterEventHandler,
registry.RegisterDomainHandler, and registry.SetFallbackHandler) so failures
produce clearer test output instead of panics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@shortcuts/event/processor_test.go`:
- Around line 72-91: Update the test helper makeTestRegistry to accept a
*testing.T (e.g., func makeTestRegistry(t *testing.T, handler EventHandler,
fallback EventHandler) *HandlerRegistry), call t.Helper() at the top, and
replace panic(err) calls with t.Fatalf including the error (e.g., when calling
registry.RegisterEventHandler, registry.RegisterDomainHandler, and
registry.SetFallbackHandler) so failures produce clearer test output instead of
panics.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 32cbd243-50a1-4e12-b31e-edd21f8abc45

📥 Commits

Reviewing files that changed from the base of the PR and between c6a4e59 and 6b18a6e.

📒 Files selected for processing (3)
  • shortcuts/event/concurrency_test.go
  • shortcuts/event/dispatcher_test.go
  • shortcuts/event/processor_test.go
✅ Files skipped from review due to trivial changes (1)
  • shortcuts/event/dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • shortcuts/event/concurrency_test.go

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
shortcuts/event/pipeline.go (1)

273-306: ⚠️ Potential issue | 🟠 Major

Serialize writes to shared writers.

Process() is already modeled for concurrent use, but ndjsonRecordWriter.WriteRecord and writeRecord still write straight to shared io.Writers. With bytes.Buffer, pipes, or stdout, that can race and interleave JSON records.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/pipeline.go` around lines 273 - 306, Concurrent calls to
ndjsonRecordWriter.WriteRecord and EventPipeline.writeRecord can race when
writing to shared io.Writers (e.g., bytes.Buffer, pipes, stdout); fix by
serializing writes: add a sync.Mutex (or sync.RWMutex) field to
ndjsonRecordWriter and to EventPipeline (or a shared writer wrapper) and
lock/unlock around the actual Write() calls inside
ndjsonRecordWriter.WriteRecord and EventPipeline.writeRecord so each JSON blob +
newline is written atomically; ensure the lock covers
marshaling-to-bytes-to-Write sequence or at minimum the Write() call to avoid
interleaving.
🧹 Nitpick comments (2)
shortcuts/event/processor_generic.go (1)

54-84: Consider making raw_payload inclusion opt-in.

The generic fallback always includes raw_payload as a stringified JSON blob (lines 78-82). For large events, this can significantly increase output size and may duplicate information already in the flattened fields. Consider making this configurable or only including it when Payload.Data was empty.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_generic.go` around lines 54 - 84, The
genericCompactOutput function always appends a stringified raw_payload from
evt.RawPayload which can bloat outputs; modify genericCompactOutput to make
raw_payload opt-in by either checking a new boolean flag (e.g.,
includeRawPayload) passed into or available to the function/context or only
adding raw_payload when Payload.Data is empty (i.e., when out was populated from
rawMap fallback). Update callers or the surrounding struct to provide the flag
and ensure the existing logic that loads rawMap into out remains unchanged;
ensure you still avoid overwriting an existing raw_payload key.
shortcuts/event/subscribe_e2e_test.go (1)

131-139: Global os.Stderr manipulation is fragile.

Directly reassigning os.Stderr affects all goroutines and can cause flaky behavior in parallel tests. The pipe-based capture also won't catch writes that occur after w.Close().

Consider capturing stderr through the test factory's buffer instead, or mark this test with t.Parallel() exclusion explicitly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/subscribe_e2e_test.go` around lines 131 - 139, The test
currently replaces the global os.Stderr via origStderr/os.Stderr and os.Pipe()
(r, w) which is fragile for concurrent tests; instead stop mutating os.Stderr
globally and capture stderr output via the test's local buffer or test factory
logger used in this package (avoid using os.Pipe() and assigning os.Stderr), or
explicitly serialize the test by ensuring it does not run in parallel (remove
any t.Parallel() or add a guard) — update the subscribe_e2e_test.go test that
creates origStderr/r,w to use a local bytes.Buffer or the provided test logger
for capturing error output, and ensure any writer is closed only after the
test's logging is complete (avoid relying on w.Close() to catch late writes).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@shortcuts/event/pipeline.go`:
- Around line 98-123: The unmatched-envelope fast-path currently returns before
dedupe and fallback can take effect; update the !MatchRawEventType(evt) branch
to call malformedFallbackEvent(env, err) (ensuring evt.IdempotencyKey is set),
then run p.deduper.Seen(evt.IdempotencyKey, env.ReceivedAt) and return if true,
set evt.EventType/evt.Domain as needed (using ResolveDomain if empty) so the
dispatcher will route to the fallback handler, and finally call p.dispatch(ctx,
evt); this ensures NormalizeEnvelope/malformedFallbackEvent, p.deduper.Seen, and
Dispatcher.Dispatch/FallbackHandler behavior remain authoritative for
malformed/unknown envelopes.

In `@shortcuts/event/processor_test.go`:
- Around line 479-533: Test mutates process CWD via os.Chdir to make dir:./...
routes work; remove the Chdir/Getwd/defer block in
TestPipeline_RawModeCanWriteViaOutputRouter and instead build routes that target
the t.TempDir() directly (use tmpDir when calling ParseRoutes so the
outputRouter uses an absolute temp path), and replace direct os.* assertions
with the package vfs helpers (e.g., vfs.ReadDir/vfs.ReadFile/vfs.Stat) used by
the project; update references around ParseRoutes, outputRouter (router: router,
defaultDir: fallbackDir), newEventPipeline, and the assertions that read files
so they use vfs and tmpDir rather than changing CWD. Also apply the same change
pattern to the other two tests mentioned (lines 536-588 and 590-627).

In `@shortcuts/event/processor.go`:
- Around line 82-98: legacyEventFromRaw builds an InboundEnvelope without
setting ReceivedAt, causing NormalizeEnvelope to record the zero time; update
legacyEventFromRaw to set InboundEnvelope.ReceivedAt to the current time (e.g.,
time.Now() or time.Now().UTC()) when constructing the envelope before calling
NormalizeEnvelope so the resulting Event metadata has a correct received_at
timestamp.

In `@shortcuts/event/subscribe_e2e_test.go`:
- Around line 173-182: The test currently calls os.Getwd and os.Chdir (variables
cwd and tmpDir) which mutates process-global state; remove the
os.Getwd/os.Chdir/defer restore calls and instead pass tmpDir (or construct file
paths using tmpDir) directly into the route spec and any file operations so the
test does not change the working directory; replace deferred global cleanup with
t.Cleanup where appropriate and apply the same change to the other occurrence
that uses os.Chdir at the later block.

In `@shortcuts/event/subscribe.go`:
- Around line 215-230: The fallback writer for outputRouter currently hardcodes
ndjsonRecordWriter and ignores the jsonFlag when routes are present; update the
fallback to respect the --json/pretty setting by either (A) implementing a
jsonRecordWriter that honors jsonFlag and replacing ndjsonRecordWriter with
jsonRecordWriter{w: out, pretty: jsonFlag} when building recordWriter, or (B)
wrapping ndjsonRecordWriter with a small adapter that accepts jsonFlag and
delegates to either ndjsonRecordWriter or the existing pretty JSON writer based
on jsonFlag; change the recordWriter construction in the block that creates
outputRouter (the recordWriter variable and outputRouter.fallback) so the
fallback uses the new writer that reads jsonFlag instead of the hard-coded
ndjsonRecordWriter.

---

Duplicate comments:
In `@shortcuts/event/pipeline.go`:
- Around line 273-306: Concurrent calls to ndjsonRecordWriter.WriteRecord and
EventPipeline.writeRecord can race when writing to shared io.Writers (e.g.,
bytes.Buffer, pipes, stdout); fix by serializing writes: add a sync.Mutex (or
sync.RWMutex) field to ndjsonRecordWriter and to EventPipeline (or a shared
writer wrapper) and lock/unlock around the actual Write() calls inside
ndjsonRecordWriter.WriteRecord and EventPipeline.writeRecord so each JSON blob +
newline is written atomically; ensure the lock covers
marshaling-to-bytes-to-Write sequence or at minimum the Write() call to avoid
interleaving.

---

Nitpick comments:
In `@shortcuts/event/processor_generic.go`:
- Around line 54-84: The genericCompactOutput function always appends a
stringified raw_payload from evt.RawPayload which can bloat outputs; modify
genericCompactOutput to make raw_payload opt-in by either checking a new boolean
flag (e.g., includeRawPayload) passed into or available to the function/context
or only adding raw_payload when Payload.Data is empty (i.e., when out was
populated from rawMap fallback). Update callers or the surrounding struct to
provide the flag and ensure the existing logic that loads rawMap into out
remains unchanged; ensure you still avoid overwriting an existing raw_payload
key.

In `@shortcuts/event/subscribe_e2e_test.go`:
- Around line 131-139: The test currently replaces the global os.Stderr via
origStderr/os.Stderr and os.Pipe() (r, w) which is fragile for concurrent tests;
instead stop mutating os.Stderr globally and capture stderr output via the
test's local buffer or test factory logger used in this package (avoid using
os.Pipe() and assigning os.Stderr), or explicitly serialize the test by ensuring
it does not run in parallel (remove any t.Parallel() or add a guard) — update
the subscribe_e2e_test.go test that creates origStderr/r,w to use a local
bytes.Buffer or the provided test logger for capturing error output, and ensure
any writer is closed only after the test's logging is complete (avoid relying on
w.Close() to catch late writes).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6a6b35d0-f07a-469c-a723-cd46e9011277

📥 Commits

Reviewing files that changed from the base of the PR and between 6b18a6e and 0784bdc.

📒 Files selected for processing (12)
  • shortcuts/event/pipeline.go
  • shortcuts/event/processor.go
  • shortcuts/event/processor_generic.go
  • shortcuts/event/processor_im_chat.go
  • shortcuts/event/processor_im_chat_member.go
  • shortcuts/event/processor_im_message.go
  • shortcuts/event/processor_im_message_reaction.go
  • shortcuts/event/processor_im_message_read.go
  • shortcuts/event/processor_test.go
  • shortcuts/event/registry.go
  • shortcuts/event/subscribe.go
  • shortcuts/event/subscribe_e2e_test.go
✅ Files skipped from review due to trivial changes (1)
  • shortcuts/event/processor_im_chat.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • shortcuts/event/processor_im_message_reaction.go

Comment on lines +479 to 533
func TestPipeline_RawModeCanWriteViaOutputRouter(t *testing.T) {
tmpDir := t.TempDir()
cwd, err := os.Getwd()
if err != nil {
t.Fatalf("Getwd() error = %v", err)
}
if err := os.Chdir(tmpDir); err != nil {
t.Fatalf("Chdir() error = %v", err)
}
defer func() {
_ = os.Chdir(cwd)
}()
router, err := ParseRoutes([]string{"^im\\.message=dir:./im"})
if err != nil {
t.Fatalf("ParseRoutes() error = %v", err)
}
fallbackDir := filepath.Join(tmpDir, "default")
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformCompact, OutputDir: dir}, &out, &errOut)
if err := p.EnsureDirs(); err != nil {
t.Fatal(err)
registry := NewHandlerRegistry()
if err := registry.RegisterEventHandler(handlerFuncWith{id: genericHandlerID, eventType: "im.message.receive_v1", fn: func(_ context.Context, evt *Event) HandlerResult {
return HandlerResult{Status: HandlerStatusHandled}
}}); err != nil {
t.Fatalf("RegisterEventHandler() error = %v", err)
}
p := newEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw}, &out, &errOut, &outputRouter{router: router, defaultDir: fallbackDir, fallback: ndjsonRecordWriter{w: &out}, seq: new(uint64), writers: map[string]*dirRecordWriter{}})

eventJSON := `{
"message": {
"message_id": "msg_file", "chat_id": "oc_001",
"chat_type": "group", "message_type": "text",
"content": "{\"text\":\"file test\"}", "create_time": "1700000000"
},
"sender": {"sender_id": {"open_id": "ou_001"}}
}`
raw := makeRawEvent("im.message.receive_v1", eventJSON)
raw.Header.EventID = "ev_file"
raw.Header.CreateTime = "1700000000"
p.Process(context.Background(), raw)
p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`))

// stdout should be empty (output goes to file)
if out.Len() != 0 {
t.Error("OutputDir mode should not write to stdout")
t.Fatalf("stdout output = %q, want empty when routed to files", out.String())
}

// Verify file was created
entries, err := os.ReadDir(dir)
entries, err := os.ReadDir(filepath.Join(tmpDir, "im"))
if err != nil {
t.Fatal(err)
t.Fatalf("ReadDir() error = %v", err)
}
if len(entries) != 1 {
t.Fatalf("expected 1 file, got %d", len(entries))
t.Fatalf("routed files = %d, want 1", len(entries))
}

// Verify file content is valid JSON
data, err := os.ReadFile(filepath.Join(dir, entries[0].Name()))
data, err := os.ReadFile(filepath.Join(tmpDir, "im", entries[0].Name()))
if err != nil {
t.Fatal(err)
t.Fatalf("ReadFile() error = %v", err)
}
var m map[string]interface{}
if err := json.Unmarshal(data, &m); err != nil {
t.Fatalf("file content is not valid JSON: %v", err)
var record map[string]interface{}
if err := json.Unmarshal(bytes.TrimSpace(data), &record); err != nil {
t.Fatalf("invalid routed JSON: %v", err)
}
if m["type"] != "im.message.receive_v1" {
t.Errorf("type = %v", m["type"])
if got, want := record["event_type"], "im.message.receive_v1"; got != want {
t.Fatalf("event_type = %v, want %v", got, want)
}
if got, want := record["raw_payload"], `{"schema":"2.0","header":{"event_id":"ev_test","event_type":"im.message.receive_v1"},"event":{"message":{"message_id":"om_123"}}}`; got != want {
t.Fatalf("raw_payload = %v, want %v", got, want)
}
if _, err := os.Stat(fallbackDir); !os.IsNotExist(err) {
t.Fatalf("fallback dir stat err = %v, want not exists", err)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stop mutating the process CWD in these routing tests.

These cases change global working-directory state just to make relative dir:./... routes work, and they go straight through os.* for the assertions. That makes the package more brittle if these tests ever run in parallel, and it bypasses the repo's filesystem abstraction. Prefer routing directly into t.TempDir() via the vfs helpers instead of os.Chdir(...).

As per coding guidelines **/*.go: Use vfs.* instead of os.* for all filesystem access.

Also applies to: 536-588, 590-627

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor_test.go` around lines 479 - 533, Test mutates
process CWD via os.Chdir to make dir:./... routes work; remove the
Chdir/Getwd/defer block in TestPipeline_RawModeCanWriteViaOutputRouter and
instead build routes that target the t.TempDir() directly (use tmpDir when
calling ParseRoutes so the outputRouter uses an absolute temp path), and replace
direct os.* assertions with the package vfs helpers (e.g.,
vfs.ReadDir/vfs.ReadFile/vfs.Stat) used by the project; update references around
ParseRoutes, outputRouter (router: router, defaultDir: fallbackDir),
newEventPipeline, and the assertions that read files so they use vfs and tmpDir
rather than changing CWD. Also apply the same change pattern to the other two
tests mentioned (lines 536-588 and 590-627).

Comment on lines +82 to +98
func legacyEventFromRaw(raw *RawEvent) (*Event, bool) {
if raw == nil {
return nil, false
}
body, err := json.Marshal(raw)
if err != nil {
return nil, false
}
evt, err := NormalizeEnvelope(InboundEnvelope{
Source: SourceWebSocket,
RawPayload: body,
})
if err != nil {
return nil, false
}
return evt, true
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing ReceivedAt in legacy envelope conversion.

The InboundEnvelope is constructed without setting ReceivedAt, which causes NormalizeEnvelope to format a zero time.Time as "0001-01-01T00:00:00Z" in the event metadata. This affects observability/debugging since the received_at field won't reflect actual receipt time.

🔧 Proposed fix
 func legacyEventFromRaw(raw *RawEvent) (*Event, bool) {
 	if raw == nil {
 		return nil, false
 	}
 	body, err := json.Marshal(raw)
 	if err != nil {
 		return nil, false
 	}
 	evt, err := NormalizeEnvelope(InboundEnvelope{
-		Source:     SourceWebSocket,
-		RawPayload: body,
+		Source:     SourceWebSocket,
+		ReceivedAt: time.Now(),
+		RawPayload: body,
 	})
 	if err != nil {
 		return nil, false
 	}
 	return evt, true
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func legacyEventFromRaw(raw *RawEvent) (*Event, bool) {
if raw == nil {
return nil, false
}
body, err := json.Marshal(raw)
if err != nil {
return nil, false
}
evt, err := NormalizeEnvelope(InboundEnvelope{
Source: SourceWebSocket,
RawPayload: body,
})
if err != nil {
return nil, false
}
return evt, true
}
func legacyEventFromRaw(raw *RawEvent) (*Event, bool) {
if raw == nil {
return nil, false
}
body, err := json.Marshal(raw)
if err != nil {
return nil, false
}
evt, err := NormalizeEnvelope(InboundEnvelope{
Source: SourceWebSocket,
ReceivedAt: time.Now(),
RawPayload: body,
})
if err != nil {
return nil, false
}
return evt, true
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/processor.go` around lines 82 - 98, legacyEventFromRaw builds
an InboundEnvelope without setting ReceivedAt, causing NormalizeEnvelope to
record the zero time; update legacyEventFromRaw to set
InboundEnvelope.ReceivedAt to the current time (e.g., time.Now() or
time.Now().UTC()) when constructing the envelope before calling
NormalizeEnvelope so the resulting Event metadata has a correct received_at
timestamp.

Comment on lines +173 to +182
cwd, err := os.Getwd()
if err != nil {
t.Fatalf("Getwd() error = %v", err)
}
if err := os.Chdir(tmpDir); err != nil {
t.Fatalf("Chdir() error = %v", err)
}
defer func() {
_ = os.Chdir(cwd)
}()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

os.Chdir breaks test isolation and parallel execution.

Changing the working directory is process-global and will interfere with parallel tests. Combined with defer restoration, if the test panics the cleanup may not run.

🔧 Suggested approach

Instead of changing the working directory, modify the route spec to use the temp directory path directly:

-	if err := os.Chdir(tmpDir); err != nil {
-		t.Fatalf("Chdir() error = %v", err)
-	}
-	defer func() {
-		_ = os.Chdir(cwd)
-	}()
 ...
-		"--route", `^im\.message=dir:./im`,
+		"--route", fmt.Sprintf(`^im\.message=dir:%s/im`, tmpDir),

Also applies to: 247-256

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/subscribe_e2e_test.go` around lines 173 - 182, The test
currently calls os.Getwd and os.Chdir (variables cwd and tmpDir) which mutates
process-global state; remove the os.Getwd/os.Chdir/defer restore calls and
instead pass tmpDir (or construct file paths using tmpDir) directly into the
route spec and any file operations so the test does not change the working
directory; replace deferred global cleanup with t.Cleanup where appropriate and
apply the same change to the other occurrence that uses os.Chdir at the later
block.

@niuchong0523 niuchong0523 force-pushed the feat/event-bus-subscription-dispatch branch from 5c27be2 to a65ffb1 Compare April 7, 2026 10:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size/XL Architecture-level or global-impact change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant