diff --git a/live_data_processor/main.py b/live_data_processor/main.py index f130124..10f4173 100644 --- a/live_data_processor/main.py +++ b/live_data_processor/main.py @@ -8,6 +8,7 @@ import contextlib import datetime +import io import logging import os import signal @@ -85,6 +86,48 @@ } +class TeeStream(io.TextIOBase): + """ + Stream wrapper that duplicates output to two text streams. + + This class simulates a text stream and takes any writes and flushes, + piping the result uniformly to both provided destination streams. + """ + + def __init__(self, stream1: Any, stream2: Any) -> None: + """ + Initialize the TeeStream with two target streams. + + :param stream1: The primary stream to duplicate output to (e.g. sys.stdout). + :param stream2: The secondary stream to duplicate output to (e.g. io.StringIO). + """ + super().__init__() + self.stream1 = stream1 + self.stream2 = stream2 + + def write(self, data: str) -> int: + """ + Write the string data to both output streams. + + :param data: The string data to write out. + :return: The number of characters written. + """ + self.stream1.write(data) + self.stream2.write(data) + return len(data) + + def flush(self) -> None: + """Flush the output buffers of both underlying streams.""" + self.stream1.flush() + self.stream2.flush() + + def __getattr__(self, name: str) -> Any: + """ + Delegate remaining unfound stream properties or methods to the primary stream. + """ + return getattr(self.stream1, name) + + def process_events(events: EventMessage) -> None: """ Process event data by adding events to the live workspace. @@ -252,6 +295,7 @@ def start_live_reduction( # noqa: C901, PLR0912, PLR0915 # Execute reduction function periodically if (now - script_last_executed_time).total_seconds() > SCRIPT_EXECUTION_INTERVAL: + output_stream = io.StringIO() try: if not kafka_sample_log_streaming: with Path(epics_log_file).open("r", encoding="utf-8") as f: @@ -267,16 +311,23 @@ def start_live_reduction( # noqa: C901, PLR0912, PLR0915 RemoveWorkspaceHistory(ws) logger.info("Executing reduction script") - reduction_function() + tee_stdout = TeeStream(sys.stdout, output_stream) + tee_stderr = TeeStream(sys.stderr, output_stream) + with contextlib.redirect_stdout(tee_stdout), contextlib.redirect_stderr(tee_stderr): + reduction_function() logger.info("Reduction script executed") except Exception as exc: logger.warning("Error occurred in reduction", exc_info=exc) + tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + if output_stream.getvalue(): + output_stream.write("\n") + output_stream.write(tb_str) + finally: try: - tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) - valkey_client.set(f"live_data:{INSTRUMENT}:traceback", tb_str) + valkey_client.set(f"live_data:{INSTRUMENT}:traceback", output_stream.getvalue()) except Exception as redis_exc: - logger.warning("Failed to store traceback in Valkey", exc_info=redis_exc) - finally: + logger.warning("Failed to store output in Valkey", exc_info=redis_exc) + script_last_executed_time = datetime.datetime.now(tz=datetime.UTC) logger.info( "Script will execute again in %s seconds",