From 6cc9820cbc7e28299f646984597fdaf13b5ca377 Mon Sep 17 00:00:00 2001 From: ImpulseB23 Date: Sat, 18 Apr 2026 23:21:15 +0200 Subject: [PATCH 1/3] feat: kick chat read path via pusher websocket --- .../src-sidecar/internal/control/control.go | 12 + .../src-sidecar/internal/kick/client.go | 226 ++++++++++++++ .../src-sidecar/internal/kick/client_test.go | 281 ++++++++++++++++++ .../src-sidecar/internal/kick/types.go | 50 ++++ .../src-sidecar/internal/sidecar/sidecar.go | 53 ++++ .../src-sidecar/internal/twitch/eventsub.go | 6 +- .../internal/twitch/eventsub_test.go | 5 +- apps/desktop/src-tauri/src/host.rs | 107 ++++++- apps/desktop/src-tauri/src/message.rs | 167 ++++++++++- docs/adr.md | 1 + docs/platform-apis.md | 47 ++- 11 files changed, 937 insertions(+), 18 deletions(-) create mode 100644 apps/desktop/src-sidecar/internal/kick/client.go create mode 100644 apps/desktop/src-sidecar/internal/kick/client_test.go create mode 100644 apps/desktop/src-sidecar/internal/kick/types.go diff --git a/apps/desktop/src-sidecar/internal/control/control.go b/apps/desktop/src-sidecar/internal/control/control.go index 0ae1915..f35c29e 100644 --- a/apps/desktop/src-sidecar/internal/control/control.go +++ b/apps/desktop/src-sidecar/internal/control/control.go @@ -1,5 +1,13 @@ package control +// Platform tag bytes prepended to each message written to the out channel. +// The Rust host reads the first byte to dispatch to the correct parser. +const ( + TagTwitch byte = 0x01 + TagKick byte = 0x02 + TagYouTube byte = 0x03 +) + // Bootstrap is the first message the Rust host writes to the sidecar's stdin // at startup. It hands over the inherited shared memory section so the sidecar // can attach without having to know a name or open a kernel object by lookup. @@ -36,6 +44,10 @@ type Command struct { // require the moderator_id to match the token's authenticated user. UserID string `json:"user_id,omitempty"` + // ChatroomID is the Kick chatroom numeric ID. Used by kick_connect to + // subscribe to the Pusher channel for this chatroom. + ChatroomID int `json:"chatroom_id,omitempty"` + // Mod action fields. Only set by ban_user / unban_user / timeout_user / // delete_message commands. TargetUserID string `json:"target_user_id,omitempty"` diff --git a/apps/desktop/src-sidecar/internal/kick/client.go b/apps/desktop/src-sidecar/internal/kick/client.go new file mode 100644 index 0000000..2514dbd --- /dev/null +++ b/apps/desktop/src-sidecar/internal/kick/client.go @@ -0,0 +1,226 @@ +package kick + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "time" + + "github.com/coder/websocket" + "github.com/rs/zerolog" + + "github.com/ImpulseB23/Prismoid/sidecar/internal/backoff" + "github.com/ImpulseB23/Prismoid/sidecar/internal/control" +) + +const ( + defaultPusherURL = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0-rc2&flash=false" + channelPrefix = "chatrooms." + channelSuffix = ".v2" +) + +// Notify is called on control-plane events that the Rust host should know +// about. Mirrors twitch.Notify. +type Notify func(msgType string, payload any) + +// Client streams Kick chat messages from a Pusher WebSocket channel and writes +// raw Pusher event bytes (tagged with control.TagKick) to a shared channel. +// Follows the same lifecycle pattern as the Twitch EventSub client. +type Client struct { + ChatroomID int + WSURL string // override for testing; "" uses default + + Out chan<- []byte + Log zerolog.Logger + Notify Notify +} + +// Run connects to the Kick Pusher WebSocket and reads messages until ctx is +// cancelled. Reconnects automatically with exponential backoff on errors. +// Pusher close codes 4000-4099 are fatal (do not reconnect). +func (c *Client) Run(ctx context.Context) error { + bo := backoff.New(1*time.Second, 30*time.Second) + + for { + err := c.connectAndListen(ctx) + if err == nil || errors.Is(err, context.Canceled) { + return err + } + + if isFatalClose(err) { + c.Log.Error().Err(err).Msg("fatal pusher close, not reconnecting") + return err + } + + c.Log.Warn().Err(err).Msg("kick disconnected, reconnecting") + + delay := bo.Next() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } +} + +func (c *Client) wsURL() string { + if c.WSURL != "" { + return c.WSURL + } + return defaultPusherURL +} + +func (c *Client) connectAndListen(ctx context.Context) error { + conn, _, err := websocket.Dial(ctx, c.wsURL(), nil) + if err != nil { + return fmt.Errorf("dial %s: %w", c.wsURL(), err) + } + defer func() { _ = conn.CloseNow() }() + + activityTimeout, err := c.readConnectionEstablished(ctx, conn) + if err != nil { + return err + } + + c.Log.Info().Int("chatroom", c.ChatroomID).Int("activity_timeout", activityTimeout).Msg("connected to kick pusher") + + if err := c.subscribe(ctx, conn); err != nil { + return err + } + + return c.listenLoop(ctx, conn, activityTimeout) +} + +func (c *Client) readConnectionEstablished(ctx context.Context, conn *websocket.Conn) (int, error) { + readCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + + _, data, err := conn.Read(readCtx) + if err != nil { + return 0, fmt.Errorf("read connection_established: %w", err) + } + + var ev PusherEvent + if err := json.Unmarshal(data, &ev); err != nil { + return 0, fmt.Errorf("unmarshal connection_established: %w", err) + } + if ev.Event != "pusher:connection_established" { + return 0, fmt.Errorf("expected pusher:connection_established, got %s", ev.Event) + } + + var connData ConnectionData + if err := json.Unmarshal([]byte(ev.Data), &connData); err != nil { + return 0, fmt.Errorf("unmarshal connection data: %w", err) + } + + return connData.ActivityTimeout, nil +} + +func (c *Client) subscribe(ctx context.Context, conn *websocket.Conn) error { + channel := channelPrefix + strconv.Itoa(c.ChatroomID) + channelSuffix + sub := PusherEvent{ + Event: "pusher:subscribe", + Data: `{"channel":"` + channel + `"}`, + } + + msg, err := json.Marshal(sub) + if err != nil { + return fmt.Errorf("marshal subscribe: %w", err) + } + + writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + if err := conn.Write(writeCtx, websocket.MessageText, msg); err != nil { + return fmt.Errorf("write subscribe: %w", err) + } + + c.Log.Info().Str("channel", channel).Msg("subscribed to kick chatroom") + return nil +} + +func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, activityTimeoutSec int) error { + // Pusher recommends sending a ping after activity_timeout seconds of + // inactivity, and closing if no pong arrives within 30s. We use + // activityTimeout + 10s as the read deadline to account for ping/pong. + timeout := time.Duration(activityTimeoutSec)*time.Second + 10*time.Second + + lastActivity := time.Now() + pingInterval := time.Duration(activityTimeoutSec) * time.Second + + for { + readCtx, cancel := context.WithTimeout(ctx, timeout) + _, data, err := conn.Read(readCtx) + cancel() + if err != nil { + if errors.Is(err, context.Canceled) && ctx.Err() != nil { + _ = conn.Close(websocket.StatusNormalClosure, "shutting down") + return ctx.Err() + } + return fmt.Errorf("read: %w", err) + } + + var ev PusherEvent + if err := json.Unmarshal(data, &ev); err != nil { + c.Log.Error().Err(err).Msg("unmarshal pusher event failed") + continue + } + + now := time.Now() + + switch { + case ev.Event == "pusher:ping": + pong, _ := json.Marshal(PusherEvent{Event: "pusher:pong", Data: "{}"}) + writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + _ = conn.Write(writeCtx, websocket.MessageText, pong) + cancel() + lastActivity = now + + case ev.Event == "pusher:pong": + lastActivity = now + + case ev.Event == "pusher:error": + c.Log.Warn().Str("data", ev.Data).Msg("pusher error") + + case ev.Event == "pusher_internal:subscription_succeeded": + lastActivity = now + + case ev.Channel != "": + // Channel event (chat message, ban, etc). Forward the raw + // Pusher event bytes tagged for the Rust parser. + tagged := make([]byte, 1+len(data)) + tagged[0] = control.TagKick + copy(tagged[1:], data) + select { + case c.Out <- tagged: + default: + c.Log.Warn().Msg("output channel full, dropping message") + } + lastActivity = now + + default: + c.Log.Debug().Str("event", ev.Event).Msg("unhandled pusher event") + } + + if now.Sub(lastActivity) >= pingInterval { + ping, _ := json.Marshal(PusherEvent{Event: "pusher:ping", Data: "{}"}) + writeCtx, wCancel := context.WithTimeout(ctx, 5*time.Second) + _ = conn.Write(writeCtx, websocket.MessageText, ping) + wCancel() + lastActivity = now + } + } +} + +// isFatalClose checks if the error contains a Pusher close code in the +// 4000-4099 range, which means "do not reconnect". +func isFatalClose(err error) bool { + var closeErr websocket.CloseError + if errors.As(err, &closeErr) { + code := int(closeErr.Code) + return code >= 4000 && code <= 4099 + } + return false +} diff --git a/apps/desktop/src-sidecar/internal/kick/client_test.go b/apps/desktop/src-sidecar/internal/kick/client_test.go new file mode 100644 index 0000000..2caa8e6 --- /dev/null +++ b/apps/desktop/src-sidecar/internal/kick/client_test.go @@ -0,0 +1,281 @@ +package kick + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/coder/websocket" + "github.com/rs/zerolog" +) + +func connectionEstablishedMsg(timeout int) []byte { + data := fmt.Sprintf(`{"socket_id":"123.456","activity_timeout":%d}`, timeout) + ev := PusherEvent{Event: "pusher:connection_established", Data: data} + b, _ := json.Marshal(ev) + return b +} + +func subscriptionSucceededMsg(channel string) []byte { + ev := PusherEvent{ + Event: "pusher_internal:subscription_succeeded", + Data: "{}", + Channel: channel, + } + b, _ := json.Marshal(ev) + return b +} + +func chatMessageEvent(id, content, username string, chatroomID int) []byte { + inner := fmt.Sprintf(`{"id":"%s","chatroom_id":%d,"content":"%s","type":"message","created_at":"2025-06-01T12:00:00Z","sender":{"id":42,"username":"%s","slug":"%s","identity":{"color":"#FF0000","badges":[]}}}`, + id, chatroomID, content, username, username) + ev := PusherEvent{ + Event: `App\Events\ChatMessageEvent`, + Data: inner, + Channel: fmt.Sprintf("chatrooms.%d.v2", chatroomID), + } + b, _ := json.Marshal(ev) + return b +} + +func pusherPingMsg() []byte { + ev := PusherEvent{Event: "pusher:ping", Data: "{}"} + b, _ := json.Marshal(ev) + return b +} + +func newTestClient(wsURL string, chatroomID int, out chan<- []byte) *Client { + return &Client{ + ChatroomID: chatroomID, + WSURL: wsURL, + Out: out, + Log: zerolog.Nop(), + Notify: func(string, any) {}, + } +} + +func drainChan(ch <-chan []byte) [][]byte { + var msgs [][]byte + deadline := time.After(100 * time.Millisecond) + for { + select { + case msg := <-ch: + msgs = append(msgs, msg) + case <-deadline: + return msgs + } + } +} + +func TestClientReceivesChatMessages(t *testing.T) { + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + t.Errorf("accept: %v", err) + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(120)) + // read the subscribe message from client + _, _, _ = conn.Read(ctx) + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.100.v2")) + _ = conn.Write(ctx, websocket.MessageText, chatMessageEvent("msg-1", "hello kick", "viewer1", 100)) + _ = conn.Write(ctx, websocket.MessageText, chatMessageEvent("msg-2", "second msg", "viewer2", 100)) + + time.Sleep(100 * time.Millisecond) + _ = conn.Close(websocket.StatusNormalClosure, "done") + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := newTestClient( + "ws"+strings.TrimPrefix(wsSrv.URL, "http"), + 100, + out, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _ = client.Run(ctx) + + msgs := drainChan(out) + if len(msgs) < 2 { + t.Fatalf("expected at least 2 messages, got %d", len(msgs)) + } + + // verify tag byte + if msgs[0][0] != 0x02 { + t.Fatalf("expected tag 0x02, got %x", msgs[0][0]) + } + + // verify the inner Pusher event is valid JSON + var ev PusherEvent + if err := json.Unmarshal(msgs[0][1:], &ev); err != nil { + t.Fatalf("unmarshal first message: %v", err) + } + if ev.Event != `App\Events\ChatMessageEvent` { + t.Fatalf("expected ChatMessageEvent, got %s", ev.Event) + } + if ev.Channel != "chatrooms.100.v2" { + t.Fatalf("expected channel chatrooms.100.v2, got %s", ev.Channel) + } +} + +func TestClientHandlesPusherPing(t *testing.T) { + var receivedPong bool + + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(120)) + _, _, _ = conn.Read(ctx) // subscribe + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.200.v2")) + _ = conn.Write(ctx, websocket.MessageText, pusherPingMsg()) + + // read the pong response + readCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + _, data, err := conn.Read(readCtx) + if err == nil { + var ev PusherEvent + if json.Unmarshal(data, &ev) == nil && ev.Event == "pusher:pong" { + receivedPong = true + } + } + + _ = conn.Write(ctx, websocket.MessageText, chatMessageEvent("msg-1", "after ping", "user1", 200)) + time.Sleep(100 * time.Millisecond) + _ = conn.Close(websocket.StatusNormalClosure, "done") + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := newTestClient( + "ws"+strings.TrimPrefix(wsSrv.URL, "http"), + 200, + out, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _ = client.Run(ctx) + + if !receivedPong { + t.Fatal("expected client to respond to pusher:ping with pusher:pong") + } + + msgs := drainChan(out) + if len(msgs) < 1 { + t.Fatal("expected at least 1 message after ping/pong") + } +} + +func TestClientFatalCloseDoesNotReconnect(t *testing.T) { + connectCount := 0 + + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + connectCount++ + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(120)) + _, _, _ = conn.Read(ctx) // subscribe + + // 4001 = application only, do not reconnect + _ = conn.Close(websocket.StatusCode(4001), "app disabled") + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := newTestClient( + "ws"+strings.TrimPrefix(wsSrv.URL, "http"), + 300, + out, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + err := client.Run(ctx) + if err == nil { + t.Fatal("expected error on fatal close") + } + if connectCount != 1 { + t.Fatalf("expected exactly 1 connection attempt for fatal close, got %d", connectCount) + } +} + +func TestClientContextCancel(t *testing.T) { + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(120)) + _, _, _ = conn.Read(ctx) // subscribe + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.400.v2")) + + // keep connection open until test cancels + time.Sleep(5 * time.Second) + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := newTestClient( + "ws"+strings.TrimPrefix(wsSrv.URL, "http"), + 400, + out, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + err := client.Run(ctx) + if err != nil && err != context.DeadlineExceeded && !strings.Contains(err.Error(), "context") { + t.Fatalf("expected context error, got: %v", err) + } +} + +func TestIsFatalClose(t *testing.T) { + tests := []struct { + name string + code websocket.StatusCode + fatal bool + }{ + {"4000 fatal", websocket.StatusCode(4000), true}, + {"4099 fatal", websocket.StatusCode(4099), true}, + {"4100 not fatal", websocket.StatusCode(4100), false}, + {"4200 not fatal", websocket.StatusCode(4200), false}, + {"1000 not fatal", websocket.StatusNormalClosure, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := websocket.CloseError{Code: tt.code, Reason: "test"} + if got := isFatalClose(err); got != tt.fatal { + t.Errorf("isFatalClose(%d) = %v, want %v", tt.code, got, tt.fatal) + } + }) + } +} diff --git a/apps/desktop/src-sidecar/internal/kick/types.go b/apps/desktop/src-sidecar/internal/kick/types.go new file mode 100644 index 0000000..e40c3a8 --- /dev/null +++ b/apps/desktop/src-sidecar/internal/kick/types.go @@ -0,0 +1,50 @@ +package kick + +import "encoding/json" + +// PusherEvent is the top-level frame for all Pusher WebSocket messages. +// Protocol events (connection_established, pong, error) and channel events +// (ChatMessageEvent) share this shape. Data is always a JSON-encoded string. +type PusherEvent struct { + Event string `json:"event"` + Data string `json:"data"` + Channel string `json:"channel,omitempty"` +} + +// ConnectionData is the payload inside a pusher:connection_established event. +type ConnectionData struct { + SocketID string `json:"socket_id"` + ActivityTimeout int `json:"activity_timeout"` +} + +// ChatMessage is the inner payload of a ChatMessageEvent on a chatrooms.*.v2 +// Pusher channel. Field names follow the Pusher v2 format observed in the +// wild; the official webhook format uses slightly different names (message_id +// vs id, user_id vs id on sender, username_color vs color on identity). +type ChatMessage struct { + ID string `json:"id"` + ChatroomID int `json:"chatroom_id,omitempty"` + Content string `json:"content"` + Type string `json:"type,omitempty"` + CreatedAt string `json:"created_at"` + Sender Sender `json:"sender"` + Emotes json.RawMessage `json:"emotes,omitempty"` +} + +type Sender struct { + ID int `json:"id"` + Username string `json:"username"` + Slug string `json:"slug,omitempty"` + Identity Identity `json:"identity"` +} + +type Identity struct { + Color string `json:"color"` + Badges []Badge `json:"badges"` +} + +type Badge struct { + Type string `json:"type"` + Text string `json:"text"` + Count int `json:"count,omitempty"` +} diff --git a/apps/desktop/src-sidecar/internal/sidecar/sidecar.go b/apps/desktop/src-sidecar/internal/sidecar/sidecar.go index eed0f1b..0314ec0 100644 --- a/apps/desktop/src-sidecar/internal/sidecar/sidecar.go +++ b/apps/desktop/src-sidecar/internal/sidecar/sidecar.go @@ -13,6 +13,7 @@ import ( "io" "os" "os/signal" + "strconv" "sync" "syscall" "time" @@ -22,6 +23,7 @@ import ( "github.com/ImpulseB23/Prismoid/sidecar/internal/control" "github.com/ImpulseB23/Prismoid/sidecar/internal/emotes" + "github.com/ImpulseB23/Prismoid/sidecar/internal/kick" "github.com/ImpulseB23/Prismoid/sidecar/internal/ringbuf" "github.com/ImpulseB23/Prismoid/sidecar/internal/twitch" ) @@ -269,6 +271,10 @@ func DispatchCommand(ctx context.Context, cmd control.Command, clients map[strin HandleTwitchConnect(ctx, cmd, clients, out, notify, logger) case "twitch_disconnect": HandleTwitchDisconnect(cmd, clients, logger) + case "kick_connect": + HandleKickConnect(ctx, cmd, clients, out, logger) + case "kick_disconnect": + HandleKickDisconnect(cmd, clients, logger) case "ban_user": HandleBanUser(cmd, logger) case "unban_user": @@ -499,6 +505,53 @@ func HandleTwitchDisconnect(cmd control.Command, clients map[string]context.Canc logger.Info().Str("broadcaster", cmd.BroadcasterID).Msg("twitch client disconnected") } +// HandleKickConnect spawns a Kick Pusher WebSocket client for the chatroom in +// cmd if there isn't already one running. Uses the chatroom ID as the client +// registry key (prefixed with "kick:" to avoid collisions with Twitch IDs). +func HandleKickConnect(ctx context.Context, cmd control.Command, clients map[string]context.CancelFunc, out chan<- []byte, logger zerolog.Logger) { + key := kickClientKey(cmd.ChatroomID) + if _, exists := clients[key]; exists { + logger.Warn().Int("chatroom", cmd.ChatroomID).Msg("kick already connected, ignoring") + return + } + + clientCtx, clientCancel := context.WithCancel(ctx) + + client := &kick.Client{ + ChatroomID: cmd.ChatroomID, + Out: out, + Log: logger.With().Int("chatroom", cmd.ChatroomID).Logger(), + Notify: func(string, any) {}, + } + + clients[key] = clientCancel + + go func() { + if err := client.Run(clientCtx); err != nil && !errors.Is(err, context.Canceled) { + logger.Error().Err(err).Int("chatroom", cmd.ChatroomID).Msg("kick client exited") + } + }() + + logger.Info().Int("chatroom", cmd.ChatroomID).Msg("kick client started") +} + +// HandleKickDisconnect cancels and removes a previously-connected Kick client. +func HandleKickDisconnect(cmd control.Command, clients map[string]context.CancelFunc, logger zerolog.Logger) { + key := kickClientKey(cmd.ChatroomID) + cancelFn, exists := clients[key] + if !exists { + logger.Warn().Int("chatroom", cmd.ChatroomID).Msg("no active kick connection to disconnect") + return + } + cancelFn() + delete(clients, key) + logger.Info().Int("chatroom", cmd.ChatroomID).Msg("kick client disconnected") +} + +func kickClientKey(chatroomID int) string { + return "kick:" + strconv.Itoa(chatroomID) +} + // readerScanner is a small helper used by tests; production code constructs // its scanner directly from os.Stdin in Run. func readerScanner(r io.Reader) *bufio.Scanner { diff --git a/apps/desktop/src-sidecar/internal/twitch/eventsub.go b/apps/desktop/src-sidecar/internal/twitch/eventsub.go index 5fe2a01..d78f91f 100644 --- a/apps/desktop/src-sidecar/internal/twitch/eventsub.go +++ b/apps/desktop/src-sidecar/internal/twitch/eventsub.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog" "github.com/ImpulseB23/Prismoid/sidecar/internal/backoff" + "github.com/ImpulseB23/Prismoid/sidecar/internal/control" ) const defaultWSURL = "wss://eventsub.wss.twitch.tv/ws" @@ -169,8 +170,11 @@ func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, keepalive // SPSC primitive cannot evict already-written messages without a // reader-side cooperation we haven't built yet. Tracked // separately. + tagged := make([]byte, 1+len(data)) + tagged[0] = control.TagTwitch + copy(tagged[1:], data) select { - case c.Out <- data: + case c.Out <- tagged: default: c.Log.Warn().Msg("output channel full, dropping message") } diff --git a/apps/desktop/src-sidecar/internal/twitch/eventsub_test.go b/apps/desktop/src-sidecar/internal/twitch/eventsub_test.go index 957dc37..85689bc 100644 --- a/apps/desktop/src-sidecar/internal/twitch/eventsub_test.go +++ b/apps/desktop/src-sidecar/internal/twitch/eventsub_test.go @@ -107,7 +107,10 @@ func TestClientReceivesNotifications(t *testing.T) { } var env Envelope - if err := json.Unmarshal(msgs[0], &env); err != nil { + if msgs[0][0] != 0x01 { + t.Fatalf("expected tag 0x01, got %x", msgs[0][0]) + } + if err := json.Unmarshal(msgs[0][1:], &env); err != nil { t.Fatalf("unmarshal first message: %v", err) } if env.Metadata.MessageType != "notification" { diff --git a/apps/desktop/src-tauri/src/host.rs b/apps/desktop/src-tauri/src/host.rs index f39897b..7abc5c3 100644 --- a/apps/desktop/src-tauri/src/host.rs +++ b/apps/desktop/src-tauri/src/host.rs @@ -12,9 +12,13 @@ use std::time::Duration; use serde::Serialize; use crate::emote_index::{EmoteBundle, EmoteIndex}; -use crate::message::{parse_twitch_envelope, UnifiedMessage}; +use crate::message::{parse_kick_event, parse_twitch_envelope, UnifiedMessage}; use crate::ringbuf::RawHandle; +/// Platform tag bytes prepended by the Go sidecar. Must match control.go. +const TAG_TWITCH: u8 = 0x01; +const TAG_KICK: u8 = 0x02; + /// Timeout for [`ringbuf::RingBufReader::wait_for_signal`] in the host drain /// loop. In the happy path the sidecar signals the auto-reset event after /// each ring write and the drain wakes immediately; this value only bounds @@ -54,8 +58,19 @@ pub struct TwitchCreds { /// cannot kill the drain loop (`docs/stability.md` §Rust Panic Handling). pub fn parse_batch(raw: &[Vec], batch: &mut Vec, emote_index: &EmoteIndex) { for payload in raw { - let slice = payload.as_slice(); - let outcome = std::panic::catch_unwind(|| parse_twitch_envelope(slice)); + if payload.is_empty() { + continue; + } + let tag = payload[0]; + let data = &payload[1..]; + let outcome = std::panic::catch_unwind(|| match tag { + TAG_TWITCH => parse_twitch_envelope(data), + TAG_KICK => parse_kick_event(data), + _ => { + tracing::warn!(tag, "unknown platform tag, dropping message"); + Ok(None) + } + }); match outcome { Ok(Ok(Some(mut msg))) => { emote_index.scan_into(&msg.message_text, &mut msg.emote_spans); @@ -118,6 +133,23 @@ pub fn build_twitch_connect_line(creds: &TwitchCreds) -> serde_json::Result serde_json::Result> { + #[derive(Serialize)] + struct ConnectCmd { + cmd: &'static str, + chatroom_id: i64, + } + let cmd = ConnectCmd { + cmd: "kick_connect", + chatroom_id, + }; + let mut bytes = serde_json::to_vec(&cmd)?; + bytes.push(b'\n'); + Ok(bytes) +} + /// Marks a shared memory HANDLE inheritable just before spawning a child /// process. See ADR 18 for why this is necessary. #[cfg(windows)] @@ -260,7 +292,8 @@ mod tests { #[test] fn parse_batch_filters_non_chat_and_parse_errors() { - let viewer = br##"{ + let viewer = { + let json = br##"{ "metadata": {"message_id":"m","message_type":"notification","message_timestamp":"2023-11-06T18:11:47.492Z"}, "payload": { "subscription": {"type":"channel.chat.message"}, @@ -269,9 +302,22 @@ mod tests { "message_id":"mid","message":{"text":"hi"} } } - }"##.to_vec(); - let keepalive = br##"{"metadata":{"message_id":"ka","message_type":"session_keepalive","message_timestamp":"2023-11-06T18:11:49.000Z"},"payload":{}}"##.to_vec(); - let junk = b"not json".to_vec(); + }"##; + let mut tagged = vec![TAG_TWITCH]; + tagged.extend_from_slice(json); + tagged + }; + let keepalive = { + let json = br##"{"metadata":{"message_id":"ka","message_type":"session_keepalive","message_timestamp":"2023-11-06T18:11:49.000Z"},"payload":{}}"##; + let mut tagged = vec![TAG_TWITCH]; + tagged.extend_from_slice(json); + tagged + }; + let junk = { + let mut tagged = vec![TAG_TWITCH]; + tagged.extend_from_slice(b"not json"); + tagged + }; let raw = vec![viewer, keepalive, junk]; let mut batch = Vec::new(); @@ -295,7 +341,8 @@ mod tests { // Verifies that parse_batch appends rather than clearing. The drain // loop owns clearing between ticks; this lets the loop avoid any // allocation churn on the hot path. - let viewer = br##"{ + let viewer = { + let json = br##"{ "metadata": {"message_id":"m","message_type":"notification","message_timestamp":"2023-11-06T18:11:47.492Z"}, "payload": { "subscription": {"type":"channel.chat.message"}, @@ -304,11 +351,14 @@ mod tests { "message_id":"mid","message":{"text":"second"} } } - }"##.to_vec(); + }"##; + let mut tagged = vec![TAG_TWITCH]; + tagged.extend_from_slice(json); + tagged + }; let mut batch = Vec::new(); let idx = EmoteIndex::new(); - // Pretend a previous tick left one item in the scratch. parse_batch(std::slice::from_ref(&viewer), &mut batch, &idx); assert_eq!(batch.len(), 1); assert_eq!(batch[0].message_text, "second"); @@ -323,7 +373,8 @@ mod tests { fn parse_batch_attaches_emote_spans_from_index() { use crate::emote_index::{EmoteMeta, Provider}; - let viewer = br##"{ + let viewer = { + let json = br##"{ "metadata": {"message_id":"m","message_type":"notification","message_timestamp":"2023-11-06T18:11:47.492Z"}, "payload": { "subscription": {"type":"channel.chat.message"}, @@ -332,7 +383,11 @@ mod tests { "message_id":"mid","message":{"text":"hello Kappa world"} } } - }"##.to_vec(); + }"##; + let mut tagged = vec![TAG_TWITCH]; + tagged.extend_from_slice(json); + tagged + }; let idx = EmoteIndex::new(); idx.load([EmoteMeta { @@ -358,6 +413,34 @@ mod tests { assert_eq!(span.emote.code.as_ref(), "Kappa"); } + #[test] + fn parse_batch_handles_kick_messages() { + let kick_msg = { + let json = br##"{"event":"App\\Events\\ChatMessageEvent","data":"{\"id\":\"k1\",\"chatroom_id\":100,\"content\":\"hello from kick\",\"type\":\"message\",\"created_at\":\"2025-06-01T12:00:00Z\",\"sender\":{\"id\":42,\"username\":\"kuser\",\"slug\":\"kuser\",\"identity\":{\"color\":\"#00FF00\",\"badges\":[]}}}","channel":"chatrooms.100.v2"}"##; + let mut tagged = vec![TAG_KICK]; + tagged.extend_from_slice(json); + tagged + }; + + let mut batch = Vec::new(); + let idx = EmoteIndex::new(); + parse_batch(std::slice::from_ref(&kick_msg), &mut batch, &idx); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].message_text, "hello from kick"); + assert!(matches!(batch[0].platform, crate::message::Platform::Kick)); + } + + #[test] + fn parse_batch_skips_empty_and_unknown_tags() { + let empty: Vec = vec![]; + let unknown = vec![0xFF, b'{', b'}']; + let raw = vec![empty, unknown]; + let mut batch = Vec::new(); + let idx = EmoteIndex::new(); + parse_batch(&raw, &mut batch, &idx); + assert!(batch.is_empty()); + } + #[cfg(windows)] #[test] fn mark_and_unmark_handle_inheritance_round_trip() { diff --git a/apps/desktop/src-tauri/src/message.rs b/apps/desktop/src-tauri/src/message.rs index 68cec44..985d477 100644 --- a/apps/desktop/src-tauri/src/message.rs +++ b/apps/desktop/src-tauri/src/message.rs @@ -12,7 +12,6 @@ pub enum Platform { // so the frontend's discriminated union stays the single source of truth. #[allow(dead_code)] YouTube, - #[allow(dead_code)] Kick, } @@ -207,6 +206,115 @@ pub fn parse_twitch_envelope(bytes: &[u8]) -> Result, Par })) } +// --- Kick Pusher deserialization types (private, narrow) --- + +#[derive(Debug, Deserialize)] +struct KickPusherEvent { + event: String, + #[serde(default)] + data: Option, +} + +#[derive(Debug, Deserialize)] +struct KickChatMessage { + id: String, + #[serde(default)] + #[allow(dead_code)] + chatroom_id: Option, + content: String, + created_at: String, + sender: KickSender, +} + +#[derive(Debug, Deserialize)] +struct KickSender { + id: i64, + username: String, + #[serde(default)] + identity: Option, +} + +#[derive(Debug, Deserialize)] +struct KickIdentity { + #[serde(default)] + color: Option, + #[serde(default)] + badges: Vec, +} + +#[derive(Debug, Deserialize)] +struct KickBadge { + #[serde(rename = "type")] + badge_type: String, + #[serde(default)] + text: Option, +} + +/// Parses a raw Kick Pusher channel event into a [`UnifiedMessage`] when the +/// event is a `ChatMessageEvent`. +/// +/// Returns `Ok(None)` for non-chat events. The Pusher `data` field is a +/// double-encoded JSON string containing the actual chat message payload. +pub fn parse_kick_event(bytes: &[u8]) -> Result, ParseError> { + let event: KickPusherEvent = serde_json::from_slice(bytes)?; + + if !event.event.contains("ChatMessageEvent") { + return Ok(None); + } + + let Some(data_str) = event.data else { + return Ok(None); + }; + + let msg: KickChatMessage = serde_json::from_str(&data_str)?; + + let platform_ts = chrono::DateTime::parse_from_rfc3339(&msg.created_at) + .or_else(|_| { + // Kick sometimes sends timestamps without timezone offset + chrono::NaiveDateTime::parse_from_str(&msg.created_at, "%Y-%m-%dT%H:%M:%S") + .map(|naive| naive.and_utc().fixed_offset()) + }) + .map_err(ParseError::Timestamp)? + .timestamp_millis(); + let arrival_time = chrono::Utc::now().timestamp_millis(); + + let identity = msg.sender.identity.unwrap_or(KickIdentity { + color: None, + badges: Vec::new(), + }); + + let badges: Vec = identity + .badges + .iter() + .map(|b| Badge { + set_id: b.badge_type.clone(), + id: b.text.clone().unwrap_or_default(), + }) + .collect(); + + let is_broadcaster = badges.iter().any(|b| b.set_id == "broadcaster"); + let is_mod = is_broadcaster || badges.iter().any(|b| b.set_id == "moderator"); + let is_subscriber = badges.iter().any(|b| b.set_id == "subscriber"); + + Ok(Some(UnifiedMessage { + id: msg.id, + platform: Platform::Kick, + timestamp: platform_ts, + arrival_time, + username: msg.sender.username.clone(), + display_name: msg.sender.username, + platform_user_id: msg.sender.id.to_string(), + message_text: msg.content, + badges, + is_mod, + is_subscriber, + is_broadcaster, + color: identity.color, + reply_to: None, + emote_spans: Vec::new(), + })) +} + #[cfg(test)] mod tests { use super::*; @@ -419,4 +527,61 @@ mod tests { let _ = err.to_string(); assert!(std::error::Error::source(&err).is_some()); } + + // --- Kick parser tests --- + + const KICK_CHAT_EVENT: &[u8] = br##"{ + "event": "App\\Events\\ChatMessageEvent", + "data": "{\"id\":\"msg-k1\",\"chatroom_id\":100,\"content\":\"hello kick\",\"type\":\"message\",\"created_at\":\"2025-06-01T12:00:00Z\",\"sender\":{\"id\":42,\"username\":\"viewer1\",\"slug\":\"viewer1\",\"identity\":{\"color\":\"#FF5733\",\"badges\":[]}}}", + "channel": "chatrooms.100.v2" + }"##; + + const KICK_MOD_MSG: &[u8] = br##"{ + "event": "App\\Events\\ChatMessageEvent", + "data": "{\"id\":\"msg-k2\",\"chatroom_id\":100,\"content\":\"!ban spammer\",\"type\":\"message\",\"created_at\":\"2025-06-01T12:01:00Z\",\"sender\":{\"id\":99,\"username\":\"the_mod\",\"slug\":\"the_mod\",\"identity\":{\"color\":\"#FF0000\",\"badges\":[{\"type\":\"moderator\",\"text\":\"Moderator\"},{\"type\":\"subscriber\",\"text\":\"Subscriber\"}]}}}", + "channel": "chatrooms.100.v2" + }"##; + + const KICK_OTHER_EVENT: &[u8] = br##"{ + "event": "App\\Events\\UserBannedEvent", + "data": "{}", + "channel": "chatrooms.100.v2" + }"##; + + #[test] + fn parses_kick_chat_message() { + let msg = parse_kick_event(KICK_CHAT_EVENT).unwrap().unwrap(); + assert_eq!(msg.id, "msg-k1"); + assert_eq!(msg.username, "viewer1"); + assert_eq!(msg.display_name, "viewer1"); + assert_eq!(msg.platform_user_id, "42"); + assert_eq!(msg.message_text, "hello kick"); + assert_eq!(msg.color.as_deref(), Some("#FF5733")); + assert!(matches!(msg.platform, Platform::Kick)); + assert!(!msg.is_mod); + assert!(!msg.is_subscriber); + assert!(!msg.is_broadcaster); + assert!(msg.timestamp > 0); + } + + #[test] + fn parses_kick_moderator_flags() { + let msg = parse_kick_event(KICK_MOD_MSG).unwrap().unwrap(); + assert!(msg.is_mod); + assert!(msg.is_subscriber); + assert!(!msg.is_broadcaster); + assert_eq!(msg.badges.len(), 2); + } + + #[test] + fn kick_non_chat_event_returns_none() { + let result = parse_kick_event(KICK_OTHER_EVENT).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn kick_malformed_json_returns_err() { + let err = parse_kick_event(b"not json").unwrap_err(); + assert!(matches!(err, ParseError::Json(_))); + } } diff --git a/docs/adr.md b/docs/adr.md index 780e0fc..b9b3d20 100644 --- a/docs/adr.md +++ b/docs/adr.md @@ -58,3 +58,4 @@ Decisions that have been discussed, evaluated, and locked. Don't revisit unless | 35 | License: GPL v3 | MIT, Apache 2.0, GPL v3, BSL 1.1, AGPL | Open source, prevents closed-source forks. Can relicense later as sole copyright holder (with CLA for contributors) | | 36 | Windows-first development | All platforms equal, macOS first | Most streamers are on Windows. CI, tooling, and testing prioritize Windows | | 37 | Twitch OAuth via Device Code Grant (public client), tokens stored via `keyring-rs` (native credential store per OS) as a single serde-JSON blob per broadcaster. Browser launched to `verification_uri` via `tauri_plugin_shell::open`. Refresh rotation (Twitch rotates refresh tokens on every use) handled by persisting the new refresh token atomically. YouTube and Kick get their own ADRs when those platforms land; Twitch-specific constraints do not generalize. | Authorization Code Grant with client secret, Authorization Code + PKCE, Device Code Grant (public client), Implicit Grant. Storage: keyring-rs, direct Win32 CredMan/macOS Keychain/Secret-Service bindings, encrypted file on disk. | Twitch's Authorization Code Grant requires `client_secret` in the token exchange per their docs ("This flow is meant for apps that use a server, can securely store a client secret, and can make server-to-server requests to the Twitch API"), and a desktop binary cannot safely hold a client secret regardless of code signing. Twitch does not support PKCE as a client-secret substitute on their Authorization Code endpoint; the documented flow hard-requires `client_secret`. Twitch's docs explicitly recommend Device Code Grant with public client type "if your application is on a more open platform (such as windows)", which is exactly our primary target (ADR 36). DCF for public clients issues access tokens (4h expiry) plus refresh tokens (one-time use, 30-day inactive expiry) with no secret required, and matches ADR 29 (proactive 5-min refresh) and ADR 31 (re-auth UI on refresh failure) without modification. Implicit Grant is ruled out by RFC 9700 and issues no refresh token. On storage, `keyring-rs` is the de-facto Rust cross-platform credential store — it wraps Windows Credential Manager, macOS Keychain Services, Linux Secret Service / keyutils, and FreeBSD/OpenBSD, exposing a single `Entry::new(service, user)` API; every token lives in the OS's own secret store with OS-level access control, matching how 1Password, GitHub CLI, and git-credential-manager store secrets. Direct platform APIs were rejected as unnecessary reinvention. Encrypted-file was rejected because there is no key the app can hold that the OS cannot already hold better. Tokens are serialized as a single JSON blob `{access_token, refresh_token, expires_at, scopes}` keyed by `prismoid.twitch.`, not per-field entries — atomic read/write and one keychain prompt per operation. DCF user UX is "app opens browser to verification URL with the device code pre-filled in query string, user clicks Authorize, app polls token endpoint until approved" — Twitch's `verification_uri` already includes the `device-code=` query parameter, so the flow is effectively one click once the browser lands on the page. | +| 38 | Kick chat read via Pusher WebSocket (public channel, no auth for read-only). Official API (OAuth 2.1 Authorization Code + PKCE via `id.kick.com`) used for write and moderation operations only. Platform tag byte (`0x01` Twitch, `0x02` Kick, `0x03` YouTube) prepended to each message in the Go sidecar's out channel so the Rust host can dispatch to the correct parser without trying multiple deserializers. | Official webhooks for all events, Pusher WebSocket for all events, official API polling | Kick's official event system (docs.kick.com) is webhook-only and requires a publicly reachable URL, which a desktop app cannot provide without tunneling. Kick's own web client reads chat via Pusher WebSocket on `chatrooms.{id}.v2` channels, which are public (no auth required for subscribe). This is the only viable path for a desktop chat reader. Official OAuth 2.1 + PKCE is used for write/mod operations (`POST /public/v1/chat`, `POST/DELETE /public/v1/moderation/bans`, `DELETE /public/v1/chat/{message_id}`) since those endpoints require `chat:write` and `moderation:*` scopes. The platform tag byte approach was chosen over trying multiple JSON parsers (fragile, O(n) parsers per message) or wrapping in a JSON envelope (unnecessary allocation and re-encoding). One byte per message is zero-cost and extends cleanly to YouTube (tag `0x03`). | diff --git a/docs/platform-apis.md b/docs/platform-apis.md index f27ad76..407f2d8 100644 --- a/docs/platform-apis.md +++ b/docs/platform-apis.md @@ -80,11 +80,52 @@ YouTube Data API v3 has a daily quota (default 10,000 units). gRPC streaming kee --- -## Kick (Phase 5) +## Kick -Reverse-engineered Pusher WebSocket. No official API. +### Authentication + +OAuth 2.1 Authorization Code + PKCE via `id.kick.com`. Kick launched an official public API at `docs.kick.com` in 2025. + +Required scopes: + +- `chat:write` - send chat messages and allow bots to post +- `events:subscribe` - subscribe to channel events (chat, follows, subs) +- `moderation:ban` - ban/timeout/unban users +- `moderation:chat_message:manage` - delete chat messages +- `channel:read` - read channel information +- `user:read` - read user information + +Token storage: OS keychain, same as Twitch (one JSON blob per account). +Token refresh: proactive, 5 minutes before expiry. Token endpoint: `POST https://id.kick.com/oauth/token` with `grant_type=refresh_token`. + +### Chat (read) + +Pusher WebSocket, not the official webhook API. The official API delivers chat events via webhooks (`POST` to a public URL), which a desktop app cannot receive without a tunnel. Pusher is what Kick's own web client uses and is the standard approach for all third-party Kick clients. + +Connect to `wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0-rc2&flash=false`. Subscribe to `chatroom.{chatroom_id}` channel. Messages arrive as Pusher `ChatMessageEvent` events containing sender identity (username, color, badges), message content, emote positions, and reply context. + +Chatroom ID is looked up from the channel slug via `GET https://api.kick.com/public/v1/channels?slug={slug}`. + +Kick connection failures must never affect Twitch or YouTube connections. Separate goroutine, separate error handling, separate reconnection logic. + +### Chat (write) + +Official API `POST https://api.kick.com/public/v1/chat` with `broadcaster_user_id`, `content`, and `type` (`"user"` or `"bot"`). Requires `chat:write` scope. + +### Moderation + +| Action | Endpoint | +| --------------- | ----------------------------------------------------------- | +| Delete message | `DELETE /public/v1/chat/{message_id}` | +| Timeout | `POST /public/v1/moderation/bans` with `duration` (minutes) | +| Ban (permanent) | `POST /public/v1/moderation/bans` without `duration` | +| Unban | `DELETE /public/v1/moderation/bans` | + +Duration range for timeouts: 1 to 10,080 minutes (7 days). + +### Rate Limits -Connection is fragile and must be fully isolated. Kick connection failures must never affect Twitch or YouTube connections. Separate goroutine, separate error handling, separate reconnection logic. +Official API rate limits TBD (not yet documented). Token bucket rate limiter in Go, same pattern as Twitch. --- From af60f2f31faaf96f6ad63bf0b98d8619c4a9c2da Mon Sep 17 00:00:00 2001 From: ImpulseB23 Date: Sun, 19 Apr 2026 00:27:41 +0200 Subject: [PATCH 2/3] fix: kick client ping logic, write errors, badge cloning, flaky test --- .../src-sidecar/internal/kick/client.go | 70 ++++++++++++------- .../src-sidecar/internal/sidecar/sidecar.go | 4 ++ .../internal/sidecar/sidecar_test.go | 5 +- apps/desktop/src-tauri/src/message.rs | 6 +- 4 files changed, 54 insertions(+), 31 deletions(-) diff --git a/apps/desktop/src-sidecar/internal/kick/client.go b/apps/desktop/src-sidecar/internal/kick/client.go index 2514dbd..45be5f7 100644 --- a/apps/desktop/src-sidecar/internal/kick/client.go +++ b/apps/desktop/src-sidecar/internal/kick/client.go @@ -142,16 +142,19 @@ func (c *Client) subscribe(ctx context.Context, conn *websocket.Conn) error { } func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, activityTimeoutSec int) error { - // Pusher recommends sending a ping after activity_timeout seconds of - // inactivity, and closing if no pong arrives within 30s. We use - // activityTimeout + 10s as the read deadline to account for ping/pong. - timeout := time.Duration(activityTimeoutSec)*time.Second + 10*time.Second - - lastActivity := time.Now() pingInterval := time.Duration(activityTimeoutSec) * time.Second + pongTimeout := 30 * time.Second + waitingForPong := false + + readDeadline := func() time.Duration { + if waitingForPong { + return pongTimeout + } + return pingInterval + } for { - readCtx, cancel := context.WithTimeout(ctx, timeout) + readCtx, cancel := context.WithTimeout(ctx, readDeadline()) _, data, err := conn.Read(readCtx) cancel() if err != nil { @@ -159,6 +162,16 @@ func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, activityT _ = conn.Close(websocket.StatusNormalClosure, "shutting down") return ctx.Err() } + if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + if waitingForPong { + return fmt.Errorf("pong timeout after %s", pongTimeout) + } + if err := c.sendPing(ctx, conn); err != nil { + return err + } + waitingForPong = true + continue + } return fmt.Errorf("read: %w", err) } @@ -168,28 +181,24 @@ func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, activityT continue } - now := time.Now() + waitingForPong = false switch { case ev.Event == "pusher:ping": - pong, _ := json.Marshal(PusherEvent{Event: "pusher:pong", Data: "{}"}) - writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - _ = conn.Write(writeCtx, websocket.MessageText, pong) - cancel() - lastActivity = now + if err := c.sendPong(ctx, conn); err != nil { + return err + } case ev.Event == "pusher:pong": - lastActivity = now + // pong received, waitingForPong already cleared above case ev.Event == "pusher:error": c.Log.Warn().Str("data", ev.Data).Msg("pusher error") case ev.Event == "pusher_internal:subscription_succeeded": - lastActivity = now + // no-op case ev.Channel != "": - // Channel event (chat message, ban, etc). Forward the raw - // Pusher event bytes tagged for the Rust parser. tagged := make([]byte, 1+len(data)) tagged[0] = control.TagKick copy(tagged[1:], data) @@ -198,20 +207,31 @@ func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, activityT default: c.Log.Warn().Msg("output channel full, dropping message") } - lastActivity = now default: c.Log.Debug().Str("event", ev.Event).Msg("unhandled pusher event") } + } +} - if now.Sub(lastActivity) >= pingInterval { - ping, _ := json.Marshal(PusherEvent{Event: "pusher:ping", Data: "{}"}) - writeCtx, wCancel := context.WithTimeout(ctx, 5*time.Second) - _ = conn.Write(writeCtx, websocket.MessageText, ping) - wCancel() - lastActivity = now - } +func (c *Client) sendPing(ctx context.Context, conn *websocket.Conn) error { + msg, _ := json.Marshal(PusherEvent{Event: "pusher:ping", Data: "{}"}) + writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := conn.Write(writeCtx, websocket.MessageText, msg); err != nil { + return fmt.Errorf("write ping: %w", err) + } + return nil +} + +func (c *Client) sendPong(ctx context.Context, conn *websocket.Conn) error { + msg, _ := json.Marshal(PusherEvent{Event: "pusher:pong", Data: "{}"}) + writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := conn.Write(writeCtx, websocket.MessageText, msg); err != nil { + return fmt.Errorf("write pong: %w", err) } + return nil } // isFatalClose checks if the error contains a Pusher close code in the diff --git a/apps/desktop/src-sidecar/internal/sidecar/sidecar.go b/apps/desktop/src-sidecar/internal/sidecar/sidecar.go index 0314ec0..406e64a 100644 --- a/apps/desktop/src-sidecar/internal/sidecar/sidecar.go +++ b/apps/desktop/src-sidecar/internal/sidecar/sidecar.go @@ -509,6 +509,10 @@ func HandleTwitchDisconnect(cmd control.Command, clients map[string]context.Canc // cmd if there isn't already one running. Uses the chatroom ID as the client // registry key (prefixed with "kick:" to avoid collisions with Twitch IDs). func HandleKickConnect(ctx context.Context, cmd control.Command, clients map[string]context.CancelFunc, out chan<- []byte, logger zerolog.Logger) { + if cmd.ChatroomID <= 0 { + logger.Warn().Int("chatroom", cmd.ChatroomID).Msg("kick_connect missing chatroom_id; ignoring") + return + } key := kickClientKey(cmd.ChatroomID) if _, exists := clients[key]; exists { logger.Warn().Int("chatroom", cmd.ChatroomID).Msg("kick already connected, ignoring") diff --git a/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go b/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go index 47676ea..bb10b7d 100644 --- a/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go +++ b/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go @@ -171,19 +171,18 @@ func TestRunWriter_SkipsSignalOnRejectedPayload(t *testing.T) { in := make(chan []byte, 2) in <- make([]byte, 32) in <- make([]byte, 8) + close(in) var signalCount atomic.Int32 signal := func() { signalCount.Add(1) } - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() done := make(chan struct{}) go func() { RunWriter(ctx, in, writer, signal) close(done) }() - time.Sleep(50 * time.Millisecond) - cancel() <-done // Only the second (valid) write should have signaled. diff --git a/apps/desktop/src-tauri/src/message.rs b/apps/desktop/src-tauri/src/message.rs index 985d477..f8fd57b 100644 --- a/apps/desktop/src-tauri/src/message.rs +++ b/apps/desktop/src-tauri/src/message.rs @@ -285,10 +285,10 @@ pub fn parse_kick_event(bytes: &[u8]) -> Result, ParseErr let badges: Vec = identity .badges - .iter() + .into_iter() .map(|b| Badge { - set_id: b.badge_type.clone(), - id: b.text.clone().unwrap_or_default(), + set_id: b.badge_type, + id: b.text.unwrap_or_default(), }) .collect(); From d005c9335cc9748f066484f9d8bb3e0f115d1bec Mon Sep 17 00:00:00 2001 From: ImpulseB23 Date: Sun, 19 Apr 2026 01:46:33 +0200 Subject: [PATCH 3/3] test: kick handler and client coverage for codecov patch threshold --- .../src-sidecar/internal/kick/client.go | 136 ++++++++----- .../src-sidecar/internal/kick/client_test.go | 187 +++++++++++++++++- .../internal/sidecar/sidecar_test.go | 109 ++++++++++ 3 files changed, 375 insertions(+), 57 deletions(-) diff --git a/apps/desktop/src-sidecar/internal/kick/client.go b/apps/desktop/src-sidecar/internal/kick/client.go index 45be5f7..465bfd0 100644 --- a/apps/desktop/src-sidecar/internal/kick/client.go +++ b/apps/desktop/src-sidecar/internal/kick/client.go @@ -29,8 +29,9 @@ type Notify func(msgType string, payload any) // raw Pusher event bytes (tagged with control.TagKick) to a shared channel. // Follows the same lifecycle pattern as the Twitch EventSub client. type Client struct { - ChatroomID int - WSURL string // override for testing; "" uses default + ChatroomID int + WSURL string // override for testing; "" uses default + PongTimeout time.Duration // override for testing; 0 uses default (30s) Out chan<- []byte Log zerolog.Logger @@ -143,73 +144,104 @@ func (c *Client) subscribe(ctx context.Context, conn *websocket.Conn) error { func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, activityTimeoutSec int) error { pingInterval := time.Duration(activityTimeoutSec) * time.Second - pongTimeout := 30 * time.Second - waitingForPong := false + pongTimeout := c.PongTimeout + if pongTimeout == 0 { + pongTimeout = 30 * time.Second + } - readDeadline := func() time.Duration { - if waitingForPong { - return pongTimeout - } - return pingInterval + type readResult struct { + data []byte + err error } - for { - readCtx, cancel := context.WithTimeout(ctx, readDeadline()) - _, data, err := conn.Read(readCtx) - cancel() - if err != nil { - if errors.Is(err, context.Canceled) && ctx.Err() != nil { - _ = conn.Close(websocket.StatusNormalClosure, "shutting down") - return ctx.Err() + // Read goroutine: reads messages and sends them to results. + // Exits when conn is closed (via deferred CloseNow) or ctx is cancelled. + results := make(chan readResult, 1) + loopDone := make(chan struct{}) + defer close(loopDone) + defer func() { _ = conn.CloseNow() }() + + go func() { + for { + _, data, err := conn.Read(ctx) + select { + case results <- readResult{data, err}: + case <-loopDone: + return } - if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { - if waitingForPong { - return fmt.Errorf("pong timeout after %s", pongTimeout) - } - if err := c.sendPing(ctx, conn); err != nil { - return err - } - waitingForPong = true - continue + if err != nil { + return } - return fmt.Errorf("read: %w", err) } + }() - var ev PusherEvent - if err := json.Unmarshal(data, &ev); err != nil { - c.Log.Error().Err(err).Msg("unmarshal pusher event failed") - continue - } + timer := time.NewTimer(pingInterval) + defer timer.Stop() + waitingForPong := false - waitingForPong = false + for { + select { + case r := <-results: + if r.err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + return fmt.Errorf("read: %w", r.err) + } - switch { - case ev.Event == "pusher:ping": - if err := c.sendPong(ctx, conn); err != nil { - return err + if !timer.Stop() { + select { + case <-timer.C: + default: + } } + timer.Reset(pingInterval) + waitingForPong = false - case ev.Event == "pusher:pong": - // pong received, waitingForPong already cleared above + var ev PusherEvent + if err := json.Unmarshal(r.data, &ev); err != nil { + c.Log.Error().Err(err).Msg("unmarshal pusher event failed") + continue + } - case ev.Event == "pusher:error": - c.Log.Warn().Str("data", ev.Data).Msg("pusher error") + switch { + case ev.Event == "pusher:ping": + if err := c.sendPong(ctx, conn); err != nil { + return err + } - case ev.Event == "pusher_internal:subscription_succeeded": - // no-op + case ev.Event == "pusher:pong": + // pong received, waitingForPong already cleared above + + case ev.Event == "pusher:error": + c.Log.Warn().Str("data", ev.Data).Msg("pusher error") + + case ev.Event == "pusher_internal:subscription_succeeded": + // no-op + + case ev.Channel != "": + tagged := make([]byte, 1+len(r.data)) + tagged[0] = control.TagKick + copy(tagged[1:], r.data) + select { + case c.Out <- tagged: + default: + c.Log.Warn().Msg("output channel full, dropping message") + } - case ev.Channel != "": - tagged := make([]byte, 1+len(data)) - tagged[0] = control.TagKick - copy(tagged[1:], data) - select { - case c.Out <- tagged: default: - c.Log.Warn().Msg("output channel full, dropping message") + c.Log.Debug().Str("event", ev.Event).Msg("unhandled pusher event") } - default: - c.Log.Debug().Str("event", ev.Event).Msg("unhandled pusher event") + case <-timer.C: + if waitingForPong { + return fmt.Errorf("pong timeout after %s", pongTimeout) + } + if err := c.sendPing(ctx, conn); err != nil { + return err + } + waitingForPong = true + timer.Reset(pongTimeout) } } } diff --git a/apps/desktop/src-sidecar/internal/kick/client_test.go b/apps/desktop/src-sidecar/internal/kick/client_test.go index 2caa8e6..6c6bcbf 100644 --- a/apps/desktop/src-sidecar/internal/kick/client_test.go +++ b/apps/desktop/src-sidecar/internal/kick/client_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync/atomic" "testing" "time" @@ -51,11 +52,12 @@ func pusherPingMsg() []byte { func newTestClient(wsURL string, chatroomID int, out chan<- []byte) *Client { return &Client{ - ChatroomID: chatroomID, - WSURL: wsURL, - Out: out, - Log: zerolog.Nop(), - Notify: func(string, any) {}, + ChatroomID: chatroomID, + WSURL: wsURL, + PongTimeout: 0, // default 30s + Out: out, + Log: zerolog.Nop(), + Notify: func(string, any) {}, } } @@ -279,3 +281,178 @@ func TestIsFatalClose(t *testing.T) { }) } } + +func TestClientSendsPingOnIdle(t *testing.T) { + var receivedPing bool + + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + // activity_timeout=1 so the client pings after 1s of idle + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(1)) + _, _, _ = conn.Read(ctx) // subscribe + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.500.v2")) + + // wait for the client-initiated ping + readCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + _, data, err := conn.Read(readCtx) + if err == nil { + var ev PusherEvent + if json.Unmarshal(data, &ev) == nil && ev.Event == "pusher:ping" { + receivedPing = true + } + } + + // respond with pong then close + pong, _ := json.Marshal(PusherEvent{Event: "pusher:pong", Data: "{}"}) + _ = conn.Write(ctx, websocket.MessageText, pong) + time.Sleep(50 * time.Millisecond) + _ = conn.Close(websocket.StatusNormalClosure, "done") + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := newTestClient("ws"+strings.TrimPrefix(wsSrv.URL, "http"), 500, out) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _ = client.Run(ctx) + + if !receivedPing { + t.Fatal("expected client to send pusher:ping after idle timeout") + } +} + +func TestClientPongTimeout(t *testing.T) { + var connectCount atomic.Int32 + + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + connectCount.Add(1) + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + // activity_timeout=1 so client pings after 1s + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(1)) + _, _, _ = conn.Read(ctx) // subscribe + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.600.v2")) + + // read the ping but do NOT respond with pong + readCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, _, _ = conn.Read(readCtx) + + <-readCtx.Done() + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := &Client{ + ChatroomID: 600, + WSURL: "ws" + strings.TrimPrefix(wsSrv.URL, "http"), + PongTimeout: 1 * time.Second, + Out: out, + Log: zerolog.Nop(), + Notify: func(string, any) {}, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _ = client.Run(ctx) + + // pong timeout (1s) + activity_timeout (1s) + backoff = ~3s per cycle. + // With 5s context, we should see at least 2 connections. + if connectCount.Load() < 2 { + t.Fatalf("expected at least 2 connections (pong timeout triggers reconnect), got %d", connectCount.Load()) + } +} + +func TestClientPusherErrorEvent(t *testing.T) { + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(120)) + _, _, _ = conn.Read(ctx) // subscribe + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.700.v2")) + + errorEv, _ := json.Marshal(PusherEvent{Event: "pusher:error", Data: `{"code":4201,"message":"rate limit"}`}) + _ = conn.Write(ctx, websocket.MessageText, errorEv) + + // send a normal message after the error to verify the client keeps running + _ = conn.Write(ctx, websocket.MessageText, chatMessageEvent("msg-1", "after error", "user1", 700)) + + time.Sleep(100 * time.Millisecond) + _ = conn.Close(websocket.StatusNormalClosure, "done") + })) + defer wsSrv.Close() + + out := make(chan []byte, 16) + client := newTestClient("ws"+strings.TrimPrefix(wsSrv.URL, "http"), 700, out) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _ = client.Run(ctx) + + msgs := drainChan(out) + if len(msgs) < 1 { + t.Fatal("expected message after pusher:error event") + } +} + +func TestClientDropsMessageOnFullChannel(t *testing.T) { + wsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer func() { _ = conn.CloseNow() }() + + ctx := context.Background() + _ = conn.Write(ctx, websocket.MessageText, connectionEstablishedMsg(120)) + _, _, _ = conn.Read(ctx) // subscribe + _ = conn.Write(ctx, websocket.MessageText, subscriptionSucceededMsg("chatrooms.800.v2")) + + // send more messages than the channel can hold + for i := 0; i < 5; i++ { + _ = conn.Write(ctx, websocket.MessageText, chatMessageEvent( + fmt.Sprintf("msg-%d", i), "overflow", "user1", 800, + )) + } + + time.Sleep(100 * time.Millisecond) + _ = conn.Close(websocket.StatusNormalClosure, "done") + })) + defer wsSrv.Close() + + // channel with capacity 1 so messages get dropped + out := make(chan []byte, 1) + client := newTestClient("ws"+strings.TrimPrefix(wsSrv.URL, "http"), 800, out) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _ = client.Run(ctx) + + // at least one should have been received, but not all 5 + msgs := drainChan(out) + if len(msgs) == 0 { + t.Fatal("expected at least one message") + } +} diff --git a/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go b/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go index bb10b7d..d38bccb 100644 --- a/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go +++ b/apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go @@ -933,3 +933,112 @@ func TestProviderError_JSONIncludesErrorString(t *testing.T) { t.Errorf("unexpected JSON: %s", got) } } + +func TestHandleKickConnect_AddsClient(t *testing.T) { + clients := make(map[string]context.CancelFunc) + out := make(chan []byte, 1) + cmd := control.Command{ + Cmd: "kick_connect", + ChatroomID: 12345, + } + + HandleKickConnect(context.Background(), cmd, clients, out, zerolog.Nop()) + + if _, ok := clients["kick:12345"]; !ok { + t.Fatal("expected kick client to be registered") + } + clients["kick:12345"]() +} + +func TestHandleKickConnect_RejectsDuplicate(t *testing.T) { + clients := make(map[string]context.CancelFunc) + out := make(chan []byte, 1) + + var cancelled atomic.Bool + clients["kick:12345"] = func() { cancelled.Store(true) } + + cmd := control.Command{Cmd: "kick_connect", ChatroomID: 12345} + HandleKickConnect(context.Background(), cmd, clients, out, zerolog.Nop()) + + if cancelled.Load() { + t.Fatal("existing kick client cancel was overwritten") + } + if len(clients) != 1 { + t.Fatalf("expected 1 client, got %d", len(clients)) + } +} + +func TestHandleKickConnect_RejectsZeroChatroomID(t *testing.T) { + clients := make(map[string]context.CancelFunc) + out := make(chan []byte, 1) + cmd := control.Command{Cmd: "kick_connect", ChatroomID: 0} + + HandleKickConnect(context.Background(), cmd, clients, out, zerolog.Nop()) + + if len(clients) != 0 { + t.Fatalf("expected no clients for zero chatroom ID, got %d", len(clients)) + } +} + +func TestHandleKickConnect_RejectsNegativeChatroomID(t *testing.T) { + clients := make(map[string]context.CancelFunc) + out := make(chan []byte, 1) + cmd := control.Command{Cmd: "kick_connect", ChatroomID: -1} + + HandleKickConnect(context.Background(), cmd, clients, out, zerolog.Nop()) + + if len(clients) != 0 { + t.Fatalf("expected no clients for negative chatroom ID, got %d", len(clients)) + } +} + +func TestHandleKickDisconnect_CancelsAndRemoves(t *testing.T) { + clients := make(map[string]context.CancelFunc) + var cancelled atomic.Bool + clients["kick:12345"] = func() { cancelled.Store(true) } + + cmd := control.Command{Cmd: "kick_disconnect", ChatroomID: 12345} + HandleKickDisconnect(cmd, clients, zerolog.Nop()) + + if !cancelled.Load() { + t.Fatal("expected kick client to be cancelled") + } + if _, ok := clients["kick:12345"]; ok { + t.Fatal("expected kick client to be removed from registry") + } +} + +func TestHandleKickDisconnect_NoOpForUnknown(t *testing.T) { + clients := make(map[string]context.CancelFunc) + cmd := control.Command{Cmd: "kick_disconnect", ChatroomID: 99999} + + HandleKickDisconnect(cmd, clients, zerolog.Nop()) +} + +func TestDispatchCommand_RoutesKickConnect(t *testing.T) { + clients := make(map[string]context.CancelFunc) + out := make(chan []byte, 1) + cmd := control.Command{Cmd: "kick_connect", ChatroomID: 777} + + DispatchCommand(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop()) + + if _, ok := clients["kick:777"]; !ok { + t.Fatal("expected kick_connect to register client") + } + clients["kick:777"]() +} + +func TestDispatchCommand_RoutesKickDisconnect(t *testing.T) { + var cancelled atomic.Bool + clients := map[string]context.CancelFunc{ + "kick:777": func() { cancelled.Store(true) }, + } + out := make(chan []byte, 1) + cmd := control.Command{Cmd: "kick_disconnect", ChatroomID: 777} + + DispatchCommand(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop()) + + if !cancelled.Load() { + t.Fatal("expected kick_disconnect to cancel client") + } +}