From d87b54ae6ca2733a7904e7be90ab2fad44536fad Mon Sep 17 00:00:00 2001 From: onorabil <7421752+onorabil@users.noreply.github.com> Date: Sat, 1 Mar 2025 17:43:59 +0200 Subject: [PATCH 1/5] Fix for text/byte streams Fix for RuntimeWarning: coroutine 'RoomManager.handle_text_stream' was never awaited This occurs when a message is received from a text stream. --- livekit-rtc/livekit/rtc/room.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index db0f92bf..8e12945a 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -753,7 +753,11 @@ def _handle_stream_header( text_reader = TextStreamReader(header) self._text_stream_readers[header.stream_id] = text_reader - text_stream_handler(text_reader, participant_identity) + task = asyncio.create_task( + text_stream_handler(text_reader, participant_identity, self) + ) + self._data_stream_tasks.add(task) + task.add_done_callback(self._data_stream_tasks.discard) elif stream_type == "byte_header": byte_stream_handler = self._byte_stream_handlers.get(header.topic) if byte_stream_handler is None: @@ -765,7 +769,11 @@ def _handle_stream_header( byte_reader = ByteStreamReader(header) self._byte_stream_readers[header.stream_id] = byte_reader - byte_stream_handler(byte_reader, participant_identity) + task = asyncio.create_task( + byte_stream_handler(byte_reader, participant_identity) + ) + self._data_stream_tasks.add(task) + task.add_done_callback(self._data_stream_tasks.discard) else: logging.warning("received unknown header type, %s", stream_type) pass From b03258611e1eead3c7e7a2f2ac99fedc8582dbd0 Mon Sep 17 00:00:00 2001 From: onorabil <7421752+onorabil@users.noreply.github.com> Date: Sat, 1 Mar 2025 18:50:11 +0200 Subject: [PATCH 2/5] original call signature --- livekit-rtc/livekit/rtc/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 8e12945a..fae53776 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -754,7 +754,7 @@ def _handle_stream_header( text_reader = TextStreamReader(header) self._text_stream_readers[header.stream_id] = text_reader task = asyncio.create_task( - text_stream_handler(text_reader, participant_identity, self) + text_stream_handler(text_reader, participant_identity) ) self._data_stream_tasks.add(task) task.add_done_callback(self._data_stream_tasks.discard) From 8f955c556f6a6c7756e50e0990c5f1b487b8b4c3 Mon Sep 17 00:00:00 2001 From: onorabil <7421752+onorabil@users.noreply.github.com> Date: Sun, 2 Mar 2025 01:23:57 +0200 Subject: [PATCH 3/5] CI fix attempt 0 --- livekit-rtc/livekit/rtc/room.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index fae53776..d4375813 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -17,7 +17,7 @@ import ctypes import logging from dataclasses import dataclass, field -from typing import Callable, Dict, Literal, Optional, cast, Mapping +from typing import Callable, Dict, Literal, Optional, cast, Mapping, Any from .event_emitter import EventEmitter from ._ffi_client import FfiClient, FfiHandle @@ -753,8 +753,8 @@ def _handle_stream_header( text_reader = TextStreamReader(header) self._text_stream_readers[header.stream_id] = text_reader - task = asyncio.create_task( - text_stream_handler(text_reader, participant_identity) + task: asyncio.Task[Any] = asyncio.create_task( + text_stream_handler(text_reader, participant_identity) # type: ignore ) self._data_stream_tasks.add(task) task.add_done_callback(self._data_stream_tasks.discard) @@ -769,8 +769,8 @@ def _handle_stream_header( byte_reader = ByteStreamReader(header) self._byte_stream_readers[header.stream_id] = byte_reader - task = asyncio.create_task( - byte_stream_handler(byte_reader, participant_identity) + task: asyncio.Task[Any] = asyncio.create_task( + byte_stream_handler(byte_reader, participant_identity) # type: ignore ) self._data_stream_tasks.add(task) task.add_done_callback(self._data_stream_tasks.discard) From 1e19e1195239998760aa5814615b1b24b4ec1907 Mon Sep 17 00:00:00 2001 From: onorabil <7421752+onorabil@users.noreply.github.com> Date: Sun, 2 Mar 2025 01:26:45 +0200 Subject: [PATCH 4/5] CI fix attempt 1 --- livekit-rtc/livekit/rtc/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index d4375813..d8766309 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -769,7 +769,7 @@ def _handle_stream_header( byte_reader = ByteStreamReader(header) self._byte_stream_readers[header.stream_id] = byte_reader - task: asyncio.Task[Any] = asyncio.create_task( + task = asyncio.create_task( byte_stream_handler(byte_reader, participant_identity) # type: ignore ) self._data_stream_tasks.add(task) From 8d170a61f05163f77f097d2697d842b9880e06a2 Mon Sep 17 00:00:00 2001 From: Dragos Costea Date: Sun, 2 Mar 2025 02:09:29 +0200 Subject: [PATCH 5/5] CI fix attempt 2