diff --git a/.gitignore b/.gitignore index ec525ba8..e035898b 100644 --- a/.gitignore +++ b/.gitignore @@ -28,5 +28,6 @@ vendor/ #test test_scripts/ tests/mail/reports/ +internal/registry/meta_data.json /log/ diff --git a/internal/registry/loader.go b/internal/registry/loader.go index a310326d..76869caa 100644 --- a/internal/registry/loader.go +++ b/internal/registry/loader.go @@ -19,9 +19,6 @@ import ( //go:embed scope_priorities.json scope_overrides.json var registryFS embed.FS -// embeddedMetaJSON is set by loader_embedded.go when meta_data.json is compiled in. -var embeddedMetaJSON []byte - var ( mergedServices = make(map[string]map[string]interface{}) // project name → parsed spec mergedProjectList []string // sorted project names @@ -43,7 +40,7 @@ func Init() { func InitWithBrand(brand core.LarkBrand) { initOnce.Do(func() { configuredBrand = brand - // 1. Load embedded meta_data.json as baseline (no-op if not compiled in) + // 1. Load embedded default registry baseline loadEmbeddedIntoMerged() // 2. Remote overlay if remoteEnabled() && cacheWritable() { @@ -69,8 +66,8 @@ func InitWithBrand(brand core.LarkBrand) { }) } -// loadEmbeddedIntoMerged parses the embedded meta_data.json and populates -// mergedServices. No-op if meta_data.json is not compiled in. +// loadEmbeddedIntoMerged parses the embedded default registry JSON and populates +// mergedServices. No-op if no embedded baseline is compiled in. func loadEmbeddedIntoMerged() { if len(embeddedMetaJSON) == 0 { return diff --git a/internal/registry/loader_embedded.go b/internal/registry/loader_embedded.go index da41e079..dc454d8c 100644 --- a/internal/registry/loader_embedded.go +++ b/internal/registry/loader_embedded.go @@ -3,18 +3,7 @@ package registry -import "embed" - -//go:embed meta_data*.json -var metaFS embed.FS +import _ "embed" //go:embed meta_data_default.json -var embeddedMetaDataDefaultJSON []byte - -func init() { - if data, err := metaFS.ReadFile("meta_data.json"); err == nil && len(data) > 0 { - embeddedMetaJSON = data - } else { - embeddedMetaJSON = embeddedMetaDataDefaultJSON - } -} +var embeddedMetaJSON []byte diff --git a/internal/registry/meta_data_default.json b/internal/registry/meta_data_default.json index a070ff22..04501024 100644 --- a/internal/registry/meta_data_default.json +++ b/internal/registry/meta_data_default.json @@ -1 +1 @@ -{"version":"0.0.0","services":[]} +{"version":"0.0.0","services":[{"name":"calendar","version":"v1","title":"calendar API","servicePath":"/open-apis/calendar/v1","resources":{"calendars":{"methods":{"get":{"id":"calendar.calendars.get","httpMethod":"GET","accessTokens":["user"],"scopes":["calendar:calendar:readonly","calendar:calendar:read"]},"create":{"id":"calendar.calendars.create","httpMethod":"POST","accessTokens":["user"],"scopes":["calendar:calendar:create"]}}},"events":{"methods":{"get":{"id":"calendar.events.get","httpMethod":"GET","accessTokens":["user"],"scopes":["calendar:calendar.event:read"]},"create":{"id":"calendar.events.create","httpMethod":"POST","accessTokens":["user"],"scopes":["calendar:calendar.event:create"]}}}}}]} diff --git a/internal/registry/remote_test.go b/internal/registry/remote_test.go index 1aa0f51a..dd5a0694 100644 --- a/internal/registry/remote_test.go +++ b/internal/registry/remote_test.go @@ -29,7 +29,7 @@ func resetInit() { testMetaURL = "" } -// hasEmbeddedData returns true if meta_data.json is compiled in. +// hasEmbeddedData returns true if the default embedded registry baseline is compiled in. func hasEmbeddedData() bool { return len(embeddedMetaJSON) > 0 } @@ -77,7 +77,7 @@ func testEnvelopeNotModifiedJSON() []byte { func TestColdStart_UsesEmbedded(t *testing.T) { if !hasEmbeddedData() { - t.Skip("no embedded from_meta data") + t.Skip("no embedded default registry baseline") } resetInit() tmp := t.TempDir() diff --git a/shortcuts/event/deduper.go b/shortcuts/event/deduper.go new file mode 100644 index 00000000..0148d7ae --- /dev/null +++ b/shortcuts/event/deduper.go @@ -0,0 +1,34 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import ( + "sync" + "time" +) + +// Deduper suppresses repeated keys seen within a TTL window. +type Deduper struct { + ttl time.Duration + seen sync.Map // key -> time.Time +} + +// NewDeduper creates a deduper with the provided TTL. +func NewDeduper(ttl time.Duration) *Deduper { + return &Deduper{ttl: ttl} +} + +// Seen reports whether key has already been seen within ttl and records now. +func (d *Deduper) Seen(key string, now time.Time) bool { + if d == nil || key == "" || d.ttl <= 0 { + return false + } + if v, loaded := d.seen.LoadOrStore(key, now); loaded { + if ts, ok := v.(time.Time); ok && now.Sub(ts) < d.ttl { + return true + } + d.seen.Store(key, now) + } + return false +} diff --git a/shortcuts/event/dispatcher.go b/shortcuts/event/dispatcher.go new file mode 100644 index 00000000..7c3e76dc --- /dev/null +++ b/shortcuts/event/dispatcher.go @@ -0,0 +1,48 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import "context" + +// Dispatcher routes normalized events to registered handlers. +type Dispatcher struct { + registry *HandlerRegistry +} + +// NewDispatcher creates a dispatcher backed by the provided registry. +func NewDispatcher(registry *HandlerRegistry) *Dispatcher { + if registry == nil { + registry = NewHandlerRegistry() + } + return &Dispatcher{registry: registry} +} + +// Dispatch runs matching event handlers first, then matching domain handlers. +// Fallback is only used when no direct handlers matched. +func (d *Dispatcher) Dispatch(ctx context.Context, evt *Event) DispatchResult { + if d == nil || d.registry == nil || evt == nil { + return DispatchResult{} + } + + matched := append([]EventHandler{}, d.registry.EventHandlers(evt.EventType)...) + matched = append(matched, d.registry.DomainHandlers(evt.Domain)...) + if len(matched) == 0 { + if fallback := d.registry.FallbackHandler(); fallback != nil { + matched = append(matched, fallback) + } + } + + result := DispatchResult{Results: make([]DispatchRecord, 0, len(matched))} + for _, handler := range matched { + handlerResult := handler.Handle(ctx, evt) + result.Results = append(result.Results, DispatchRecord{ + HandlerID: handler.ID(), + Status: handlerResult.Status, + Reason: handlerResult.Reason, + Err: handlerResult.Err, + Output: handlerResult.Output, + }) + } + return result +} diff --git a/shortcuts/event/dispatcher_test.go b/shortcuts/event/dispatcher_test.go new file mode 100644 index 00000000..8c6a6ba8 --- /dev/null +++ b/shortcuts/event/dispatcher_test.go @@ -0,0 +1,210 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import ( + "context" + "errors" + "reflect" + "testing" +) + +type testEventHandler struct { + id string + eventType string + domain string + result HandlerResult + called *[]string +} + +func (h *testEventHandler) ID() string { return h.id } + +func (h *testEventHandler) EventType() string { return h.eventType } + +func (h *testEventHandler) Domain() string { return h.domain } + +func (h *testEventHandler) Handle(_ context.Context, _ *Event) HandlerResult { + if h.called != nil { + *h.called = append(*h.called, h.id) + } + return h.result +} + +func TestDispatcher_EventHandlerThenDomainHandlerOrder(t *testing.T) { + registry := NewHandlerRegistry() + var calls []string + + eventHandler := &testEventHandler{ + id: "event-handler", + eventType: "im.message.receive_v1", + result: HandlerResult{Status: HandlerStatusHandled}, + called: &calls, + } + domainHandler := &testEventHandler{ + id: "domain-handler", + domain: "im", + result: HandlerResult{Status: HandlerStatusHandled}, + called: &calls, + } + + if err := registry.RegisterEventHandler(eventHandler); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) + } + if err := registry.RegisterDomainHandler(domainHandler); err != nil { + t.Fatalf("RegisterDomainHandler() error = %v", err) + } + + result := NewDispatcher(registry).Dispatch(context.Background(), &Event{ + EventType: "im.message.receive_v1", + Domain: "im", + }) + + if got, want := calls, []string{"event-handler", "domain-handler"}; !reflect.DeepEqual(got, want) { + t.Fatalf("call order = %v, want %v", got, want) + } + if len(result.Results) != 2 { + t.Fatalf("len(result.Results) = %d, want 2", len(result.Results)) + } + if result.Results[0].HandlerID != "event-handler" || result.Results[1].HandlerID != "domain-handler" { + t.Fatalf("dispatch results = %+v", result.Results) + } +} + +func TestNewBuiltinHandlerRegistry_RegistersRequiredIMHandlers(t *testing.T) { + registry := NewBuiltinHandlerRegistry() + + if got, want := subscribedEventTypes, []string{ + "im.message.receive_v1", + "im.message.message_read_v1", + "im.message.reaction.created_v1", + "im.message.reaction.deleted_v1", + "im.chat.member.bot.added_v1", + "im.chat.member.bot.deleted_v1", + "im.chat.member.user.added_v1", + "im.chat.member.user.withdrawn_v1", + "im.chat.member.user.deleted_v1", + "im.chat.updated_v1", + "im.chat.disbanded_v1", + }; !reflect.DeepEqual(got, want) { + t.Fatalf("subscribedEventTypes = %v, want %v", got, want) + } + + for _, eventType := range subscribedEventTypes { + handlers := registry.EventHandlers(eventType) + if len(handlers) != 1 { + t.Fatalf("EventHandlers(%q) len = %d, want 1", eventType, len(handlers)) + } + if handlers[0].EventType() != eventType { + t.Fatalf("EventHandlers(%q)[0].EventType() = %q", eventType, handlers[0].EventType()) + } + if handlers[0].Domain() != "im" { + t.Fatalf("EventHandlers(%q)[0].Domain() = %q, want im", eventType, handlers[0].Domain()) + } + } + + fallback := registry.FallbackHandler() + if fallback == nil { + t.Fatal("FallbackHandler() = nil") + } + if fallback.ID() != genericHandlerID { + t.Fatalf("fallback ID = %q, want %q", fallback.ID(), genericHandlerID) + } +} + +func TestDispatcher_UsesFallbackWhenNoHandlersMatch(t *testing.T) { + registry := NewHandlerRegistry() + var calls []string + fallback := &testEventHandler{ + id: "fallback", + result: HandlerResult{Status: HandlerStatusSkipped, Reason: "no route"}, + called: &calls, + } + registry.SetFallbackHandler(fallback) + + result := NewDispatcher(registry).Dispatch(context.Background(), &Event{ + EventType: "unknown.event", + Domain: "unknown", + }) + + if got, want := calls, []string{"fallback"}; !reflect.DeepEqual(got, want) { + t.Fatalf("calls = %v, want %v", got, want) + } + if len(result.Results) != 1 { + t.Fatalf("len(result.Results) = %d, want 1", len(result.Results)) + } + if result.Results[0].HandlerID != "fallback" { + t.Fatalf("fallback handler ID = %q, want fallback", result.Results[0].HandlerID) + } + if result.Results[0].Status != HandlerStatusSkipped { + t.Fatalf("fallback status = %q, want %q", result.Results[0].Status, HandlerStatusSkipped) + } +} + +func TestHandlerRegistry_RejectsDuplicateHandlerID(t *testing.T) { + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(&testEventHandler{ + id: "dup", + eventType: "im.message.receive_v1", + result: HandlerResult{Status: HandlerStatusHandled}, + }); err != nil { + t.Fatalf("first registration error = %v", err) + } + + err := registry.RegisterDomainHandler(&testEventHandler{ + id: "dup", + domain: "im", + result: HandlerResult{Status: HandlerStatusHandled}, + }) + if err == nil { + t.Fatal("expected duplicate handler ID error") + } +} + +func TestDispatcher_FailedHandlerDoesNotStopNextHandler(t *testing.T) { + registry := NewHandlerRegistry() + var calls []string + boom := errors.New("boom") + + failed := &testEventHandler{ + id: "failed-handler", + eventType: "im.message.receive_v1", + result: HandlerResult{ + Status: HandlerStatusFailed, + Reason: "failed", + Err: boom, + }, + called: &calls, + } + next := &testEventHandler{ + id: "next-handler", + eventType: "im.message.receive_v1", + result: HandlerResult{Status: HandlerStatusHandled}, + called: &calls, + } + + if err := registry.RegisterEventHandler(failed); err != nil { + t.Fatalf("RegisterEventHandler(failed) error = %v", err) + } + if err := registry.RegisterEventHandler(next); err != nil { + t.Fatalf("RegisterEventHandler(next) error = %v", err) + } + + result := NewDispatcher(registry).Dispatch(context.Background(), &Event{EventType: "im.message.receive_v1"}) + + if got, want := calls, []string{"failed-handler", "next-handler"}; !reflect.DeepEqual(got, want) { + t.Fatalf("calls = %v, want %v", got, want) + } + if len(result.Results) != 2 { + t.Fatalf("len(result.Results) = %d, want 2", len(result.Results)) + } + if result.Results[0].Status != HandlerStatusFailed { + t.Fatalf("first status = %q, want %q", result.Results[0].Status, HandlerStatusFailed) + } + if !errors.Is(result.Results[0].Err, boom) { + t.Fatalf("first error = %v, want boom", result.Results[0].Err) + } + if result.Results[1].Status != HandlerStatusHandled { + t.Fatalf("second status = %q, want %q", result.Results[1].Status, HandlerStatusHandled) + } +} diff --git a/shortcuts/event/domain_resolver.go b/shortcuts/event/domain_resolver.go new file mode 100644 index 00000000..63e8cdcf --- /dev/null +++ b/shortcuts/event/domain_resolver.go @@ -0,0 +1,39 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import "strings" + +const DomainUnknown = "unknown" + +var domainPrefixes = []struct { + prefix string + domain string +}{ + {prefix: "im.", domain: "im"}, + {prefix: "base.", domain: "base"}, + {prefix: "bitable.", domain: "base"}, + {prefix: "docs.", domain: "docs"}, + {prefix: "docx.", domain: "docs"}, + {prefix: "drive.", domain: "docs"}, + {prefix: "calendar.", domain: "calendar"}, + {prefix: "task.", domain: "task"}, + {prefix: "contact.", domain: "contact"}, + {prefix: "vc.", domain: "vc"}, +} + +// ResolveDomain maps a normalized event to a routing domain based on its raw event type prefix. +func ResolveDomain(evt *Event) string { + if evt == nil || evt.EventType == "" { + return DomainUnknown + } + + for _, mapping := range domainPrefixes { + if strings.HasPrefix(evt.EventType, mapping.prefix) { + return mapping.domain + } + } + + return DomainUnknown +} diff --git a/shortcuts/event/domain_resolver_test.go b/shortcuts/event/domain_resolver_test.go new file mode 100644 index 00000000..391d05fa --- /dev/null +++ b/shortcuts/event/domain_resolver_test.go @@ -0,0 +1,90 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import "testing" + +func TestMatchRawEventTypeReturnsExactEventType(t *testing.T) { + evt := &Event{EventType: "im.message.receive_v1"} + + match, ok := MatchRawEventType(evt) + if !ok { + t.Fatal("MatchRawEventType returned ok=false, want true") + } + if !match.Matched { + t.Fatal("Matched = false, want true") + } + if match.EventType != "im.message.receive_v1" { + t.Fatalf("EventType = %q, want im.message.receive_v1", match.EventType) + } +} + +func TestMatchRawEventTypeReturnsFalseForNilOrEmptyEventType(t *testing.T) { + tests := []struct { + name string + evt *Event + }{ + {name: "nil event", evt: nil}, + {name: "empty event type", evt: &Event{}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + match, ok := MatchRawEventType(tt.evt) + if ok { + t.Fatal("MatchRawEventType returned ok=true, want false") + } + if match != (RawEventMatch{}) { + t.Fatalf("match = %+v, want zero value", match) + } + }) + } +} + +func TestResolveDomainKnownMappings(t *testing.T) { + tests := []struct { + name string + eventType string + want string + }{ + {name: "im", eventType: "im.message.receive_v1", want: "im"}, + {name: "base", eventType: "base.record.created_v1", want: "base"}, + {name: "bitable aliases to base", eventType: "bitable.record.updated_v1", want: "base"}, + {name: "docs", eventType: "docs.document.created_v1", want: "docs"}, + {name: "docx aliases to docs", eventType: "docx.document.updated_v1", want: "docs"}, + {name: "drive aliases to docs", eventType: "drive.file.created_v1", want: "docs"}, + {name: "calendar", eventType: "calendar.event.created_v4", want: "calendar"}, + {name: "task", eventType: "task.task.updated_v1", want: "task"}, + {name: "contact", eventType: "contact.user.created_v3", want: "contact"}, + {name: "vc", eventType: "vc.meeting.started_v1", want: "vc"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ResolveDomain(&Event{EventType: tt.eventType}) + if got != tt.want { + t.Fatalf("ResolveDomain(%q) = %q, want %q", tt.eventType, got, tt.want) + } + }) + } +} + +func TestResolveDomainReturnsUnknownForNilOrUnknownEventType(t *testing.T) { + tests := []struct { + name string + evt *Event + }{ + {name: "nil event", evt: nil}, + {name: "empty event type", evt: &Event{}}, + {name: "unknown prefix", evt: &Event{EventType: "approval.instance.created_v1"}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ResolveDomain(tt.evt); got != DomainUnknown { + t.Fatalf("ResolveDomain() = %q, want %q", got, DomainUnknown) + } + }) + } +} diff --git a/shortcuts/event/envelope.go b/shortcuts/event/envelope.go new file mode 100644 index 00000000..2381cc64 --- /dev/null +++ b/shortcuts/event/envelope.go @@ -0,0 +1,81 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import "time" + +// Source identifies the adapter that delivered an inbound event. +type Source string + +const ( + SourceWebSocket Source = "websocket" + SourceWebhook Source = "webhook" +) + +// BuildWebSocketEnvelope adapts a raw WebSocket body into the inbound pipeline shape. +func BuildWebSocketEnvelope(body []byte) InboundEnvelope { + return InboundEnvelope{ + Source: SourceWebSocket, + ReceivedAt: time.Now(), + RawPayload: append([]byte(nil), body...), + } +} + +// InboundEnvelope is the raw input shape for the routing layer. +type InboundEnvelope struct { + Source Source `json:"source"` + ReceivedAt time.Time `json:"received_at"` + Headers map[string]string `json:"headers,omitempty"` + RawPayload []byte `json:"raw_payload"` +} + +// EventHeader is the normalized event header shared across source adapters. +type EventHeader struct { + EventID string `json:"event_id,omitempty"` + EventType string `json:"event_type,omitempty"` + CreateTime string `json:"create_time,omitempty"` + TenantKey string `json:"tenant_key,omitempty"` + AppID string `json:"app_id,omitempty"` +} + +// NormalizedPayload stores events in header + data form. +type NormalizedPayload struct { + Header EventHeader `json:"header"` + Data map[string]interface{} `json:"data"` +} + +// Event is the normalized routing-layer event model. +type Event struct { + Source Source `json:"source"` + EventID string `json:"event_id,omitempty"` + EventType string `json:"event_type"` + Domain string `json:"domain,omitempty"` + Payload NormalizedPayload `json:"payload"` + RawPayload []byte `json:"raw_payload"` + Metadata map[string]interface{} `json:"metadata,omitempty"` + IdempotencyKey string `json:"idempotency_key"` +} + +// MalformedEventError reports why an inbound payload could not be normalized. +type MalformedEventError struct { + Reason string + Err error +} + +func (e *MalformedEventError) Error() string { + if e == nil { + return "" + } + if e.Err == nil { + return e.Reason + } + return e.Reason + ": " + e.Err.Error() +} + +func (e *MalformedEventError) Unwrap() error { + if e == nil { + return nil + } + return e.Err +} diff --git a/shortcuts/event/handlers.go b/shortcuts/event/handlers.go new file mode 100644 index 00000000..46586b4f --- /dev/null +++ b/shortcuts/event/handlers.go @@ -0,0 +1,45 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import "context" + +// HandlerStatus reports how a handler processed an event. +type HandlerStatus string + +const ( + HandlerStatusHandled HandlerStatus = "handled" + HandlerStatusSkipped HandlerStatus = "skipped" + HandlerStatusFailed HandlerStatus = "failed" +) + +// HandlerResult captures the outcome from a handler invocation. +type HandlerResult struct { + Status HandlerStatus + Reason string + Err error + Output interface{} +} + +// DispatchRecord captures the recorded outcome for a single handler. +type DispatchRecord struct { + HandlerID string + Status HandlerStatus + Reason string + Err error + Output interface{} +} + +// DispatchResult stores the collected outcomes for all dispatched handlers. +type DispatchResult struct { + Results []DispatchRecord +} + +// EventHandler routes normalized events by event type or domain. +type EventHandler interface { + ID() string + EventType() string + Domain() string + Handle(ctx context.Context, evt *Event) HandlerResult +} diff --git a/shortcuts/event/matcher.go b/shortcuts/event/matcher.go new file mode 100644 index 00000000..4740520b --- /dev/null +++ b/shortcuts/event/matcher.go @@ -0,0 +1,22 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +// RawEventMatch captures the exact raw event type extracted from a normalized event. +type RawEventMatch struct { + EventType string + Matched bool +} + +// MatchRawEventType returns the exact event type when present on the normalized event. +func MatchRawEventType(evt *Event) (RawEventMatch, bool) { + if evt == nil || evt.EventType == "" { + return RawEventMatch{}, false + } + + return RawEventMatch{ + EventType: evt.EventType, + Matched: true, + }, true +} diff --git a/shortcuts/event/normalizer.go b/shortcuts/event/normalizer.go new file mode 100644 index 00000000..e34a2264 --- /dev/null +++ b/shortcuts/event/normalizer.go @@ -0,0 +1,63 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" +) + +type rawEnvelope struct { + Schema string `json:"schema"` + Header EventHeader `json:"header"` + Event map[string]interface{} `json:"event"` +} + +// NormalizeEnvelope converts a source-specific inbound envelope into the routing-layer event model. +func NormalizeEnvelope(env InboundEnvelope) (*Event, error) { + var raw rawEnvelope + if err := json.Unmarshal(env.RawPayload, &raw); err != nil { + return nil, &MalformedEventError{Reason: "invalid_json", Err: err} + } + if raw.Schema != "2.0" { + return nil, &MalformedEventError{Reason: "unsupported_schema"} + } + if raw.Header.EventType == "" { + return nil, &MalformedEventError{Reason: "missing_event_type"} + } + if raw.Event == nil { + raw.Event = map[string]interface{}{} + } + + payload := NormalizedPayload{ + Header: raw.Header, + Data: raw.Event, + } + + rawPayload := append([]byte(nil), env.RawPayload...) + event := &Event{ + Source: env.Source, + EventID: raw.Header.EventID, + EventType: raw.Header.EventType, + Domain: ResolveDomain(&Event{EventType: raw.Header.EventType}), + Payload: payload, + RawPayload: rawPayload, + Metadata: map[string]interface{}{ + "received_at": env.ReceivedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), + }, + } + event.IdempotencyKey = buildIdempotencyKey(env.Source, event.EventID, rawPayload) + + return event, nil +} + +func buildIdempotencyKey(source Source, eventID string, rawPayload []byte) string { + if eventID != "" { + return string(source) + ":" + eventID + } + + sum := sha256.Sum256(rawPayload) + return string(source) + ":sha256:" + hex.EncodeToString(sum[:]) +} diff --git a/shortcuts/event/normalizer_test.go b/shortcuts/event/normalizer_test.go new file mode 100644 index 00000000..ed9eada6 --- /dev/null +++ b/shortcuts/event/normalizer_test.go @@ -0,0 +1,271 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package event + +import ( + "bytes" + "encoding/json" + "errors" + "testing" + "time" +) + +func TestNormalizeEnvelopePreservesRawPayloadAndExtractsFields(t *testing.T) { + receivedAt := time.Unix(1712345678, 0).UTC() + rawPayload := []byte(`{"schema":"2.0","header":{"event_id":"evt_123","event_type":"im.message.receive_v1","create_time":"1712345600","tenant_key":"tenant_1","app_id":"cli_app"},"event":{"message":{"message_id":"om_123"},"sender":{"sender_id":{"open_id":"ou_1"}}}}`) + env := InboundEnvelope{ + Source: SourceWebhook, + ReceivedAt: receivedAt, + Headers: map[string]string{ + "X-Lark-Request-Timestamp": "1712345678", + }, + RawPayload: rawPayload, + } + + event, err := NormalizeEnvelope(env) + if err != nil { + t.Fatalf("NormalizeEnvelope returned error: %v", err) + } + + if event.Source != SourceWebhook { + t.Fatalf("Source = %q, want %q", event.Source, SourceWebhook) + } + if event.EventID != "evt_123" { + t.Fatalf("EventID = %q, want evt_123", event.EventID) + } + if event.EventType != "im.message.receive_v1" { + t.Fatalf("EventType = %q, want im.message.receive_v1", event.EventType) + } + if event.Domain != "im" { + t.Fatalf("Domain = %q, want im", event.Domain) + } + if event.IdempotencyKey != "webhook:evt_123" { + t.Fatalf("IdempotencyKey = %q, want webhook:evt_123", event.IdempotencyKey) + } + if !bytes.Equal(event.RawPayload, rawPayload) { + t.Fatal("RawPayload bytes differ from input") + } + if &event.RawPayload[0] == &rawPayload[0] { + t.Fatal("RawPayload should be copied, but shares backing array with input") + } + + if got := event.Metadata["received_at"]; got != receivedAt.Format(time.RFC3339Nano) { + t.Fatalf("Metadata[received_at] = %v, want %q", got, receivedAt.Format(time.RFC3339Nano)) + } + + if event.Payload.Header.EventID != "evt_123" || event.Payload.Header.EventType != "im.message.receive_v1" { + t.Fatalf("Payload.Header = %+v", event.Payload.Header) + } + if event.Payload.Header.CreateTime != "1712345600" { + t.Fatalf("Payload.Header.CreateTime = %q, want 1712345600", event.Payload.Header.CreateTime) + } + if event.Payload.Header.TenantKey != "tenant_1" { + t.Fatalf("Payload.Header.TenantKey = %q, want tenant_1", event.Payload.Header.TenantKey) + } + if event.Payload.Header.AppID != "cli_app" { + t.Fatalf("Payload.Header.AppID = %q, want cli_app", event.Payload.Header.AppID) + } + + message, ok := event.Payload.Data["message"].(map[string]interface{}) + if !ok { + t.Fatalf("Payload.Data[message] has unexpected type %T", event.Payload.Data["message"]) + } + if message["message_id"] != "om_123" { + t.Fatalf("Payload.Data[message][message_id] = %v, want om_123", message["message_id"]) + } + + sender, ok := event.Payload.Data["sender"].(map[string]interface{}) + if !ok { + t.Fatalf("Payload.Data[sender] has unexpected type %T", event.Payload.Data["sender"]) + } + senderID, ok := sender["sender_id"].(map[string]interface{}) + if !ok { + t.Fatalf("Payload.Data[sender][sender_id] has unexpected type %T", sender["sender_id"]) + } + if senderID["open_id"] != "ou_1" { + t.Fatalf("Payload.Data[sender][sender_id][open_id] = %v, want ou_1", senderID["open_id"]) + } +} + +func TestNormalizeEnvelopeMissingEventTypeReturnsMalformedError(t *testing.T) { + env := InboundEnvelope{ + Source: SourceWebSocket, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: []byte(`{"schema":"2.0","header":{"event_id":"evt_123"},"event":{"message":{}}}`), + } + + _, err := NormalizeEnvelope(env) + if err == nil { + t.Fatal("expected error, got nil") + } + + var malformed *MalformedEventError + if !errors.As(err, &malformed) { + t.Fatalf("expected MalformedEventError, got %T", err) + } + if malformed.Reason != "missing_event_type" { + t.Fatalf("Reason = %q, want missing_event_type", malformed.Reason) + } +} + +func TestNormalizeEnvelopeMissingOrUnsupportedSchemaReturnsMalformedError(t *testing.T) { + tests := []struct { + name string + payload string + want string + }{ + { + name: "missing schema", + payload: `{"header":{"event_type":"im.message.receive_v1"},"event":{}}`, + want: "unsupported_schema", + }, + { + name: "unsupported schema", + payload: `{"schema":"1.0","header":{"event_type":"im.message.receive_v1"},"event":{}}`, + want: "unsupported_schema", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NormalizeEnvelope(InboundEnvelope{ + Source: SourceWebhook, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: []byte(tt.payload), + }) + if err == nil { + t.Fatal("expected error, got nil") + } + + var malformed *MalformedEventError + if !errors.As(err, &malformed) { + t.Fatalf("expected MalformedEventError, got %T", err) + } + if malformed.Reason != tt.want { + t.Fatalf("Reason = %q, want %q", malformed.Reason, tt.want) + } + }) + } +} + +func TestNormalizeEnvelopeUsesRawPayloadFingerprintWhenEventIDMissing(t *testing.T) { + firstPayload := []byte(`{"schema":"2.0","header":{"event_type":"contact.user.created_v3","tenant_key":"tenant_1"},"event":{"user":{"user_id":"ou_123"}}}`) + secondPayload := []byte(`{"schema":"2.0","header":{"tenant_key":"tenant_1","event_type":"contact.user.created_v3"},"event":{"user":{"user_id":"ou_123"}}}`) + + first, err := NormalizeEnvelope(InboundEnvelope{ + Source: SourceWebSocket, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: firstPayload, + }) + if err != nil { + t.Fatalf("first NormalizeEnvelope returned error: %v", err) + } + second, err := NormalizeEnvelope(InboundEnvelope{ + Source: SourceWebSocket, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: secondPayload, + }) + if err != nil { + t.Fatalf("second NormalizeEnvelope returned error: %v", err) + } + + if first.EventID != "" || second.EventID != "" { + t.Fatalf("EventID values = %q, %q; want both empty", first.EventID, second.EventID) + } + if first.IdempotencyKey == "" || second.IdempotencyKey == "" { + t.Fatal("IdempotencyKey should not be empty") + } + if first.IdempotencyKey == string(SourceWebSocket)+":" || second.IdempotencyKey == string(SourceWebSocket)+":" { + t.Fatal("IdempotencyKey should not fall back to empty event_id format") + } + if first.IdempotencyKey == second.IdempotencyKey { + t.Fatalf("IdempotencyKey should differ for distinct raw payloads, got %q", first.IdempotencyKey) + } + + replay, err := NormalizeEnvelope(InboundEnvelope{ + Source: SourceWebSocket, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: firstPayload, + }) + if err != nil { + t.Fatalf("replay NormalizeEnvelope returned error: %v", err) + } + if first.IdempotencyKey != replay.IdempotencyKey { + t.Fatalf("IdempotencyKey should be deterministic, got %q and %q", first.IdempotencyKey, replay.IdempotencyKey) + } +} + +func TestNormalizeEnvelopeInvalidJSONReturnsMalformedError(t *testing.T) { + _, err := NormalizeEnvelope(InboundEnvelope{ + Source: SourceWebhook, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: []byte(`{"schema":`), + }) + if err == nil { + t.Fatal("expected error, got nil") + } + + var malformed *MalformedEventError + if !errors.As(err, &malformed) { + t.Fatalf("expected MalformedEventError, got %T", err) + } + if malformed.Reason != "invalid_json" { + t.Fatalf("Reason = %q, want invalid_json", malformed.Reason) + } + if malformed.Err == nil { + t.Fatal("Err should be populated") + } +} + +func TestNormalizeEnvelopeNilEventNormalizesToEmptyDataObject(t *testing.T) { + event, err := NormalizeEnvelope(InboundEnvelope{ + Source: SourceWebhook, + ReceivedAt: time.Unix(1712345678, 0).UTC(), + RawPayload: []byte(`{"schema":"2.0","header":{"event_id":"evt_nil","event_type":"im.message.receive_v1"},"event":null}`), + }) + if err != nil { + t.Fatalf("NormalizeEnvelope returned error: %v", err) + } + + if event.Payload.Data == nil { + t.Fatal("Payload.Data = nil, want empty map") + } + if len(event.Payload.Data) != 0 { + t.Fatalf("len(Payload.Data) = %d, want 0", len(event.Payload.Data)) + } + + body, err := json.Marshal(event.Payload) + if err != nil { + t.Fatalf("Marshal failed: %v", err) + } + if string(body) != `{"header":{"event_id":"evt_nil","event_type":"im.message.receive_v1"},"data":{}}` { + t.Fatalf("payload JSON = %s, want data to serialize as empty object", body) + } +} + +func TestNormalizedPayloadJSONShape(t *testing.T) { + payload := NormalizedPayload{ + Header: EventHeader{ + EventID: "evt_1", + EventType: "im.message.receive_v1", + CreateTime: "1712345600", + TenantKey: "tenant_1", + AppID: "cli_app", + }, + Data: map[string]interface{}{ + "message": map[string]interface{}{"message_id": "om_1"}, + "sender": map[string]interface{}{"sender_id": map[string]interface{}{"open_id": "ou_1"}}, + }, + } + + body, err := json.Marshal(payload) + if err != nil { + t.Fatalf("Marshal failed: %v", err) + } + + const want = `{"header":{"event_id":"evt_1","event_type":"im.message.receive_v1","create_time":"1712345600","tenant_key":"tenant_1","app_id":"cli_app"},"data":{"message":{"message_id":"om_1"},"sender":{"sender_id":{"open_id":"ou_1"}}}}` + if string(body) != want { + t.Fatalf("payload JSON = %s, want %s", body, want) + } +} diff --git a/shortcuts/event/pipeline.go b/shortcuts/event/pipeline.go index 28d552ff..9a249131 100644 --- a/shortcuts/event/pipeline.go +++ b/shortcuts/event/pipeline.go @@ -8,76 +8,63 @@ import ( "encoding/json" "fmt" "io" - "os" - "path/filepath" - "regexp" - "sync" - "sync/atomic" "time" "github.com/larksuite/cli/internal/output" - "github.com/larksuite/cli/internal/validate" - larkevent "github.com/larksuite/oapi-sdk-go/v3/event" ) const dedupTTL = 5 * time.Minute // PipelineConfig configures the event processing pipeline. type PipelineConfig struct { - Mode TransformMode // determined by --compact flag - JsonFlag bool // --json: pretty JSON instead of NDJSON - OutputDir string // --output-dir: write events to files - Quiet bool // --quiet: suppress stderr status messages - Router *EventRouter // --route: regex-based output routing + Mode TransformMode // determined by --compact flag + Quiet bool // --quiet: suppress stderr status messages + PrettyJSON bool } -// EventPipeline chains filter → dedup → transform → emit. +// EventPipeline chains normalize -> match -> resolve -> filter -> dedupe -> dispatch. type EventPipeline struct { - registry *ProcessorRegistry - filters *FilterChain - config PipelineConfig - eventCount atomic.Int64 - seen sync.Map // key → time.Time (first-seen timestamp) - out io.Writer - errOut io.Writer + registry *HandlerRegistry + filters *FilterChain + config PipelineConfig + deduper *Deduper + dispatcher *Dispatcher + dispatchedN int64 + out io.Writer + errOut io.Writer + recordWriter OutputRecordWriter } // NewEventPipeline builds an event processing pipeline. func NewEventPipeline( - registry *ProcessorRegistry, + registry *HandlerRegistry, filters *FilterChain, config PipelineConfig, out, errOut io.Writer, ) *EventPipeline { - return &EventPipeline{ - registry: registry, - filters: filters, - config: config, - out: out, - errOut: errOut, - } + return newEventPipeline(registry, filters, config, out, errOut, nil) } -// EnsureDirs creates all configured output directories once at startup. -func (p *EventPipeline) EnsureDirs() error { - if p.config.OutputDir != "" { - if err := os.MkdirAll(p.config.OutputDir, 0700); err != nil { - return fmt.Errorf("create output dir: %w", err) - } +func newEventPipeline( + registry *HandlerRegistry, + filters *FilterChain, + config PipelineConfig, + out, errOut io.Writer, + recordWriter OutputRecordWriter, +) *EventPipeline { + if registry == nil { + registry = NewHandlerRegistry() } - if p.config.Router != nil { - for _, route := range p.config.Router.routes { - if err := os.MkdirAll(route.dir, 0700); err != nil { - return fmt.Errorf("create route dir %s: %w", route.dir, err) - } - } + return &EventPipeline{ + registry: registry, + filters: filters, + config: config, + deduper: NewDeduper(dedupTTL), + dispatcher: NewDispatcher(registry), + out: out, + errOut: errOut, + recordWriter: recordWriter, } - return nil -} - -// EventCount returns the number of processed events. -func (p *EventPipeline) EventCount() int64 { - return p.eventCount.Load() } func (p *EventPipeline) infof(format string, args ...interface{}) { @@ -86,113 +73,211 @@ func (p *EventPipeline) infof(format string, args ...interface{}) { } } -// isDuplicate returns true if key was seen within dedupTTL. -func (p *EventPipeline) isDuplicate(key string) bool { - now := time.Now() - if v, loaded := p.seen.LoadOrStore(key, now); loaded { - if ts, ok := v.(time.Time); ok && now.Sub(ts) < dedupTTL { - return true - } - p.seen.Store(key, now) +// EventCount returns the number of dispatch records written by the pipeline. +func (p *EventPipeline) EventCount() int64 { + if p == nil { + return 0 } - return false -} - -func (p *EventPipeline) cleanupSeen(now time.Time) { - p.seen.Range(func(k, v any) bool { - if ts, ok := v.(time.Time); ok && now.Sub(ts) >= dedupTTL { - p.seen.Delete(k) - } - return true - }) + return p.dispatchedN } -// Process is the pipeline entry point, called by the WebSocket callback. -func (p *EventPipeline) Process(ctx context.Context, raw *RawEvent) { - eventType := raw.Header.EventType +// Process is the pipeline entry point. +func (p *EventPipeline) Process(ctx context.Context, env InboundEnvelope) { + evt, err := NormalizeEnvelope(env) + if err != nil { + evt = malformedFallbackEvent(env, err) + } - // 1. Filter - if !p.filters.Allow(eventType) { + match, ok := MatchRawEventType(evt) + if !ok { + p.dispatch(ctx, evt) return } - // 2. Lookup processor - processor := p.registry.Lookup(eventType) + evt.EventType = match.EventType + if evt.Domain == "" { + evt.Domain = ResolveDomain(evt) + } - // 3. Dedup - if key := processor.DeduplicateKey(raw); key != "" && p.isDuplicate(key) { - p.infof("%s[dedup]%s %s (key=%s)", output.Dim, output.Reset, eventType, key) + if !p.filters.Allow(evt.EventType) { return } - n := p.eventCount.Add(1) - if n%100 == 0 { - p.cleanupSeen(time.Now()) + if p.deduper.Seen(evt.IdempotencyKey, env.ReceivedAt) { + p.infof("%s[dedup]%s %s (key=%s)", output.Dim, output.Reset, evt.EventType, evt.IdempotencyKey) + return } - // 4. Transform — processor returns the final serializable value - data := processor.Transform(ctx, raw, p.config.Mode) + p.dispatch(ctx, evt) +} - // 5. Output routing (framework-controlled) - // 5a. Route-based output — matched events go to route dirs - if p.config.Router != nil { - if dirs := p.config.Router.Match(eventType); len(dirs) > 0 { - for _, dir := range dirs { - p.writeAndLog(dir, n, eventType, data, raw.Header) +func (p *EventPipeline) dispatch(ctx context.Context, evt *Event) { + result := p.dispatcher.Dispatch(ctx, evt) + for _, record := range result.Results { + p.dispatchedN++ + var entry map[string]interface{} + if p.config.Mode == TransformRaw && p.recordWriter != nil { + entry = rawModeRecord(evt, record) + if len(evt.Metadata) > 0 { + entry["metadata"] = evt.Metadata + } + if reason := summarizeDispatchReason(result); reason != "" { + entry["reason"] = reason } + if err := p.recordWriter.WriteRecord(evt.EventType, entry); err != nil { + output.PrintError(p.errOut, fmt.Sprintf("write failed: %v", err)) + return + } + continue + } + if p.config.Mode == TransformRaw { + entry = rawModeRecord(evt, record) + } else { + entry = compactModeRecord(evt, record) + } + if err := p.writeRecord(entry); err != nil { + output.PrintError(p.errOut, fmt.Sprintf("write failed: %v", err)) return } } +} - // 5b. --output-dir - if p.config.OutputDir != "" { - p.writeAndLog(p.config.OutputDir, n, eventType, data, raw.Header) - return +func compactModeRecord(evt *Event, record DispatchRecord) map[string]interface{} { + entry := map[string]interface{}{ + "event_type": evt.EventType, + "handler_id": record.HandlerID, + "status": record.Status, + } + mergeHandlerOutput(entry, record.Output) + if evt.Domain != "" { + entry["domain"] = evt.Domain + } + if evt.EventID != "" { + entry["event_id"] = evt.EventID + } + if evt.IdempotencyKey != "" { + entry["idempotency_key"] = evt.IdempotencyKey } + if record.Reason != "" { + entry["reason"] = record.Reason + } + if record.Err != nil { + entry["error"] = record.Err.Error() + } + return entry +} - // 5c. Stdout - if p.config.JsonFlag { - output.PrintJson(p.out, data) - } else { - output.PrintNdjson(p.out, data) +func rawModeRecord(evt *Event, record DispatchRecord) map[string]interface{} { + entry := map[string]interface{}{ + "event_type": evt.EventType, + "status": record.Status, + "payload": evt.Payload.Data, + "raw_payload": string(evt.RawPayload), + "source": evt.Source, + "idempotency_key": evt.IdempotencyKey, + } + if evt.Domain != "" { + entry["domain"] = evt.Domain + } + if evt.EventID != "" { + entry["event_id"] = evt.EventID + } + if record.Reason != "" { + entry["reason"] = record.Reason + } + if record.Err != nil { + entry["error"] = record.Err.Error() } - p.infof("%s[%d]%s %s", output.Dim, n, output.Reset, eventType) + return entry } -// writeAndLog writes an event to a directory and logs the result. -func (p *EventPipeline) writeAndLog(dir string, n int64, eventType string, data interface{}, header larkevent.EventHeader) { - fp, err := writeEventFile(dir, data, header) +func malformedFallbackEvent(env InboundEnvelope, err error) *Event { + reason := "malformed" + if malformed, ok := err.(*MalformedEventError); ok && malformed.Reason != "" { + reason = malformed.Reason + } + + metadata := map[string]interface{}{ + "received_at": env.ReceivedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), + "malformed_reason": reason, + } if err != nil { - output.PrintError(p.errOut, fmt.Sprintf("write failed (%s): %v", dir, err)) - } else { - p.infof("%s[%d]%s %s → %s", output.Dim, n, output.Reset, eventType, fp) + metadata["normalization_error"] = err.Error() + } + + payload := NormalizedPayload{Data: map[string]interface{}{"raw_payload": string(env.RawPayload)}} + return &Event{ + Source: env.Source, + EventType: "malformed", + Domain: DomainUnknown, + Payload: payload, + RawPayload: append([]byte(nil), env.RawPayload...), + Metadata: metadata, + IdempotencyKey: buildIdempotencyKey(env.Source, "", env.RawPayload), } } -var filenameSanitizer = regexp.MustCompile(`[^a-zA-Z0-9._-]`) +func mergeHandlerOutput(entry map[string]interface{}, outputValue interface{}) { + if entry == nil || outputValue == nil { + return + } + if compact, ok := outputValue.(map[string]interface{}); ok { + for k, v := range compact { + entry[k] = v + } + return + } + entry["output"] = outputValue +} -func writeEventFile(dir string, data interface{}, header larkevent.EventHeader) (string, error) { - eventID := header.EventID - if eventID == "" { - eventID = "unknown" +func summarizeDispatchReason(result DispatchResult) string { + if len(result.Results) == 0 { + return "" } - ts := header.CreateTime - if ts == "" { - ts = fmt.Sprintf("%d", os.Getpid()) + reason := result.Results[0].Reason + if reason == "" { + return "" } + for _, record := range result.Results[1:] { + if record.Reason != reason { + return "" + } + } + return reason +} - safeName := filenameSanitizer.ReplaceAllString(header.EventType, "_") - filename := fmt.Sprintf("%s_%s_%s.json", safeName, eventID, ts) - outPath := filepath.Join(dir, filename) +type ndjsonRecordWriter struct { + w io.Writer +} - jsonData, err := json.MarshalIndent(data, "", " ") +func (w ndjsonRecordWriter) WriteRecord(_ string, value map[string]interface{}) error { + data, err := json.Marshal(value) if err != nil { - return "", err + return err } + _, err = w.w.Write(append(data, '\n')) + return err +} - if err := validate.AtomicWrite(outPath, append(jsonData, '\n'), 0600); err != nil { - return "", err +func (p *EventPipeline) writeRecord(value interface{}) error { + var ( + data []byte + err error + ) + if p.config.PrettyJSON { + data, err = json.MarshalIndent(value, "", " ") + if err == nil { + data = append(data, '\n') + } + } else { + data, err = json.Marshal(value) + if err == nil { + data = append(data, '\n') + } } - - return outPath, nil + if err != nil { + return err + } + _, err = p.out.Write(data) + return err } diff --git a/shortcuts/event/processor_generic.go b/shortcuts/event/processor_generic.go index 793a79e0..5d7bca8c 100644 --- a/shortcuts/event/processor_generic.go +++ b/shortcuts/event/processor_generic.go @@ -8,16 +8,36 @@ import ( "encoding/json" ) +const genericHandlerID = "builtin.generic.fallback" + // GenericProcessor is the fallback for unregistered event types. // Compact mode parses the event payload as a map; Raw mode passes through raw.Event. type GenericProcessor struct{} +func NewGenericFallbackHandler() *GenericProcessor { return &GenericProcessor{} } + +func (p *GenericProcessor) ID() string { return genericHandlerID } func (p *GenericProcessor) EventType() string { return "" } +func (p *GenericProcessor) Domain() string { return "" } + +func (p *GenericProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{ + Status: HandlerStatusHandled, + Output: genericCompactOutput(evt), + } +} func (p *GenericProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } + return genericCompactMap(raw) +} + +func (p *GenericProcessor) DeduplicateKey(raw *RawEvent) string { return raw.Header.EventID } +func (p *GenericProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } + +func genericCompactMap(raw *RawEvent) interface{} { // Compact: parse event as flat map, inject envelope metadata so AI // can always identify the event type regardless of which processor ran. var eventMap map[string]interface{} @@ -34,5 +54,34 @@ func (p *GenericProcessor) Transform(_ context.Context, raw *RawEvent, mode Tran return eventMap } -func (p *GenericProcessor) DeduplicateKey(raw *RawEvent) string { return raw.Header.EventID } -func (p *GenericProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } +func genericCompactOutput(evt *Event) interface{} { + if evt == nil { + return map[string]interface{}{"type": ""} + } + + out := map[string]interface{}{} + for k, v := range evt.Payload.Data { + out[k] = v + } + if len(out) == 0 && len(evt.RawPayload) > 0 { + var rawMap map[string]interface{} + if err := json.Unmarshal(evt.RawPayload, &rawMap); err == nil { + for k, v := range rawMap { + out[k] = v + } + } + } + out["type"] = evt.EventType + if evt.EventID != "" { + out["event_id"] = evt.EventID + } + if evt.Payload.Header.CreateTime != "" { + out["timestamp"] = evt.Payload.Header.CreateTime + } + if len(evt.RawPayload) > 0 { + if _, ok := out["raw_payload"]; !ok { + out["raw_payload"] = string(evt.RawPayload) + } + } + return out +} diff --git a/shortcuts/event/processor_im_chat.go b/shortcuts/event/processor_im_chat.go index 585f3f0b..b1a9b958 100644 --- a/shortcuts/event/processor_im_chat.go +++ b/shortcuts/event/processor_im_chat.go @@ -21,37 +21,25 @@ import ( // - after_change: chat properties after the update type ImChatUpdatedProcessor struct{} +func NewIMChatUpdatedHandler() *ImChatUpdatedProcessor { return &ImChatUpdatedProcessor{} } + +func (p *ImChatUpdatedProcessor) ID() string { return "builtin.im.chat.updated" } func (p *ImChatUpdatedProcessor) EventType() string { return "im.chat.updated_v1" } +func (p *ImChatUpdatedProcessor) Domain() string { return "im" } + +func (p *ImChatUpdatedProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled, Output: imChatUpdatedCompactOutput(evt)} +} func (p *ImChatUpdatedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - var ev struct { - ChatID string `json:"chat_id"` - OperatorID interface{} `json:"operator_id"` - External bool `json:"external"` - AfterChange interface{} `json:"after_change"` - BeforeChange interface{} `json:"before_change"` - } + var ev imChatPayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } - out := compactBase(raw) - if ev.ChatID != "" { - out["chat_id"] = ev.ChatID - } - if id := openID(ev.OperatorID); id != "" { - out["operator_id"] = id - } - out["external"] = ev.External - if ev.AfterChange != nil { - out["after_change"] = ev.AfterChange - } - if ev.BeforeChange != nil { - out["before_change"] = ev.BeforeChange - } - return out + return buildIMChatCompactOutput(raw.Header.EventType, raw.Header.EventID, raw.Header.CreateTime, ev, true) } func (p *ImChatUpdatedProcessor) DeduplicateKey(raw *RawEvent) string { @@ -70,21 +58,66 @@ func (p *ImChatUpdatedProcessor) WindowStrategy() WindowConfig { return WindowCo // - external: whether this is an external (cross-tenant) chat type ImChatDisbandedProcessor struct{} +func NewIMChatDisbandedHandler() *ImChatDisbandedProcessor { return &ImChatDisbandedProcessor{} } + +func (p *ImChatDisbandedProcessor) ID() string { return "builtin.im.chat.disbanded" } func (p *ImChatDisbandedProcessor) EventType() string { return "im.chat.disbanded_v1" } +func (p *ImChatDisbandedProcessor) Domain() string { return "im" } + +func (p *ImChatDisbandedProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled, Output: imChatDisbandedCompactOutput(evt)} +} func (p *ImChatDisbandedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - var ev struct { - ChatID string `json:"chat_id"` - OperatorID interface{} `json:"operator_id"` - External bool `json:"external"` - } + var ev imChatPayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } - out := compactBase(raw) + return buildIMChatCompactOutput(raw.Header.EventType, raw.Header.EventID, raw.Header.CreateTime, ev, false) +} + +func (p *ImChatDisbandedProcessor) DeduplicateKey(raw *RawEvent) string { + return raw.Header.EventID +} +func (p *ImChatDisbandedProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } + +type imChatPayload struct { + ChatID string `json:"chat_id"` + OperatorID interface{} `json:"operator_id"` + External bool `json:"external"` + AfterChange interface{} `json:"after_change"` + BeforeChange interface{} `json:"before_change"` +} + +func imChatUpdatedCompactOutput(evt *Event) map[string]interface{} { + return imChatCompactOutput(evt, true) +} + +func imChatDisbandedCompactOutput(evt *Event) map[string]interface{} { + return imChatCompactOutput(evt, false) +} + +func imChatCompactOutput(evt *Event, includeChanges bool) map[string]interface{} { + if evt == nil { + return map[string]interface{}{"type": ""} + } + data, _ := json.Marshal(evt.Payload.Data) + var payload imChatPayload + _ = json.Unmarshal(data, &payload) + return buildIMChatCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, payload, includeChanges) +} + +func buildIMChatCompactOutput(eventType, eventID, createTime string, ev imChatPayload, includeChanges bool) map[string]interface{} { + out := map[string]interface{}{"type": eventType} + if eventID != "" { + out["event_id"] = eventID + } + if createTime != "" { + out["timestamp"] = createTime + } if ev.ChatID != "" { out["chat_id"] = ev.ChatID } @@ -92,10 +125,13 @@ func (p *ImChatDisbandedProcessor) Transform(_ context.Context, raw *RawEvent, m out["operator_id"] = id } out["external"] = ev.External + if includeChanges { + if ev.AfterChange != nil { + out["after_change"] = ev.AfterChange + } + if ev.BeforeChange != nil { + out["before_change"] = ev.BeforeChange + } + } return out } - -func (p *ImChatDisbandedProcessor) DeduplicateKey(raw *RawEvent) string { - return raw.Header.EventID -} -func (p *ImChatDisbandedProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } diff --git a/shortcuts/event/processor_im_chat_member.go b/shortcuts/event/processor_im_chat_member.go index e0c209a6..49e85e2f 100644 --- a/shortcuts/event/processor_im_chat_member.go +++ b/shortcuts/event/processor_im_chat_member.go @@ -29,39 +29,37 @@ func NewImChatBotAddedProcessor() *ImChatBotProcessor { return &ImChatBotProcessor{eventType: "im.chat.member.bot.added_v1"} } +func NewIMChatMemberBotAddedHandler() *ImChatBotProcessor { return NewImChatBotAddedProcessor() } + // NewImChatBotDeletedProcessor creates a processor for im.chat.member.bot.deleted_v1. func NewImChatBotDeletedProcessor() *ImChatBotProcessor { return &ImChatBotProcessor{eventType: "im.chat.member.bot.deleted_v1"} } +func NewIMChatMemberBotDeletedHandler() *ImChatBotProcessor { return NewImChatBotDeletedProcessor() } + +func (p *ImChatBotProcessor) ID() string { + if strings.Contains(p.eventType, "deleted") { + return "builtin.im.chat.member.bot.deleted" + } + return "builtin.im.chat.member.bot.added" +} func (p *ImChatBotProcessor) EventType() string { return p.eventType } +func (p *ImChatBotProcessor) Domain() string { return "im" } + +func (p *ImChatBotProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled, Output: imChatBotCompactOutput(evt, p.eventType)} +} func (p *ImChatBotProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - var ev struct { - ChatID string `json:"chat_id"` - OperatorID interface{} `json:"operator_id"` - External bool `json:"external"` - } + var ev imChatMemberPayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } - out := compactBase(raw) - action := "added" - if strings.Contains(p.eventType, "deleted") { - action = "removed" - } - out["action"] = action - if ev.ChatID != "" { - out["chat_id"] = ev.ChatID - } - if id := openID(ev.OperatorID); id != "" { - out["operator_id"] = id - } - out["external"] = ev.External - return out + return buildIMChatBotCompactOutput(raw.Header.EventType, raw.Header.EventID, raw.Header.CreateTime, p.eventType, ev) } func (p *ImChatBotProcessor) DeduplicateKey(raw *RawEvent) string { return raw.Header.EventID } @@ -88,39 +86,127 @@ func NewImChatMemberUserAddedProcessor() *ImChatMemberUserProcessor { return &ImChatMemberUserProcessor{eventType: "im.chat.member.user.added_v1"} } +func NewIMChatMemberUserAddedHandler() *ImChatMemberUserProcessor { + return NewImChatMemberUserAddedProcessor() +} + // NewImChatMemberUserWithdrawnProcessor creates a processor for im.chat.member.user.withdrawn_v1. func NewImChatMemberUserWithdrawnProcessor() *ImChatMemberUserProcessor { return &ImChatMemberUserProcessor{eventType: "im.chat.member.user.withdrawn_v1"} } +func NewIMChatMemberUserWithdrawnHandler() *ImChatMemberUserProcessor { + return NewImChatMemberUserWithdrawnProcessor() +} + // NewImChatMemberUserDeletedProcessor creates a processor for im.chat.member.user.deleted_v1. func NewImChatMemberUserDeletedProcessor() *ImChatMemberUserProcessor { return &ImChatMemberUserProcessor{eventType: "im.chat.member.user.deleted_v1"} } +func NewIMChatMemberUserDeletedHandler() *ImChatMemberUserProcessor { + return NewImChatMemberUserDeletedProcessor() +} + +func (p *ImChatMemberUserProcessor) ID() string { + switch { + case strings.Contains(p.eventType, "withdrawn"): + return "builtin.im.chat.member.user.withdrawn" + case strings.Contains(p.eventType, "deleted"): + return "builtin.im.chat.member.user.deleted" + default: + return "builtin.im.chat.member.user.added" + } +} func (p *ImChatMemberUserProcessor) EventType() string { return p.eventType } +func (p *ImChatMemberUserProcessor) Domain() string { return "im" } + +func (p *ImChatMemberUserProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled, Output: imChatMemberUserCompactOutput(evt, p.eventType)} +} func (p *ImChatMemberUserProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - var ev struct { - ChatID string `json:"chat_id"` - OperatorID interface{} `json:"operator_id"` - External bool `json:"external"` - Users []interface{} `json:"users"` - } + var ev imChatMemberPayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } - out := compactBase(raw) - // Derive action from event type suffix + return buildIMChatMemberUserCompactOutput(raw.Header.EventType, raw.Header.EventID, raw.Header.CreateTime, p.eventType, ev) +} + +func (p *ImChatMemberUserProcessor) DeduplicateKey(raw *RawEvent) string { + return raw.Header.EventID +} +func (p *ImChatMemberUserProcessor) WindowStrategy() WindowConfig { + return WindowConfig{} +} + +type imChatMemberPayload struct { + ChatID string `json:"chat_id"` + OperatorID interface{} `json:"operator_id"` + External bool `json:"external"` + Users []interface{} `json:"users"` +} + +func imChatBotCompactOutput(evt *Event, processorEventType string) map[string]interface{} { + if evt == nil { + return map[string]interface{}{"type": processorEventType} + } + data, _ := json.Marshal(evt.Payload.Data) + var payload imChatMemberPayload + _ = json.Unmarshal(data, &payload) + return buildIMChatBotCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, processorEventType, payload) +} + +func buildIMChatBotCompactOutput(eventType, eventID, createTime, processorEventType string, ev imChatMemberPayload) map[string]interface{} { + out := map[string]interface{}{"type": eventType} + if eventID != "" { + out["event_id"] = eventID + } + if createTime != "" { + out["timestamp"] = createTime + } + action := "added" + if strings.Contains(processorEventType, "deleted") { + action = "removed" + } + out["action"] = action + if ev.ChatID != "" { + out["chat_id"] = ev.ChatID + } + if id := openID(ev.OperatorID); id != "" { + out["operator_id"] = id + } + out["external"] = ev.External + return out +} + +func imChatMemberUserCompactOutput(evt *Event, processorEventType string) map[string]interface{} { + if evt == nil { + return map[string]interface{}{"type": processorEventType} + } + data, _ := json.Marshal(evt.Payload.Data) + var payload imChatMemberPayload + _ = json.Unmarshal(data, &payload) + return buildIMChatMemberUserCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, processorEventType, payload) +} + +func buildIMChatMemberUserCompactOutput(eventType, eventID, createTime, processorEventType string, ev imChatMemberPayload) map[string]interface{} { + out := map[string]interface{}{"type": eventType} + if eventID != "" { + out["event_id"] = eventID + } + if createTime != "" { + out["timestamp"] = createTime + } switch { - case strings.Contains(p.eventType, "added"): + case strings.Contains(processorEventType, "added"): out["action"] = "added" - case strings.Contains(p.eventType, "withdrawn"): + case strings.Contains(processorEventType, "withdrawn"): out["action"] = "withdrawn" - case strings.Contains(p.eventType, "deleted"): + case strings.Contains(processorEventType, "deleted"): out["action"] = "removed" } if ev.ChatID != "" { @@ -135,10 +221,3 @@ func (p *ImChatMemberUserProcessor) Transform(_ context.Context, raw *RawEvent, out["external"] = ev.External return out } - -func (p *ImChatMemberUserProcessor) DeduplicateKey(raw *RawEvent) string { - return raw.Header.EventID -} -func (p *ImChatMemberUserProcessor) WindowStrategy() WindowConfig { - return WindowConfig{} -} diff --git a/shortcuts/event/processor_im_message.go b/shortcuts/event/processor_im_message.go index 68433c8b..f6cbbce2 100644 --- a/shortcuts/event/processor_im_message.go +++ b/shortcuts/event/processor_im_message.go @@ -13,6 +13,8 @@ import ( convertlib "github.com/larksuite/cli/shortcuts/im/convert_lib" ) +const imMessageHandlerID = "builtin.im.message.receive" + // ImMessageProcessor handles im.message.receive_v1 events. // // Compact output fields: @@ -21,39 +23,81 @@ import ( // - content: human-readable text converted via convertlib (supports text, post, image, file, card, etc.) type ImMessageProcessor struct{} +func NewIMMessageReceiveHandler() *ImMessageProcessor { return &ImMessageProcessor{} } + +func (p *ImMessageProcessor) ID() string { return imMessageHandlerID } func (p *ImMessageProcessor) EventType() string { return "im.message.receive_v1" } +func (p *ImMessageProcessor) Domain() string { return "im" } + +func (p *ImMessageProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + out, ok := imMessageCompactOutput(evt) + if !ok { + return HandlerResult{ + Status: HandlerStatusHandled, + Output: genericCompactOutput(evt), + Reason: "interactive_fallback", + } + } + return HandlerResult{Status: HandlerStatusHandled, Output: out} +} func (p *ImMessageProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - // Compact: unmarshal event portion into IM message structure - var ev struct { - Message struct { - MessageID string `json:"message_id"` - ChatID string `json:"chat_id"` - ChatType string `json:"chat_type"` - MessageType string `json:"message_type"` - Content string `json:"content"` - CreateTime string `json:"create_time"` - Mentions []interface{} `json:"mentions"` - } `json:"message"` - Sender struct { - SenderID struct { - OpenID string `json:"open_id"` - } `json:"sender_id"` - } `json:"sender"` - } + var ev imMessagePayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } + out, ok := buildIMMessageCompactOutput(raw.Header.EventType, raw.Header.CreateTime, ev) + if !ok { + return raw + } + return out +} + +func (p *ImMessageProcessor) DeduplicateKey(raw *RawEvent) string { return raw.Header.EventID } +func (p *ImMessageProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } +type imMessagePayload struct { + Message struct { + MessageID string `json:"message_id"` + ChatID string `json:"chat_id"` + ChatType string `json:"chat_type"` + MessageType string `json:"message_type"` + Content string `json:"content"` + CreateTime string `json:"create_time"` + Mentions []interface{} `json:"mentions"` + } `json:"message"` + Sender struct { + SenderID struct { + OpenID string `json:"open_id"` + } `json:"sender_id"` + } `json:"sender"` +} + +func imMessageCompactOutput(evt *Event) (map[string]interface{}, bool) { + if evt == nil { + return nil, false + } + data, err := json.Marshal(evt.Payload.Data) + if err != nil { + return nil, false + } + var payload imMessagePayload + if err := json.Unmarshal(data, &payload); err != nil { + return nil, false + } + return buildIMMessageCompactOutput(evt.EventType, evt.Payload.Header.CreateTime, payload) +} + +func buildIMMessageCompactOutput(eventType, headerCreateTime string, ev imMessagePayload) (map[string]interface{}, bool) { // Card messages (interactive) are not yet supported for compact conversion; // return raw event data directly. if ev.Message.MessageType == "interactive" { fmt.Fprintf(os.Stderr, "%s[hint]%s card message (interactive) compact conversion is not yet supported, returning raw event data\n", output.Dim, output.Reset) - return raw + return nil, false } // Use convertlib to convert raw content JSON into human-readable text. @@ -63,9 +107,8 @@ func (p *ImMessageProcessor) Transform(_ context.Context, raw *RawEvent, mode Tr MentionMap: convertlib.BuildMentionKeyMap(ev.Message.Mentions), }) - // Build compact output with core message metadata out := map[string]interface{}{ - "type": raw.Header.EventType, + "type": eventType, } if ev.Message.MessageID != "" { out["id"] = ev.Message.MessageID @@ -74,9 +117,8 @@ func (p *ImMessageProcessor) Transform(_ context.Context, raw *RawEvent, mode Tr if ev.Message.CreateTime != "" { out["create_time"] = ev.Message.CreateTime } - // Prefer header-level timestamp; fall back to message create_time - if raw.Header.CreateTime != "" { - out["timestamp"] = raw.Header.CreateTime + if headerCreateTime != "" { + out["timestamp"] = headerCreateTime } else if ev.Message.CreateTime != "" { out["timestamp"] = ev.Message.CreateTime } @@ -95,8 +137,5 @@ func (p *ImMessageProcessor) Transform(_ context.Context, raw *RawEvent, mode Tr if content != "" { out["content"] = content } - return out + return out, true } - -func (p *ImMessageProcessor) DeduplicateKey(raw *RawEvent) string { return raw.Header.EventID } -func (p *ImMessageProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } diff --git a/shortcuts/event/processor_im_message_reaction.go b/shortcuts/event/processor_im_message_reaction.go index 3c56d83c..b83be625 100644 --- a/shortcuts/event/processor_im_message_reaction.go +++ b/shortcuts/event/processor_im_message_reaction.go @@ -28,34 +28,82 @@ func NewImReactionCreatedProcessor() *ImMessageReactionProcessor { return &ImMessageReactionProcessor{eventType: "im.message.reaction.created_v1"} } +func NewIMReactionCreatedHandler() *ImMessageReactionProcessor { + return NewImReactionCreatedProcessor() +} + // NewImReactionDeletedProcessor creates a processor for im.message.reaction.deleted_v1. func NewImReactionDeletedProcessor() *ImMessageReactionProcessor { return &ImMessageReactionProcessor{eventType: "im.message.reaction.deleted_v1"} } +func NewIMReactionDeletedHandler() *ImMessageReactionProcessor { + return NewImReactionDeletedProcessor() +} + +func (p *ImMessageReactionProcessor) ID() string { + if strings.Contains(p.eventType, "deleted") { + return "builtin.im.message.reaction.deleted" + } + return "builtin.im.message.reaction.created" +} func (p *ImMessageReactionProcessor) EventType() string { return p.eventType } +func (p *ImMessageReactionProcessor) Domain() string { return "im" } + +func (p *ImMessageReactionProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled, Output: imMessageReactionCompactOutput(evt, p.eventType)} +} func (p *ImMessageReactionProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - var ev struct { - MessageID string `json:"message_id"` - ReactionType struct { - EmojiType string `json:"emoji_type"` - } `json:"reaction_type"` - OperatorType string `json:"operator_type"` - UserID struct { - OpenID string `json:"open_id"` - } `json:"user_id"` - ActionTime string `json:"action_time"` - } + var ev imMessageReactionPayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } - out := compactBase(raw) + return buildIMMessageReactionCompactOutput(raw.Header.EventType, raw.Header.EventID, raw.Header.CreateTime, p.eventType, ev) +} + +func (p *ImMessageReactionProcessor) DeduplicateKey(raw *RawEvent) string { + return raw.Header.EventID +} +func (p *ImMessageReactionProcessor) WindowStrategy() WindowConfig { + return WindowConfig{} +} + +type imMessageReactionPayload struct { + MessageID string `json:"message_id"` + ReactionType struct { + EmojiType string `json:"emoji_type"` + } `json:"reaction_type"` + OperatorType string `json:"operator_type"` + UserID struct { + OpenID string `json:"open_id"` + } `json:"user_id"` + ActionTime string `json:"action_time"` +} + +func imMessageReactionCompactOutput(evt *Event, processorEventType string) map[string]interface{} { + if evt == nil { + return map[string]interface{}{"type": processorEventType} + } + data, _ := json.Marshal(evt.Payload.Data) + var payload imMessageReactionPayload + _ = json.Unmarshal(data, &payload) + return buildIMMessageReactionCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, processorEventType, payload) +} + +func buildIMMessageReactionCompactOutput(eventType, eventID, createTime, processorEventType string, ev imMessageReactionPayload) map[string]interface{} { + out := map[string]interface{}{"type": eventType} + if eventID != "" { + out["event_id"] = eventID + } + if createTime != "" { + out["timestamp"] = createTime + } action := "added" - if strings.Contains(p.eventType, "deleted") { + if strings.Contains(processorEventType, "deleted") { action = "removed" } out["action"] = action @@ -73,10 +121,3 @@ func (p *ImMessageReactionProcessor) Transform(_ context.Context, raw *RawEvent, } return out } - -func (p *ImMessageReactionProcessor) DeduplicateKey(raw *RawEvent) string { - return raw.Header.EventID -} -func (p *ImMessageReactionProcessor) WindowStrategy() WindowConfig { - return WindowConfig{} -} diff --git a/shortcuts/event/processor_im_message_read.go b/shortcuts/event/processor_im_message_read.go index da7bbce6..b1ea7853 100644 --- a/shortcuts/event/processor_im_message_read.go +++ b/shortcuts/event/processor_im_message_read.go @@ -8,7 +8,9 @@ import ( "encoding/json" ) -// ── im.message.message_read_v1 ────────────────────────────────────────────── +const imMessageReadHandlerID = "builtin.im.message.read" + +// ── im.message.message_read_v1 ─────────────────────────────────────────────── // ImMessageReadProcessor handles im.message.message_read_v1 events. // @@ -19,25 +21,60 @@ import ( // - message_ids: list of message IDs that were read type ImMessageReadProcessor struct{} +func NewIMMessageReadHandler() *ImMessageReadProcessor { return &ImMessageReadProcessor{} } + +func (p *ImMessageReadProcessor) ID() string { return imMessageReadHandlerID } func (p *ImMessageReadProcessor) EventType() string { return "im.message.message_read_v1" } +func (p *ImMessageReadProcessor) Domain() string { return "im" } + +func (p *ImMessageReadProcessor) Handle(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled, Output: imMessageReadCompactOutput(evt)} +} func (p *ImMessageReadProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{} { if mode == TransformRaw { return raw } - var ev struct { - Reader struct { - ReaderID struct { - OpenID string `json:"open_id"` - } `json:"reader_id"` - ReadTime string `json:"read_time"` - } `json:"reader"` - MessageIDList []string `json:"message_id_list"` - } + var ev imMessageReadPayload if err := json.Unmarshal(raw.Event, &ev); err != nil { return raw } - out := compactBase(raw) + return buildIMMessageReadCompactOutput(raw.Header.EventType, raw.Header.EventID, raw.Header.CreateTime, ev) +} + +func (p *ImMessageReadProcessor) DeduplicateKey(raw *RawEvent) string { + return raw.Header.EventID +} +func (p *ImMessageReadProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } + +type imMessageReadPayload struct { + Reader struct { + ReaderID struct { + OpenID string `json:"open_id"` + } `json:"reader_id"` + ReadTime string `json:"read_time"` + } `json:"reader"` + MessageIDList []string `json:"message_id_list"` +} + +func imMessageReadCompactOutput(evt *Event) map[string]interface{} { + if evt == nil { + return map[string]interface{}{"type": ""} + } + data, _ := json.Marshal(evt.Payload.Data) + var payload imMessageReadPayload + _ = json.Unmarshal(data, &payload) + return buildIMMessageReadCompactOutput(evt.EventType, evt.EventID, evt.Payload.Header.CreateTime, payload) +} + +func buildIMMessageReadCompactOutput(eventType, eventID, createTime string, ev imMessageReadPayload) map[string]interface{} { + out := map[string]interface{}{"type": eventType} + if eventID != "" { + out["event_id"] = eventID + } + if createTime != "" { + out["timestamp"] = createTime + } if ev.Reader.ReaderID.OpenID != "" { out["reader_id"] = ev.Reader.ReaderID.OpenID } @@ -49,8 +86,3 @@ func (p *ImMessageReadProcessor) Transform(_ context.Context, raw *RawEvent, mod } return out } - -func (p *ImMessageReadProcessor) DeduplicateKey(raw *RawEvent) string { - return raw.Header.EventID -} -func (p *ImMessageReadProcessor) WindowStrategy() WindowConfig { return WindowConfig{} } diff --git a/shortcuts/event/processor_im_test.go b/shortcuts/event/processor_im_test.go index 63765f56..b2e9169c 100644 --- a/shortcuts/event/processor_im_test.go +++ b/shortcuts/event/processor_im_test.go @@ -8,36 +8,96 @@ import ( "testing" ) -// --- im.message.message_read_v1 --- +func makeNormalizedEvent(eventType string, data map[string]interface{}) *Event { + return &Event{ + Source: SourceWebSocket, + EventID: "ev_test", + EventType: eventType, + Domain: "im", + Payload: NormalizedPayload{ + Header: EventHeader{ + EventID: "ev_test", + EventType: eventType, + CreateTime: "1700000000", + }, + Data: data, + }, + IdempotencyKey: "ws:ev_test", + } +} -func TestImMessageReadProcessor_Compact(t *testing.T) { - p := &ImMessageReadProcessor{} - if p.EventType() != "im.message.message_read_v1" { - t.Fatalf("EventType = %q", p.EventType()) +// --- im.message.receive_v1 handler --- + +func TestIMMessageHandler_Handle(t *testing.T) { + h := NewIMMessageReceiveHandler() + evt := makeNormalizedEvent("im.message.receive_v1", map[string]interface{}{ + "message": map[string]interface{}{ + "message_id": "om_123", + "chat_id": "oc_456", + "chat_type": "p2p", + "message_type": "text", + "content": `{"text":"hello"}`, + "create_time": "1700000001", + }, + "sender": map[string]interface{}{ + "sender_id": map[string]interface{}{"open_id": "ou_sender"}, + }, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) + if !ok { + t.Fatal("output should be compact map") + } + if out["message_id"] != "om_123" || out["id"] != "om_123" { + t.Fatalf("message identifiers = %v", out) + } + if out["chat_id"] != "oc_456" { + t.Fatalf("chat_id = %v", out["chat_id"]) + } + if out["sender_id"] != "ou_sender" { + t.Fatalf("sender_id = %v", out["sender_id"]) + } + if out["content"] != "hello" { + t.Fatalf("content = %v", out["content"]) } - raw := makeRawEvent("im.message.message_read_v1", `{ - "reader": { - "reader_id": {"open_id": "ou_reader"}, - "read_time": "1700000001" +} + +// --- im.message.message_read_v1 --- + +func TestIMMessageReadHandler_Handle(t *testing.T) { + h := NewIMMessageReadHandler() + evt := makeNormalizedEvent("im.message.message_read_v1", map[string]interface{}{ + "reader": map[string]interface{}{ + "reader_id": map[string]interface{}{"open_id": "ou_reader"}, + "read_time": "1700000001", }, - "message_id_list": ["msg_001", "msg_002"] - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) + "message_id_list": []interface{}{"msg_001", "msg_002"}, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["type"] != "im.message.message_read_v1" { - t.Errorf("type = %v", result["type"]) + if out["type"] != "im.message.message_read_v1" { + t.Errorf("type = %v", out["type"]) } - if result["reader_id"] != "ou_reader" { - t.Errorf("reader_id = %v", result["reader_id"]) + if out["reader_id"] != "ou_reader" { + t.Errorf("reader_id = %v", out["reader_id"]) } - if result["read_time"] != "1700000001" { - t.Errorf("read_time = %v", result["read_time"]) + if out["read_time"] != "1700000001" { + t.Errorf("read_time = %v", out["read_time"]) } - ids, ok := result["message_ids"].([]string) + ids, ok := out["message_ids"].([]string) if !ok || len(ids) != 2 { - t.Errorf("message_ids = %v", result["message_ids"]) + t.Errorf("message_ids = %v", out["message_ids"]) } } @@ -75,56 +135,60 @@ func TestImMessageReadProcessor_Dedup(t *testing.T) { // --- im.message.reaction.created_v1 / deleted_v1 --- -func TestImReactionCreatedProcessor_Compact(t *testing.T) { - p := NewImReactionCreatedProcessor() - if p.EventType() != "im.message.reaction.created_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.message.reaction.created_v1", `{ - "message_id": "msg_react", - "reaction_type": {"emoji_type": "THUMBSUP"}, +func TestIMReactionCreatedHandler_Handle(t *testing.T) { + h := NewIMReactionCreatedHandler() + evt := makeNormalizedEvent("im.message.reaction.created_v1", map[string]interface{}{ + "message_id": "msg_react", + "reaction_type": map[string]interface{}{"emoji_type": "THUMBSUP"}, "operator_type": "user", - "user_id": {"open_id": "ou_reactor"}, - "action_time": "1700000002" - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) + "user_id": map[string]interface{}{"open_id": "ou_reactor"}, + "action_time": "1700000002", + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "added" { - t.Errorf("action = %v, want added", result["action"]) + if out["action"] != "added" { + t.Errorf("action = %v, want added", out["action"]) } - if result["message_id"] != "msg_react" { - t.Errorf("message_id = %v", result["message_id"]) + if out["message_id"] != "msg_react" { + t.Errorf("message_id = %v", out["message_id"]) } - if result["emoji_type"] != "THUMBSUP" { - t.Errorf("emoji_type = %v", result["emoji_type"]) + if out["emoji_type"] != "THUMBSUP" { + t.Errorf("emoji_type = %v", out["emoji_type"]) } - if result["operator_id"] != "ou_reactor" { - t.Errorf("operator_id = %v", result["operator_id"]) + if out["operator_id"] != "ou_reactor" { + t.Errorf("operator_id = %v", out["operator_id"]) } - if result["action_time"] != "1700000002" { - t.Errorf("action_time = %v", result["action_time"]) + if out["action_time"] != "1700000002" { + t.Errorf("action_time = %v", out["action_time"]) } } -func TestImReactionDeletedProcessor_Compact(t *testing.T) { - p := NewImReactionDeletedProcessor() - if p.EventType() != "im.message.reaction.deleted_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.message.reaction.deleted_v1", `{ - "message_id": "msg_unreact", - "reaction_type": {"emoji_type": "THUMBSUP"}, - "user_id": {"open_id": "ou_reactor"}, - "action_time": "1700000003" - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMReactionDeletedHandler_Handle(t *testing.T) { + h := NewIMReactionDeletedHandler() + evt := makeNormalizedEvent("im.message.reaction.deleted_v1", map[string]interface{}{ + "message_id": "msg_unreact", + "reaction_type": map[string]interface{}{"emoji_type": "THUMBSUP"}, + "user_id": map[string]interface{}{"open_id": "ou_reactor"}, + "action_time": "1700000003", + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "removed" { - t.Errorf("action = %v, want removed", result["action"]) + if out["action"] != "removed" { + t.Errorf("action = %v, want removed", out["action"]) } } @@ -146,53 +210,57 @@ func TestImReactionProcessor_UnmarshalError(t *testing.T) { // --- im.chat.member.bot.added_v1 / deleted_v1 --- -func TestImChatBotAddedProcessor_Compact(t *testing.T) { - p := NewImChatBotAddedProcessor() - if p.EventType() != "im.chat.member.bot.added_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.member.bot.added_v1", `{ - "chat_id": "oc_bot", - "operator_id": {"open_id": "ou_operator"}, - "external": false - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatMemberBotAddedHandler_Handle(t *testing.T) { + h := NewIMChatMemberBotAddedHandler() + evt := makeNormalizedEvent("im.chat.member.bot.added_v1", map[string]interface{}{ + "chat_id": "oc_bot", + "operator_id": map[string]interface{}{"open_id": "ou_operator"}, + "external": false, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "added" { - t.Errorf("action = %v", result["action"]) + if out["action"] != "added" { + t.Errorf("action = %v", out["action"]) } - if result["chat_id"] != "oc_bot" { - t.Errorf("chat_id = %v", result["chat_id"]) + if out["chat_id"] != "oc_bot" { + t.Errorf("chat_id = %v", out["chat_id"]) } - if result["operator_id"] != "ou_operator" { - t.Errorf("operator_id = %v", result["operator_id"]) + if out["operator_id"] != "ou_operator" { + t.Errorf("operator_id = %v", out["operator_id"]) } - if result["external"] != false { - t.Errorf("external = %v", result["external"]) + if out["external"] != false { + t.Errorf("external = %v", out["external"]) } } -func TestImChatBotDeletedProcessor_Compact(t *testing.T) { - p := NewImChatBotDeletedProcessor() - if p.EventType() != "im.chat.member.bot.deleted_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.member.bot.deleted_v1", `{ - "chat_id": "oc_bot2", - "operator_id": {"open_id": "ou_op2"}, - "external": true - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatMemberBotDeletedHandler_Handle(t *testing.T) { + h := NewIMChatMemberBotDeletedHandler() + evt := makeNormalizedEvent("im.chat.member.bot.deleted_v1", map[string]interface{}{ + "chat_id": "oc_bot2", + "operator_id": map[string]interface{}{"open_id": "ou_op2"}, + "external": true, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "removed" { - t.Errorf("action = %v, want removed", result["action"]) + if out["action"] != "removed" { + t.Errorf("action = %v, want removed", out["action"]) } - if result["external"] != true { - t.Errorf("external = %v, want true", result["external"]) + if out["external"] != true { + t.Errorf("external = %v, want true", out["external"]) } } @@ -214,78 +282,84 @@ func TestImChatBotProcessor_UnmarshalError(t *testing.T) { // --- im.chat.member.user.added_v1 / withdrawn_v1 / deleted_v1 --- -func TestImChatMemberUserAddedProcessor_Compact(t *testing.T) { - p := NewImChatMemberUserAddedProcessor() - if p.EventType() != "im.chat.member.user.added_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.member.user.added_v1", `{ - "chat_id": "oc_members", - "operator_id": {"open_id": "ou_admin"}, - "external": false, - "users": [ - {"user_id": {"open_id": "ou_user1"}, "name": "Alice"}, - {"user_id": {"open_id": "ou_user2"}, "name": "Bob"} - ] - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatMemberUserAddedHandler_Handle(t *testing.T) { + h := NewIMChatMemberUserAddedHandler() + evt := makeNormalizedEvent("im.chat.member.user.added_v1", map[string]interface{}{ + "chat_id": "oc_members", + "operator_id": map[string]interface{}{"open_id": "ou_admin"}, + "external": false, + "users": []interface{}{ + map[string]interface{}{"user_id": map[string]interface{}{"open_id": "ou_user1"}, "name": "Alice"}, + map[string]interface{}{"user_id": map[string]interface{}{"open_id": "ou_user2"}, "name": "Bob"}, + }, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "added" { - t.Errorf("action = %v", result["action"]) + if out["action"] != "added" { + t.Errorf("action = %v", out["action"]) } - if result["chat_id"] != "oc_members" { - t.Errorf("chat_id = %v", result["chat_id"]) + if out["chat_id"] != "oc_members" { + t.Errorf("chat_id = %v", out["chat_id"]) } - if result["operator_id"] != "ou_admin" { - t.Errorf("operator_id = %v", result["operator_id"]) + if out["operator_id"] != "ou_admin" { + t.Errorf("operator_id = %v", out["operator_id"]) } - userIDs, ok := result["user_ids"].([]string) + userIDs, ok := out["user_ids"].([]string) if !ok || len(userIDs) != 2 { - t.Fatalf("user_ids = %v", result["user_ids"]) + t.Fatalf("user_ids = %v", out["user_ids"]) } if userIDs[0] != "ou_user1" || userIDs[1] != "ou_user2" { t.Errorf("user_ids = %v", userIDs) } } -func TestImChatMemberUserWithdrawnProcessor_Compact(t *testing.T) { - p := NewImChatMemberUserWithdrawnProcessor() - if p.EventType() != "im.chat.member.user.withdrawn_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.member.user.withdrawn_v1", `{ - "chat_id": "oc_w", - "operator_id": {"open_id": "ou_self"}, - "external": false, - "users": [{"user_id": {"open_id": "ou_self"}, "name": "Self"}] - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatMemberUserWithdrawnHandler_Handle(t *testing.T) { + h := NewIMChatMemberUserWithdrawnHandler() + evt := makeNormalizedEvent("im.chat.member.user.withdrawn_v1", map[string]interface{}{ + "chat_id": "oc_w", + "operator_id": map[string]interface{}{"open_id": "ou_self"}, + "external": false, + "users": []interface{}{map[string]interface{}{"user_id": map[string]interface{}{"open_id": "ou_self"}, "name": "Self"}}, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "withdrawn" { - t.Errorf("action = %v, want withdrawn", result["action"]) + if out["action"] != "withdrawn" { + t.Errorf("action = %v, want withdrawn", out["action"]) } } -func TestImChatMemberUserDeletedProcessor_Compact(t *testing.T) { - p := NewImChatMemberUserDeletedProcessor() - if p.EventType() != "im.chat.member.user.deleted_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.member.user.deleted_v1", `{ - "chat_id": "oc_del", - "operator_id": {"open_id": "ou_admin"}, - "users": [{"user_id": {"open_id": "ou_kicked"}}] - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatMemberUserDeletedHandler_Handle(t *testing.T) { + h := NewIMChatMemberUserDeletedHandler() + evt := makeNormalizedEvent("im.chat.member.user.deleted_v1", map[string]interface{}{ + "chat_id": "oc_del", + "operator_id": map[string]interface{}{"open_id": "ou_admin"}, + "users": []interface{}{map[string]interface{}{"user_id": map[string]interface{}{"open_id": "ou_kicked"}}}, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["action"] != "removed" { - t.Errorf("action = %v, want removed", result["action"]) + if out["action"] != "removed" { + t.Errorf("action = %v, want removed", out["action"]) } } @@ -307,39 +381,41 @@ func TestImChatMemberUserProcessor_UnmarshalError(t *testing.T) { // --- im.chat.updated_v1 --- -func TestImChatUpdatedProcessor_Compact(t *testing.T) { - p := &ImChatUpdatedProcessor{} - if p.EventType() != "im.chat.updated_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.updated_v1", `{ - "chat_id": "oc_updated", - "operator_id": {"open_id": "ou_updater"}, - "external": false, - "after_change": {"name": "New Name"}, - "before_change": {"name": "Old Name"} - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatUpdatedHandler_Handle(t *testing.T) { + h := NewIMChatUpdatedHandler() + evt := makeNormalizedEvent("im.chat.updated_v1", map[string]interface{}{ + "chat_id": "oc_updated", + "operator_id": map[string]interface{}{"open_id": "ou_updater"}, + "external": false, + "after_change": map[string]interface{}{"name": "New Name"}, + "before_change": map[string]interface{}{"name": "Old Name"}, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["type"] != "im.chat.updated_v1" { - t.Errorf("type = %v", result["type"]) + if out["type"] != "im.chat.updated_v1" { + t.Errorf("type = %v", out["type"]) } - if result["chat_id"] != "oc_updated" { - t.Errorf("chat_id = %v", result["chat_id"]) + if out["chat_id"] != "oc_updated" { + t.Errorf("chat_id = %v", out["chat_id"]) } - if result["operator_id"] != "ou_updater" { - t.Errorf("operator_id = %v", result["operator_id"]) + if out["operator_id"] != "ou_updater" { + t.Errorf("operator_id = %v", out["operator_id"]) } - after, ok := result["after_change"].(map[string]interface{}) + after, ok := out["after_change"].(map[string]interface{}) if !ok { t.Fatal("after_change should be a map") } if after["name"] != "New Name" { t.Errorf("after_change.name = %v", after["name"]) } - before, ok := result["before_change"].(map[string]interface{}) + before, ok := out["before_change"].(map[string]interface{}) if !ok { t.Fatal("before_change should be a map") } @@ -366,31 +442,33 @@ func TestImChatUpdatedProcessor_UnmarshalError(t *testing.T) { // --- im.chat.disbanded_v1 --- -func TestImChatDisbandedProcessor_Compact(t *testing.T) { - p := &ImChatDisbandedProcessor{} - if p.EventType() != "im.chat.disbanded_v1" { - t.Fatalf("EventType = %q", p.EventType()) - } - raw := makeRawEvent("im.chat.disbanded_v1", `{ - "chat_id": "oc_disbanded", - "operator_id": {"open_id": "ou_disbander"}, - "external": true - }`) - result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{}) +func TestIMChatDisbandedHandler_Handle(t *testing.T) { + h := NewIMChatDisbandedHandler() + evt := makeNormalizedEvent("im.chat.disbanded_v1", map[string]interface{}{ + "chat_id": "oc_disbanded", + "operator_id": map[string]interface{}{"open_id": "ou_disbander"}, + "external": true, + }) + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) if !ok { - t.Fatal("compact should return map") + t.Fatal("output should be compact map") } - if result["type"] != "im.chat.disbanded_v1" { - t.Errorf("type = %v", result["type"]) + if out["type"] != "im.chat.disbanded_v1" { + t.Errorf("type = %v", out["type"]) } - if result["chat_id"] != "oc_disbanded" { - t.Errorf("chat_id = %v", result["chat_id"]) + if out["chat_id"] != "oc_disbanded" { + t.Errorf("chat_id = %v", out["chat_id"]) } - if result["operator_id"] != "ou_disbander" { - t.Errorf("operator_id = %v", result["operator_id"]) + if out["operator_id"] != "ou_disbander" { + t.Errorf("operator_id = %v", out["operator_id"]) } - if result["external"] != true { - t.Errorf("external = %v, want true", result["external"]) + if out["external"] != true { + t.Errorf("external = %v, want true", out["external"]) } } @@ -410,6 +488,44 @@ func TestImChatDisbandedProcessor_UnmarshalError(t *testing.T) { } } +// --- generic fallback handler --- + +func TestGenericFallbackHandler_HandleUnknownEvent(t *testing.T) { + h := NewGenericFallbackHandler() + evt := &Event{ + Source: SourceWebhook, + EventType: "unknown.event", + EventID: "ev_unknown", + Domain: DomainUnknown, + Payload: NormalizedPayload{ + Header: EventHeader{EventID: "ev_unknown", EventType: "unknown.event", CreateTime: "1700000009"}, + Data: map[string]interface{}{"foo": "bar"}, + }, + RawPayload: []byte(`{"foo":"bar","nested":{"x":1}}`), + } + + result := h.Handle(context.Background(), evt) + if result.Status != HandlerStatusHandled { + t.Fatalf("status = %q", result.Status) + } + out, ok := result.Output.(map[string]interface{}) + if !ok { + t.Fatal("output should be a map") + } + if out["type"] != "unknown.event" { + t.Fatalf("type = %v", out["type"]) + } + if out["event_id"] != "ev_unknown" { + t.Fatalf("event_id = %v", out["event_id"]) + } + if out["foo"] != "bar" { + t.Fatalf("foo = %v", out["foo"]) + } + if out["raw_payload"] != `{"foo":"bar","nested":{"x":1}}` { + t.Fatalf("raw_payload = %v", out["raw_payload"]) + } +} + // --- Registry: all IM processors registered --- func TestRegistryAllIMProcessors(t *testing.T) { diff --git a/shortcuts/event/processor_test.go b/shortcuts/event/processor_test.go index 464cda88..9f6e27ec 100644 --- a/shortcuts/event/processor_test.go +++ b/shortcuts/event/processor_test.go @@ -18,20 +18,6 @@ import ( larkevent "github.com/larksuite/oapi-sdk-go/v3/event" ) -// chdirTemp changes cwd to a fresh temp dir for the test duration. -func chdirTemp(t *testing.T) { - t.Helper() - orig, err := os.Getwd() - if err != nil { - t.Fatal(err) - } - dir := t.TempDir() - if err := os.Chdir(dir); err != nil { - t.Fatal(err) - } - t.Cleanup(func() { os.Chdir(orig) }) -} - // helper to build a RawEvent from event-level JSON and header fields. func makeRawEvent(eventType string, eventJSON string) *RawEvent { return &RawEvent{ @@ -44,6 +30,66 @@ func makeRawEvent(eventType string, eventJSON string) *RawEvent { } } +func makeInboundEnvelope(eventType, eventJSON string) InboundEnvelope { + body := `{"schema":"2.0","header":{"event_id":"ev_test","event_type":"` + eventType + `"},"event":` + eventJSON + `}` + return InboundEnvelope{ + Source: SourceWebSocket, + ReceivedAt: time.Unix(1700000000, 0).UTC(), + RawPayload: []byte(body), + } +} + +func TestBuildWebSocketEnvelope(t *testing.T) { + body := []byte(`{"schema":"2.0","header":{"event_id":"ev_test","event_type":"im.message.receive_v1"},"event":{"message":{"message_id":"om_123"}}}`) + before := time.Now() + env := BuildWebSocketEnvelope(body) + after := time.Now() + + if env.Source != SourceWebSocket { + t.Fatalf("Source = %q, want %q", env.Source, SourceWebSocket) + } + if env.ReceivedAt.Before(before) || env.ReceivedAt.After(after) { + t.Fatalf("ReceivedAt = %v, want between %v and %v", env.ReceivedAt, before, after) + } + if !bytes.Equal(env.RawPayload, body) { + t.Fatalf("RawPayload = %s, want %s", env.RawPayload, body) + } + if &env.RawPayload[0] == &body[0] { + t.Fatal("RawPayload should copy input bytes") + } +} + +func TestBuildWebSocketEnvelopePreservesValidJSON(t *testing.T) { + body := []byte(`{"schema":"2.0","header":{"event_id":"ev_test","event_type":"im.message.receive_v1"},"event":{"message":{"message_id":"om_123"}}}`) + env := BuildWebSocketEnvelope(body) + + var decoded map[string]interface{} + if err := json.Unmarshal(env.RawPayload, &decoded); err != nil { + t.Fatalf("RawPayload should remain valid JSON: %v", err) + } +} + +func makeTestRegistry(handler EventHandler, fallback EventHandler) *HandlerRegistry { + registry := NewHandlerRegistry() + if handler != nil { + if handler.EventType() != "" { + if err := registry.RegisterEventHandler(handler); err != nil { + panic(err) + } + } else if handler.Domain() != "" { + if err := registry.RegisterDomainHandler(handler); err != nil { + panic(err) + } + } + } + if fallback != nil { + if err := registry.SetFallbackHandler(fallback); err != nil { + panic(err) + } + } + return registry +} + // --- Registry --- func TestRegistryLookup(t *testing.T) { @@ -190,237 +236,493 @@ func TestGenericProcessor_Raw(t *testing.T) { // --- Pipeline --- -func TestPipeline_Raw(t *testing.T) { +func TestPipeline_NormalizesAndDispatchesEventHandler(t *testing.T) { + var calls []string + handler := &testEventHandler{ + id: "event-handler", + eventType: "im.message.receive_v1", + result: HandlerResult{Status: HandlerStatusHandled}, + called: &calls, + } + registry := makeTestRegistry(handler, nil) filters := NewFilterChain() var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformRaw}, &out, &errOut) + p := NewEventPipeline(registry, filters, PipelineConfig{Mode: TransformCompact}, &out, &errOut) - eventJSON := `{"file_token":"xxx"}` - raw := makeRawEvent("drive.file.edit_v1", eventJSON) - raw.Header.EventID = "ev_raw" - raw.Header.CreateTime = "1700000000" - raw.Header.AppID = "cli_test" - p.Process(context.Background(), raw) + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"id":"1"}}`)) - // Raw output should be the complete original event (schema + header + event) - var outputMap map[string]interface{} - if err := json.Unmarshal(out.Bytes(), &outputMap); err != nil { - t.Fatalf("failed to parse output: %v", err) + if got, want := calls, []string{"event-handler"}; !reflect.DeepEqual(got, want) { + t.Fatalf("calls = %v, want %v", got, want) } - if outputMap["schema"] != "2.0" { - t.Errorf("schema = %v, want 2.0", outputMap["schema"]) + lines := strings.Split(strings.TrimSpace(out.String()), "\n") + if len(lines) != 1 { + t.Fatalf("got %d output lines, want 1", len(lines)) } - header, ok := outputMap["header"].(map[string]interface{}) - if !ok { - t.Fatal("raw output should contain header object") + var record map[string]interface{} + if err := json.Unmarshal([]byte(lines[0]), &record); err != nil { + t.Fatalf("invalid NDJSON line: %v", err) } - if header["event_type"] != "drive.file.edit_v1" { - t.Errorf("header.event_type = %v", header["event_type"]) + if record["event_type"] != "im.message.receive_v1" { + t.Fatalf("event_type = %v", record["event_type"]) } - if header["app_id"] != "cli_test" { - t.Errorf("header.app_id = %v, want cli_test", header["app_id"]) + if record["handler_id"] != "event-handler" { + t.Fatalf("handler_id = %v", record["handler_id"]) + } + if record["domain"] != "im" { + t.Fatalf("domain = %v", record["domain"]) } } func TestPipeline_Filtered(t *testing.T) { filters := NewFilterChain(NewEventTypeFilter("im.message.receive_v1")) var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{}, &out, &errOut) + p := NewEventPipeline(NewHandlerRegistry(), filters, PipelineConfig{}, &out, &errOut) - raw := makeRawEvent("drive.file.edit_v1", `{}`) - p.Process(context.Background(), raw) + p.Process(context.Background(), makeInboundEnvelope("drive.file.edit_v1", `{}`)) - if p.EventCount() != 0 { - t.Errorf("filtered event should not be counted") - } if out.Len() != 0 { t.Error("filtered event should produce no output") } } -func TestDeduplicateKey(t *testing.T) { - raw := makeRawEvent("im.message.receive_v1", `{}`) - if k := (&ImMessageProcessor{}).DeduplicateKey(raw); k != "ev_test" { - t.Errorf("ImMessageProcessor got %q, want ev_test", k) +func TestNewBuiltinHandlerRegistryRegistersBuiltins(t *testing.T) { + registry := NewBuiltinHandlerRegistry() + if got := registry.EventHandlers("im.message.receive_v1"); len(got) != 1 || got[0].ID() != imMessageHandlerID { + t.Fatalf("EventHandlers(im.message.receive_v1) = %#v, want built-in IM handler", got) } - if k := (&GenericProcessor{}).DeduplicateKey(raw); k != "ev_test" { - t.Errorf("GenericProcessor got %q, want ev_test", k) + if got := registry.FallbackHandler(); got == nil || got.ID() != genericHandlerID { + t.Fatalf("FallbackHandler() = %#v, want built-in generic fallback", got) } } -func TestPipeline_Dedup(t *testing.T) { - filters := NewFilterChain() +func TestPipeline_PreservesHandlerCompactOutput(t *testing.T) { var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformRaw}, &out, &errOut) + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{id: "compact", eventType: "im.message.receive_v1", fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{ + Status: HandlerStatusHandled, + Output: map[string]interface{}{ + "type": evt.EventType, + "message_id": "om_123", + "content": "hello", + }, + } + }}); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) + } + p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformCompact}, &out, &errOut) - raw := makeRawEvent("im.message.receive_v1", `{"message":{"id":"1"}}`) + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`)) - // First event should pass - p.Process(context.Background(), raw) - if p.EventCount() != 1 { - t.Fatalf("EventCount = %d, want 1", p.EventCount()) + var record map[string]interface{} + if err := json.Unmarshal(bytes.TrimSpace(out.Bytes()), &record); err != nil { + t.Fatalf("invalid NDJSON line: %v", err) } - firstLen := out.Len() - if firstLen == 0 { - t.Fatal("expected output from first event") + if record["handler_id"] != "compact" { + t.Fatalf("handler_id = %v, want compact", record["handler_id"]) } - - // Same event_id again should be deduped - p.Process(context.Background(), raw) - if p.EventCount() != 1 { - t.Errorf("EventCount = %d, want 1 (deduped)", p.EventCount()) + if record["type"] != "im.message.receive_v1" { + t.Fatalf("type = %v, want im.message.receive_v1", record["type"]) + } + if record["message_id"] != "om_123" { + t.Fatalf("message_id = %v, want om_123", record["message_id"]) } - if out.Len() != firstLen { - t.Error("duplicate event should produce no additional output") + if record["content"] != "hello" { + t.Fatalf("content = %v, want hello", record["content"]) } +} - // Different event_id should pass - raw2 := makeRawEvent("im.message.receive_v1", `{"message":{"id":"2"}}`) - raw2.Header.EventID = "ev_other" - p.Process(context.Background(), raw2) - if p.EventCount() != 2 { - t.Errorf("EventCount = %d, want 2", p.EventCount()) +func TestPipeline_CompactModeWritesDispatchRecord(t *testing.T) { + var out, errOut bytes.Buffer + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{ + id: "compact-handler", + eventType: "im.message.receive_v1", + fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{ + Status: HandlerStatusHandled, + Output: map[string]interface{}{ + "type": evt.EventType, + "message_id": "om_123", + }, + } + }, + }); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) + } + + p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformCompact}, &out, &errOut) + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`)) + + var record map[string]interface{} + if err := json.Unmarshal(bytes.TrimSpace(out.Bytes()), &record); err != nil { + t.Fatalf("invalid compact-mode output: %v", err) + } + if record["handler_id"] != "compact-handler" { + t.Fatalf("handler_id = %v", record["handler_id"]) + } + if record["message_id"] != "om_123" { + t.Fatalf("message_id = %v", record["message_id"]) + } + if record["status"] != "handled" { + t.Fatalf("status = %v", record["status"]) } } -// --- Pipeline: OutputDir --- +func TestPipeline_RawModeWritesEventRecord(t *testing.T) { + var out, errOut bytes.Buffer + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{id: genericHandlerID, eventType: "im.message.receive_v1", fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{ + Status: HandlerStatusHandled, + Output: map[string]interface{}{ + "content": "handler-output-should-not-leak", + }, + } + }}); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) + } + p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw}, &out, &errOut) + rawPayload := `{"message":{"message_id":"om_123","message_type":"text","content":"{\"text\":\"hello\"}"}}` -func TestPipeline_OutputDir(t *testing.T) { - dir := t.TempDir() - filters := NewFilterChain() + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", rawPayload)) + + var record map[string]interface{} + if err := json.Unmarshal(bytes.TrimSpace(out.Bytes()), &record); err != nil { + t.Fatalf("invalid NDJSON line: %v", err) + } + if got, want := record["event_type"], "im.message.receive_v1"; got != want { + t.Fatalf("event_type = %v, want %v", got, want) + } + if got, want := record["status"], string(HandlerStatusHandled); got != want { + t.Fatalf("status = %v, want %v", got, want) + } + if got, want := record["source"], string(SourceWebSocket); got != want { + t.Fatalf("source = %v, want %v", got, want) + } + if got := record["idempotency_key"]; got == nil || got == "" { + t.Fatalf("idempotency_key = %v, want non-empty", got) + } + if got, want := record["raw_payload"], `{"schema":"2.0","header":{"event_id":"ev_test","event_type":"im.message.receive_v1"},"event":{"message":{"message_id":"om_123","message_type":"text","content":"{\"text\":\"hello\"}"}}}`; got != want { + t.Fatalf("raw_payload = %v, want %v", got, want) + } + payload, ok := record["payload"].(map[string]interface{}) + if !ok { + t.Fatalf("payload = %T, want object", record["payload"]) + } + message, ok := payload["message"].(map[string]interface{}) + if !ok { + t.Fatalf("payload.message = %T, want object", payload["message"]) + } + if got, want := message["message_id"], "om_123"; got != want { + t.Fatalf("payload.message.message_id = %v, want %v", got, want) + } + if got, want := message["content"], `{"text":"hello"}`; got != want { + t.Fatalf("payload.message.content = %v, want %v", got, want) + } + if got, want := record["domain"], "im"; got != want { + t.Fatalf("domain = %v, want %v", got, want) + } + if got, want := record["event_id"], "ev_test"; got != want { + t.Fatalf("event_id = %v, want %v", got, want) + } + if _, exists := record["handler_id"]; exists { + t.Fatalf("handler_id should be absent in raw mode: %v", record["handler_id"]) + } + if _, exists := record["content"]; exists { + t.Fatalf("content should be absent in raw mode: %v", record["content"]) + } +} + +func TestPipeline_RawModeCanWriteViaOutputRouter(t *testing.T) { + tmpDir := t.TempDir() + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("Getwd() error = %v", err) + } + if err := os.Chdir(tmpDir); err != nil { + t.Fatalf("Chdir() error = %v", err) + } + defer func() { + _ = os.Chdir(cwd) + }() + router, err := ParseRoutes([]string{"^im\\.message=dir:./im"}) + if err != nil { + t.Fatalf("ParseRoutes() error = %v", err) + } + fallbackDir := filepath.Join(tmpDir, "default") var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformCompact, OutputDir: dir}, &out, &errOut) - if err := p.EnsureDirs(); err != nil { - t.Fatal(err) + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{id: genericHandlerID, eventType: "im.message.receive_v1", fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} + }}); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) } + p := newEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw}, &out, &errOut, &outputRouter{router: router, defaultDir: fallbackDir, fallback: ndjsonRecordWriter{w: &out}, seq: new(uint64), writers: map[string]*dirRecordWriter{}}) - eventJSON := `{ - "message": { - "message_id": "msg_file", "chat_id": "oc_001", - "chat_type": "group", "message_type": "text", - "content": "{\"text\":\"file test\"}", "create_time": "1700000000" - }, - "sender": {"sender_id": {"open_id": "ou_001"}} - }` - raw := makeRawEvent("im.message.receive_v1", eventJSON) - raw.Header.EventID = "ev_file" - raw.Header.CreateTime = "1700000000" - p.Process(context.Background(), raw) + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`)) - // stdout should be empty (output goes to file) if out.Len() != 0 { - t.Error("OutputDir mode should not write to stdout") + t.Fatalf("stdout output = %q, want empty when routed to files", out.String()) } - - // Verify file was created - entries, err := os.ReadDir(dir) + entries, err := os.ReadDir(filepath.Join(tmpDir, "im")) if err != nil { - t.Fatal(err) + t.Fatalf("ReadDir() error = %v", err) } if len(entries) != 1 { - t.Fatalf("expected 1 file, got %d", len(entries)) + t.Fatalf("routed files = %d, want 1", len(entries)) } - - // Verify file content is valid JSON - data, err := os.ReadFile(filepath.Join(dir, entries[0].Name())) + data, err := os.ReadFile(filepath.Join(tmpDir, "im", entries[0].Name())) if err != nil { - t.Fatal(err) + t.Fatalf("ReadFile() error = %v", err) + } + var record map[string]interface{} + if err := json.Unmarshal(bytes.TrimSpace(data), &record); err != nil { + t.Fatalf("invalid routed JSON: %v", err) + } + if got, want := record["event_type"], "im.message.receive_v1"; got != want { + t.Fatalf("event_type = %v, want %v", got, want) } - var m map[string]interface{} - if err := json.Unmarshal(data, &m); err != nil { - t.Fatalf("file content is not valid JSON: %v", err) + if got, want := record["raw_payload"], `{"schema":"2.0","header":{"event_id":"ev_test","event_type":"im.message.receive_v1"},"event":{"message":{"message_id":"om_123"}}}`; got != want { + t.Fatalf("raw_payload = %v, want %v", got, want) } - if m["type"] != "im.message.receive_v1" { - t.Errorf("type = %v", m["type"]) + if _, err := os.Stat(fallbackDir); !os.IsNotExist(err) { + t.Fatalf("fallback dir stat err = %v, want not exists", err) } } -// --- Pipeline: JsonFlag --- - -func TestPipeline_JsonFlag(t *testing.T) { - filters := NewFilterChain() +func TestPipeline_RawModePrettyJSONStillWritesCompactFileJSONWhenRouted(t *testing.T) { + tmpDir := t.TempDir() + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("Getwd() error = %v", err) + } + if err := os.Chdir(tmpDir); err != nil { + t.Fatalf("Chdir() error = %v", err) + } + defer func() { + _ = os.Chdir(cwd) + }() + router, err := ParseRoutes([]string{"^im\\.message=dir:./im"}) + if err != nil { + t.Fatalf("ParseRoutes() error = %v", err) + } var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformRaw, JsonFlag: true}, &out, &errOut) + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{id: genericHandlerID, eventType: "im.message.receive_v1", fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} + }}); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) + } + p := newEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw, PrettyJSON: true}, &out, &errOut, &outputRouter{router: router, fallback: ndjsonRecordWriter{w: &out}, seq: new(uint64), writers: map[string]*dirRecordWriter{}}) - raw := makeRawEvent("drive.file.edit_v1", `{"key":"val"}`) - p.Process(context.Background(), raw) + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`)) - // --json output should be pretty-printed (contain newlines + indentation) - output := out.String() - if !strings.Contains(output, "\n") { - t.Error("--json output should be pretty-printed") + if out.Len() != 0 { + t.Fatalf("stdout output = %q, want empty when routed to files", out.String()) + } + entries, err := os.ReadDir(filepath.Join(tmpDir, "im")) + if err != nil { + t.Fatalf("ReadDir() error = %v", err) + } + if len(entries) != 1 { + t.Fatalf("routed files = %d, want 1", len(entries)) + } + data, err := os.ReadFile(filepath.Join(tmpDir, "im", entries[0].Name())) + if err != nil { + t.Fatalf("ReadFile() error = %v", err) + } + trimmed := bytes.TrimSpace(data) + if bytes.Contains(trimmed, []byte("\n \"event_type\"")) { + t.Fatalf("routed file unexpectedly used pretty JSON: %q", string(data)) + } + var record map[string]interface{} + if err := json.Unmarshal(trimmed, &record); err != nil { + t.Fatalf("invalid routed JSON: %v", err) + } + if got, want := record["event_type"], "im.message.receive_v1"; got != want { + t.Fatalf("event_type = %v, want %v", got, want) + } +} + +func TestPipeline_RawModePrettyJSONFallsBackToPrettyStdoutWhenUnmatched(t *testing.T) { + tmpDir := t.TempDir() + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("Getwd() error = %v", err) + } + if err := os.Chdir(tmpDir); err != nil { + t.Fatalf("Chdir() error = %v", err) + } + defer func() { + _ = os.Chdir(cwd) + }() + router, err := ParseRoutes([]string{"^contact\\.=dir:./contacts"}) + if err != nil { + t.Fatalf("ParseRoutes() error = %v", err) + } + var out, errOut bytes.Buffer + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{id: genericHandlerID, eventType: "im.message.receive_v1", fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} + }}); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) } + p := newEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw, PrettyJSON: true}, &out, &errOut, &outputRouter{router: router, fallback: ndjsonRecordWriter{w: &out}, seq: new(uint64), writers: map[string]*dirRecordWriter{}}) + + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`)) - var m map[string]interface{} - if err := json.Unmarshal([]byte(output), &m); err != nil { - t.Fatalf("output is not valid JSON: %v", err) + got := out.String() + if !strings.Contains(got, `"event_type":"im.message.receive_v1"`) { + t.Fatalf("stdout output = %q, want compact fallback NDJSON", got) + } + if strings.Contains(got, "\n \"event_type\"") { + t.Fatalf("stdout output = %q, want fallback writer to ignore PrettyJSON", got) + } + if _, err := os.Stat(filepath.Join(tmpDir, "contacts")); !os.IsNotExist(err) { + t.Fatalf("contacts dir stat err = %v, want not exists", err) } } -// --- Pipeline: Quiet --- +func TestDeduplicateKey(t *testing.T) { + raw := makeRawEvent("im.message.receive_v1", `{}`) + if k := (&ImMessageProcessor{}).DeduplicateKey(raw); k != "ev_test" { + t.Errorf("ImMessageProcessor got %q, want ev_test", k) + } + if k := (&GenericProcessor{}).DeduplicateKey(raw); k != "ev_test" { + t.Errorf("GenericProcessor got %q, want ev_test", k) + } +} -func TestPipeline_Quiet(t *testing.T) { +func TestPipeline_DedupePreventsDuplicateDispatch(t *testing.T) { + var calls []string + handler := &testEventHandler{ + id: "event-handler", + eventType: "im.message.receive_v1", + result: HandlerResult{Status: HandlerStatusHandled}, + called: &calls, + } + registry := makeTestRegistry(handler, nil) filters := NewFilterChain() var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformRaw, Quiet: true}, &out, &errOut) + p := NewEventPipeline(registry, filters, PipelineConfig{}, &out, &errOut) + env := makeInboundEnvelope("im.message.receive_v1", `{"message":{"id":"1"}}`) - raw := makeRawEvent("im.message.receive_v1", `{}`) - p.Process(context.Background(), raw) + p.Process(context.Background(), env) + p.Process(context.Background(), env) - if errOut.Len() != 0 { - t.Errorf("quiet mode should suppress stderr, got: %s", errOut.String()) + if got, want := calls, []string{"event-handler"}; !reflect.DeepEqual(got, want) { + t.Fatalf("calls = %v, want %v", got, want) + } + lines := strings.Split(strings.TrimSpace(out.String()), "\n") + if len(lines) != 1 { + t.Fatalf("got %d output lines, want 1", len(lines)) } } -// --- writeEventFile --- - -func TestWriteEventFile(t *testing.T) { - dir := t.TempDir() - header := larkevent.EventHeader{ - EventType: "im.message.receive_v1", - EventID: "ev_write", - CreateTime: "1700000000", +func TestPipeline_MalformedEventUsesFallback(t *testing.T) { + var captured *Event + registry := NewHandlerRegistry() + if err := registry.SetFallbackHandler(handlerFuncWith{id: "fallback", fn: func(_ context.Context, evt *Event) HandlerResult { + captured = evt + return HandlerResult{Status: HandlerStatusHandled} + }}); err != nil { + t.Fatalf("SetFallbackHandler() error = %v", err) } - data := map[string]string{"hello": "world"} + var out, errOut bytes.Buffer + p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{}, &out, &errOut) - path, err := writeEventFile(dir, data, header) - if err != nil { - t.Fatal(err) + p.Process(context.Background(), InboundEnvelope{ + Source: SourceWebhook, + ReceivedAt: time.Unix(1700000001, 0).UTC(), + RawPayload: []byte("not-json"), + }) + + if captured == nil { + t.Fatal("expected fallback to capture malformed event") } - if !strings.Contains(path, "ev_write") { - t.Errorf("path should contain event ID, got: %s", path) + if captured.EventType != "malformed" { + t.Fatalf("fallback event type = %q, want malformed", captured.EventType) } + if captured.Domain != DomainUnknown { + t.Fatalf("fallback domain = %q, want %q", captured.Domain, DomainUnknown) + } + if captured.Metadata["malformed_reason"] != "invalid_json" { + t.Fatalf("malformed_reason = %v, want invalid_json", captured.Metadata["malformed_reason"]) + } + if payload, ok := captured.Payload.Data["raw_payload"].(string); !ok || payload != "not-json" { + t.Fatalf("raw payload = %v, want not-json", captured.Payload.Data["raw_payload"]) + } +} - content, err := os.ReadFile(path) - if err != nil { - t.Fatal(err) +func TestPipeline_RawModeMalformedEventPreservesRawPayload(t *testing.T) { + var out, errOut bytes.Buffer + registry := NewHandlerRegistry() + if err := registry.SetFallbackHandler(handlerFuncWith{ + id: "fallback", + fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} + }, + }); err != nil { + t.Fatalf("SetFallbackHandler() error = %v", err) + } + + p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw}, &out, &errOut) + p.Process(context.Background(), InboundEnvelope{ + Source: SourceWebhook, + ReceivedAt: time.Unix(1700000001, 0).UTC(), + RawPayload: []byte("not-json"), + }) + + var record map[string]interface{} + if err := json.Unmarshal(bytes.TrimSpace(out.Bytes()), &record); err != nil { + t.Fatalf("invalid raw-mode malformed output: %v", err) + } + if record["event_type"] != "malformed" { + t.Fatalf("event_type = %v", record["event_type"]) } - if !strings.Contains(string(content), `"hello"`) { - t.Error("file should contain data") + if record["raw_payload"] != "not-json" { + t.Fatalf("raw_payload = %v", record["raw_payload"]) + } + if record["status"] != "handled" { + t.Fatalf("status = %v", record["status"]) } } -func TestWriteEventFile_EmptyFields(t *testing.T) { - dir := t.TempDir() - header := larkevent.EventHeader{EventType: "test.type"} - _, err := writeEventFile(dir, "data", header) - if err != nil { - t.Fatal(err) +func TestPipeline_RawModePrettyJSONWritesIndentedObject(t *testing.T) { + var out, errOut bytes.Buffer + registry := NewHandlerRegistry() + if err := registry.RegisterEventHandler(handlerFuncWith{ + id: "raw-handler", + eventType: "im.message.receive_v1", + fn: func(_ context.Context, evt *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} + }, + }); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) } - entries, _ := os.ReadDir(dir) - if len(entries) != 1 { - t.Fatal("expected 1 file") + p := NewEventPipeline(registry, NewFilterChain(), PipelineConfig{Mode: TransformRaw, PrettyJSON: true}, &out, &errOut) + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{"message":{"message_id":"om_123"}}`)) + + got := out.String() + if !strings.Contains(got, "\n \"event_type\"") { + t.Fatalf("pretty JSON output = %q, want indented object", got) } - name := entries[0].Name() - if !strings.Contains(name, "unknown") { - t.Errorf("empty EventID should fallback to 'unknown', got: %s", name) +} + +// --- Pipeline: Quiet --- + +func TestPipeline_Quiet(t *testing.T) { + filters := NewFilterChain() + var out, errOut bytes.Buffer + p := NewEventPipeline(NewHandlerRegistry(), filters, + PipelineConfig{Quiet: true}, &out, &errOut) + + p.Process(context.Background(), makeInboundEnvelope("im.message.receive_v1", `{}`)) + + if errOut.Len() != 0 { + t.Errorf("quiet mode should suppress stderr, got: %s", errOut.String()) } } @@ -567,361 +869,68 @@ func TestGenericProcessor_CompactUnmarshalError(t *testing.T) { } } -// --- Router --- - -func TestParseRoutes(t *testing.T) { - routes, err := ParseRoutes([]string{ - `^im\.message=dir:./messages/`, - `^contact\.=dir:./contacts/`, - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if routes == nil { - t.Fatal("expected non-nil router") - } - if len(routes.routes) != 2 { - t.Errorf("expected 2 routes, got %d", len(routes.routes)) - } -} - -func TestParseRoutes_Empty(t *testing.T) { - routes, err := ParseRoutes(nil) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if routes != nil { - t.Error("expected nil router for empty input") - } - - routes2, err2 := ParseRoutes([]string{}) - if err2 != nil { - t.Fatalf("unexpected error: %v", err2) - } - if routes2 != nil { - t.Error("expected nil router for empty slice") - } -} - -func TestParseRoutes_MissingEquals(t *testing.T) { - _, err := ParseRoutes([]string{"no-equals-sign"}) - if err == nil { - t.Error("expected error for missing =") - } -} - -func TestParseRoutes_InvalidRegex(t *testing.T) { - _, err := ParseRoutes([]string{"[invalid=dir:./foo/"}) - if err == nil { - t.Error("expected error for invalid regex") - } -} - -func TestParseRoutes_MissingPrefix(t *testing.T) { - _, err := ParseRoutes([]string{`^im\.message=./messages/`}) - if err == nil { - t.Error("expected error for missing dir: prefix") - } - if !strings.Contains(err.Error(), "dir:") { - t.Errorf("error should mention dir: prefix, got: %v", err) - } -} - -func TestParseRoutes_EmptyPath(t *testing.T) { - _, err := ParseRoutes([]string{`^im\.message=dir:`}) - if err == nil { - t.Error("expected error for empty path") - } +type testHandler struct { + id string + eventType string + domain string } -func TestParseRoutes_RejectsAbsolutePath(t *testing.T) { - _, err := ParseRoutes([]string{`^test=dir:/tmp/evil`}) - if err == nil { - t.Error("expected error for absolute path in route") - } +func (h testHandler) ID() string { return h.id } +func (h testHandler) EventType() string { return h.eventType } +func (h testHandler) Domain() string { return h.domain } +func (h testHandler) Handle(context.Context, *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} } -func TestParseRoutes_RejectsTraversal(t *testing.T) { - _, err := ParseRoutes([]string{`^test=dir:../../etc/evil`}) - if err == nil { - t.Error("expected error for path traversal in route") - } -} +type handlerFunc string -func TestParseRoutes_PathSafety(t *testing.T) { - routes, err := ParseRoutes([]string{`^test=dir:./foo/../bar/`}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - dir := routes.routes[0].dir - if !filepath.IsAbs(dir) { - t.Errorf("expected absolute path, got %s", dir) - } - if strings.Contains(dir, "..") { - t.Errorf("expected cleaned path without .., got %s", dir) - } +func (h handlerFunc) ID() string { return string(h) } +func (h handlerFunc) EventType() string { return "" } +func (h handlerFunc) Domain() string { return "" } +func (h handlerFunc) Handle(context.Context, *Event) HandlerResult { + return HandlerResult{Status: HandlerStatusHandled} } -func TestEventRouter_Match(t *testing.T) { - chdirTemp(t) - - router, err := ParseRoutes([]string{ - `^im\.message=dir:./test_messages`, - `^contact\.=dir:./test_contacts`, - }) - if err != nil { - t.Fatal(err) - } - - // Single match - dirs := router.Match("im.message.receive_v1") - if len(dirs) != 1 { - t.Errorf("expected 1 match, got %v", dirs) - } - - dirs = router.Match("contact.user.created_v3") - if len(dirs) != 1 { - t.Errorf("expected 1 match, got %v", dirs) - } - - // No match - dirs = router.Match("drive.file.edit_v1") - if len(dirs) != 0 { - t.Errorf("expected no match, got %v", dirs) - } -} - -func TestEventRouter_Match_FanOut(t *testing.T) { - chdirTemp(t) - - router, err := ParseRoutes([]string{ - `^im\.=dir:./test_im`, - `message=dir:./test_msg`, - }) - if err != nil { - t.Fatal(err) - } - - // "im.message.receive_v1" matches both patterns - dirs := router.Match("im.message.receive_v1") - if len(dirs) != 2 { - t.Errorf("expected 2 matches (fan-out), got %d: %v", len(dirs), dirs) - } -} - -// --- Pipeline: Route --- - -func TestPipeline_Route(t *testing.T) { - chdirTemp(t) - router, err := ParseRoutes([]string{ - `^im\.message=dir:./route_out`, - }) - if err != nil { - t.Fatal(err) - } - dir := router.routes[0].dir - - filters := NewFilterChain() - var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformCompact, Router: router}, &out, &errOut) - if err := p.EnsureDirs(); err != nil { - t.Fatal(err) - } - - eventJSON := `{ - "message": { - "message_id": "msg_route", "chat_id": "oc_001", - "chat_type": "group", "message_type": "text", - "content": "{\"text\":\"routed\"}", "create_time": "1700000000" - }, - "sender": {"sender_id": {"open_id": "ou_001"}} - }` - raw := makeRawEvent("im.message.receive_v1", eventJSON) - raw.Header.EventID = "ev_route" - raw.Header.CreateTime = "1700000000" - p.Process(context.Background(), raw) - - // stdout should be empty — output goes to route dir - if out.Len() != 0 { - t.Errorf("routed event should not appear on stdout, got: %s", out.String()) - } - - // Verify file was created in route dir - entries, err := os.ReadDir(dir) - if err != nil { - t.Fatal(err) - } - if len(entries) != 1 { - t.Fatalf("expected 1 file in route dir, got %d", len(entries)) - } - - data, err := os.ReadFile(filepath.Join(dir, entries[0].Name())) - if err != nil { - t.Fatal(err) - } - var m map[string]interface{} - if err := json.Unmarshal(data, &m); err != nil { - t.Fatalf("file content is not valid JSON: %v", err) - } - if m["type"] != "im.message.receive_v1" { - t.Errorf("type = %v", m["type"]) - } +func (h handlerFunc) with(fn func(context.Context, *Event) HandlerResult) EventHandler { + return handlerFuncWith{id: string(h), fn: fn} } -func TestPipeline_Route_NoMatch(t *testing.T) { - chdirTemp(t) - fallbackDir := t.TempDir() - - router, err := ParseRoutes([]string{ - `^im\.message=dir:./route_dir`, - }) - if err != nil { - t.Fatal(err) - } - routeDir := router.routes[0].dir - - filters := NewFilterChain() - var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformCompact, Router: router, OutputDir: fallbackDir}, &out, &errOut) - if err := p.EnsureDirs(); err != nil { - t.Fatal(err) - } - - // Send an event that does NOT match the route - raw := makeRawEvent("drive.file.edit_v1", `{"file_token":"xxx"}`) - raw.Header.EventID = "ev_nomatch" - raw.Header.CreateTime = "1700000000" - p.Process(context.Background(), raw) - - // stdout should be empty - if out.Len() != 0 { - t.Errorf("should not appear on stdout, got: %s", out.String()) - } - - // Route dir should be empty - routeEntries, _ := os.ReadDir(routeDir) - if len(routeEntries) != 0 { - t.Errorf("route dir should be empty, got %d files", len(routeEntries)) - } - - // Fallback dir should have the file - fallbackEntries, _ := os.ReadDir(fallbackDir) - if len(fallbackEntries) != 1 { - t.Fatalf("fallback dir should have 1 file, got %d", len(fallbackEntries)) - } +type handlerFuncWith struct { + id string + eventType string + domain string + fn func(context.Context, *Event) HandlerResult } -func TestPipeline_Route_NoMatch_Stdout(t *testing.T) { - chdirTemp(t) - - router, err := ParseRoutes([]string{ - `^im\.message=dir:./route_dir`, - }) - if err != nil { - t.Fatal(err) - } - routeDir := router.routes[0].dir - - filters := NewFilterChain() - var out, errOut bytes.Buffer - // No OutputDir — unmatched events should go to stdout - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformRaw, Router: router}, &out, &errOut) - if err := p.EnsureDirs(); err != nil { - t.Fatal(err) - } - - raw := makeRawEvent("drive.file.edit_v1", `{"file_token":"xxx"}`) - raw.Header.EventID = "ev_stdout" - raw.Header.CreateTime = "1700000000" - p.Process(context.Background(), raw) - - // Route dir should be empty - routeEntries, _ := os.ReadDir(routeDir) - if len(routeEntries) != 0 { - t.Errorf("route dir should be empty, got %d files", len(routeEntries)) - } - - // stdout should have the event - if out.Len() == 0 { - t.Error("unmatched event should fall through to stdout") - } - var m map[string]interface{} - if err := json.Unmarshal(out.Bytes(), &m); err != nil { - t.Fatalf("stdout is not valid JSON: %v", err) - } +func (h handlerFuncWith) ID() string { return h.id } +func (h handlerFuncWith) EventType() string { return h.eventType } +func (h handlerFuncWith) Domain() string { return h.domain } +func (h handlerFuncWith) Handle(ctx context.Context, evt *Event) HandlerResult { + return h.fn(ctx, evt) } -func TestPipeline_Route_FanOut(t *testing.T) { - chdirTemp(t) - - router, err := ParseRoutes([]string{ - `^im\.=dir:./fanout1`, - `message=dir:./fanout2`, - }) - if err != nil { - t.Fatal(err) - } - dir1 := router.routes[0].dir - dir2 := router.routes[1].dir - - filters := NewFilterChain() - var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformCompact, Router: router}, &out, &errOut) - if err := p.EnsureDirs(); err != nil { - t.Fatal(err) - } - - eventJSON := `{ - "message": { - "message_id": "msg_fanout", "chat_id": "oc_001", - "chat_type": "group", "message_type": "text", - "content": "{\"text\":\"fanout\"}", "create_time": "1700000000" - }, - "sender": {"sender_id": {"open_id": "ou_001"}} - }` - raw := makeRawEvent("im.message.receive_v1", eventJSON) - raw.Header.EventID = "ev_fanout" - raw.Header.CreateTime = "1700000000" - p.Process(context.Background(), raw) - - // stdout should be empty - if out.Len() != 0 { - t.Errorf("fan-out event should not appear on stdout, got: %s", out.String()) +func TestHandlerRegistryRejectsDuplicateFallbackHandlerID(t *testing.T) { + r := NewHandlerRegistry() + if err := r.RegisterEventHandler(testHandler{id: "dup", eventType: "im.message.receive_v1"}); err != nil { + t.Fatalf("RegisterEventHandler() error = %v", err) } - // Both dirs should have a file - entries1, _ := os.ReadDir(dir1) - entries2, _ := os.ReadDir(dir2) - if len(entries1) != 1 { - t.Errorf("dir1 should have 1 file, got %d", len(entries1)) + err := r.SetFallbackHandler(testHandler{id: "dup", eventType: "fallback", domain: "fallback"}) + if err == nil { + t.Fatal("expected duplicate fallback handler ID to be rejected") } - if len(entries2) != 1 { - t.Errorf("dir2 should have 1 file, got %d", len(entries2)) + if !strings.Contains(err.Error(), "duplicate handler ID: dup") { + t.Fatalf("error = %v, want duplicate handler ID", err) } } -// --- cleanupSeen --- - -func TestCleanupSeen(t *testing.T) { - filters := NewFilterChain() - var out, errOut bytes.Buffer - p := NewEventPipeline(DefaultRegistry(), filters, - PipelineConfig{Mode: TransformRaw}, &out, &errOut) - - // Insert an expired entry directly - p.seen.Store("old_key", time.Now().Add(-10*time.Minute)) - p.seen.Store("fresh_key", time.Now()) - - p.cleanupSeen(time.Now()) - - if _, ok := p.seen.Load("old_key"); ok { - t.Error("expired key should be cleaned up") +func TestHandlerRegistrySetFallbackHandlerStoresHandler(t *testing.T) { + r := NewHandlerRegistry() + h := testHandler{id: "fallback", eventType: "fallback", domain: "fallback"} + if err := r.SetFallbackHandler(h); err != nil { + t.Fatalf("SetFallbackHandler() error = %v", err) } - if _, ok := p.seen.Load("fresh_key"); !ok { - t.Error("fresh key should be kept") + if got := r.FallbackHandler(); got == nil || got.ID() != h.ID() { + t.Fatalf("FallbackHandler() = %v, want handler %q", got, h.ID()) } } diff --git a/shortcuts/event/registry.go b/shortcuts/event/registry.go index e51ef4f4..5d703ae0 100644 --- a/shortcuts/event/registry.go +++ b/shortcuts/event/registry.go @@ -11,6 +11,14 @@ type ProcessorRegistry struct { fallback EventProcessor } +// HandlerRegistry manages event and domain scoped EventHandler registrations. +type HandlerRegistry struct { + eventHandlers map[string][]EventHandler + domainHandlers map[string][]EventHandler + fallback EventHandler + ids map[string]struct{} +} + // NewProcessorRegistry creates a registry with a fallback for unregistered event types. func NewProcessorRegistry(fallback EventProcessor) *ProcessorRegistry { return &ProcessorRegistry{ @@ -57,3 +65,105 @@ func DefaultRegistry() *ProcessorRegistry { _ = r.Register(&ImChatDisbandedProcessor{}) return r } + +// NewHandlerRegistry creates an empty handler registry. +func NewHandlerRegistry() *HandlerRegistry { + return &HandlerRegistry{ + eventHandlers: make(map[string][]EventHandler), + domainHandlers: make(map[string][]EventHandler), + ids: make(map[string]struct{}), + } +} + +// NewBuiltinHandlerRegistry creates a handler registry with the built-in runtime handlers. +func NewBuiltinHandlerRegistry() *HandlerRegistry { + r := NewHandlerRegistry() + for _, h := range []EventHandler{ + NewIMMessageReceiveHandler(), + NewIMMessageReadHandler(), + NewIMReactionCreatedHandler(), + NewIMReactionDeletedHandler(), + NewIMChatUpdatedHandler(), + NewIMChatDisbandedHandler(), + NewIMChatMemberBotAddedHandler(), + NewIMChatMemberBotDeletedHandler(), + NewIMChatMemberUserAddedHandler(), + NewIMChatMemberUserWithdrawnHandler(), + NewIMChatMemberUserDeletedHandler(), + } { + _ = r.RegisterEventHandler(h) + } + _ = r.SetFallbackHandler(NewGenericFallbackHandler()) + return r +} + +// RegisterEventHandler registers a handler for an exact event type. +func (r *HandlerRegistry) RegisterEventHandler(h EventHandler) error { + if err := r.validateHandler(h, h.EventType(), "event type"); err != nil { + return err + } + r.eventHandlers[h.EventType()] = append(r.eventHandlers[h.EventType()], h) + return nil +} + +// RegisterDomainHandler registers a handler for an exact domain. +func (r *HandlerRegistry) RegisterDomainHandler(h EventHandler) error { + if err := r.validateHandler(h, h.Domain(), "domain"); err != nil { + return err + } + r.domainHandlers[h.Domain()] = append(r.domainHandlers[h.Domain()], h) + return nil +} + +// SetFallbackHandler sets the fallback handler used when no handlers match. +func (r *HandlerRegistry) SetFallbackHandler(h EventHandler) error { + if err := r.validateHandler(h, h.ID(), "fallback handler"); err != nil { + return err + } + r.fallback = h + return nil +} + +// EventHandlers returns handlers registered for the exact event type. +func (r *HandlerRegistry) EventHandlers(eventType string) []EventHandler { + if r == nil { + return nil + } + return r.eventHandlers[eventType] +} + +// DomainHandlers returns handlers registered for the exact domain. +func (r *HandlerRegistry) DomainHandlers(domain string) []EventHandler { + if r == nil { + return nil + } + return r.domainHandlers[domain] +} + +// FallbackHandler returns the configured fallback handler. +func (r *HandlerRegistry) FallbackHandler() EventHandler { + if r == nil { + return nil + } + return r.fallback +} + +func (r *HandlerRegistry) validateHandler(h EventHandler, key string, scope string) error { + if r == nil { + return fmt.Errorf("nil handler registry") + } + if h == nil { + return fmt.Errorf("nil handler") + } + if h.ID() == "" { + return fmt.Errorf("handler ID is required") + } + if key == "" { + return fmt.Errorf("handler %q %s is required", h.ID(), scope) + } + if _, exists := r.ids[h.ID()]; exists { + return fmt.Errorf("duplicate handler ID: %s", h.ID()) + } + r.ids[h.ID()] = struct{}{} + return nil +} diff --git a/shortcuts/event/router.go b/shortcuts/event/router.go index 07991647..16f18cdb 100644 --- a/shortcuts/event/router.go +++ b/shortcuts/event/router.go @@ -4,9 +4,13 @@ package event import ( + "encoding/json" "fmt" + "os" + "path/filepath" "regexp" "strings" + "sync/atomic" "github.com/larksuite/cli/internal/validate" ) @@ -74,3 +78,112 @@ func (r *EventRouter) Match(eventType string) []string { } return dirs } + +// OutputRecordWriter writes a fully serialized pipeline record. +type OutputRecordWriter interface { + WriteRecord(eventType string, record map[string]interface{}) error +} + +type outputRouter struct { + router *EventRouter + defaultDir string + fallback OutputRecordWriter + seq *uint64 + writers map[string]*dirRecordWriter +} + +func (r *outputRouter) WriteRecord(eventType string, record map[string]interface{}) error { + if r == nil { + return nil + } + + dirs := r.matchDirs(eventType) + if len(dirs) == 0 { + if r.fallback == nil { + return nil + } + return r.fallback.WriteRecord(eventType, record) + } + + for _, dir := range dirs { + writer := r.writers[dir] + if writer == nil { + writer = &dirRecordWriter{dir: dir, seq: r.seq} + r.writers[dir] = writer + } + if err := writer.WriteRecord(eventType, record); err != nil { + return err + } + } + return nil +} + +func (r *outputRouter) matchDirs(eventType string) []string { + if r == nil { + return nil + } + var dirs []string + if r.router != nil { + dirs = append(dirs, r.router.Match(eventType)...) + } + if len(dirs) == 0 && r.defaultDir != "" { + dirs = append(dirs, r.defaultDir) + } + return dirs +} + +type dirRecordWriter struct { + dir string + seq *uint64 +} + +func (w *dirRecordWriter) WriteRecord(_ string, record map[string]interface{}) error { + if err := os.MkdirAll(w.dir, 0o755); err != nil { + return err + } + data, err := json.Marshal(record) + if err != nil { + return err + } + name := w.nextFileName(record) + return os.WriteFile(filepath.Join(w.dir, name), append(data, '\n'), 0o644) +} + +func (w *dirRecordWriter) nextFileName(record map[string]interface{}) string { + seq := atomic.AddUint64(w.seq, 1) + eventType, _ := record["event_type"].(string) + eventID, _ := record["event_id"].(string) + if eventID == "" { + eventID, _ = record["idempotency_key"].(string) + } + base := sanitizeRouteFilePart(eventType) + if base == "" { + base = "event" + } + id := sanitizeRouteFilePart(eventID) + if id == "" { + return fmt.Sprintf("%06d-%s.json", seq, base) + } + return fmt.Sprintf("%06d-%s-%s.json", seq, base, id) +} + +func sanitizeRouteFilePart(value string) string { + value = strings.TrimSpace(value) + if value == "" { + return "" + } + value = strings.ReplaceAll(value, string(filepath.Separator), "-") + value = strings.ReplaceAll(value, "/", "-") + var b strings.Builder + for _, r := range value { + switch { + case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9': + b.WriteRune(r) + case r == '.', r == '-', r == '_': + b.WriteRune(r) + default: + b.WriteByte('-') + } + } + return strings.Trim(b.String(), "-.") +} diff --git a/shortcuts/event/subscribe.go b/shortcuts/event/subscribe.go index 5b3022e6..0cbdd8f8 100644 --- a/shortcuts/event/subscribe.go +++ b/shortcuts/event/subscribe.go @@ -5,7 +5,6 @@ package event import ( "context" - "encoding/json" "fmt" "io" "os" @@ -50,8 +49,10 @@ func (l *stderrLogger) Error(_ context.Context, args ...interface{}) { var _ larkcore.Logger = (*stderrLogger)(nil) -// commonEventTypes are well-known event types registered in catch-all mode. -var commonEventTypes = []string{ +// subscribedEventTypes are the event types wired into the SDK dispatcher. +// Keep this list aligned with NewBuiltinHandlerRegistry because the current SDK +// only delivers websocket events to explicitly registered event types here. +var subscribedEventTypes = []string{ "im.message.receive_v1", "im.message.message_read_v1", "im.message.reaction.created_v1", @@ -63,19 +64,25 @@ var commonEventTypes = []string{ "im.chat.member.user.deleted_v1", "im.chat.updated_v1", "im.chat.disbanded_v1", - "contact.user.created_v3", - "contact.user.updated_v3", - "contact.user.deleted_v3", - "contact.department.created_v3", - "contact.department.updated_v3", - "contact.department.deleted_v3", - "calendar.calendar.acl.created_v4", - "calendar.calendar.event.changed_v4", - "approval.approval.updated", - "application.application.visibility.added_v6", - "task.task.update_tenant_v1", - "task.task.comment_updated_v1", - "drive.notice.comment_add_v1", +} + +func subscribedEventTypesFor(eventTypesStr string) []string { + filter := NewEventTypeFilter(eventTypesStr) + if filter == nil { + return nil + } + return filter.Types() +} + +func pipelineConfigFor(jsonFlag, compactFlag bool) PipelineConfig { + mode := TransformRaw + if compactFlag { + mode = TransformCompact + } + return PipelineConfig{ + Mode: mode, + PrettyJSON: jsonFlag, + } } var EventSubscribe = common.Shortcut{ @@ -170,6 +177,7 @@ var EventSubscribe = common.Shortcut{ } // --- Build filter chain --- + explicitTypes := subscribedEventTypesFor(eventTypesStr) eventTypeFilter := NewEventTypeFilter(eventTypesStr) regexFilter, err := NewRegexFilter(filterStr) if err != nil { @@ -184,40 +192,34 @@ var EventSubscribe = common.Shortcut{ } filters := NewFilterChain(filterList...) - // --- Parse route --- - router, err := ParseRoutes(routeSpecs) - if err != nil { - return output.ErrValidation("invalid --route: %v", err) + _ = jsonFlag + hasRoutes := len(routeSpecs) > 0 + var recordWriter OutputRecordWriter + if hasRoutes || outputDir != "" { + router, err := ParseRoutes(routeSpecs) + if err != nil { + return output.ErrValidation("invalid --route: %v", err) + } + recordWriter = &outputRouter{ + router: router, + defaultDir: outputDir, + fallback: ndjsonRecordWriter{w: out}, + seq: new(uint64), + writers: map[string]*dirRecordWriter{}, + } } // --- Build pipeline --- - mode := TransformRaw - if compactFlag { - mode = TransformCompact - } - pipeline := NewEventPipeline(DefaultRegistry(), filters, PipelineConfig{ - Mode: mode, - JsonFlag: jsonFlag, - OutputDir: outputDir, - Quiet: quietFlag, - Router: router, - }, out, errOut) - - if err := pipeline.EnsureDirs(); err != nil { - return err - } + config := pipelineConfigFor(jsonFlag, compactFlag) + config.Quiet = quietFlag + pipeline := newEventPipeline(NewBuiltinHandlerRegistry(), filters, config, out, errOut, recordWriter) // --- Build SDK event dispatcher --- rawHandler := func(ctx context.Context, event *larkevent.EventReq) error { if event.Body == nil { return nil } - var raw RawEvent - if err := json.Unmarshal(event.Body, &raw); err != nil { - output.PrintError(errOut, fmt.Sprintf("failed to parse event: %v", err)) - return nil - } - pipeline.Process(ctx, &raw) + pipeline.Process(ctx, BuildWebSocketEnvelope(event.Body)) return nil } @@ -225,12 +227,12 @@ var EventSubscribe = common.Shortcut{ eventDispatcher := dispatcher.NewEventDispatcher("", "") eventDispatcher.InitConfig(larkevent.WithLogger(sdkLogger)) - if eventTypeFilter != nil { - for _, et := range eventTypeFilter.Types() { + if explicitTypes != nil { + for _, et := range explicitTypes { eventDispatcher.OnCustomizedEvent(et, rawHandler) } } else { - for _, et := range commonEventTypes { + for _, et := range subscribedEventTypes { eventDispatcher.OnCustomizedEvent(et, rawHandler) } } @@ -242,16 +244,16 @@ var EventSubscribe = common.Shortcut{ } info(fmt.Sprintf("%sConnecting to Lark event WebSocket...%s", output.Cyan, output.Reset)) - if eventTypeFilter != nil { - info(fmt.Sprintf("Listening for: %s%s%s", output.Green, strings.Join(eventTypeFilter.Types(), ", "), output.Reset)) + if explicitTypes != nil { + info(fmt.Sprintf("Listening for: %s%s%s", output.Green, strings.Join(explicitTypes, ", "), output.Reset)) } else { - info(fmt.Sprintf("Listening for %s%d common event types%s (catch-all mode)", output.Green, len(commonEventTypes), output.Reset)) - info(fmt.Sprintf("%sTip:%s use --event-types to listen for specific event types", output.Dim, output.Reset)) + info(fmt.Sprintf("Listening in %scatch-all%s mode for the SDK-supported event types registered by this command", output.Green, output.Reset)) + info(fmt.Sprintf("%sTip:%s use --event-types to narrow subscription", output.Dim, output.Reset)) } if regexFilter != nil { info(fmt.Sprintf("Filter: %s%s%s", output.Yellow, regexFilter.String(), output.Reset)) } - if router != nil { + if hasRoutes { for _, spec := range routeSpecs { info(fmt.Sprintf(" Route: %s%s%s", output.Green, spec, output.Reset)) } diff --git a/shortcuts/event/subscribe_test.go b/shortcuts/event/subscribe_test.go new file mode 100644 index 00000000..1c85c3f9 --- /dev/null +++ b/shortcuts/event/subscribe_test.go @@ -0,0 +1,60 @@ +package event + +import ( + "context" + "reflect" + "testing" + + larkevent "github.com/larksuite/oapi-sdk-go/v3/event" + "github.com/larksuite/oapi-sdk-go/v3/event/dispatcher" +) + +func TestSubscribedEventTypesForCatchAllReturnsNil(t *testing.T) { + got := subscribedEventTypesFor("") + if got != nil { + t.Fatalf("subscribedEventTypesFor(\"\") = %v, want nil for catch-all", got) + } +} + +func TestSubscribedEventTypesForExplicitListSortsValues(t *testing.T) { + got := subscribedEventTypesFor("calendar.event.updated_v1,im.message.receive_v1") + want := []string{"calendar.event.updated_v1", "im.message.receive_v1"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("subscribedEventTypesFor(explicit) = %v, want %v", got, want) + } +} + +func TestCatchAllUsesSDKDefaultCustomizedHandler(t *testing.T) { + eventDispatcher := dispatcher.NewEventDispatcher("", "") + eventDispatcher.OnCustomizedEvent("", func(_ context.Context, _ *larkevent.EventReq) error { + return nil + }) + + _, err := eventDispatcher.Do(context.Background(), []byte(`{"header":{"event_type":"contact.user.created_v3"}}`)) + if err == nil { + t.Fatal("dispatcher.Do() error = nil, want not found because parse() still routes by concrete event type") + } + if err.Error() != "event type: contact.user.created_v3, not found handler" { + t.Fatalf("dispatcher.Do() error = %v, want concrete event type not found", err) + } +} + +func TestSubscribePipelineConfigUsesCompactFlag(t *testing.T) { + config := pipelineConfigFor(false, true) + if config.Mode != TransformCompact { + t.Fatalf("Mode = %v, want TransformCompact", config.Mode) + } + if config.PrettyJSON { + t.Fatalf("PrettyJSON = true, want false") + } +} + +func TestSubscribePipelineConfigUsesJSONFlag(t *testing.T) { + config := pipelineConfigFor(true, false) + if config.Mode != TransformRaw { + t.Fatalf("Mode = %v, want TransformRaw", config.Mode) + } + if !config.PrettyJSON { + t.Fatalf("PrettyJSON = false, want true") + } +}