Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dimos/robot/all_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"unitree-go2-ros": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_ros:unitree_go2_ros",
"unitree-go2-spatial": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_spatial:unitree_go2_spatial",
"unitree-go2-temporal-memory": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_temporal_memory:unitree_go2_temporal_memory",
"unitree-go2-twitch": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_twitch:unitree_go2_twitch",
"unitree-go2-vlm-stream-test": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_vlm_stream_test:unitree_go2_vlm_stream_test",
"unitree-go2-webrtc-keyboard-teleop": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_webrtc_keyboard_teleop:unitree_go2_webrtc_keyboard_teleop",
"unity-sim": "dimos.simulation.unity.blueprint:unity_sim",
Expand Down Expand Up @@ -163,6 +164,8 @@
"speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill",
"temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory",
"twist-teleop-module": "dimos.teleop.quest.quest_extensions.TwistTeleopModule",
"twitch-chat": "dimos.stream.twitch.module.TwitchChat",
"twitch-votes": "dimos.stream.twitch.votes.TwitchVotes",
"unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container.UnitreeG1SkillContainer",
"unitree-skill-container": "dimos.robot.unitree.unitree_skill_container.UnitreeSkillContainer",
"unity-bridge-module": "dimos.simulation.unity.module.UnityBridgeModule",
Expand Down
83 changes: 83 additions & 0 deletions dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""unitree-go2-twitch — Twitch Plays Go2.

Usage::

export DIMOS_TWITCH_TOKEN=oauth:your_token
export DIMOS_CHANNEL_NAME=your_channel
dimos run unitree-go2-twitch --robot-ip 192.168.123.161
"""

from __future__ import annotations

import time

from dimos.core.blueprints import autoconnect
from dimos.core.core import rpc
from dimos.core.module import Module, ModuleConfig
from dimos.core.stream import In, Out
from dimos.msgs.geometry_msgs.Twist import Twist
from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic
from dimos.stream.twitch.votes import TwitchChoice, TwitchVotes
from dimos.utils.logging_config import setup_logger

logger = setup_logger()


class _ChoiceToCmdVel(Module["ModuleConfig"]):
default_config = ModuleConfig
command_duration: float = 1.0

chat_vote_choice: In[TwitchChoice]
cmd_vel: Out[Twist]

@rpc
def start(self) -> None:
super().start()
self.chat_vote_choice.subscribe(self._on_choice)

def _on_choice(self, choice: TwitchChoice) -> None:
t = Twist()
if choice.winner == "forward":
t.linear.x = 0.3
elif choice.winner == "back":
t.linear.x = -0.3
elif choice.winner == "left":
t.angular.z = 0.5
elif choice.winner == "right":
t.angular.z = -0.5

logger.info("[TwitchPlays] Executing: %s", choice.winner)

end = time.time() + self.command_duration
while time.time() < end:
self.cmd_vel.publish(t)
time.sleep(0.1)

self.cmd_vel.publish(Twist())


unitree_go2_twitch = autoconnect(
unitree_go2_basic,
TwitchVotes.blueprint(
choices=["forward", "back", "left", "right", "stop"],
vote_window_seconds=5.0,
vote_mode="plurality",
),
_ChoiceToCmdVel.blueprint(),
).global_config(n_workers=4, robot_model="unitree_go2")

__all__ = ["unitree_go2_twitch"]
265 changes: 265 additions & 0 deletions dimos/stream/twitch/module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""TwitchChat: connects to a Twitch channel and publishes chat messages.

Publishes all messages on ``raw_messages``, and a subset matching regex
patterns on ``filtered_messages``.
"""

from __future__ import annotations

import asyncio
from collections.abc import Callable
from dataclasses import dataclass, field
import os
import re
import threading
import time
from typing import Any

from dimos.core.core import rpc
from dimos.core.module import Module, ModuleConfig
from dimos.core.stream import Out
from dimos.utils.logging_config import setup_logger

logger = setup_logger()


@dataclass
class TwitchMessage:
author: str = ""
content: str = ""
channel: str = ""
timestamp: float = 0.0
is_subscriber: bool = False
is_mod: bool = False
badges: dict[str, str] = field(default_factory=dict)

@property
def text(self) -> str:
return self.content

def find_one(self, options: list[str] | set[str] | frozenset[str]) -> str | None:
"""Return the first option found as a whole word in content (case-insensitive), or None."""
lower = self.content.lower()
for opt in options:
if re.search(rf"\b{re.escape(opt.lower())}\b", lower):
return opt
return None

def __repr__(self) -> str:
return f"TwitchMessage({self.author}: {self.content!r})"


class TwitchChatConfig(ModuleConfig):
# OAuth token (oauth:xxx). Falls back to DIMOS_TWITCH_TOKEN env var.
twitch_token: str = ""
# Falls back to DIMOS_CHANNEL_NAME env var.
channel_name: str = ""
bot_prefix: str = "!"
# Regex patterns for filtered_messages. If empty, all messages pass through.
patterns: list[str] = []
# Only pass messages where is_mod matches this value.
filter_is_mod: bool | None = None
# Only pass messages where is_subscriber matches this value.
filter_is_subscriber: bool | None = None
filter_content: Callable[[str], bool] | None = None
filter_author: Callable[[str], bool] | None = None


class TwitchChat(Module["TwitchChatConfig"]):
"""Connects to a Twitch channel and publishes chat messages.

- ``raw_messages`` — every chat message
- ``filtered_messages`` — messages matching configured regex patterns
"""

default_config = TwitchChatConfig

raw_messages: Out[TwitchMessage]
filtered_messages: Out[TwitchMessage]

def __init__(self, **kwargs: Any) -> None:
self._bot: _TwitchBot | None = None
self._bot_thread: threading.Thread | None = None
self._bot_loop: asyncio.AbstractEventLoop | None = None
self._compiled_patterns: list[re.Pattern[str]] = []
super().__init__(**kwargs)

@rpc
def start(self) -> None:
super().start()

token = self.config.twitch_token or os.getenv("DIMOS_TWITCH_TOKEN", "")
channel = self.config.channel_name or os.getenv("DIMOS_CHANNEL_NAME", "")

self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.config.patterns]

if not token or not channel:
logger.warning("[TwitchChat] No token/channel — running in local-only mode")
return

self._bot_loop = asyncio.new_event_loop()
self._bot_thread = threading.Thread(
target=self._run_bot_loop,
args=(token, channel),
daemon=True,
name="twitch-bot",
)
self._bot_thread.start()
logger.info("[TwitchChat] Started", channel=channel)

def _run_bot_loop(self, token: str, channel: str) -> None:
assert self._bot_loop is not None
asyncio.set_event_loop(self._bot_loop)
try:
self._bot = _TwitchBot(
token=token,
channel=channel,
prefix=self.config.bot_prefix,
on_message_cb=self._handle_message,
on_ready_cb=self._handle_ready,
)
self._bot.run()
except ImportError:
logger.error("[TwitchChat] twitchio is not installed — run: uv pip install twitchio")
except Exception:
logger.exception("[TwitchChat] Bot crashed")

@rpc
def stop(self) -> None:
if self._bot is not None and self._bot_loop is not None:
try:

async def _close() -> None:
assert self._bot is not None
await self._bot.close()

asyncio.run_coroutine_threadsafe(_close(), self._bot_loop).result(timeout=5)
except Exception:
logger.warning("[TwitchChat] Error closing bot", exc_info=True)

if self._bot_loop is not None:
self._bot_loop.call_soon_threadsafe(self._bot_loop.stop)

if self._bot_thread is not None:
self._bot_thread.join(timeout=5)

self._bot = None
self._bot_thread = None
self._bot_loop = None
super().stop()

def _handle_ready(self) -> None:
logger.info("[TwitchChat] Ready")

def _build_twitch_message(self, message: Any) -> TwitchMessage:
"""Convert a raw twitchio Message into a TwitchMessage."""
badges: dict[str, str] = {}
if message.tags and "badges" in message.tags:
raw = message.tags["badges"]
if raw:
for badge in raw.split(","):
parts = badge.split("/", 1)
if len(parts) == 2:
badges[parts[0]] = parts[1]

return TwitchMessage(
author=message.author.name if message.author else "",
content=message.content or "",
channel=message.channel.name if message.channel else "",
timestamp=time.time(),
is_subscriber="subscriber" in badges,
is_mod="moderator" in badges,
badges=badges,
)

def _handle_message(self, message: Any) -> None:
msg = self._build_twitch_message(message)
self.raw_messages.publish(msg)
self._publish_if_matched(msg)
self._on_message_received(msg)

def _on_message_received(self, msg: TwitchMessage) -> None:
"""Hook for subclasses to process messages after publishing."""

def _publish_if_matched(self, msg: TwitchMessage) -> None:
"""Publish to filtered_messages if msg passes all configured filters."""
cfg = self.config

if cfg.filter_is_mod is not None and msg.is_mod != cfg.filter_is_mod:
return
if cfg.filter_is_subscriber is not None and msg.is_subscriber != cfg.filter_is_subscriber:
return
if cfg.filter_author is not None and not cfg.filter_author(msg.author):
return
if cfg.filter_content is not None and not cfg.filter_content(msg.content):
return

if self._compiled_patterns:
for pat in self._compiled_patterns:
if pat.search(msg.content):
self.filtered_messages.publish(msg)
return
else:
self.filtered_messages.publish(msg)

def inject_message(self, content: str, author: str = "anonymous") -> None:
"""Inject a chat message programmatically (for testing or local-only mode)."""
msg = TwitchMessage(author=author, content=content, channel="local", timestamp=time.time())
self.raw_messages.publish(msg)
self._publish_if_matched(msg)
self._on_message_received(msg)


class _TwitchBot:
"""Thin twitchio wrapper that forwards messages via callbacks."""

def __init__(
self,
token: str,
channel: str,
prefix: str,
on_message_cb: Any,
on_ready_cb: Any,
) -> None:
from twitchio.ext import ( # type: ignore[import-not-found]
commands as twitch_commands, # type: ignore[import-untyped]
)

cb_message = on_message_cb
cb_ready = on_ready_cb
chan = channel

class _Bot(twitch_commands.Bot): # type: ignore[misc]
def __init__(inner_self) -> None: # noqa: N805
super().__init__(token=token, prefix=prefix, initial_channels=[chan])

async def event_ready(inner_self) -> None: # noqa: N805
logger.info("[TwitchChat] Bot connected as %s to #%s", inner_self.nick, chan)
cb_ready()

async def event_message(inner_self, message: Any) -> None: # noqa: N805
if message.echo:
return
cb_message(message)

self._bot = _Bot()

def run(self) -> None:
self._bot.run()

async def close(self) -> None:
await self._bot.close()
Loading