From e258050998f91d86e6319aad09d3c2b24a230f68 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 08:17:38 -0700 Subject: [PATCH 01/10] =?UTF-8?q?feat(twitch):=20TwitchChat=20module=20?= =?UTF-8?q?=E2=80=94=20Twitch=20Plays=20Go2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Viewers vote via chat keywords (!forward, !left, etc.). Winning command each window is published as Twist on cmd_vel. Four voting modes: - plurality: most votes wins (classic Twitch Plays) - majority: winner needs >50% - weighted_recent: later votes count more - runoff: instant runoff if no majority - dimos/stream/twitch/module.py — TwitchChat module + vote tallying - Blueprint: unitree-go2-twitch (Go2 + TwitchChat) - examples/twitch_plays/ — README + standalone runner --- dimos/robot/all_blueprints.py | 2 + .../blueprints/smart/unitree_go2_twitch.py | 41 +++ dimos/stream/twitch/module.py | 341 ++++++++++++++++++ examples/twitch_plays/README.md | 87 +++++ examples/twitch_plays/run.py | 33 ++ 5 files changed, 504 insertions(+) create mode 100644 dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py create mode 100644 dimos/stream/twitch/module.py create mode 100644 examples/twitch_plays/README.md create mode 100644 examples/twitch_plays/run.py diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 5910093d61..5a846a3a1c 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -86,6 +86,7 @@ "unitree-go2-ros": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_ros:unitree_go2_ros", "unitree-go2-spatial": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_spatial:unitree_go2_spatial", "unitree-go2-temporal-memory": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_temporal_memory:unitree_go2_temporal_memory", + "unitree-go2-twitch": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_twitch:unitree_go2_twitch", "unitree-go2-vlm-stream-test": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_vlm_stream_test:unitree_go2_vlm_stream_test", "unitree-go2-webrtc-keyboard-teleop": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_webrtc_keyboard_teleop:unitree_go2_webrtc_keyboard_teleop", "unity-sim": "dimos.simulation.unity.blueprint:unity_sim", @@ -163,6 +164,7 @@ "speak-skill": "dimos.agents.skills.speak_skill", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory", "twist-teleop-module": "dimos.teleop.quest.quest_extensions", + "twitch-chat": "dimos.stream.twitch.module", "unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container", "unitree-skill-container": "dimos.robot.unitree.unitree_skill_container", "unity-bridge-module": "dimos.simulation.unity.module", diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py new file mode 100644 index 0000000000..d694e4ab5e --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py @@ -0,0 +1,41 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree-go2-twitch — Twitch Plays Go2. + +Viewers in Twitch chat vote on robot commands (!forward, !left, etc.). +The winning command each voting window is sent to the Go2 via cmd_vel. + +Usage:: + + export DIMOS_TWITCH_TOKEN=oauth:your_token + export DIMOS_CHANNEL_NAME=your_channel + dimos run unitree-go2-twitch --robot-ip 192.168.123.161 + +Or with custom voting:: + + dimos run unitree-go2-twitch --robot-ip 192.168.123.161 \\ + --vote-mode weighted_recent --vote-window-seconds 3 +""" + +from dimos.core.blueprints import autoconnect +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.stream.twitch.module import TwitchChat + +unitree_go2_twitch = autoconnect( + unitree_go2_basic, + TwitchChat.blueprint(), +).global_config(n_workers=4, robot_model="unitree_go2") + +__all__ = ["unitree_go2_twitch"] diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py new file mode 100644 index 0000000000..6a786b66bc --- /dev/null +++ b/dimos/stream/twitch/module.py @@ -0,0 +1,341 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TwitchChat: module that connects to a Twitch channel and publishes +chat-voted robot commands as Twist messages on ``cmd_vel``. + +Viewers type keywords in chat (e.g. ``forward``, ``left``). Votes are +collected over a configurable window, and the winning command is converted +to a Twist and published. + +Supports multiple voting mechanisms via ``vote_mode``: + +- **plurality**: most votes wins (classic "Twitch Plays" style) +- **majority**: winner must have >50% of votes, else no action +- **weighted_recent**: votes are time-weighted — later votes in the window + count more (rewards reaction speed) +- **runoff**: if no majority, top-2 enter an instant runoff using voters' + most recent vote as preference + +See ``examples/twitch_plays/`` for usage. +""" + +from __future__ import annotations + +from collections import Counter, deque +from enum import Enum +import threading +import time +from typing import Any + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import Out +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class VoteMode(str, Enum): + """Voting mechanism for aggregating chat commands.""" + + PLURALITY = "plurality" + """Most votes wins. Ties broken by first occurrence.""" + + MAJORITY = "majority" + """Winner must have >50%% of total votes. No action if no majority.""" + + WEIGHTED_RECENT = "weighted_recent" + """Votes are time-weighted: later votes count more. Encourages fast reactions.""" + + RUNOFF = "runoff" + """If no majority, top-2 enter instant runoff using each voter's latest vote.""" + + +class TwitchChatConfig(ModuleConfig): + """Configuration for the TwitchChat module.""" + + twitch_token: str = "" + """OAuth token for the Twitch bot (oauth:xxx). Set via DIMOS_TWITCH_TOKEN env var.""" + + channel_name: str = "" + """Twitch channel to join. Set via DIMOS_CHANNEL_NAME env var.""" + + bot_prefix: str = "!" + """Chat command prefix (e.g. !forward).""" + + commands: list[str] = ["forward", "back", "left", "right", "stop"] + """Valid vote keywords.""" + + vote_window_seconds: float = 5.0 + """Duration of each voting window in seconds.""" + + min_votes_threshold: int = 1 + """Minimum total votes required to trigger an action.""" + + vote_mode: VoteMode = VoteMode.PLURALITY + """Voting aggregation mechanism.""" + + linear_speed: float = 0.3 + """Forward/backward speed in m/s for movement commands.""" + + angular_speed: float = 0.5 + """Turning speed in rad/s for left/right commands.""" + + command_duration: float = 1.0 + """How long each winning command is published (seconds).""" + + +# ── Command → Twist mapping ── + +_COMMAND_MAP: dict[str, tuple[float, float, float, float, float, float]] = { + # (linear.x, linear.y, linear.z, angular.x, angular.y, angular.z) + "forward": (1.0, 0.0, 0.0, 0.0, 0.0, 0.0), + "back": (-1.0, 0.0, 0.0, 0.0, 0.0, 0.0), + "left": (0.0, 0.0, 0.0, 0.0, 0.0, 1.0), + "right": (0.0, 0.0, 0.0, 0.0, 0.0, -1.0), + "stop": (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), +} + + +def _command_to_twist(command: str, linear_speed: float, angular_speed: float) -> Twist: + """Convert a command string to a Twist message.""" + scales = _COMMAND_MAP.get(command, (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) + t = Twist() + t.linear.x = scales[0] * linear_speed + t.linear.y = scales[1] * linear_speed + t.linear.z = scales[2] * linear_speed + t.angular.x = scales[3] * angular_speed + t.angular.y = scales[4] * angular_speed + t.angular.z = scales[5] * angular_speed + return t + + +# ── Vote tallying ── + + +def _tally_plurality(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + if not counts: + return None + return counts.most_common(1)[0][0] + + +def _tally_majority(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + total = sum(counts.values()) + if total == 0: + return None + winner, count = counts.most_common(1)[0] + return winner if count > total / 2 else None + + +def _tally_weighted_recent( + votes: list[tuple[str, float, str]], window_start: float, window_end: float +) -> str | None: + """Time-weighted: votes later in the window count more.""" + if not votes: + return None + duration = max(window_end - window_start, 0.001) + weighted: Counter[str] = Counter() + for cmd, ts, _ in votes: + weight = 0.5 + 0.5 * ((ts - window_start) / duration) # 0.5 → 1.0 + weighted[cmd] += weight + return weighted.most_common(1)[0][0] if weighted else None + + +def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: + """Instant runoff: if no majority, keep top-2 and re-tally using each voter's latest vote.""" + counts = Counter(cmd for cmd, _, _ in votes) + total = sum(counts.values()) + if total == 0: + return None + winner, count = counts.most_common(1)[0] + if count > total / 2: + return winner + + # Top 2 + top2 = {cmd for cmd, _ in counts.most_common(2)} + if len(top2) < 2: + return winner + + # Re-tally: for each voter, use their latest vote if it's in top2 + latest: dict[str, str] = {} # voter → latest command + for cmd, _, voter in votes: + latest[voter] = cmd + + runoff_counts: Counter[str] = Counter() + for voter, cmd in latest.items(): + if cmd in top2: + runoff_counts[cmd] += 1 + + return runoff_counts.most_common(1)[0][0] if runoff_counts else winner + + +class TwitchChat(Module["TwitchChatConfig"]): + """Twitch chat → robot cmd_vel via vote aggregation. + + Connects to a Twitch channel, collects keyword votes from chat, + and publishes the winning command as a Twist on ``cmd_vel``. + """ + + default_config = TwitchChatConfig + + cmd_vel: Out[Twist] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._votes: deque[tuple[str, float, str]] = deque() # (command, timestamp, voter) + self._running = threading.Event() + self._vote_thread: threading.Thread | None = None + self._bot_thread: threading.Thread | None = None + self._bot: Any = None + + @rpc + def start(self) -> None: + super().start() + self._running.set() + + # Start vote processing loop + self._vote_thread = threading.Thread( + target=self._vote_loop, daemon=True, name="twitch-vote" + ) + self._vote_thread.start() + + # Start Twitch bot in a separate thread (asyncio) + if self.config.twitch_token and self.config.channel_name: + self._bot_thread = threading.Thread( + target=self._run_bot, daemon=True, name="twitch-bot" + ) + self._bot_thread.start() + logger.info( + "[TwitchChat] Started", + channel=self.config.channel_name, + vote_mode=self.config.vote_mode.value, + window=self.config.vote_window_seconds, + ) + else: + logger.warning("[TwitchChat] No token/channel configured — running in local-only mode") + + @rpc + def stop(self) -> None: + self._running.clear() + if self._vote_thread is not None: + self._vote_thread.join(timeout=2) + self._vote_thread = None + if self._bot_thread is not None: + self._bot_thread.join(timeout=2) + self._bot_thread = None + super().stop() + + def record_vote(self, command: str, voter: str = "anonymous") -> None: + """Record a vote (called from bot callback or test code).""" + cmd = command.lower().strip() + if cmd in set(self.config.commands): + self._votes.append((cmd, time.time(), voter)) + + def _run_bot(self) -> None: + """Run the TwitchIO bot in its own asyncio loop.""" + try: + from twitchio.ext import commands as twitch_commands # type: ignore[import-untyped] + + module_ref = self + + class _Bot(twitch_commands.Bot): # type: ignore[misc] + def __init__(inner_self) -> None: # noqa: N805 + super().__init__( + token=module_ref.config.twitch_token, + prefix=module_ref.config.bot_prefix, + initial_channels=[module_ref.config.channel_name], + ) + + async def event_ready(inner_self) -> None: # noqa: N805 + logger.info("[TwitchChat] Bot connected as %s", inner_self.nick) + + async def event_message(inner_self, message: Any) -> None: # noqa: N805 + if message.echo: + return + content = message.content.strip() + if content.startswith(module_ref.config.bot_prefix): + cmd = content[len(module_ref.config.bot_prefix) :].strip().lower() + module_ref.record_vote(cmd, voter=message.author.name) + + self._bot = _Bot() + self._bot.run() + except Exception: + logger.exception("[TwitchChat] Bot crashed") + + def _vote_loop(self) -> None: + """Periodically tally votes and publish winning command.""" + while self._running.is_set(): + window_start = time.time() + time.sleep(self.config.vote_window_seconds) + window_end = time.time() + + if not self._running.is_set(): + break + + # Collect votes within window + cutoff = window_end - self.config.vote_window_seconds + current_votes = [(cmd, ts, voter) for cmd, ts, voter in self._votes if ts >= cutoff] + self._votes.clear() + + if len(current_votes) < self.config.min_votes_threshold: + continue + + # Tally based on vote mode + winner = self._tally(current_votes, window_start, window_end) + if winner is None: + continue + + # Publish winning command as Twist + twist = _command_to_twist(winner, self.config.linear_speed, self.config.angular_speed) + + logger.info( + "[TwitchChat] Winner: %s (%d votes)", + winner, + len(current_votes), + ) + + # Publish for command_duration + end_time = time.time() + self.config.command_duration + while time.time() < end_time and self._running.is_set(): + self.cmd_vel.publish(twist) + time.sleep(0.1) + + # Publish stop after command + self.cmd_vel.publish(_command_to_twist("stop", 0.0, 0.0)) + + pass # Future: publish status on a separate stream + + def _tally( + self, + votes: list[tuple[str, float, str]], + window_start: float, + window_end: float, + ) -> str | None: + mode = self.config.vote_mode + if mode == VoteMode.PLURALITY: + return _tally_plurality(votes) + elif mode == VoteMode.MAJORITY: + return _tally_majority(votes) + elif mode == VoteMode.WEIGHTED_RECENT: + return _tally_weighted_recent(votes, window_start, window_end) + elif mode == VoteMode.RUNOFF: + return _tally_runoff(votes) + return _tally_plurality(votes) + + +twitch_chat = TwitchChat.blueprint diff --git a/examples/twitch_plays/README.md b/examples/twitch_plays/README.md new file mode 100644 index 0000000000..7356e054be --- /dev/null +++ b/examples/twitch_plays/README.md @@ -0,0 +1,87 @@ +# Twitch Plays Go2 + +Control a Unitree Go2 quadruped via Twitch chat votes. + +## Setup + +1. **Get Twitch credentials** from [twitchtokengenerator.com](https://twitchtokengenerator.com/): + - Select "Custom Scope Token" + - Choose scopes: `chat:read`, `chat:edit` + - Copy the Access Token + +2. **Set environment variables**: + ```bash + export DIMOS_TWITCH_TOKEN=oauth:your_access_token_here + export DIMOS_CHANNEL_NAME=your_twitch_channel + ``` + +3. **Install twitchio** (not in DimOS base deps): + ```bash + uv pip install twitchio + ``` + +## Run + +```bash +# Real robot +dimos run unitree-go2-twitch --robot-ip 192.168.123.161 + +# Replay mode (no robot, test the chat integration) +dimos --replay run unitree-go2-twitch +``` + +## Chat Commands + +Viewers type these in Twitch chat (with `!` prefix by default): + +| Command | Action | +|---------|--------| +| `!forward` | Walk forward | +| `!back` | Walk backward | +| `!left` | Turn left | +| `!right` | Turn right | +| `!stop` | Stop moving | + +## Voting Modes + +Configure via `--vote-mode`: + +| Mode | Behaviour | +|------|-----------| +| `plurality` (default) | Most votes wins. Classic "Twitch Plays" style. | +| `majority` | Winner needs >50% of votes. No action if split. | +| `weighted_recent` | Later votes in the window count more. Rewards fast reactions. | +| `runoff` | If no majority, top-2 enter instant runoff using each voter's latest vote. | + +## Configuration + +All configurable via CLI flags or env vars (prefixed `DIMOS_`): + +| Flag | Default | Description | +|------|---------|-------------| +| `--vote-window-seconds` | 5.0 | Duration of each voting round | +| `--min-votes-threshold` | 1 | Minimum votes to trigger action | +| `--linear-speed` | 0.3 | Forward/backward speed (m/s) | +| `--angular-speed` | 0.5 | Turning speed (rad/s) | +| `--command-duration` | 1.0 | How long each command runs (s) | +| `--bot-prefix` | `!` | Chat command prefix | +| `--commands` | forward,back,left,right,stop | Valid keywords | + +## Example: Local Testing Without Twitch + +You can test the vote logic without a Twitch connection: + +```python +from dimos.stream.twitch.module import TwitchChat + +# Create module without credentials (local-only mode) +chat = TwitchChat(vote_window_seconds=2.0, vote_mode="weighted_recent") +chat.start() + +# Simulate votes programmatically +chat.record_vote("forward", voter="user1") +chat.record_vote("forward", voter="user2") +chat.record_vote("left", voter="user3") + +# The vote loop will tally and publish cmd_vel automatically +``` diff --git a/examples/twitch_plays/run.py b/examples/twitch_plays/run.py new file mode 100644 index 0000000000..26ef237f38 --- /dev/null +++ b/examples/twitch_plays/run.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Twitch Plays Go2 — standalone example. + +This is equivalent to ``dimos run unitree-go2-twitch`` but can be +customised directly in Python. + +Usage:: + + export DIMOS_TWITCH_TOKEN=oauth:... + export DIMOS_CHANNEL_NAME=your_channel + python examples/twitch_plays/run.py +""" + +from dimos.robot.unitree.go2.blueprints.smart.unitree_go2_twitch import ( + unitree_go2_twitch, +) + +if __name__ == "__main__": + unitree_go2_twitch.build().loop() From 8c4759a3dd39a5b99be2a409c193d235172dc046 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 21:43:51 -0700 Subject: [PATCH 02/10] fix(twitch): rename unused loop variable to satisfy ruff B007 --- dimos/stream/twitch/module.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index 6a786b66bc..e4e5cbb345 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -177,7 +177,7 @@ def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: latest[voter] = cmd runoff_counts: Counter[str] = Counter() - for voter, cmd in latest.items(): + for _voter, cmd in latest.items(): if cmd in top2: runoff_counts[cmd] += 1 From 559bdf21fde7fe5489613436b2e26a3895175556 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 21:49:38 -0700 Subject: [PATCH 03/10] fix(mypy): exclude native/build dirs and fix import-not-found ignores - Add .*/native/build(/|$) to mypy exclude pattern to skip CMake _deps directories (dimos_lcm-src has invalid Python package name) - Fix type: ignore comments to cover import-not-found alongside import-untyped for twitchio and onnxruntime - Fix Counter[str] += float type error in weighted_recent tally --- dimos/agents_deprecated/memory/image_embedding.py | 2 +- dimos/simulation/mujoco/policy.py | 2 +- dimos/stream/twitch/module.py | 6 ++++-- pyproject.toml | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dimos/agents_deprecated/memory/image_embedding.py b/dimos/agents_deprecated/memory/image_embedding.py index 27e16f1aa8..d6b0967642 100644 --- a/dimos/agents_deprecated/memory/image_embedding.py +++ b/dimos/agents_deprecated/memory/image_embedding.py @@ -63,7 +63,7 @@ def __init__(self, model_name: str = "clip", dimensions: int = 512) -> None: def _initialize_model(self): # type: ignore[no-untyped-def] """Initialize the specified embedding model.""" try: - import onnxruntime as ort # type: ignore[import-untyped] + import onnxruntime as ort # type: ignore[import-untyped,import-not-found] import torch # noqa: F401 from transformers import ( # type: ignore[import-untyped] AutoFeatureExtractor, diff --git a/dimos/simulation/mujoco/policy.py b/dimos/simulation/mujoco/policy.py index 0a792baf1a..e55bc1a1e1 100644 --- a/dimos/simulation/mujoco/policy.py +++ b/dimos/simulation/mujoco/policy.py @@ -20,7 +20,7 @@ import mujoco import numpy as np -import onnxruntime as ort # type: ignore[import-untyped] +import onnxruntime as ort # type: ignore[import-untyped,import-not-found] from dimos.simulation.mujoco.input_controller import InputController from dimos.utils.logging_config import setup_logger diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index e4e5cbb345..dd6384f176 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -152,7 +152,7 @@ def _tally_weighted_recent( weighted: Counter[str] = Counter() for cmd, ts, _ in votes: weight = 0.5 + 0.5 * ((ts - window_start) / duration) # 0.5 → 1.0 - weighted[cmd] += weight + weighted[cmd] += int(weight * 1000) return weighted.most_common(1)[0][0] if weighted else None @@ -249,7 +249,9 @@ def record_vote(self, command: str, voter: str = "anonymous") -> None: def _run_bot(self) -> None: """Run the TwitchIO bot in its own asyncio loop.""" try: - from twitchio.ext import commands as twitch_commands # type: ignore[import-untyped] + from twitchio.ext import ( + commands as twitch_commands, # type: ignore[import-untyped,import-not-found] + ) module_ref = self diff --git a/pyproject.toml b/pyproject.toml index 7e2f38546e..cff735e8da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -381,7 +381,7 @@ incremental = true strict = true warn_unused_ignores = false explicit_package_bases = true -exclude = "^dimos/models/Detic(/|$)|^dimos/rxpy_backpressure(/|$)|.*/test_.|.*/conftest.py*" +exclude = "^dimos/models/Detic(/|$)|^dimos/rxpy_backpressure(/|$)|.*/test_.|.*/conftest.py*|.*/native/build(/|$)" [[tool.mypy.overrides]] module = [ From 67f847e9e62e6c34759846acf248a319e54c5085 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 21:50:37 -0700 Subject: [PATCH 04/10] fix(twitch): put import-not-found ignore on from-line for mypy compat --- dimos/stream/twitch/module.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index dd6384f176..ef07083b65 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -249,8 +249,8 @@ def record_vote(self, command: str, voter: str = "anonymous") -> None: def _run_bot(self) -> None: """Run the TwitchIO bot in its own asyncio loop.""" try: - from twitchio.ext import ( - commands as twitch_commands, # type: ignore[import-untyped,import-not-found] + from twitchio.ext import ( # type: ignore[import-not-found] + commands as twitch_commands, # type: ignore[import-untyped] ) module_ref = self From b69cd2301ea5972a2506de4fd7741143160cb8b0 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 22:34:24 -0700 Subject: [PATCH 05/10] - --- dimos/robot/all_blueprints.py | 2 + .../blueprints/smart/unitree_go2_twitch.py | 8 +- dimos/stream/twitch/module.py | 409 +++++++----------- dimos/stream/twitch/vote_cmd_vel.py | 90 ++++ dimos/stream/twitch/votes.py | 239 ++++++++++ examples/twitch_plays/README.md | 27 +- 6 files changed, 502 insertions(+), 273 deletions(-) create mode 100644 dimos/stream/twitch/vote_cmd_vel.py create mode 100644 dimos/stream/twitch/votes.py diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 5a846a3a1c..80ae03df8a 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -165,6 +165,8 @@ "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory", "twist-teleop-module": "dimos.teleop.quest.quest_extensions", "twitch-chat": "dimos.stream.twitch.module", + "twitch-vote-cmd-vel": "dimos.stream.twitch.vote_cmd_vel", + "twitch-votes": "dimos.stream.twitch.votes", "unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container", "unitree-skill-container": "dimos.robot.unitree.unitree_skill_container", "unity-bridge-module": "dimos.simulation.unity.module", diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py index d694e4ab5e..d01f67ce9b 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""unitree-go2-twitch — Twitch Plays Go2. +"""unitree-go2-twitch — Twitch Plays Go2 (demo). Viewers in Twitch chat vote on robot commands (!forward, !left, etc.). The winning command each voting window is sent to the Go2 via cmd_vel. @@ -31,11 +31,13 @@ from dimos.core.blueprints import autoconnect from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic -from dimos.stream.twitch.module import TwitchChat +from dimos.stream.twitch.vote_cmd_vel import VoteCmdVel +from dimos.stream.twitch.votes import TwitchVotes unitree_go2_twitch = autoconnect( unitree_go2_basic, - TwitchChat.blueprint(), + TwitchVotes.blueprint(), + VoteCmdVel.blueprint(), ).global_config(n_workers=4, robot_model="unitree_go2") __all__ = ["unitree_go2_twitch"] diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index ef07083b65..c65b579f9e 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -12,29 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""TwitchChat: module that connects to a Twitch channel and publishes -chat-voted robot commands as Twist messages on ``cmd_vel``. +"""TwitchChat: connects to a Twitch channel and publishes chat messages. -Viewers type keywords in chat (e.g. ``forward``, ``left``). Votes are -collected over a configurable window, and the winning command is converted -to a Twist and published. - -Supports multiple voting mechanisms via ``vote_mode``: - -- **plurality**: most votes wins (classic "Twitch Plays" style) -- **majority**: winner must have >50% of votes, else no action -- **weighted_recent**: votes are time-weighted — later votes in the window - count more (rewards reaction speed) -- **runoff**: if no majority, top-2 enter an instant runoff using voters' - most recent vote as preference - -See ``examples/twitch_plays/`` for usage. +Publishes all messages on ``raw_messages``, and a subset matching regex +patterns on ``filtered_messages``. """ from __future__ import annotations -from collections import Counter, deque -from enum import Enum +import asyncio +from dataclasses import dataclass, field +import os +import re import threading import time from typing import Any @@ -42,302 +31,198 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import Out -from dimos.msgs.geometry_msgs.Twist import Twist from dimos.utils.logging_config import setup_logger logger = setup_logger() -class VoteMode(str, Enum): - """Voting mechanism for aggregating chat commands.""" - - PLURALITY = "plurality" - """Most votes wins. Ties broken by first occurrence.""" +@dataclass +class TwitchMessage: + author: str = "" + content: str = "" + channel: str = "" + timestamp: float = 0.0 + is_subscriber: bool = False + is_mod: bool = False + badges: dict[str, str] = field(default_factory=dict) - MAJORITY = "majority" - """Winner must have >50%% of total votes. No action if no majority.""" - - WEIGHTED_RECENT = "weighted_recent" - """Votes are time-weighted: later votes count more. Encourages fast reactions.""" - - RUNOFF = "runoff" - """If no majority, top-2 enter instant runoff using each voter's latest vote.""" + def __repr__(self) -> str: + return f"TwitchMessage({self.author}: {self.content!r})" class TwitchChatConfig(ModuleConfig): - """Configuration for the TwitchChat module.""" - twitch_token: str = "" - """OAuth token for the Twitch bot (oauth:xxx). Set via DIMOS_TWITCH_TOKEN env var.""" + """OAuth token (oauth:xxx). Falls back to TWITCH_TOKEN env var.""" channel_name: str = "" - """Twitch channel to join. Set via DIMOS_CHANNEL_NAME env var.""" + """Falls back to CHANNEL_NAME env var.""" bot_prefix: str = "!" - """Chat command prefix (e.g. !forward).""" - - commands: list[str] = ["forward", "back", "left", "right", "stop"] - """Valid vote keywords.""" - - vote_window_seconds: float = 5.0 - """Duration of each voting window in seconds.""" - - min_votes_threshold: int = 1 - """Minimum total votes required to trigger an action.""" - - vote_mode: VoteMode = VoteMode.PLURALITY - """Voting aggregation mechanism.""" - - linear_speed: float = 0.3 - """Forward/backward speed in m/s for movement commands.""" - - angular_speed: float = 0.5 - """Turning speed in rad/s for left/right commands.""" - - command_duration: float = 1.0 - """How long each winning command is published (seconds).""" - - -# ── Command → Twist mapping ── - -_COMMAND_MAP: dict[str, tuple[float, float, float, float, float, float]] = { - # (linear.x, linear.y, linear.z, angular.x, angular.y, angular.z) - "forward": (1.0, 0.0, 0.0, 0.0, 0.0, 0.0), - "back": (-1.0, 0.0, 0.0, 0.0, 0.0, 0.0), - "left": (0.0, 0.0, 0.0, 0.0, 0.0, 1.0), - "right": (0.0, 0.0, 0.0, 0.0, 0.0, -1.0), - "stop": (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), -} - - -def _command_to_twist(command: str, linear_speed: float, angular_speed: float) -> Twist: - """Convert a command string to a Twist message.""" - scales = _COMMAND_MAP.get(command, (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) - t = Twist() - t.linear.x = scales[0] * linear_speed - t.linear.y = scales[1] * linear_speed - t.linear.z = scales[2] * linear_speed - t.angular.x = scales[3] * angular_speed - t.angular.y = scales[4] * angular_speed - t.angular.z = scales[5] * angular_speed - return t - - -# ── Vote tallying ── - - -def _tally_plurality(votes: list[tuple[str, float, str]]) -> str | None: - counts = Counter(cmd for cmd, _, _ in votes) - if not counts: - return None - return counts.most_common(1)[0][0] - - -def _tally_majority(votes: list[tuple[str, float, str]]) -> str | None: - counts = Counter(cmd for cmd, _, _ in votes) - total = sum(counts.values()) - if total == 0: - return None - winner, count = counts.most_common(1)[0] - return winner if count > total / 2 else None - - -def _tally_weighted_recent( - votes: list[tuple[str, float, str]], window_start: float, window_end: float -) -> str | None: - """Time-weighted: votes later in the window count more.""" - if not votes: - return None - duration = max(window_end - window_start, 0.001) - weighted: Counter[str] = Counter() - for cmd, ts, _ in votes: - weight = 0.5 + 0.5 * ((ts - window_start) / duration) # 0.5 → 1.0 - weighted[cmd] += int(weight * 1000) - return weighted.most_common(1)[0][0] if weighted else None - - -def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: - """Instant runoff: if no majority, keep top-2 and re-tally using each voter's latest vote.""" - counts = Counter(cmd for cmd, _, _ in votes) - total = sum(counts.values()) - if total == 0: - return None - winner, count = counts.most_common(1)[0] - if count > total / 2: - return winner - - # Top 2 - top2 = {cmd for cmd, _ in counts.most_common(2)} - if len(top2) < 2: - return winner - - # Re-tally: for each voter, use their latest vote if it's in top2 - latest: dict[str, str] = {} # voter → latest command - for cmd, _, voter in votes: - latest[voter] = cmd - runoff_counts: Counter[str] = Counter() - for _voter, cmd in latest.items(): - if cmd in top2: - runoff_counts[cmd] += 1 - - return runoff_counts.most_common(1)[0][0] if runoff_counts else winner + patterns: list[str] = [] + """Regex patterns for filtered_messages. If empty, all messages pass through.""" class TwitchChat(Module["TwitchChatConfig"]): - """Twitch chat → robot cmd_vel via vote aggregation. + """Connects to a Twitch channel and publishes chat messages. - Connects to a Twitch channel, collects keyword votes from chat, - and publishes the winning command as a Twist on ``cmd_vel``. + - ``raw_messages`` — every chat message + - ``filtered_messages`` — messages matching configured regex patterns """ default_config = TwitchChatConfig - cmd_vel: Out[Twist] + raw_messages: Out[TwitchMessage] + filtered_messages: Out[TwitchMessage] def __init__(self, **kwargs: Any) -> None: - super().__init__(**kwargs) - self._votes: deque[tuple[str, float, str]] = deque() # (command, timestamp, voter) - self._running = threading.Event() - self._vote_thread: threading.Thread | None = None + self._bot: _TwitchBot | None = None self._bot_thread: threading.Thread | None = None - self._bot: Any = None + self._bot_loop: asyncio.AbstractEventLoop | None = None + self._compiled_patterns: list[re.Pattern[str]] = [] + super().__init__(**kwargs) @rpc def start(self) -> None: super().start() - self._running.set() - # Start vote processing loop - self._vote_thread = threading.Thread( - target=self._vote_loop, daemon=True, name="twitch-vote" + token = self.config.twitch_token or os.getenv("TWITCH_TOKEN", "") + channel = self.config.channel_name or os.getenv("CHANNEL_NAME", "") + + if not token or not channel: + logger.warning("[TwitchChat] No token/channel — running in local-only mode") + return + + self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.config.patterns] + + self._bot_loop = asyncio.new_event_loop() + self._bot_thread = threading.Thread( + target=self._run_bot_loop, + args=(token, channel), + daemon=True, + name="twitch-bot", ) - self._vote_thread.start() + self._bot_thread.start() + logger.info("[TwitchChat] Started", channel=channel) - # Start Twitch bot in a separate thread (asyncio) - if self.config.twitch_token and self.config.channel_name: - self._bot_thread = threading.Thread( - target=self._run_bot, daemon=True, name="twitch-bot" - ) - self._bot_thread.start() - logger.info( - "[TwitchChat] Started", - channel=self.config.channel_name, - vote_mode=self.config.vote_mode.value, - window=self.config.vote_window_seconds, + def _run_bot_loop(self, token: str, channel: str) -> None: + assert self._bot_loop is not None + asyncio.set_event_loop(self._bot_loop) + try: + self._bot = _TwitchBot( + token=token, + channel=channel, + prefix=self.config.bot_prefix, + on_message_cb=self._handle_message, + on_ready_cb=self._handle_ready, ) - else: - logger.warning("[TwitchChat] No token/channel configured — running in local-only mode") + self._bot.run() + except ImportError: + logger.error("[TwitchChat] twitchio is not installed — run: uv pip install twitchio") + except Exception: + logger.exception("[TwitchChat] Bot crashed") @rpc def stop(self) -> None: - self._running.clear() - if self._vote_thread is not None: - self._vote_thread.join(timeout=2) - self._vote_thread = None - if self._bot_thread is not None: - self._bot_thread.join(timeout=2) - self._bot_thread = None - super().stop() + if self._bot is not None and self._bot_loop is not None: - def record_vote(self, command: str, voter: str = "anonymous") -> None: - """Record a vote (called from bot callback or test code).""" - cmd = command.lower().strip() - if cmd in set(self.config.commands): - self._votes.append((cmd, time.time(), voter)) + async def _close() -> None: + assert self._bot is not None + await self._bot.close() - def _run_bot(self) -> None: - """Run the TwitchIO bot in its own asyncio loop.""" - try: - from twitchio.ext import ( # type: ignore[import-not-found] - commands as twitch_commands, # type: ignore[import-untyped] - ) + asyncio.run_coroutine_threadsafe(_close(), self._bot_loop).result(timeout=5) - module_ref = self + if self._bot_loop is not None: + self._bot_loop.call_soon_threadsafe(self._bot_loop.stop) - class _Bot(twitch_commands.Bot): # type: ignore[misc] - def __init__(inner_self) -> None: # noqa: N805 - super().__init__( - token=module_ref.config.twitch_token, - prefix=module_ref.config.bot_prefix, - initial_channels=[module_ref.config.channel_name], - ) + if self._bot_thread is not None: + self._bot_thread.join(timeout=5) - async def event_ready(inner_self) -> None: # noqa: N805 - logger.info("[TwitchChat] Bot connected as %s", inner_self.nick) + self._bot = None + self._bot_thread = None + self._bot_loop = None + super().stop() - async def event_message(inner_self, message: Any) -> None: # noqa: N805 - if message.echo: - return - content = message.content.strip() - if content.startswith(module_ref.config.bot_prefix): - cmd = content[len(module_ref.config.bot_prefix) :].strip().lower() - module_ref.record_vote(cmd, voter=message.author.name) + def _handle_ready(self) -> None: + logger.info("[TwitchChat] Ready") + + def _handle_message(self, message: Any) -> None: + """Convert a twitchio Message into a TwitchMessage and publish.""" + badges: dict[str, str] = {} + if message.tags and "badges" in message.tags: + raw = message.tags["badges"] + if raw: + for badge in raw.split(","): + parts = badge.split("/", 1) + if len(parts) == 2: + badges[parts[0]] = parts[1] + + msg = TwitchMessage( + author=message.author.name if message.author else "", + content=message.content or "", + channel=message.channel.name if message.channel else "", + timestamp=time.time(), + is_subscriber="subscriber" in badges, + is_mod="moderator" in badges, + badges=badges, + ) - self._bot = _Bot() - self._bot.run() - except Exception: - logger.exception("[TwitchChat] Bot crashed") + self.raw_messages.publish(msg) + self._publish_if_matched(msg) - def _vote_loop(self) -> None: - """Periodically tally votes and publish winning command.""" - while self._running.is_set(): - window_start = time.time() - time.sleep(self.config.vote_window_seconds) - window_end = time.time() + def _publish_if_matched(self, msg: TwitchMessage) -> None: + """Publish to filtered_messages if msg matches patterns (or no patterns configured).""" + if self._compiled_patterns: + for pat in self._compiled_patterns: + if pat.search(msg.content): + self.filtered_messages.publish(msg) + return + else: + self.filtered_messages.publish(msg) - if not self._running.is_set(): - break + def inject_message(self, content: str, author: str = "anonymous") -> None: + """Inject a chat message programmatically (for testing or local-only mode).""" + msg = TwitchMessage(author=author, content=content, channel="local", timestamp=time.time()) + self.raw_messages.publish(msg) + self._publish_if_matched(msg) - # Collect votes within window - cutoff = window_end - self.config.vote_window_seconds - current_votes = [(cmd, ts, voter) for cmd, ts, voter in self._votes if ts >= cutoff] - self._votes.clear() - if len(current_votes) < self.config.min_votes_threshold: - continue +class _TwitchBot: + """Thin twitchio wrapper that forwards messages via callbacks.""" - # Tally based on vote mode - winner = self._tally(current_votes, window_start, window_end) - if winner is None: - continue + def __init__( + self, + token: str, + channel: str, + prefix: str, + on_message_cb: Any, + on_ready_cb: Any, + ) -> None: + from twitchio.ext import ( # type: ignore[import-not-found] + commands as twitch_commands, # type: ignore[import-untyped] + ) - # Publish winning command as Twist - twist = _command_to_twist(winner, self.config.linear_speed, self.config.angular_speed) + cb_message = on_message_cb + cb_ready = on_ready_cb + chan = channel - logger.info( - "[TwitchChat] Winner: %s (%d votes)", - winner, - len(current_votes), - ) + class _Bot(twitch_commands.Bot): # type: ignore[misc] + def __init__(inner_self) -> None: # noqa: N805 + super().__init__(token=token, prefix=prefix, initial_channels=[chan]) - # Publish for command_duration - end_time = time.time() + self.config.command_duration - while time.time() < end_time and self._running.is_set(): - self.cmd_vel.publish(twist) - time.sleep(0.1) + async def event_ready(inner_self) -> None: # noqa: N805 + logger.info("[TwitchChat] Bot connected as %s to #%s", inner_self.nick, chan) + cb_ready() - # Publish stop after command - self.cmd_vel.publish(_command_to_twist("stop", 0.0, 0.0)) + async def event_message(inner_self, message: Any) -> None: # noqa: N805 + if message.echo: + return + cb_message(message) - pass # Future: publish status on a separate stream + self._bot = _Bot() - def _tally( - self, - votes: list[tuple[str, float, str]], - window_start: float, - window_end: float, - ) -> str | None: - mode = self.config.vote_mode - if mode == VoteMode.PLURALITY: - return _tally_plurality(votes) - elif mode == VoteMode.MAJORITY: - return _tally_majority(votes) - elif mode == VoteMode.WEIGHTED_RECENT: - return _tally_weighted_recent(votes, window_start, window_end) - elif mode == VoteMode.RUNOFF: - return _tally_runoff(votes) - return _tally_plurality(votes) - - -twitch_chat = TwitchChat.blueprint + def run(self) -> None: + self._bot.run() + + async def close(self) -> None: + await self._bot.close() diff --git a/dimos/stream/twitch/vote_cmd_vel.py b/dimos/stream/twitch/vote_cmd_vel.py new file mode 100644 index 0000000000..414a19e955 --- /dev/null +++ b/dimos/stream/twitch/vote_cmd_vel.py @@ -0,0 +1,90 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VoteCmdVel: demo bridge that converts VoteResult into Twist on cmd_vel.""" + +from __future__ import annotations + +import time + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.stream.twitch.votes import VoteResult +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +_COMMAND_MAP: dict[str, tuple[float, float, float, float, float, float]] = { + "forward": (1.0, 0.0, 0.0, 0.0, 0.0, 0.0), + "back": (-1.0, 0.0, 0.0, 0.0, 0.0, 0.0), + "left": (0.0, 0.0, 0.0, 0.0, 0.0, 1.0), + "right": (0.0, 0.0, 0.0, 0.0, 0.0, -1.0), + "stop": (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), +} + + +def _command_to_twist(command: str, linear_speed: float, angular_speed: float) -> Twist: + scales = _COMMAND_MAP.get(command, (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) + t = Twist() + t.linear.x = scales[0] * linear_speed + t.linear.y = scales[1] * linear_speed + t.linear.z = scales[2] * linear_speed + t.angular.x = scales[3] * angular_speed + t.angular.y = scales[4] * angular_speed + t.angular.z = scales[5] * angular_speed + return t + + +class VoteCmdVelConfig(ModuleConfig): + linear_speed: float = 0.3 + angular_speed: float = 0.5 + command_duration: float = 1.0 + + +class VoteCmdVel(Module["VoteCmdVelConfig"]): + """Demo bridge: subscribes to ``vote_results`` and publishes winning + commands as Twist on ``cmd_vel`` for ``command_duration`` seconds.""" + + default_config = VoteCmdVelConfig + + vote_results: In[VoteResult] + cmd_vel: Out[Twist] + + @rpc + def start(self) -> None: + super().start() + self.vote_results.subscribe(self._on_vote_result) + logger.info("[VoteCmdVel] Listening for vote results") + + def _on_vote_result(self, result: VoteResult) -> None: + if not result.winner: + return + + twist = _command_to_twist( + result.winner, self.config.linear_speed, self.config.angular_speed + ) + logger.info("[VoteCmdVel] Executing: %s", result.winner) + + end_time = time.time() + self.config.command_duration + while time.time() < end_time: + self.cmd_vel.publish(twist) + time.sleep(0.1) + + self.cmd_vel.publish(_command_to_twist("stop", 0.0, 0.0)) + + +vote_cmd_vel = VoteCmdVel.blueprint diff --git a/dimos/stream/twitch/votes.py b/dimos/stream/twitch/votes.py new file mode 100644 index 0000000000..019f3a9c84 --- /dev/null +++ b/dimos/stream/twitch/votes.py @@ -0,0 +1,239 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TwitchVotes: extends TwitchChat with vote tallying. + +Collects chat commands over a configurable window and publishes the +winning command as a :class:`VoteResult`. + +Voting modes: plurality, majority, weighted_recent, runoff. +""" + +from __future__ import annotations + +from collections import Counter, deque +from dataclasses import dataclass +from enum import Enum +import re +import threading +import time +from typing import Any + +from dimos.core.core import rpc +from dimos.core.stream import Out +from dimos.stream.twitch.module import TwitchChat, TwitchChatConfig, TwitchMessage +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +@dataclass +class VoteResult: + winner: str = "" + total_votes: int = 0 + timestamp: float = 0.0 + + +class VoteMode(str, Enum): + PLURALITY = "plurality" + MAJORITY = "majority" + WEIGHTED_RECENT = "weighted_recent" + RUNOFF = "runoff" + + +class TwitchVotesConfig(TwitchChatConfig): + keywords: list[str] = ["forward", "back", "left", "right", "stop"] + """Valid vote keywords. Also used to build regex patterns on the base TwitchChat.""" + + vote_window_seconds: float = 5.0 + min_votes_threshold: int = 1 + vote_mode: VoteMode = VoteMode.PLURALITY + + +# ── Vote tallying ── + + +def _tally_plurality(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + if not counts: + return None + return counts.most_common(1)[0][0] + + +def _tally_majority(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + total = sum(counts.values()) + if total == 0: + return None + winner, count = counts.most_common(1)[0] + return winner if count > total / 2 else None + + +def _tally_weighted_recent( + votes: list[tuple[str, float, str]], window_start: float, window_end: float +) -> str | None: + if not votes: + return None + duration = max(window_end - window_start, 0.001) + weighted: Counter[str] = Counter() + for cmd, ts, _ in votes: + weight = 0.5 + 0.5 * ((ts - window_start) / duration) + weighted[cmd] += int(weight * 1000) + return weighted.most_common(1)[0][0] if weighted else None + + +def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: + counts = Counter(cmd for cmd, _, _ in votes) + total = sum(counts.values()) + if total == 0: + return None + winner, count = counts.most_common(1)[0] + if count > total / 2: + return winner + + top2 = {cmd for cmd, _ in counts.most_common(2)} + if len(top2) < 2: + return winner + + # For each voter, use their latest vote if it's in top2 + latest: dict[str, str] = {} + for cmd, _, voter in votes: + latest[voter] = cmd + + runoff_counts: Counter[str] = Counter() + for _voter, cmd in latest.items(): + if cmd in top2: + runoff_counts[cmd] += 1 + + return runoff_counts.most_common(1)[0][0] if runoff_counts else winner + + +class TwitchVotes(TwitchChat): + """Extends TwitchChat with vote tallying. + + Chat messages matching configured keywords are collected over a time + window, then the winning command is published as a :class:`VoteResult` + on ``vote_results``. + """ + + default_config = TwitchVotesConfig + config: TwitchVotesConfig # type narrowing for mypy + + vote_results: Out[VoteResult] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._votes: deque[tuple[str, float, str]] = deque() + self._votes_lock = threading.Lock() + self._vote_thread: threading.Thread | None = None + self._valid_keywords: frozenset[str] = frozenset() + + def _handle_message(self, message: Any) -> None: + """Extend base to also record votes from matching messages.""" + super()._handle_message(message) + + content: str = message.content or "" + prefix = self.config.bot_prefix + if not content.strip().startswith(prefix): + return + cmd = content.strip()[len(prefix) :].strip().lower() + author = message.author.name if message.author else "anonymous" + + if cmd in self._valid_keywords: + with self._votes_lock: + self._votes.append((cmd, time.time(), author)) + + @rpc + def start(self) -> None: + # Auto-generate patterns from keywords so the base class filters correctly + if self.config.keywords and not self.config.patterns: + escaped = "|".join(re.escape(k) for k in self.config.keywords) + self.config.patterns = [rf"^{re.escape(self.config.bot_prefix)}(?:{escaped})\b"] + + self._valid_keywords = frozenset(self.config.keywords) + + super().start() + + self._vote_thread = threading.Thread( + target=self._vote_loop, daemon=True, name="twitch-vote" + ) + self._vote_thread.start() + logger.info( + "[TwitchVotes] Vote loop started", + vote_mode=self.config.vote_mode.value, + window=self.config.vote_window_seconds, + ) + + @rpc + def stop(self) -> None: + if self._vote_thread is not None: + self._vote_thread.join(timeout=2) + self._vote_thread = None + super().stop() + + def record_vote(self, command: str, voter: str = "anonymous") -> None: + """Record a vote programmatically (for testing).""" + cmd = command.lower().strip() + if cmd not in self._valid_keywords: + return + with self._votes_lock: + self._votes.append((cmd, time.time(), voter)) + + def _vote_loop(self) -> None: + while True: + window_start = time.time() + time.sleep(self.config.vote_window_seconds) + window_end = time.time() + + # Atomically drain votes within window + cutoff = window_end - self.config.vote_window_seconds + with self._votes_lock: + current_votes = [(cmd, ts, voter) for cmd, ts, voter in self._votes if ts >= cutoff] + self._votes.clear() + + if len(current_votes) < self.config.min_votes_threshold: + continue + + winner = self._tally(current_votes, window_start, window_end) + if winner is None: + continue + + logger.info( + "[TwitchVotes] Winner: %s (%d votes)", + winner, + len(current_votes), + ) + self.vote_results.publish( + VoteResult(winner=winner, total_votes=len(current_votes), timestamp=window_end) + ) + + def _tally( + self, + votes: list[tuple[str, float, str]], + window_start: float, + window_end: float, + ) -> str | None: + mode = self.config.vote_mode + if mode == VoteMode.PLURALITY: + return _tally_plurality(votes) + elif mode == VoteMode.MAJORITY: + return _tally_majority(votes) + elif mode == VoteMode.WEIGHTED_RECENT: + return _tally_weighted_recent(votes, window_start, window_end) + elif mode == VoteMode.RUNOFF: + return _tally_runoff(votes) + return _tally_plurality(votes) + + +twitch_votes = TwitchVotes.blueprint diff --git a/examples/twitch_plays/README.md b/examples/twitch_plays/README.md index 7356e054be..f481bf3bdf 100644 --- a/examples/twitch_plays/README.md +++ b/examples/twitch_plays/README.md @@ -2,6 +2,17 @@ Control a Unitree Go2 quadruped via Twitch chat votes. +## Architecture + +The Twitch integration is split into three modules: + +- **TwitchChat** (`dimos.stream.twitch.module`) — Base module that connects to + a Twitch channel and publishes `ChatMessage`s. Can optionally filter by keywords. +- **TwitchVotes** (`dimos.stream.twitch.votes`) — Extends TwitchChat with vote + tallying. Publishes `VoteResult` (winning command + vote count) each window. +- **VoteCmdVel** (`dimos.stream.twitch.vote_cmd_vel`) — Demo bridge that + converts `VoteResult` into `Twist` on `cmd_vel` for robot movement. + ## Setup 1. **Get Twitch credentials** from [twitchtokengenerator.com](https://twitchtokengenerator.com/): @@ -65,23 +76,23 @@ All configurable via CLI flags or env vars (prefixed `DIMOS_`): | `--angular-speed` | 0.5 | Turning speed (rad/s) | | `--command-duration` | 1.0 | How long each command runs (s) | | `--bot-prefix` | `!` | Chat command prefix | -| `--commands` | forward,back,left,right,stop | Valid keywords | +| `--keywords` | forward,back,left,right,stop | Valid vote keywords | ## Example: Local Testing Without Twitch You can test the vote logic without a Twitch connection: ```python -from dimos.stream.twitch.module import TwitchChat +from dimos.stream.twitch.votes import TwitchVotes # Create module without credentials (local-only mode) -chat = TwitchChat(vote_window_seconds=2.0, vote_mode="weighted_recent") -chat.start() +votes = TwitchVotes(vote_window_seconds=2.0, vote_mode="weighted_recent") +votes.start() # Simulate votes programmatically -chat.record_vote("forward", voter="user1") -chat.record_vote("forward", voter="user2") -chat.record_vote("left", voter="user3") +votes.record_vote("forward", voter="user1") +votes.record_vote("forward", voter="user2") +votes.record_vote("left", voter="user3") -# The vote loop will tally and publish cmd_vel automatically +# The vote loop will tally and publish VoteResult on vote_results ``` From 8a7b4f6bfd3ca3ca02a16929fb2cb139958e20c5 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 23:06:27 -0700 Subject: [PATCH 06/10] clean --- dimos/robot/all_blueprints.py | 1 - .../blueprints/smart/unitree_go2_twitch.py | 64 ++++++++++--- dimos/stream/twitch/module.py | 45 +++++++-- dimos/stream/twitch/vote_cmd_vel.py | 90 ------------------ dimos/stream/twitch/votes.py | 92 +++++++++++-------- examples/twitch_plays/README.md | 84 +++++------------ examples/twitch_plays/run.py | 73 +++++++++++++-- 7 files changed, 229 insertions(+), 220 deletions(-) delete mode 100644 dimos/stream/twitch/vote_cmd_vel.py diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 80ae03df8a..3eb3908ce6 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -165,7 +165,6 @@ "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory", "twist-teleop-module": "dimos.teleop.quest.quest_extensions", "twitch-chat": "dimos.stream.twitch.module", - "twitch-vote-cmd-vel": "dimos.stream.twitch.vote_cmd_vel", "twitch-votes": "dimos.stream.twitch.votes", "unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container", "unitree-skill-container": "dimos.robot.unitree.unitree_skill_container", diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py index d01f67ce9b..0c380579e5 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_twitch.py @@ -12,32 +12,72 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""unitree-go2-twitch — Twitch Plays Go2 (demo). - -Viewers in Twitch chat vote on robot commands (!forward, !left, etc.). -The winning command each voting window is sent to the Go2 via cmd_vel. +"""unitree-go2-twitch — Twitch Plays Go2. Usage:: export DIMOS_TWITCH_TOKEN=oauth:your_token export DIMOS_CHANNEL_NAME=your_channel dimos run unitree-go2-twitch --robot-ip 192.168.123.161 +""" -Or with custom voting:: +from __future__ import annotations - dimos run unitree-go2-twitch --robot-ip 192.168.123.161 \\ - --vote-mode weighted_recent --vote-window-seconds 3 -""" +import time from dimos.core.blueprints import autoconnect +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs.Twist import Twist from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic -from dimos.stream.twitch.vote_cmd_vel import VoteCmdVel -from dimos.stream.twitch.votes import TwitchVotes +from dimos.stream.twitch.votes import TwitchChoice, TwitchVotes +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class _ChoiceToCmdVel(Module["ModuleConfig"]): + default_config = ModuleConfig + command_duration: float = 1.0 + + chat_vote_choice: In[TwitchChoice] + cmd_vel: Out[Twist] + + @rpc + def start(self) -> None: + super().start() + self.chat_vote_choice.subscribe(self._on_choice) + + def _on_choice(self, choice: TwitchChoice) -> None: + t = Twist() + if choice.winner == "forward": + t.linear.x = 0.3 + elif choice.winner == "back": + t.linear.x = -0.3 + elif choice.winner == "left": + t.angular.z = 0.5 + elif choice.winner == "right": + t.angular.z = -0.5 + + logger.info("[TwitchPlays] Executing: %s", choice.winner) + + end = time.time() + self.command_duration + while time.time() < end: + self.cmd_vel.publish(t) + time.sleep(0.1) + + self.cmd_vel.publish(Twist()) + unitree_go2_twitch = autoconnect( unitree_go2_basic, - TwitchVotes.blueprint(), - VoteCmdVel.blueprint(), + TwitchVotes.blueprint( + choices=["forward", "back", "left", "right", "stop"], + vote_window_seconds=5.0, + vote_mode="plurality", + ), + _ChoiceToCmdVel.blueprint(), ).global_config(n_workers=4, robot_model="unitree_go2") __all__ = ["unitree_go2_twitch"] diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index c65b579f9e..4b71effc84 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -21,6 +21,7 @@ from __future__ import annotations import asyncio +from collections.abc import Callable from dataclasses import dataclass, field import os import re @@ -46,21 +47,36 @@ class TwitchMessage: is_mod: bool = False badges: dict[str, str] = field(default_factory=dict) + @property + def text(self) -> str: + return self.content + + def find_one(self, options: list[str] | set[str] | frozenset[str]) -> str | None: + """Return the first option found in the message content (case-insensitive), or None.""" + lower = self.content.lower() + for opt in options: + if opt.lower() in lower: + return opt + return None + def __repr__(self) -> str: return f"TwitchMessage({self.author}: {self.content!r})" class TwitchChatConfig(ModuleConfig): + # OAuth token (oauth:xxx). Falls back to DIMOS_TWITCH_TOKEN env var. twitch_token: str = "" - """OAuth token (oauth:xxx). Falls back to TWITCH_TOKEN env var.""" - + # Falls back to DIMOS_CHANNEL_NAME env var. channel_name: str = "" - """Falls back to CHANNEL_NAME env var.""" - bot_prefix: str = "!" - + # Regex patterns for filtered_messages. If empty, all messages pass through. patterns: list[str] = [] - """Regex patterns for filtered_messages. If empty, all messages pass through.""" + # Only pass messages where is_mod matches this value. + filter_is_mod: bool | None = None + # Only pass messages where is_subscriber matches this value. + filter_is_subscriber: bool | None = None + filter_content: Callable[[str], bool] | None = None + filter_author: Callable[[str], bool] | None = None class TwitchChat(Module["TwitchChatConfig"]): @@ -86,8 +102,8 @@ def __init__(self, **kwargs: Any) -> None: def start(self) -> None: super().start() - token = self.config.twitch_token or os.getenv("TWITCH_TOKEN", "") - channel = self.config.channel_name or os.getenv("CHANNEL_NAME", "") + token = self.config.twitch_token or os.getenv("DIMOS_TWITCH_TOKEN", "") + channel = self.config.channel_name or os.getenv("DIMOS_CHANNEL_NAME", "") if not token or not channel: logger.warning("[TwitchChat] No token/channel — running in local-only mode") @@ -171,7 +187,18 @@ def _handle_message(self, message: Any) -> None: self._publish_if_matched(msg) def _publish_if_matched(self, msg: TwitchMessage) -> None: - """Publish to filtered_messages if msg matches patterns (or no patterns configured).""" + """Publish to filtered_messages if msg passes all configured filters.""" + cfg = self.config + + if cfg.filter_is_mod is not None and msg.is_mod != cfg.filter_is_mod: + return + if cfg.filter_is_subscriber is not None and msg.is_subscriber != cfg.filter_is_subscriber: + return + if cfg.filter_author is not None and not cfg.filter_author(msg.author): + return + if cfg.filter_content is not None and not cfg.filter_content(msg.content): + return + if self._compiled_patterns: for pat in self._compiled_patterns: if pat.search(msg.content): diff --git a/dimos/stream/twitch/vote_cmd_vel.py b/dimos/stream/twitch/vote_cmd_vel.py deleted file mode 100644 index 414a19e955..0000000000 --- a/dimos/stream/twitch/vote_cmd_vel.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""VoteCmdVel: demo bridge that converts VoteResult into Twist on cmd_vel.""" - -from __future__ import annotations - -import time - -from dimos.core.core import rpc -from dimos.core.module import Module, ModuleConfig -from dimos.core.stream import In, Out -from dimos.msgs.geometry_msgs.Twist import Twist -from dimos.stream.twitch.votes import VoteResult -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -_COMMAND_MAP: dict[str, tuple[float, float, float, float, float, float]] = { - "forward": (1.0, 0.0, 0.0, 0.0, 0.0, 0.0), - "back": (-1.0, 0.0, 0.0, 0.0, 0.0, 0.0), - "left": (0.0, 0.0, 0.0, 0.0, 0.0, 1.0), - "right": (0.0, 0.0, 0.0, 0.0, 0.0, -1.0), - "stop": (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), -} - - -def _command_to_twist(command: str, linear_speed: float, angular_speed: float) -> Twist: - scales = _COMMAND_MAP.get(command, (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) - t = Twist() - t.linear.x = scales[0] * linear_speed - t.linear.y = scales[1] * linear_speed - t.linear.z = scales[2] * linear_speed - t.angular.x = scales[3] * angular_speed - t.angular.y = scales[4] * angular_speed - t.angular.z = scales[5] * angular_speed - return t - - -class VoteCmdVelConfig(ModuleConfig): - linear_speed: float = 0.3 - angular_speed: float = 0.5 - command_duration: float = 1.0 - - -class VoteCmdVel(Module["VoteCmdVelConfig"]): - """Demo bridge: subscribes to ``vote_results`` and publishes winning - commands as Twist on ``cmd_vel`` for ``command_duration`` seconds.""" - - default_config = VoteCmdVelConfig - - vote_results: In[VoteResult] - cmd_vel: Out[Twist] - - @rpc - def start(self) -> None: - super().start() - self.vote_results.subscribe(self._on_vote_result) - logger.info("[VoteCmdVel] Listening for vote results") - - def _on_vote_result(self, result: VoteResult) -> None: - if not result.winner: - return - - twist = _command_to_twist( - result.winner, self.config.linear_speed, self.config.angular_speed - ) - logger.info("[VoteCmdVel] Executing: %s", result.winner) - - end_time = time.time() + self.config.command_duration - while time.time() < end_time: - self.cmd_vel.publish(twist) - time.sleep(0.1) - - self.cmd_vel.publish(_command_to_twist("stop", 0.0, 0.0)) - - -vote_cmd_vel = VoteCmdVel.blueprint diff --git a/dimos/stream/twitch/votes.py b/dimos/stream/twitch/votes.py index 019f3a9c84..ee88ef2d77 100644 --- a/dimos/stream/twitch/votes.py +++ b/dimos/stream/twitch/votes.py @@ -14,8 +14,10 @@ """TwitchVotes: extends TwitchChat with vote tallying. -Collects chat commands over a configurable window and publishes the -winning command as a :class:`VoteResult`. +Each incoming message is passed through ``message_to_choice`` to extract a +choice string. If the result is one of ``choices``, the vote is recorded. +At the end of each window the winning choice is published as a +:class:`TwitchChoice` on ``chat_vote_choice``. Voting modes: plurality, majority, weighted_recent, runoff. """ @@ -23,6 +25,7 @@ from __future__ import annotations from collections import Counter, deque +from collections.abc import Callable from dataclasses import dataclass from enum import Enum import re @@ -39,7 +42,7 @@ @dataclass -class VoteResult: +class TwitchChoice: winner: str = "" total_votes: int = 0 timestamp: float = 0.0 @@ -52,9 +55,15 @@ class VoteMode(str, Enum): RUNOFF = "runoff" +def _default_message_to_choice(msg: TwitchMessage, choices: list[str]) -> str | None: + return msg.find_one(choices) + + class TwitchVotesConfig(TwitchChatConfig): - keywords: list[str] = ["forward", "back", "left", "right", "stop"] - """Valid vote keywords. Also used to build regex patterns on the base TwitchChat.""" + # A vote is only counted if message_to_choice returns one of these. + choices: list[str] = ["forward", "back", "left", "right", "stop"] + # (msg, choices) -> choice string or None + message_to_choice: Callable[[TwitchMessage, list[str]], str | None] = _default_message_to_choice vote_window_seconds: float = 5.0 min_votes_threshold: int = 1 @@ -106,7 +115,6 @@ def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: if len(top2) < 2: return winner - # For each voter, use their latest vote if it's in top2 latest: dict[str, str] = {} for cmd, _, voter in votes: latest[voter] = cmd @@ -122,46 +130,63 @@ def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: class TwitchVotes(TwitchChat): """Extends TwitchChat with vote tallying. - Chat messages matching configured keywords are collected over a time - window, then the winning command is published as a :class:`VoteResult` - on ``vote_results``. + Each incoming message is passed through ``message_to_choice``. If the + result is one of ``choices``, the vote is recorded. At the end of each + window the winning choice is published as a :class:`TwitchChoice` on + ``chat_vote_choice``. """ default_config = TwitchVotesConfig config: TwitchVotesConfig # type narrowing for mypy - vote_results: Out[VoteResult] + chat_vote_choice: Out[TwitchChoice] def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self._votes: deque[tuple[str, float, str]] = deque() self._votes_lock = threading.Lock() self._vote_thread: threading.Thread | None = None - self._valid_keywords: frozenset[str] = frozenset() + self._valid_choices: frozenset[str] = frozenset() def _handle_message(self, message: Any) -> None: - """Extend base to also record votes from matching messages.""" super()._handle_message(message) content: str = message.content or "" - prefix = self.config.bot_prefix - if not content.strip().startswith(prefix): - return - cmd = content.strip()[len(prefix) :].strip().lower() author = message.author.name if message.author else "anonymous" - if cmd in self._valid_keywords: + # Build a TwitchMessage for the callback's second arg + badges: dict[str, str] = {} + if message.tags and "badges" in message.tags: + raw_badges = message.tags["badges"] + if raw_badges: + for badge in raw_badges.split(","): + parts = badge.split("/", 1) + if len(parts) == 2: + badges[parts[0]] = parts[1] + + msg = TwitchMessage( + author=author, + content=content, + channel=message.channel.name if message.channel else "", + timestamp=time.time(), + is_subscriber="subscriber" in badges, + is_mod="moderator" in badges, + badges=badges, + ) + + choice = self.config.message_to_choice(msg, self.config.choices) + if choice is not None and choice in self._valid_choices: with self._votes_lock: - self._votes.append((cmd, time.time(), author)) + self._votes.append((choice, time.time(), author)) @rpc def start(self) -> None: - # Auto-generate patterns from keywords so the base class filters correctly - if self.config.keywords and not self.config.patterns: - escaped = "|".join(re.escape(k) for k in self.config.keywords) - self.config.patterns = [rf"^{re.escape(self.config.bot_prefix)}(?:{escaped})\b"] + self._valid_choices = frozenset(self.config.choices) - self._valid_keywords = frozenset(self.config.keywords) + # Auto-generate filter patterns from choices so the base filters correctly + if self.config.choices and not self.config.patterns: + escaped = "|".join(re.escape(c) for c in self.config.choices) + self.config.patterns = [rf"^{re.escape(self.config.bot_prefix)}(?:{escaped})\b"] super().start() @@ -182,13 +207,13 @@ def stop(self) -> None: self._vote_thread = None super().stop() - def record_vote(self, command: str, voter: str = "anonymous") -> None: + def record_vote(self, choice: str, voter: str = "anonymous") -> None: """Record a vote programmatically (for testing).""" - cmd = command.lower().strip() - if cmd not in self._valid_keywords: + c = choice.lower().strip() + if c not in self._valid_choices: return with self._votes_lock: - self._votes.append((cmd, time.time(), voter)) + self._votes.append((c, time.time(), voter)) def _vote_loop(self) -> None: while True: @@ -196,10 +221,9 @@ def _vote_loop(self) -> None: time.sleep(self.config.vote_window_seconds) window_end = time.time() - # Atomically drain votes within window cutoff = window_end - self.config.vote_window_seconds with self._votes_lock: - current_votes = [(cmd, ts, voter) for cmd, ts, voter in self._votes if ts >= cutoff] + current_votes = [(c, ts, v) for c, ts, v in self._votes if ts >= cutoff] self._votes.clear() if len(current_votes) < self.config.min_votes_threshold: @@ -209,13 +233,9 @@ def _vote_loop(self) -> None: if winner is None: continue - logger.info( - "[TwitchVotes] Winner: %s (%d votes)", - winner, - len(current_votes), - ) - self.vote_results.publish( - VoteResult(winner=winner, total_votes=len(current_votes), timestamp=window_end) + logger.info("[TwitchVotes] Winner: %s (%d votes)", winner, len(current_votes)) + self.chat_vote_choice.publish( + TwitchChoice(winner=winner, total_votes=len(current_votes), timestamp=window_end) ) def _tally( diff --git a/examples/twitch_plays/README.md b/examples/twitch_plays/README.md index f481bf3bdf..7a42089ee4 100644 --- a/examples/twitch_plays/README.md +++ b/examples/twitch_plays/README.md @@ -1,17 +1,6 @@ -# Twitch Plays Go2 +# Twitch Chat Integration -Control a Unitree Go2 quadruped via Twitch chat votes. - -## Architecture - -The Twitch integration is split into three modules: - -- **TwitchChat** (`dimos.stream.twitch.module`) — Base module that connects to - a Twitch channel and publishes `ChatMessage`s. Can optionally filter by keywords. -- **TwitchVotes** (`dimos.stream.twitch.votes`) — Extends TwitchChat with vote - tallying. Publishes `VoteResult` (winning command + vote count) each window. -- **VoteCmdVel** (`dimos.stream.twitch.vote_cmd_vel`) — Demo bridge that - converts `VoteResult` into `Twist` on `cmd_vel` for robot movement. +Connect a Twitch channel's chat to DimOS as a module. ## Setup @@ -34,65 +23,34 @@ The Twitch integration is split into three modules: ## Run ```bash -# Real robot dimos run unitree-go2-twitch --robot-ip 192.168.123.161 - -# Replay mode (no robot, test the chat integration) -dimos --replay run unitree-go2-twitch ``` -## Chat Commands - -Viewers type these in Twitch chat (with `!` prefix by default): - -| Command | Action | -|---------|--------| -| `!forward` | Walk forward | -| `!back` | Walk backward | -| `!left` | Turn left | -| `!right` | Turn right | -| `!stop` | Stop moving | - -## Voting Modes +## Streams -Configure via `--vote-mode`: +- `raw_messages` — every chat message as a `TwitchMessage` +- `filtered_messages` — messages matching configured regex patterns and filters -| Mode | Behaviour | -|------|-----------| -| `plurality` (default) | Most votes wins. Classic "Twitch Plays" style. | -| `majority` | Winner needs >50% of votes. No action if split. | -| `weighted_recent` | Later votes in the window count more. Rewards fast reactions. | -| `runoff` | If no majority, top-2 enter instant runoff using each voter's latest vote. | +## Filters -## Configuration - -All configurable via CLI flags or env vars (prefixed `DIMOS_`): - -| Flag | Default | Description | -|------|---------|-------------| -| `--vote-window-seconds` | 5.0 | Duration of each voting round | -| `--min-votes-threshold` | 1 | Minimum votes to trigger action | -| `--linear-speed` | 0.3 | Forward/backward speed (m/s) | -| `--angular-speed` | 0.5 | Turning speed (rad/s) | -| `--command-duration` | 1.0 | How long each command runs (s) | -| `--bot-prefix` | `!` | Chat command prefix | -| `--keywords` | forward,back,left,right,stop | Valid vote keywords | - -## Example: Local Testing Without Twitch +```python +TwitchChat.blueprint( + patterns=[r"^!(?:forward|back|left|right)"], # regex on content + filter_is_mod=True, # mods only + filter_is_subscriber=True, # subscribers only + filter_author=lambda name: name != "nightbot", # exclude bots + filter_content=lambda text: len(text) < 200, # reject spam +) +``` -You can test the vote logic without a Twitch connection: +## Local Testing ```python -from dimos.stream.twitch.votes import TwitchVotes - -# Create module without credentials (local-only mode) -votes = TwitchVotes(vote_window_seconds=2.0, vote_mode="weighted_recent") -votes.start() +from dimos.stream.twitch.module import TwitchChat -# Simulate votes programmatically -votes.record_vote("forward", voter="user1") -votes.record_vote("forward", voter="user2") -votes.record_vote("left", voter="user3") +chat = TwitchChat() +chat.start() # runs in local-only mode without credentials -# The vote loop will tally and publish VoteResult on vote_results +chat.inject_message("!forward", author="user1") +chat.inject_message("hello", author="user2") ``` diff --git a/examples/twitch_plays/run.py b/examples/twitch_plays/run.py index 26ef237f38..9bb629af5f 100644 --- a/examples/twitch_plays/run.py +++ b/examples/twitch_plays/run.py @@ -13,21 +13,76 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Twitch Plays Go2 — standalone example. +"""Twitch Plays Go2 — demo with replay simulator. -This is equivalent to ``dimos run unitree-go2-twitch`` but can be -customised directly in Python. +Wires TwitchVotes to the Go2 in replay mode via a small bridge module +that converts winning vote choices into Twist commands on cmd_vel. Usage:: - export DIMOS_TWITCH_TOKEN=oauth:... - export DIMOS_CHANNEL_NAME=your_channel python examples/twitch_plays/run.py """ -from dimos.robot.unitree.go2.blueprints.smart.unitree_go2_twitch import ( - unitree_go2_twitch, -) +from __future__ import annotations + +import time + +from dimos.core.blueprints import autoconnect +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.stream.twitch.votes import TwitchChoice, TwitchVotes +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +# Convert TwitchChoice to cmd_vel! +class ChoiceToCmdVel(Module["ModuleConfig"]): + default_config = ModuleConfig + command_duration: float = 1.0 + + chat_vote_choice: In[TwitchChoice] + cmd_vel: Out[Twist] + + @rpc + def start(self) -> None: + super().start() + self.chat_vote_choice.subscribe(self._on_choice) + + def _on_choice(self, choice: TwitchChoice) -> None: + t = Twist() + if choice.winner == "forward": + t.linear.x = 0.3 + elif choice.winner == "back": + t.linear.x = -0.3 + elif choice.winner == "left": + t.angular.z = 0.5 + elif choice.winner == "right": + t.angular.z = -0.5 + + logger.info("[Demo] Executing: %s", choice.winner) + + end = time.time() + self.command_duration + while time.time() < end: + self.cmd_vel.publish(t) + time.sleep(0.1) + + self.cmd_vel.publish(Twist()) + if __name__ == "__main__": - unitree_go2_twitch.build().loop() + autoconnect( + unitree_go2_basic, + TwitchVotes.blueprint( + choices=["forward", "back", "left", "right", "stop"], + message_to_choice=lambda msg, choices: "🤖" in msg.text and msg.find_one(choices), + vote_window_seconds=3.0, + vote_mode="plurality", + ), + ChoiceToCmdVel.blueprint(), + ).global_config( + robot_ip="replay", + ).build().loop() From 00eaffd187f10f3ebe279c844a1cbcb81618461e Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Thu, 26 Mar 2026 23:07:47 -0700 Subject: [PATCH 07/10] feat(twitch): use mujoco simulator instead of replay in demo Co-Authored-By: Claude Opus 4.6 --- examples/twitch_plays/run.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/twitch_plays/run.py b/examples/twitch_plays/run.py index 9bb629af5f..ca22edb337 100644 --- a/examples/twitch_plays/run.py +++ b/examples/twitch_plays/run.py @@ -13,9 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Twitch Plays Go2 — demo with replay simulator. +"""Twitch Plays Go2 — demo with MuJoCo simulator. -Wires TwitchVotes to the Go2 in replay mode via a small bridge module +Wires TwitchVotes to the Go2 in MuJoCo via a small bridge module that converts winning vote choices into Twist commands on cmd_vel. Usage:: @@ -84,5 +84,5 @@ def _on_choice(self, choice: TwitchChoice) -> None: ), ChoiceToCmdVel.blueprint(), ).global_config( - robot_ip="replay", + robot_ip="mujoco", ).build().loop() From 38bdca49f85b863ab64291b1c9c55d285c3b889f Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Fri, 27 Mar 2026 11:44:47 -0700 Subject: [PATCH 08/10] fix(twitch): deduplicate badge parsing, add stop signal, widen signature, add tests - Extract _build_twitch_message and _on_message_received hook in TwitchChat so TwitchVotes doesn't duplicate badge parsing - Add threading.Event stop signal to _vote_loop for clean shutdown - Widen message_to_choice return type to Any (supports lambda short-circuit) - Add 27 unit tests for TwitchMessage, tally functions, and filter patterns Co-Authored-By: Claude Opus 4.6 --- dimos/stream/twitch/module.py | 12 +- dimos/stream/twitch/test_twitch.py | 200 +++++++++++++++++++++++++++++ dimos/stream/twitch/votes.py | 42 ++---- 3 files changed, 219 insertions(+), 35 deletions(-) create mode 100644 dimos/stream/twitch/test_twitch.py diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index 4b71effc84..b9d571a072 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -162,8 +162,8 @@ async def _close() -> None: def _handle_ready(self) -> None: logger.info("[TwitchChat] Ready") - def _handle_message(self, message: Any) -> None: - """Convert a twitchio Message into a TwitchMessage and publish.""" + def _build_twitch_message(self, message: Any) -> TwitchMessage: + """Convert a raw twitchio Message into a TwitchMessage.""" badges: dict[str, str] = {} if message.tags and "badges" in message.tags: raw = message.tags["badges"] @@ -173,7 +173,7 @@ def _handle_message(self, message: Any) -> None: if len(parts) == 2: badges[parts[0]] = parts[1] - msg = TwitchMessage( + return TwitchMessage( author=message.author.name if message.author else "", content=message.content or "", channel=message.channel.name if message.channel else "", @@ -183,8 +183,14 @@ def _handle_message(self, message: Any) -> None: badges=badges, ) + def _handle_message(self, message: Any) -> None: + msg = self._build_twitch_message(message) self.raw_messages.publish(msg) self._publish_if_matched(msg) + self._on_message_received(msg) + + def _on_message_received(self, msg: TwitchMessage) -> None: + """Hook for subclasses to process messages after publishing.""" def _publish_if_matched(self, msg: TwitchMessage) -> None: """Publish to filtered_messages if msg passes all configured filters.""" diff --git a/dimos/stream/twitch/test_twitch.py b/dimos/stream/twitch/test_twitch.py new file mode 100644 index 0000000000..1dd16c6f42 --- /dev/null +++ b/dimos/stream/twitch/test_twitch.py @@ -0,0 +1,200 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for TwitchMessage, TwitchChat filters, and vote tallying.""" + +from __future__ import annotations + +import time + +import pytest + +from dimos.stream.twitch.module import TwitchMessage +from dimos.stream.twitch.votes import ( + TwitchChoice, + _tally_majority, + _tally_plurality, + _tally_runoff, + _tally_weighted_recent, +) + + +# ── TwitchMessage ── + + +class TestTwitchMessage: + def test_text_property(self) -> None: + msg = TwitchMessage(content="hello") + assert msg.text == "hello" + + def test_find_one_match(self) -> None: + msg = TwitchMessage(content="I vote forward please") + assert msg.find_one(["forward", "back", "left", "right"]) == "forward" + + def test_find_one_case_insensitive(self) -> None: + msg = TwitchMessage(content="FORWARD") + assert msg.find_one(["forward", "back"]) == "forward" + + def test_find_one_no_match(self) -> None: + msg = TwitchMessage(content="hello world") + assert msg.find_one(["forward", "back"]) is None + + def test_find_one_first_wins(self) -> None: + msg = TwitchMessage(content="go left or right") + assert msg.find_one(["left", "right"]) == "left" + + def test_find_one_with_set(self) -> None: + msg = TwitchMessage(content="back") + assert msg.find_one({"forward", "back"}) == "back" + + def test_find_one_with_frozenset(self) -> None: + msg = TwitchMessage(content="left") + assert msg.find_one(frozenset(["left", "right"])) == "left" + + def test_repr(self) -> None: + msg = TwitchMessage(author="user1", content="hi") + assert "user1" in repr(msg) + assert "hi" in repr(msg) + + +# ── Tally functions ── + +# Vote tuple format: (choice, timestamp, voter) +NOW = time.time() + + +class TestTallyPlurality: + def test_empty(self) -> None: + assert _tally_plurality([]) is None + + def test_single_vote(self) -> None: + assert _tally_plurality([("forward", NOW, "a")]) == "forward" + + def test_winner(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c")] + assert _tally_plurality(votes) == "forward" + + def test_tie_returns_one(self) -> None: + votes = [("forward", NOW, "a"), ("back", NOW, "b")] + result = _tally_plurality(votes) + assert result in ("forward", "back") + + +class TestTallyMajority: + def test_empty(self) -> None: + assert _tally_majority([]) is None + + def test_majority_winner(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c")] + assert _tally_majority(votes) == "forward" + + def test_no_majority(self) -> None: + votes = [("forward", NOW, "a"), ("back", NOW, "b"), ("left", NOW, "c"), ("right", NOW, "d")] + assert _tally_majority(votes) is None + + def test_exact_half_not_majority(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c"), ("back", NOW, "d")] + assert _tally_majority(votes) is None + + +class TestTallyWeightedRecent: + def test_empty(self) -> None: + assert _tally_weighted_recent([], NOW, NOW + 5) is None + + def test_recent_votes_weighted_higher(self) -> None: + start = NOW + end = NOW + 10 + # Early vote for "back", late vote for "forward" + votes = [("back", start + 1, "a"), ("forward", end - 0.1, "b")] + assert _tally_weighted_recent(votes, start, end) == "forward" + + def test_many_early_can_beat_few_late(self) -> None: + start = NOW + end = NOW + 10 + votes = [ + ("back", start + 0.1, "a"), + ("back", start + 0.2, "b"), + ("back", start + 0.3, "c"), + ("forward", end - 0.1, "d"), + ] + assert _tally_weighted_recent(votes, start, end) == "back" + + +class TestTallyRunoff: + def test_empty(self) -> None: + assert _tally_runoff([]) is None + + def test_clear_majority(self) -> None: + votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c")] + assert _tally_runoff(votes) == "forward" + + def test_runoff_eliminates_third(self) -> None: + # No majority: forward=2, back=2, left=1 + # Runoff between forward and back, left eliminated + votes = [ + ("forward", NOW, "a"), + ("forward", NOW, "b"), + ("back", NOW, "c"), + ("back", NOW, "d"), + ("left", NOW, "e"), + ] + result = _tally_runoff(votes) + assert result in ("forward", "back") + + def test_single_vote(self) -> None: + assert _tally_runoff([("left", NOW, "a")]) == "left" + + +# ── TwitchChat._publish_if_matched filter logic ── +# We test the filter logic indirectly via inject_message since TwitchChat +# requires the module system. Instead we test the filter predicates directly. + + +class TestFilterLogic: + """Test the filter predicate patterns used in TwitchChatConfig.""" + + def test_filter_author_lambda(self) -> None: + exclude_nightbot = lambda name: name != "nightbot" # noqa: E731 + assert exclude_nightbot("user1") is True + assert exclude_nightbot("nightbot") is False + + def test_filter_content_lambda(self) -> None: + reject_spam = lambda text: len(text) < 200 # noqa: E731 + assert reject_spam("short message") is True + assert reject_spam("x" * 201) is False + + +# ── message_to_choice with lambda (the demo pattern) ── + + +class TestMessageToChoiceLambda: + def test_lambda_with_emoji_gate(self) -> None: + choices = ["forward", "back", "left", "right"] + fn = lambda msg, c: "🤖" in msg.text and msg.find_one(c) # noqa: E731 + + msg_with_emoji = TwitchMessage(content="🤖 forward") + assert fn(msg_with_emoji, choices) == "forward" + + msg_without_emoji = TwitchMessage(content="forward") + assert not fn(msg_without_emoji, choices) + + def test_default_message_to_choice(self) -> None: + from dimos.stream.twitch.votes import _default_message_to_choice + + choices = ["forward", "back"] + msg = TwitchMessage(content="go forward!") + assert _default_message_to_choice(msg, choices) == "forward" + + msg2 = TwitchMessage(content="hello") + assert _default_message_to_choice(msg2, choices) is None diff --git a/dimos/stream/twitch/votes.py b/dimos/stream/twitch/votes.py index ee88ef2d77..402142cd0a 100644 --- a/dimos/stream/twitch/votes.py +++ b/dimos/stream/twitch/votes.py @@ -62,8 +62,8 @@ def _default_message_to_choice(msg: TwitchMessage, choices: list[str]) -> str | class TwitchVotesConfig(TwitchChatConfig): # A vote is only counted if message_to_choice returns one of these. choices: list[str] = ["forward", "back", "left", "right", "stop"] - # (msg, choices) -> choice string or None - message_to_choice: Callable[[TwitchMessage, list[str]], str | None] = _default_message_to_choice + # (msg, choices) -> choice string, or any falsy value to skip + message_to_choice: Callable[[TwitchMessage, list[str]], Any] = _default_message_to_choice vote_window_seconds: float = 5.0 min_votes_threshold: int = 1 @@ -146,41 +146,18 @@ def __init__(self, **kwargs: Any) -> None: self._votes: deque[tuple[str, float, str]] = deque() self._votes_lock = threading.Lock() self._vote_thread: threading.Thread | None = None + self._stop_event = threading.Event() self._valid_choices: frozenset[str] = frozenset() - def _handle_message(self, message: Any) -> None: - super()._handle_message(message) - - content: str = message.content or "" - author = message.author.name if message.author else "anonymous" - - # Build a TwitchMessage for the callback's second arg - badges: dict[str, str] = {} - if message.tags and "badges" in message.tags: - raw_badges = message.tags["badges"] - if raw_badges: - for badge in raw_badges.split(","): - parts = badge.split("/", 1) - if len(parts) == 2: - badges[parts[0]] = parts[1] - - msg = TwitchMessage( - author=author, - content=content, - channel=message.channel.name if message.channel else "", - timestamp=time.time(), - is_subscriber="subscriber" in badges, - is_mod="moderator" in badges, - badges=badges, - ) - + def _on_message_received(self, msg: TwitchMessage) -> None: choice = self.config.message_to_choice(msg, self.config.choices) - if choice is not None and choice in self._valid_choices: + if choice and choice in self._valid_choices: with self._votes_lock: - self._votes.append((choice, time.time(), author)) + self._votes.append((choice, time.time(), msg.author)) @rpc def start(self) -> None: + self._stop_event.clear() self._valid_choices = frozenset(self.config.choices) # Auto-generate filter patterns from choices so the base filters correctly @@ -202,6 +179,7 @@ def start(self) -> None: @rpc def stop(self) -> None: + self._stop_event.set() if self._vote_thread is not None: self._vote_thread.join(timeout=2) self._vote_thread = None @@ -216,9 +194,9 @@ def record_vote(self, choice: str, voter: str = "anonymous") -> None: self._votes.append((c, time.time(), voter)) def _vote_loop(self) -> None: - while True: + while not self._stop_event.is_set(): window_start = time.time() - time.sleep(self.config.vote_window_seconds) + self._stop_event.wait(timeout=self.config.vote_window_seconds) window_end = time.time() cutoff = window_end - self.config.vote_window_seconds From 58f9768a0b939f62d0fc46b33e17f1afb92185bb Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Mon, 30 Mar 2026 14:00:17 -0700 Subject: [PATCH 09/10] cleanup --- dimos/stream/twitch/test_twitch.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dimos/stream/twitch/test_twitch.py b/dimos/stream/twitch/test_twitch.py index 1dd16c6f42..4d4e417649 100644 --- a/dimos/stream/twitch/test_twitch.py +++ b/dimos/stream/twitch/test_twitch.py @@ -18,18 +18,14 @@ import time -import pytest - from dimos.stream.twitch.module import TwitchMessage from dimos.stream.twitch.votes import ( - TwitchChoice, _tally_majority, _tally_plurality, _tally_runoff, _tally_weighted_recent, ) - # ── TwitchMessage ── @@ -104,7 +100,12 @@ def test_no_majority(self) -> None: assert _tally_majority(votes) is None def test_exact_half_not_majority(self) -> None: - votes = [("forward", NOW, "a"), ("forward", NOW, "b"), ("back", NOW, "c"), ("back", NOW, "d")] + votes = [ + ("forward", NOW, "a"), + ("forward", NOW, "b"), + ("back", NOW, "c"), + ("back", NOW, "d"), + ] assert _tally_majority(votes) is None From 6a1e33fee02020998b520cf783ef920ec5caaa08 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Mon, 30 Mar 2026 14:16:48 -0700 Subject: [PATCH 10/10] fix(twitch): address code review findings - inject_message now calls _on_message_received hook (votes work in local mode) - find_one uses word-boundary matching (prevents "backwards" matching "back") - stop() wraps bot close in try/except so cleanup always runs - Remove config mutation in TwitchVotes.start() (patterns don't affect voting) - _tally_weighted_recent uses float weights instead of int truncation - _vote_loop preserves votes arriving after window ends instead of clearing all - Deduplicate votes per voter per window (prevents spam) - Compile patterns before local-only early return Co-Authored-By: Claude Opus 4.6 --- dimos/stream/twitch/module.py | 20 ++++++++++++-------- dimos/stream/twitch/test_twitch.py | 10 ++++++++-- dimos/stream/twitch/votes.py | 24 +++++++++++++----------- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/dimos/stream/twitch/module.py b/dimos/stream/twitch/module.py index b9d571a072..dc1eac645c 100644 --- a/dimos/stream/twitch/module.py +++ b/dimos/stream/twitch/module.py @@ -52,10 +52,10 @@ def text(self) -> str: return self.content def find_one(self, options: list[str] | set[str] | frozenset[str]) -> str | None: - """Return the first option found in the message content (case-insensitive), or None.""" + """Return the first option found as a whole word in content (case-insensitive), or None.""" lower = self.content.lower() for opt in options: - if opt.lower() in lower: + if re.search(rf"\b{re.escape(opt.lower())}\b", lower): return opt return None @@ -105,12 +105,12 @@ def start(self) -> None: token = self.config.twitch_token or os.getenv("DIMOS_TWITCH_TOKEN", "") channel = self.config.channel_name or os.getenv("DIMOS_CHANNEL_NAME", "") + self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.config.patterns] + if not token or not channel: logger.warning("[TwitchChat] No token/channel — running in local-only mode") return - self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.config.patterns] - self._bot_loop = asyncio.new_event_loop() self._bot_thread = threading.Thread( target=self._run_bot_loop, @@ -141,12 +141,15 @@ def _run_bot_loop(self, token: str, channel: str) -> None: @rpc def stop(self) -> None: if self._bot is not None and self._bot_loop is not None: + try: - async def _close() -> None: - assert self._bot is not None - await self._bot.close() + async def _close() -> None: + assert self._bot is not None + await self._bot.close() - asyncio.run_coroutine_threadsafe(_close(), self._bot_loop).result(timeout=5) + asyncio.run_coroutine_threadsafe(_close(), self._bot_loop).result(timeout=5) + except Exception: + logger.warning("[TwitchChat] Error closing bot", exc_info=True) if self._bot_loop is not None: self._bot_loop.call_soon_threadsafe(self._bot_loop.stop) @@ -218,6 +221,7 @@ def inject_message(self, content: str, author: str = "anonymous") -> None: msg = TwitchMessage(author=author, content=content, channel="local", timestamp=time.time()) self.raw_messages.publish(msg) self._publish_if_matched(msg) + self._on_message_received(msg) class _TwitchBot: diff --git a/dimos/stream/twitch/test_twitch.py b/dimos/stream/twitch/test_twitch.py index 4d4e417649..fa903322ad 100644 --- a/dimos/stream/twitch/test_twitch.py +++ b/dimos/stream/twitch/test_twitch.py @@ -50,13 +50,19 @@ def test_find_one_first_wins(self) -> None: msg = TwitchMessage(content="go left or right") assert msg.find_one(["left", "right"]) == "left" + def test_find_one_word_boundary(self) -> None: + msg = TwitchMessage(content="I want to go backwards") + assert msg.find_one(["back", "forward"]) is None + def test_find_one_with_set(self) -> None: msg = TwitchMessage(content="back") - assert msg.find_one({"forward", "back"}) == "back" + result = msg.find_one({"forward", "back"}) + assert result in ("forward", "back") # set order not guaranteed def test_find_one_with_frozenset(self) -> None: msg = TwitchMessage(content="left") - assert msg.find_one(frozenset(["left", "right"])) == "left" + result = msg.find_one(frozenset(["left", "right"])) + assert result in ("left", "right") # frozenset order not guaranteed def test_repr(self) -> None: msg = TwitchMessage(author="user1", content="hi") diff --git a/dimos/stream/twitch/votes.py b/dimos/stream/twitch/votes.py index 402142cd0a..3de254f7fb 100644 --- a/dimos/stream/twitch/votes.py +++ b/dimos/stream/twitch/votes.py @@ -28,7 +28,6 @@ from collections.abc import Callable from dataclasses import dataclass from enum import Enum -import re import threading import time from typing import Any @@ -95,11 +94,13 @@ def _tally_weighted_recent( if not votes: return None duration = max(window_end - window_start, 0.001) - weighted: Counter[str] = Counter() + weighted: dict[str, float] = {} for cmd, ts, _ in votes: weight = 0.5 + 0.5 * ((ts - window_start) / duration) - weighted[cmd] += int(weight * 1000) - return weighted.most_common(1)[0][0] if weighted else None + weighted[cmd] = weighted.get(cmd, 0.0) + weight + if not weighted: + return None + return max(weighted, key=weighted.__getitem__) def _tally_runoff(votes: list[tuple[str, float, str]]) -> str | None: @@ -159,12 +160,6 @@ def _on_message_received(self, msg: TwitchMessage) -> None: def start(self) -> None: self._stop_event.clear() self._valid_choices = frozenset(self.config.choices) - - # Auto-generate filter patterns from choices so the base filters correctly - if self.config.choices and not self.config.patterns: - escaped = "|".join(re.escape(c) for c in self.config.choices) - self.config.patterns = [rf"^{re.escape(self.config.bot_prefix)}(?:{escaped})\b"] - super().start() self._vote_thread = threading.Thread( @@ -202,7 +197,14 @@ def _vote_loop(self) -> None: cutoff = window_end - self.config.vote_window_seconds with self._votes_lock: current_votes = [(c, ts, v) for c, ts, v in self._votes if ts >= cutoff] - self._votes.clear() + # Keep only votes that arrived after this window ended + self._votes = deque((c, ts, v) for c, ts, v in self._votes if ts >= window_end) + + # Deduplicate: keep only the latest vote per voter + latest_per_voter: dict[str, tuple[str, float, str]] = {} + for vote in current_votes: + latest_per_voter[vote[2]] = vote + current_votes = list(latest_per_voter.values()) if len(current_votes) < self.config.min_votes_threshold: continue