From 43c3139639784cc16ec4c129c5e05ef780dcdce1 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Thu, 5 Mar 2026 14:27:33 +0000 Subject: [PATCH 1/3] feat: Add TeeStream for duplicating output and enhance traceback storage --- live_data_processor/main.py | 61 ++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/live_data_processor/main.py b/live_data_processor/main.py index f130124..10f4173 100644 --- a/live_data_processor/main.py +++ b/live_data_processor/main.py @@ -8,6 +8,7 @@ import contextlib import datetime +import io import logging import os import signal @@ -85,6 +86,48 @@ } +class TeeStream(io.TextIOBase): + """ + Stream wrapper that duplicates output to two text streams. + + This class simulates a text stream and takes any writes and flushes, + piping the result uniformly to both provided destination streams. + """ + + def __init__(self, stream1: Any, stream2: Any) -> None: + """ + Initialize the TeeStream with two target streams. + + :param stream1: The primary stream to duplicate output to (e.g. sys.stdout). + :param stream2: The secondary stream to duplicate output to (e.g. io.StringIO). + """ + super().__init__() + self.stream1 = stream1 + self.stream2 = stream2 + + def write(self, data: str) -> int: + """ + Write the string data to both output streams. + + :param data: The string data to write out. + :return: The number of characters written. + """ + self.stream1.write(data) + self.stream2.write(data) + return len(data) + + def flush(self) -> None: + """Flush the output buffers of both underlying streams.""" + self.stream1.flush() + self.stream2.flush() + + def __getattr__(self, name: str) -> Any: + """ + Delegate remaining unfound stream properties or methods to the primary stream. + """ + return getattr(self.stream1, name) + + def process_events(events: EventMessage) -> None: """ Process event data by adding events to the live workspace. @@ -252,6 +295,7 @@ def start_live_reduction( # noqa: C901, PLR0912, PLR0915 # Execute reduction function periodically if (now - script_last_executed_time).total_seconds() > SCRIPT_EXECUTION_INTERVAL: + output_stream = io.StringIO() try: if not kafka_sample_log_streaming: with Path(epics_log_file).open("r", encoding="utf-8") as f: @@ -267,16 +311,23 @@ def start_live_reduction( # noqa: C901, PLR0912, PLR0915 RemoveWorkspaceHistory(ws) logger.info("Executing reduction script") - reduction_function() + tee_stdout = TeeStream(sys.stdout, output_stream) + tee_stderr = TeeStream(sys.stderr, output_stream) + with contextlib.redirect_stdout(tee_stdout), contextlib.redirect_stderr(tee_stderr): + reduction_function() logger.info("Reduction script executed") except Exception as exc: logger.warning("Error occurred in reduction", exc_info=exc) + tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + if output_stream.getvalue(): + output_stream.write("\n") + output_stream.write(tb_str) + finally: try: - tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) - valkey_client.set(f"live_data:{INSTRUMENT}:traceback", tb_str) + valkey_client.set(f"live_data:{INSTRUMENT}:traceback", output_stream.getvalue()) except Exception as redis_exc: - logger.warning("Failed to store traceback in Valkey", exc_info=redis_exc) - finally: + logger.warning("Failed to store output in Valkey", exc_info=redis_exc) + script_last_executed_time = datetime.datetime.now(tz=datetime.UTC) logger.info( "Script will execute again in %s seconds", From 0d5d58d0d67e2e35698a567f469f12dd9d76f706 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Thu, 5 Mar 2026 14:27:56 +0000 Subject: [PATCH 2/3] temp build push --- .github/workflows/build-push.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index b61581f..231d1b1 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -1,9 +1,9 @@ name: Build and Push Docker Images -on: - push: - branches: - - main +on: push +# push: +# branches: +# - main env: From 2f1cbb8e4f1968b7caa6725ec044223e5fedbfed Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Mon, 9 Mar 2026 09:04:46 +0000 Subject: [PATCH 3/3] Restrict build-push workflow to main branch --- .github/workflows/build-push.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index 231d1b1..b61581f 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -1,9 +1,9 @@ name: Build and Push Docker Images -on: push -# push: -# branches: -# - main +on: + push: + branches: + - main env: