diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 00690d514f..35e9e1490b 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -86,6 +86,7 @@ "unitree-go2-ros": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_ros:unitree_go2_ros", "unitree-go2-spatial": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_spatial:unitree_go2_spatial", "unitree-go2-temporal-memory": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_temporal_memory:unitree_go2_temporal_memory", + "unitree-go2-twitch": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_twitch:unitree_go2_twitch", "unitree-go2-vlm-stream-test": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_vlm_stream_test:unitree_go2_vlm_stream_test", "unitree-go2-webrtc-keyboard-teleop": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_webrtc_keyboard_teleop:unitree_go2_webrtc_keyboard_teleop", "unity-sim": "dimos.simulation.unity.blueprint:unity_sim", @@ -163,6 +164,8 @@ "speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory", "twist-teleop-module": "dimos.teleop.quest.quest_extensions.TwistTeleopModule", + "twitch-chat": "dimos.stream.twitch.module.TwitchChat", + "twitch-votes": "dimos.stream.twitch.votes.TwitchVotes", "unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container.UnitreeG1SkillContainer", "unitree-skill-container": "dimos.robot.unitree.unitree_skill_container.UnitreeSkillContainer", "unity-bridge-module": "dimos.simulation.unity.module.UnityBridgeModule", diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py new file mode 100644 index 0000000000..0c380579e5 --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py @@ -0,0 +1,83 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree-go2-twitch — Twitch Plays Go2. + +Usage:: + + export DIMOS_TWITCH_TOKEN=oauth:your_token + export DIMOS_CHANNEL_NAME=your_channel + dimos run unitree-go2-twitch --robot-ip 192.168.123.161 +""" + +from __future__ import annotations + +import time + +from dimos.core.blueprints import autoconnect +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.stream.twitch.votes import TwitchChoice, TwitchVotes +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class _ChoiceToCmdVel(Module["ModuleConfig"]): + default_config = ModuleConfig + command_duration: float = 1.0 + + chat_vote_choice: In[TwitchChoice] + cmd_vel: Out[Twist] + + @rpc + def start(self) -> None: + super().start() + self.chat_vote_choice.subscribe(self._on_choice) + + def _on_choice(self, choice: TwitchChoice) -> None: + t = Twist() + if choice.winner == "forward": + t.linear.x = 0.3 + elif choice.winner == "back": + t.linear.x = -0.3 + elif choice.winner == "left": + t.angular.z = 0.5 + elif choice.winner == "right": + t.angular.z = -0.5 + + logger.info("[TwitchPlays] Executing: %s", choice.winner) + + end = time.time() + self.command_duration + while time.time() < end: + self.cmd_vel.publish(t) + time.sleep(0.1) + + self.cmd_vel.publish(Twist()) + + +unitree_go2_twitch = autoconnect( + unitree_go2_basic, + TwitchVotes.blueprint( + choices=["forward", "back", "left", "right", "stop"], + vote_window_seconds=5.0, + vote_mode="plurality", + ), + _ChoiceToCmdVel.blueprint(), +).global_config(n_workers=4, robot_model="unitree_go2") + +__all__ = ["unitree_go2_twitch"] diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py new file mode 100644 index 0000000000..dc1eac645c --- /dev/null +++ b/dimos/stream/twitch/module.py @@ -0,0 +1,265 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TwitchChat: connects to a Twitch channel and publishes chat messages. + +Publishes all messages on ``raw_messages``, and a subset matching regex +patterns on ``filtered_messages``. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from dataclasses import dataclass, field +import os +import re +import threading +import time +from typing import Any + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import Out +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +@dataclass +class TwitchMessage: + author: str = "" + content: str = "" + channel: str = "" + timestamp: float = 0.0 + is_subscriber: bool = False + is_mod: bool = False + badges: dict[str, str] = field(default_factory=dict) + + @property + def text(self) -> str: + return self.content + + def find_one(self, options: list[str] | set[str] | frozenset[str]) -> str | None: + """Return the first option found as a whole word in content (case-insensitive), or None.""" + lower = self.content.lower() + for opt in options: + if re.search(rf"\b{re.escape(opt.lower())}\b", lower): + return opt + return None + + def __repr__(self) -> str: + return f"TwitchMessage({self.author}: {self.content!r})" + + +class TwitchChatConfig(ModuleConfig): + # OAuth token (oauth:xxx). Falls back to DIMOS_TWITCH_TOKEN env var. + twitch_token: str = "" + # Falls back to DIMOS_CHANNEL_NAME env var. + channel_name: str = "" + bot_prefix: str = "!" + # Regex patterns for filtered_messages. If empty, all messages pass through. + patterns: list[str] = [] + # Only pass messages where is_mod matches this value. + filter_is_mod: bool | None = None + # Only pass messages where is_subscriber matches this value. + filter_is_subscriber: bool | None = None + filter_content: Callable[[str], bool] | None = None + filter_author: Callable[[str], bool] | None = None + + +class TwitchChat(Module["TwitchChatConfig"]): + """Connects to a Twitch channel and publishes chat messages. + + - ``raw_messages`` — every chat message + - ``filtered_messages`` — messages matching configured regex patterns + """ + + default_config = TwitchChatConfig + + raw_messages: Out[TwitchMessage] + filtered_messages: Out[TwitchMessage] + + def __init__(self, **kwargs: Any) -> None: + self._bot: _TwitchBot | None = None + self._bot_thread: threading.Thread | None = None + self._bot_loop: asyncio.AbstractEventLoop | None = None + self._compiled_patterns: list[re.Pattern[str]] = [] + super().__init__(**kwargs) + + @rpc + def start(self) -> None: + super().start() + + token = self.config.twitch_token or os.getenv("DIMOS_TWITCH_TOKEN", "") + channel = self.config.channel_name or os.getenv("DIMOS_CHANNEL_NAME", "") + + self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.config.patterns] + + if not token or not channel: + logger.warning("[TwitchChat] No token/channel — running in local-only mode") + return + + self._bot_loop = asyncio.new_event_loop() + self._bot_thread = threading.Thread( + target=self._run_bot_loop, + args=(token, channel), + daemon=True, + name="twitch-bot", + ) + self._bot_thread.start() + logger.info("[TwitchChat] Started", channel=channel) + + def _run_bot_loop(self, token: str, channel: str) -> None: + assert self._bot_loop is not None + asyncio.set_event_loop(self._bot_loop) + try: + self._bot = _TwitchBot( + token=token, + channel=channel, + prefix=self.config.bot_prefix, + on_message_cb=self._handle_message, + on_ready_cb=self._handle_ready, + ) + self._bot.run() + except ImportError: + logger.error("[TwitchChat] twitchio is not installed — run: uv pip install twitchio") + except Exception: + logger.exception("[TwitchChat] Bot crashed") + + @rpc + def stop(self) -> None: + if self._bot is not None and self._bot_loop is not None: + try: + + async def _close() -> None: + assert self._bot is not None + await self._bot.close() + + asyncio.run_coroutine_threadsafe(_close(), self._bot_loop).result(timeout=5) + except Exception: + logger.warning("[TwitchChat] Error closing bot", exc_info=True) + + if self._bot_loop is not None: + self._bot_loop.call_soon_threadsafe(self._bot_loop.stop) + + if self._bot_thread is not None: + self._bot_thread.join(timeout=5) + + self._bot = None + self._bot_thread = None + self._bot_loop = None + super().stop() + + def _handle_ready(self) -> None: + logger.info("[TwitchChat] Ready") + + def _build_twitch_message(self, message: Any) -> TwitchMessage: + """Convert a raw twitchio Message into a TwitchMessage.""" + badges: dict[str, str] = {} + if message.tags and "badges" in message.tags: + raw = message.tags["badges"] + if raw: + for badge in raw.split(","): + parts = badge.split("/", 1) + if len(parts) == 2: + badges[parts[0]] = parts[1] + + return TwitchMessage( + author=message.author.name if message.author else "", + content=message.content or "", + channel=message.channel.name if message.channel else "", + timestamp=time.time(), + is_subscriber="subscriber" in badges, + is_mod="moderator" in badges, + badges=badges, + ) + + def _handle_message(self, message: Any) -> None: + msg = self._build_twitch_message(message) + self.raw_messages.publish(msg) + self._publish_if_matched(msg) + self._on_message_received(msg) + + def _on_message_received(self, msg: TwitchMessage) -> None: + """Hook for subclasses to process messages after publishing.""" + + def _publish_if_matched(self, msg: TwitchMessage) -> None: + """Publish to filtered_messages if msg passes all configured filters.""" + cfg = self.config + + if cfg.filter_is_mod is not None and msg.is_mod != cfg.filter_is_mod: + return + if cfg.filter_is_subscriber is not None and msg.is_subscriber != cfg.filter_is_subscriber: + return + if cfg.filter_author is not None and not cfg.filter_author(msg.author): + return + if cfg.filter_content is not None and not cfg.filter_content(msg.content): + return + + if self._compiled_patterns: + for pat in self._compiled_patterns: + if pat.search(msg.content): + self.filtered_messages.publish(msg) + return + else: + self.filtered_messages.publish(msg) + + def inject_message(self, content: str, author: str = "anonymous") -> None: + """Inject a chat message programmatically (for testing or local-only mode).""" + msg = TwitchMessage(author=author, content=content, channel="local", timestamp=time.time()) + self.raw_messages.publish(msg) + self._publish_if_matched(msg) + self._on_message_received(msg) + + +class _TwitchBot: + """Thin twitchio wrapper that forwards messages via callbacks.""" + + def __init__( + self, + token: str, + channel: str, + prefix: str, + on_message_cb: Any, + on_ready_cb: Any, + ) -> None: + from twitchio.ext import ( # type: ignore[import-not-found] + commands as twitch_commands, # type: ignore[import-untyped] + ) + + cb_message = on_message_cb + cb_ready = on_ready_cb + chan = channel + + class _Bot(twitch_commands.Bot): # type: ignore[misc] + def __init__(inner_self) -> None: # noqa: N805 + super().__init__(token=token, prefix=prefix, initial_channels=[chan]) + + async def event_ready(inner_self) -> None: # noqa: N805 + logger.info("[TwitchChat] Bot connected as %s to #%s", inner_self.nick, chan) + cb_ready() + + async def event_message(inner_self, message: Any) -> None: # noqa: N805 + if message.echo: + return + cb_message(message) + + self._bot = _Bot() + + def run(self) -> None: + self._bot.run() + + async def close(self) -> None: + await self._bot.close() diff --git a/dimos/stream/twitch/test_twitch.py b/dimos/stream/twitch/test_twitch.py new file mode 100644 index 0000000000..fa903322ad --- /dev/null +++ b/dimos/stream/twitch/test_twitch.py @@ -0,0 +1,207 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for TwitchMessage, TwitchChat filters, and vote tallying.""" + +from __future__ import annotations + +import time + +from dimos.stream.twitch.module import TwitchMessage +from dimos.stream.twitch.votes import ( + _tally_majority, + _tally_plurality, + _tally_runoff, + _tally_weighted_recent, +) + +# ── TwitchMessage ── + + +class TestTwitchMessage: + def test_text_property(self) -> None: + msg = TwitchMessage(content="hello") + assert msg.text == "hello" + + def test_find_one_match(self) -> None: + msg = TwitchMessage(content="I vote forward please") + assert msg.find_one(["forward", "back", "left", "right"]) == "forward" + + def test_find_one_case_insensitive(self) -> None: + msg = TwitchMessage(content="FORWARD") + assert msg.find_one(["forward", "back"]) == "forward" + + def test_find_one_no_match(self) -> None: + msg = TwitchMessage(content="hello world") + assert msg.find_one(["forward", "back"]) is None + + def test_find_one_first_wins(self) -> None: + msg = TwitchMessage(content="go left or right") + assert msg.find_one(["left", "right"]) == "left" + + def test_find_one_word_boundary(self) -> None: + msg = TwitchMessage(content="I want to go backwards") + assert msg.find_one(["back", "forward"]) is None + + def test_find_one_with_set(self) -> None: + msg = TwitchMessage(content="back") + result = msg.find_one({"forward", "back"}) + assert result in ("forward", "back") # set order not guaranteed + + def test_find_one_with_frozenset(self) -> None: + msg = TwitchMessage(content="left") + result = msg.find_one(frozenset(["left", "right"])) + assert result in ("left", "right") # frozenset order not guaranteed + + def test_repr(self) -> None: + msg = TwitchMessage(author="user1", content="hi") + assert "user1" in repr(msg) + assert "hi" in repr(msg) + + +# ── Tally functions ── + +# Vote tuple format: (choice, timestamp, voter) +NOW = time.time() + + +class TestTallyPlurality: + def test_empty(self) -> None: + assert _tally_plurality([]) is None + + def test_single_vote(self) -> None: + assert _tally_plurality([("forward", NOW, "a")]) == "forward" + + def test_winner(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c")] + assert _tally_plurality(votes) == "forward" + + def test_tie_returns_one(self) -> None: + votes = [("forward", NOW, "a"), ("back", NOW, "b")] + result = _tally_plurality(votes) + assert result in ("forward", "back") + + +class TestTallyMajority: + def test_empty(self) -> None: + assert _tally_majority([]) is None + + def test_majority_winner(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c")] + assert _tally_majority(votes) == "forward" + + def test_no_majority(self) -> None: + votes = [("forward", NOW, "a"), ("back", NOW, "b"), ("left", NOW, "c"), ("right", NOW, "d")] + assert _tally_majority(votes) is None + + def test_exact_half_not_majority(self) -> None: + votes = [ + ("forward", NOW, "a"), + ("forward", NOW, "b"), + ("back", NOW, "c"), + ("back", NOW, "d"), + ] + assert _tally_majority(votes) is None + + +class TestTallyWeightedRecent: + def test_empty(self) -> None: + assert _tally_weighted_recent([], NOW, NOW + 5) is None + + def test_recent_votes_weighted_higher(self) -> None: + start = NOW + end = NOW + 10 + # Early vote for "back", late vote for "forward" + votes = [("back", start + 1, "a"), ("forward", end - 0.1, "b")] + assert _tally_weighted_recent(votes, start, end) == "forward" + + def test_many_early_can_beat_few_late(self) -> None: + start = NOW + end = NOW + 10 + votes = [ + ("back", start + 0.1, "a"), + ("back", start + 0.2, "b"), + ("back", start + 0.3, "c"), + ("forward", end - 0.1, "d"), + ] + assert _tally_weighted_recent(votes, start, end) == "back" + + +class TestTallyRunoff: + def test_empty(self) -> None: + assert _tally_runoff([]) is None + + def test_clear_majority(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c")] + assert _tally_runoff(votes) == "forward" + + def test_runoff_eliminates_third(self) -> None: + # No majority: forward=2, back=2, left=1 + # Runoff between forward and back, left eliminated + votes = [ + ("forward", NOW, "a"), + ("forward", NOW, "b"), + ("back", NOW, "c"), + ("back", NOW, "d"), + ("left", NOW, "e"), + ] + result = _tally_runoff(votes) + assert result in ("forward", "back") + + def test_single_vote(self) -> None: + assert _tally_runoff([("left", NOW, "a")]) == "left" + + +# ── TwitchChat._publish_if_matched filter logic ── +# We test the filter logic indirectly via inject_message since TwitchChat +# requires the module system. Instead we test the filter predicates directly. + + +class TestFilterLogic: + """Test the filter predicate patterns used in TwitchChatConfig.""" + + def test_filter_author_lambda(self) -> None: + exclude_nightbot = lambda name: name != "nightbot" # noqa: E731 + assert exclude_nightbot("user1") is True + assert exclude_nightbot("nightbot") is False + + def test_filter_content_lambda(self) -> None: + reject_spam = lambda text: len(text) < 200 # noqa: E731 + assert reject_spam("short message") is True + assert reject_spam("x" * 201) is False + + +# ── message_to_choice with lambda (the demo pattern) ── + + +class TestMessageToChoiceLambda: + def test_lambda_with_emoji_gate(self) -> None: + choices = ["forward", "back", "left", "right"] + fn = lambda msg, c: "🤖" in msg.text and msg.find_one(c) # noqa: E731 + + msg_with_emoji = TwitchMessage(content="🤖 forward") + assert fn(msg_with_emoji, choices) == "forward" + + msg_without_emoji = TwitchMessage(content="forward") + assert not fn(msg_without_emoji, choices) + + def test_default_message_to_choice(self) -> None: + from dimos.stream.twitch.votes import _default_message_to_choice + + choices = ["forward", "back"] + msg = TwitchMessage(content="go forward!") + assert _default_message_to_choice(msg, choices) == "forward" + + msg2 = TwitchMessage(content="hello") + assert _default_message_to_choice(msg2, choices) is None diff --git a/dimos/stream/twitch/votes.py b/dimos/stream/twitch/votes.py new file mode 100644 index 0000000000..3de254f7fb --- /dev/null +++ b/dimos/stream/twitch/votes.py @@ -0,0 +1,239 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TwitchVotes: extends TwitchChat with vote tallying. + +Each incoming message is passed through ``message_to_choice`` to extract a +choice string. If the result is one of ``choices``, the vote is recorded. +At the end of each window the winning choice is published as a +:class:`TwitchChoice` on ``chat_vote_choice``. + +Voting modes: plurality, majority, weighted_recent, runoff. +""" + +from __future__ import annotations + +from collections import Counter, deque +from collections.abc import Callable +from dataclasses import dataclass +from enum import Enum +import threading +import time +from typing import Any + +from dimos.core.core import rpc +from dimos.core.stream import Out +from dimos.stream.twitch.module import TwitchChat, TwitchChatConfig, TwitchMessage +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +@dataclass +class TwitchChoice: + winner: str = "" + total_votes: int = 0 + timestamp: float = 0.0 + + +class VoteMode(str, Enum): + PLURALITY = "plurality" + MAJORITY = "majority" + WEIGHTED_RECENT = "weighted_recent" + RUNOFF = "runoff" + + +def _default_message_to_choice(msg: TwitchMessage, choices: list[str]) -> str | None: + return msg.find_one(choices) + + +class TwitchVotesConfig(TwitchChatConfig): + # A vote is only counted if message_to_choice returns one of these. + choices: list[str] = ["forward", "back", "left", "right", "stop"] + # (msg, choices) -> choice string, or any falsy value to skip + message_to_choice: Callable[[TwitchMessage, list[str]], Any] = _default_message_to_choice + + vote_window_seconds: float = 5.0 + min_votes_threshold: int = 1 + vote_mode: VoteMode = VoteMode.PLURALITY + + +# ── Vote tallying ── + + +def _tally_plurality(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + if not counts: + return None + return counts.most_common(1)[0][0] + + +def _tally_majority(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + total = sum(counts.values()) + if total == 0: + return None + winner, count = counts.most_common(1)[0] + return winner if count > total / 2 else None + + +def _tally_weighted_recent( + votes: list[tuple[str, float, str]], window_start: float, window_end: float +) -> str | None: + if not votes: + return None + duration = max(window_end - window_start, 0.001) + weighted: dict[str, float] = {} + for cmd, ts, _ in votes: + weight = 0.5 + 0.5 * ((ts - window_start) / duration) + weighted[cmd] = weighted.get(cmd, 0.0) + weight + if not weighted: + return None + return max(weighted, key=weighted.__getitem__) + + +def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + total = sum(counts.values()) + if total == 0: + return None + winner, count = counts.most_common(1)[0] + if count > total / 2: + return winner + + top2 = {cmd for cmd, _ in counts.most_common(2)} + if len(top2) < 2: + return winner + + latest: dict[str, str] = {} + for cmd, _, voter in votes: + latest[voter] = cmd + + runoff_counts: Counter[str] = Counter() + for _voter, cmd in latest.items(): + if cmd in top2: + runoff_counts[cmd] += 1 + + return runoff_counts.most_common(1)[0][0] if runoff_counts else winner + + +class TwitchVotes(TwitchChat): + """Extends TwitchChat with vote tallying. + + Each incoming message is passed through ``message_to_choice``. If the + result is one of ``choices``, the vote is recorded. At the end of each + window the winning choice is published as a :class:`TwitchChoice` on + ``chat_vote_choice``. + """ + + default_config = TwitchVotesConfig + config: TwitchVotesConfig # type narrowing for mypy + + chat_vote_choice: Out[TwitchChoice] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._votes: deque[tuple[str, float, str]] = deque() + self._votes_lock = threading.Lock() + self._vote_thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._valid_choices: frozenset[str] = frozenset() + + def _on_message_received(self, msg: TwitchMessage) -> None: + choice = self.config.message_to_choice(msg, self.config.choices) + if choice and choice in self._valid_choices: + with self._votes_lock: + self._votes.append((choice, time.time(), msg.author)) + + @rpc + def start(self) -> None: + self._stop_event.clear() + self._valid_choices = frozenset(self.config.choices) + super().start() + + self._vote_thread = threading.Thread( + target=self._vote_loop, daemon=True, name="twitch-vote" + ) + self._vote_thread.start() + logger.info( + "[TwitchVotes] Vote loop started", + vote_mode=self.config.vote_mode.value, + window=self.config.vote_window_seconds, + ) + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._vote_thread is not None: + self._vote_thread.join(timeout=2) + self._vote_thread = None + super().stop() + + def record_vote(self, choice: str, voter: str = "anonymous") -> None: + """Record a vote programmatically (for testing).""" + c = choice.lower().strip() + if c not in self._valid_choices: + return + with self._votes_lock: + self._votes.append((c, time.time(), voter)) + + def _vote_loop(self) -> None: + while not self._stop_event.is_set(): + window_start = time.time() + self._stop_event.wait(timeout=self.config.vote_window_seconds) + window_end = time.time() + + cutoff = window_end - self.config.vote_window_seconds + with self._votes_lock: + current_votes = [(c, ts, v) for c, ts, v in self._votes if ts >= cutoff] + # Keep only votes that arrived after this window ended + self._votes = deque((c, ts, v) for c, ts, v in self._votes if ts >= window_end) + + # Deduplicate: keep only the latest vote per voter + latest_per_voter: dict[str, tuple[str, float, str]] = {} + for vote in current_votes: + latest_per_voter[vote[2]] = vote + current_votes = list(latest_per_voter.values()) + + if len(current_votes) < self.config.min_votes_threshold: + continue + + winner = self._tally(current_votes, window_start, window_end) + if winner is None: + continue + + logger.info("[TwitchVotes] Winner: %s (%d votes)", winner, len(current_votes)) + self.chat_vote_choice.publish( + TwitchChoice(winner=winner, total_votes=len(current_votes), timestamp=window_end) + ) + + def _tally( + self, + votes: list[tuple[str, float, str]], + window_start: float, + window_end: float, + ) -> str | None: + mode = self.config.vote_mode + if mode == VoteMode.PLURALITY: + return _tally_plurality(votes) + elif mode == VoteMode.MAJORITY: + return _tally_majority(votes) + elif mode == VoteMode.WEIGHTED_RECENT: + return _tally_weighted_recent(votes, window_start, window_end) + elif mode == VoteMode.RUNOFF: + return _tally_runoff(votes) + return _tally_plurality(votes) + + +twitch_votes = TwitchVotes.blueprint diff --git a/examples/twitch_plays/README.md b/examples/twitch_plays/README.md new file mode 100644 index 0000000000..7a42089ee4 --- /dev/null +++ b/examples/twitch_plays/README.md @@ -0,0 +1,56 @@ +# Twitch Chat Integration + +Connect a Twitch channel's chat to DimOS as a module. + +## Setup + +1. **Get Twitch credentials** from [twitchtokengenerator.com](https://twitchtokengenerator.com/): + - Select "Custom Scope Token" + - Choose scopes: `chat:read`, `chat:edit` + - Copy the Access Token + +2. **Set environment variables**: + ```bash + export DIMOS_TWITCH_TOKEN=oauth:your_access_token_here + export DIMOS_CHANNEL_NAME=your_twitch_channel + ``` + +3. **Install twitchio** (not in DimOS base deps): + ```bash + uv pip install twitchio + ``` + +## Run + +```bash +dimos run unitree-go2-twitch --robot-ip 192.168.123.161 +``` + +## Streams + +- `raw_messages` — every chat message as a `TwitchMessage` +- `filtered_messages` — messages matching configured regex patterns and filters + +## Filters + +```python +TwitchChat.blueprint( + patterns=[r"^!(?:forward|back|left|right)"], # regex on content + filter_is_mod=True, # mods only + filter_is_subscriber=True, # subscribers only + filter_author=lambda name: name != "nightbot", # exclude bots + filter_content=lambda text: len(text) < 200, # reject spam +) +``` + +## Local Testing + +```python +from dimos.stream.twitch.module import TwitchChat + +chat = TwitchChat() +chat.start() # runs in local-only mode without credentials + +chat.inject_message("!forward", author="user1") +chat.inject_message("hello", author="user2") +``` diff --git a/examples/twitch_plays/run.py b/examples/twitch_plays/run.py new file mode 100644 index 0000000000..ca22edb337 --- /dev/null +++ b/examples/twitch_plays/run.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Twitch Plays Go2 — demo with MuJoCo simulator. + +Wires TwitchVotes to the Go2 in MuJoCo via a small bridge module +that converts winning vote choices into Twist commands on cmd_vel. + +Usage:: + + python examples/twitch_plays/run.py +""" + +from __future__ import annotations + +import time + +from dimos.core.blueprints import autoconnect +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.stream.twitch.votes import TwitchChoice, TwitchVotes +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +# Convert TwitchChoice to cmd_vel! +class ChoiceToCmdVel(Module["ModuleConfig"]): + default_config = ModuleConfig + command_duration: float = 1.0 + + chat_vote_choice: In[TwitchChoice] + cmd_vel: Out[Twist] + + @rpc + def start(self) -> None: + super().start() + self.chat_vote_choice.subscribe(self._on_choice) + + def _on_choice(self, choice: TwitchChoice) -> None: + t = Twist() + if choice.winner == "forward": + t.linear.x = 0.3 + elif choice.winner == "back": + t.linear.x = -0.3 + elif choice.winner == "left": + t.angular.z = 0.5 + elif choice.winner == "right": + t.angular.z = -0.5 + + logger.info("[Demo] Executing: %s", choice.winner) + + end = time.time() + self.command_duration + while time.time() < end: + self.cmd_vel.publish(t) + time.sleep(0.1) + + self.cmd_vel.publish(Twist()) + + +if __name__ == "__main__": + autoconnect( + unitree_go2_basic, + TwitchVotes.blueprint( + choices=["forward", "back", "left", "right", "stop"], + message_to_choice=lambda msg, choices: "🤖" in msg.text and msg.find_one(choices), + vote_window_seconds=3.0, + vote_mode="plurality", + ), + ChoiceToCmdVel.blueprint(), + ).global_config( + robot_ip="mujoco", + ).build().loop() diff --git a/pyproject.toml b/pyproject.toml index 7e2f38546e..cff735e8da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -381,7 +381,7 @@ incremental = true strict = true warn_unused_ignores = false explicit_package_bases = true -exclude = "^dimos/models/Detic(/|$)|^dimos/rxpy_backpressure(/|$)|.*/test_.|.*/conftest.py*" +exclude = "^dimos/models/Detic(/|$)|^dimos/rxpy_backpressure(/|$)|.*/test_.|.*/conftest.py*|.*/native/build(/|$)" [[tool.mypy.overrides]] module = [