Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/desktop/src-sidecar/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ require (
github.com/coder/websocket v1.8.14
github.com/rs/zerolog v1.35.0
golang.org/x/sys v0.43.0
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
)

require (
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
)
36 changes: 36 additions & 0 deletions apps/desktop/src-sidecar/go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,47 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/rs/zerolog v1.35.0 h1:VD0ykx7HMiMJytqINBsKcbLS+BJ4WYjz+05us+LRTdI=
github.com/rs/zerolog v1.35.0/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
9 changes: 7 additions & 2 deletions apps/desktop/src-sidecar/internal/control/control.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package control

// Platform tag bytes prepended to each message written to the out channel.
// The Rust host reads the first byte to dispatch to the correct parser.
// Platform tag bytes prepended to each ring-buffer payload so the Rust host
// can dispatch to the correct parser without inspecting the JSON body.
const (
TagTwitch byte = 0x01
TagKick byte = 0x02
Expand Down Expand Up @@ -54,6 +54,11 @@ type Command struct {
DurationSeconds int `json:"duration_seconds,omitempty"`
Reason string `json:"reason,omitempty"`
MessageID string `json:"message_id,omitempty"`

// YouTube fields
VideoID string `json:"video_id,omitempty"`
LiveChatID string `json:"live_chat_id,omitempty"`
APIKey string `json:"api_key,omitempty"`
}

// Message is a notification the sidecar writes to stdout for the Rust host.
Expand Down
61 changes: 61 additions & 0 deletions apps/desktop/src-sidecar/internal/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ImpulseB23/Prismoid/sidecar/internal/kick"
"github.com/ImpulseB23/Prismoid/sidecar/internal/ringbuf"
"github.com/ImpulseB23/Prismoid/sidecar/internal/twitch"
"github.com/ImpulseB23/Prismoid/sidecar/internal/youtube"
)

const (
Expand Down Expand Up @@ -271,6 +272,10 @@ func DispatchCommand(ctx context.Context, cmd control.Command, clients map[strin
HandleTwitchConnect(ctx, cmd, clients, out, notify, logger)
case "twitch_disconnect":
HandleTwitchDisconnect(cmd, clients, logger)
case "youtube_connect":
HandleYouTubeConnect(ctx, cmd, clients, out, notify, logger)
case "youtube_disconnect":
HandleYouTubeDisconnect(cmd, clients, logger)
case "kick_connect":
HandleKickConnect(ctx, cmd, clients, out, logger)
case "kick_disconnect":
Expand Down Expand Up @@ -505,6 +510,62 @@ func HandleTwitchDisconnect(cmd control.Command, clients map[string]context.Canc
logger.Info().Str("broadcaster", cmd.BroadcasterID).Msg("twitch client disconnected")
}

// HandleYouTubeConnect spawns a YouTube gRPC streamList client for the given
// live chat ID. The client writes tagged JSON payloads to `out`, which the
// writer goroutine drains into the ring buffer.
func HandleYouTubeConnect(ctx context.Context, cmd control.Command, clients map[string]context.CancelFunc, out chan<- []byte, notify twitch.Notify, logger zerolog.Logger) {
chatID := cmd.LiveChatID
if chatID == "" {
logger.Warn().Msg("youtube_connect missing live_chat_id")
return
}

key := "yt:" + chatID
if _, exists := clients[key]; exists {
logger.Warn().Str("chat_id", chatID).Msg("already connected to youtube chat, ignoring")
return
}

clientCtx, clientCancel := context.WithCancel(ctx)

client := &youtube.Client{
LiveChatID: chatID,
APIKey: cmd.APIKey,
AccessToken: cmd.Token,
Out: out,
Log: logger.With().Str("youtube_chat", chatID).Logger(),
Notify: notify,
Comment thread
ImpulseB23 marked this conversation as resolved.
}

clients[key] = clientCancel

go func() {
if err := client.Run(clientCtx); err != nil && !errors.Is(err, context.Canceled) {
logger.Error().Err(err).Str("chat_id", chatID).Msg("youtube client exited")
}
}()

logger.Info().Str("chat_id", chatID).Msg("youtube client started")
}

// HandleYouTubeDisconnect cancels and removes a previously-connected YouTube client.
func HandleYouTubeDisconnect(cmd control.Command, clients map[string]context.CancelFunc, logger zerolog.Logger) {
chatID := cmd.LiveChatID
if chatID == "" {
logger.Warn().Msg("youtube_disconnect missing live_chat_id")
return
}
key := "yt:" + chatID
cancelFn, exists := clients[key]
if !exists {
logger.Warn().Str("chat_id", chatID).Msg("no active youtube connection to disconnect")
return
}
cancelFn()
delete(clients, key)
logger.Info().Str("chat_id", chatID).Msg("youtube client disconnected")
}

// HandleKickConnect spawns a Kick Pusher WebSocket client for the chatroom in
// cmd if there isn't already one running. Uses the chatroom ID as the client
// registry key (prefixed with "kick:" to avoid collisions with Twitch IDs).
Expand Down
109 changes: 109 additions & 0 deletions apps/desktop/src-sidecar/internal/sidecar/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,3 +1042,112 @@ func TestDispatchCommand_RoutesKickDisconnect(t *testing.T) {
t.Fatal("expected kick_disconnect to cancel client")
}
}

func TestHandleYouTubeConnect_AddsClient(t *testing.T) {
clients := make(map[string]context.CancelFunc)
out := make(chan []byte, 1)
cmd := control.Command{
Cmd: "youtube_connect",
LiveChatID: "abc",
APIKey: "key",
}

HandleYouTubeConnect(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop())

if _, ok := clients["yt:abc"]; !ok {
t.Fatal("expected youtube client to be registered")
}
clients["yt:abc"]()
}

func TestHandleYouTubeConnect_RejectsMissingChatID(t *testing.T) {
clients := make(map[string]context.CancelFunc)
out := make(chan []byte, 1)
cmd := control.Command{Cmd: "youtube_connect"}

HandleYouTubeConnect(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop())

if len(clients) != 0 {
t.Fatalf("expected no clients for missing chat_id, got %d", len(clients))
}
}

func TestHandleYouTubeConnect_RejectsDuplicate(t *testing.T) {
clients := make(map[string]context.CancelFunc)
out := make(chan []byte, 1)

var cancelled atomic.Bool
clients["yt:abc"] = func() { cancelled.Store(true) }

cmd := control.Command{Cmd: "youtube_connect", LiveChatID: "abc", APIKey: "key"}
HandleYouTubeConnect(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop())

if cancelled.Load() {
t.Fatal("existing youtube client cancel was overwritten")
}
if len(clients) != 1 {
t.Fatalf("expected 1 client, got %d", len(clients))
}
}

func TestHandleYouTubeDisconnect_CancelsAndRemoves(t *testing.T) {
clients := make(map[string]context.CancelFunc)
var cancelled atomic.Bool
clients["yt:abc"] = func() { cancelled.Store(true) }

cmd := control.Command{Cmd: "youtube_disconnect", LiveChatID: "abc"}
HandleYouTubeDisconnect(cmd, clients, zerolog.Nop())

if !cancelled.Load() {
t.Fatal("expected youtube client to be cancelled")
}
if _, ok := clients["yt:abc"]; ok {
t.Fatal("expected youtube client to be removed from registry")
}
}

func TestHandleYouTubeDisconnect_NoOpForUnknown(t *testing.T) {
clients := make(map[string]context.CancelFunc)
cmd := control.Command{Cmd: "youtube_disconnect", LiveChatID: "missing"}

HandleYouTubeDisconnect(cmd, clients, zerolog.Nop())
}

func TestHandleYouTubeDisconnect_RejectsMissingChatID(t *testing.T) {
clients := make(map[string]context.CancelFunc)
cmd := control.Command{Cmd: "youtube_disconnect"}

HandleYouTubeDisconnect(cmd, clients, zerolog.Nop())

if len(clients) != 0 {
t.Fatalf("expected no client mutations, got %d", len(clients))
}
}

func TestDispatchCommand_RoutesYouTubeConnect(t *testing.T) {
clients := make(map[string]context.CancelFunc)
out := make(chan []byte, 1)
cmd := control.Command{Cmd: "youtube_connect", LiveChatID: "xyz", APIKey: "k"}

DispatchCommand(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop())

if _, ok := clients["yt:xyz"]; !ok {
t.Fatal("expected youtube_connect to register client")
}
clients["yt:xyz"]()
}

func TestDispatchCommand_RoutesYouTubeDisconnect(t *testing.T) {
var cancelled atomic.Bool
clients := map[string]context.CancelFunc{
"yt:xyz": func() { cancelled.Store(true) },
}
out := make(chan []byte, 1)
cmd := control.Command{Cmd: "youtube_disconnect", LiveChatID: "xyz"}

DispatchCommand(context.Background(), cmd, clients, out, func(string, any) {}, zerolog.Nop())

if !cancelled.Load() {
t.Fatal("expected youtube_disconnect to cancel client")
}
}
1 change: 1 addition & 0 deletions apps/desktop/src-sidecar/internal/twitch/eventsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (c *Client) listenLoop(ctx context.Context, conn *websocket.Conn, keepalive
tagged := make([]byte, 1+len(data))
tagged[0] = control.TagTwitch
copy(tagged[1:], data)

select {
case c.Out <- tagged:
default:
Expand Down
Loading
Loading