Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pytrickle/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,15 @@ async def send_data(self, data: str):
logger.error(f"Error sending data: {e}")
return False

async def send_monitoring_event(self, event: dict, event_type: str = "custom_monitoring_event"):
"""Send a monitoring event to the server."""
if self.server.current_client is None:
logger.warning("No active client connection, cannot send monitoring event")
return False

# Check if client is in error state or stopping
await self.server.current_client.protocol.emit_monitoring_event(event, event_type)

async def send_input_frame(self, frame: Union[VideoFrame, AudioFrame]):
"""Send a video or audio frame to the input processing pipeline."""
if self.server.current_client is None:
Expand Down
57 changes: 57 additions & 0 deletions tests/test_stream_processor_handlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Optional
from unittest.mock import AsyncMock, MagicMock

import pytest
import torch
Expand Down Expand Up @@ -133,3 +134,59 @@ async def test_internal_processor_audio_wrong_type_passthrough():
assert out.timestamp == frame.timestamp
assert out.rate == frame.rate
assert out.samples.shape == frame.samples.shape


@pytest.mark.asyncio
async def test_send_monitoring_event_with_no_client():
"""Test send_monitoring_event returns False when no client is connected."""
inst = MyHandlers()
sp = StreamProcessor.from_handlers(inst)

# Ensure current_client is None
sp.server.current_client = None

result = await sp.send_monitoring_event({"test": "data"})
assert result is False


@pytest.mark.asyncio
async def test_send_monitoring_event_with_client():
"""Test send_monitoring_event calls protocol.emit_monitoring_event when client exists."""
inst = MyHandlers()
sp = StreamProcessor.from_handlers(inst)

# Mock the current client and protocol
mock_client = MagicMock()
mock_protocol = MagicMock()
mock_protocol.emit_monitoring_event = AsyncMock()
mock_client.protocol = mock_protocol
sp.server.current_client = mock_client

event = {"type": "test_event", "data": "test_data"}
event_type = "custom_monitoring_event"

await sp.send_monitoring_event(event, event_type)

# Verify emit_monitoring_event was called with correct arguments
mock_protocol.emit_monitoring_event.assert_called_once_with(event, event_type)


@pytest.mark.asyncio
async def test_send_monitoring_event_with_client_default_event_type():
"""Test send_monitoring_event uses default event_type when not specified."""
inst = MyHandlers()
sp = StreamProcessor.from_handlers(inst)

# Mock the current client and protocol
mock_client = MagicMock()
mock_protocol = MagicMock()
mock_protocol.emit_monitoring_event = AsyncMock()
mock_client.protocol = mock_protocol
sp.server.current_client = mock_client

event = {"type": "test_event"}

await sp.send_monitoring_event(event)

# Verify emit_monitoring_event was called with default event_type
mock_protocol.emit_monitoring_event.assert_called_once_with(event, "custom_monitoring_event")