Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions live_data_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import contextlib
import datetime
import io
import logging
import os
import signal
Expand Down Expand Up @@ -85,6 +86,48 @@
}


class TeeStream(io.TextIOBase):
"""
Stream wrapper that duplicates output to two text streams.

This class simulates a text stream and takes any writes and flushes,
piping the result uniformly to both provided destination streams.
"""

def __init__(self, stream1: Any, stream2: Any) -> None:
"""
Initialize the TeeStream with two target streams.

:param stream1: The primary stream to duplicate output to (e.g. sys.stdout).
:param stream2: The secondary stream to duplicate output to (e.g. io.StringIO).
"""
super().__init__()
self.stream1 = stream1
self.stream2 = stream2

def write(self, data: str) -> int:
"""
Write the string data to both output streams.

:param data: The string data to write out.
:return: The number of characters written.
"""
self.stream1.write(data)
self.stream2.write(data)
return len(data)

def flush(self) -> None:
"""Flush the output buffers of both underlying streams."""
self.stream1.flush()
self.stream2.flush()

def __getattr__(self, name: str) -> Any:
"""
Delegate remaining unfound stream properties or methods to the primary stream.
"""
return getattr(self.stream1, name)


def process_events(events: EventMessage) -> None:
"""
Process event data by adding events to the live workspace.
Expand Down Expand Up @@ -252,6 +295,7 @@ def start_live_reduction( # noqa: C901, PLR0912, PLR0915

# Execute reduction function periodically
if (now - script_last_executed_time).total_seconds() > SCRIPT_EXECUTION_INTERVAL:
output_stream = io.StringIO()
try:
if not kafka_sample_log_streaming:
with Path(epics_log_file).open("r", encoding="utf-8") as f:
Expand All @@ -267,16 +311,23 @@ def start_live_reduction( # noqa: C901, PLR0912, PLR0915
RemoveWorkspaceHistory(ws)

logger.info("Executing reduction script")
reduction_function()
tee_stdout = TeeStream(sys.stdout, output_stream)
tee_stderr = TeeStream(sys.stderr, output_stream)
with contextlib.redirect_stdout(tee_stdout), contextlib.redirect_stderr(tee_stderr):
reduction_function()
logger.info("Reduction script executed")
except Exception as exc:
logger.warning("Error occurred in reduction", exc_info=exc)
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
if output_stream.getvalue():
output_stream.write("\n")
output_stream.write(tb_str)
finally:
try:
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
valkey_client.set(f"live_data:{INSTRUMENT}:traceback", tb_str)
valkey_client.set(f"live_data:{INSTRUMENT}:traceback", output_stream.getvalue())
except Exception as redis_exc:
logger.warning("Failed to store traceback in Valkey", exc_info=redis_exc)
finally:
logger.warning("Failed to store output in Valkey", exc_info=redis_exc)

script_last_executed_time = datetime.datetime.now(tz=datetime.UTC)
logger.info(
"Script will execute again in %s seconds",
Expand Down