From 1665060ef192ee1721ba4a3a38b9ba41a6a6ce76 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 29 Jan 2026 21:03:27 +0000 Subject: [PATCH 01/10] feat: add configurable max threads for local and online deployments - Add max_threads configuration to settings.json with separate values for local (4) and online (2) deployments - Add get_max_threads() helper function in common.py that returns appropriate thread count based on deployment mode - Add Threads number input in parameter_section for local mode UI - Update CommandExecutor.run_multiple_commands() to use semaphore limiting based on max_threads setting - Update CommandExecutor.run_topp() to pass -threads parameter to TOPP tools with intelligently distributed thread allocation Thread distribution algorithm: - parallel_commands = min(num_files, max_threads) - threads_per_command = max(1, max_threads // parallel_commands) This ensures optimal resource usage: with 4 max_threads and 2 files, runs 2 parallel commands with 2 threads each; with 4 files, runs 4 parallel commands with 1 thread each. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- settings.json | 4 ++++ src/common/common.py | 23 +++++++++++++++++++++++ src/workflow/CommandExecutor.py | 32 +++++++++++++++++++++++++------- src/workflow/StreamlitUI.py | 12 ++++++++++++ 4 files changed, 64 insertions(+), 7 deletions(-) diff --git a/settings.json b/settings.json index 96bf9ac0..34189b8c 100644 --- a/settings.json +++ b/settings.json @@ -26,5 +26,9 @@ "source_dirs": [ "example-data/workspaces" ] + }, + "max_threads": { + "local": 4, + "online": 2 } } \ No newline at end of file diff --git a/src/common/common.py b/src/common/common.py index 4d24e765..8effd1e5 100644 --- a/src/common/common.py +++ b/src/common/common.py @@ -32,6 +32,29 @@ OS_PLATFORM = sys.platform +def get_max_threads() -> int: + """ + Get max threads for current deployment mode. + + In local mode, checks for UI override in session state. + In online mode, uses the configured value directly. + + Returns: + int: Maximum number of threads to use for parallel processing. + """ + settings = st.session_state.get("settings", {}) + max_threads_config = settings.get("max_threads", {"local": 4, "online": 2}) + + if settings.get("online_deployment", False): + return max_threads_config.get("online", 2) + else: + # Local mode: check for UI override, fallback to config default + return st.session_state.get( + "max_threads_override", + max_threads_config.get("local", 4) + ) + + def is_safe_workspace_name(name: str) -> bool: """ Check if a workspace name is safe (no path traversal characters). diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 6869f647..7efd5989 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -32,8 +32,9 @@ def run_multiple_commands( Executes multiple shell commands concurrently in separate threads. This method leverages threading to run each command in parallel, improving - efficiency for batch command execution. Execution time and command results are - logged if specified. + efficiency for batch command execution. The number of concurrent commands + is limited by the max_threads setting, which is distributed between + parallel command execution and per-tool thread allocation. Args: commands (list[str]): A list where each element is a list representing @@ -42,17 +43,26 @@ def run_multiple_commands( Returns: bool: True if all commands succeeded, False if any failed. """ + from src.common.common import get_max_threads + + # Get thread settings and calculate distribution + max_threads = get_max_threads() + num_commands = len(commands) + parallel_commands = min(num_commands, max_threads) + # Log the start of command execution - self.logger.log(f"Running {len(commands)} commands in parallel...", 1) + self.logger.log(f"Running {num_commands} commands (max {parallel_commands} parallel, {max_threads} total threads)...", 1) start_time = time.time() results = [] lock = threading.Lock() + semaphore = threading.Semaphore(parallel_commands) def run_and_track(cmd): - success = self.run_command(cmd) - with lock: - results.append(success) + with semaphore: + success = self.run_command(cmd) + with lock: + results.append(success) # Initialize a list to keep track of threads threads = [] @@ -69,7 +79,7 @@ def run_and_track(cmd): # Calculate and log the total execution time end_time = time.time() - self.logger.log(f"Total time to run {len(commands)} commands: {end_time - start_time:.2f} seconds", 1) + self.logger.log(f"Total time to run {num_commands} commands: {end_time - start_time:.2f} seconds", 1) return all(results) @@ -210,6 +220,12 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b else: n_processes = max(io_lengths) + # Calculate threads per command based on max_threads setting + from src.common.common import get_max_threads + max_threads = get_max_threads() + parallel_commands = min(n_processes, max_threads) + threads_per_command = max(1, max_threads // parallel_commands) + commands = [] # Load parameters for non-defaults @@ -253,6 +269,8 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b command += [str(x) for x in v] else: command += [str(v)] + # Add threads parameter for TOPP tools + command += ["-threads", str(threads_per_command)] commands.append(command) # check if a ini file has been written, if yes use it (contains custom defaults) diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index a7e632e0..46fa7d5d 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -1108,6 +1108,18 @@ def file_upload_section(self, custom_upload_function) -> None: def parameter_section(self, custom_parameter_function) -> None: st.toggle("Show advanced parameters", value=False, key="advanced") + # Display threads configuration for local mode only + if not st.session_state.settings.get("online_deployment", False): + max_threads_config = st.session_state.settings.get("max_threads", {}) + default_threads = max_threads_config.get("local", 4) + st.number_input( + "Threads", + min_value=1, + value=default_threads, + key="max_threads_override", + help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation." + ) + # Display preset buttons if presets are available for this workflow self.preset_buttons() From 19e0aa04cb326107f1a83a4190423765779e90ff Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 30 Jan 2026 08:59:24 +0000 Subject: [PATCH 02/10] fix: properly initialize session state for Threads number input Pre-initialize max_threads_override in session state before rendering the widget, and remove the value parameter to let Streamlit read from session state. This ensures user changes are properly tracked. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/StreamlitUI.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 46fa7d5d..3182639c 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -1110,12 +1110,13 @@ def parameter_section(self, custom_parameter_function) -> None: # Display threads configuration for local mode only if not st.session_state.settings.get("online_deployment", False): - max_threads_config = st.session_state.settings.get("max_threads", {}) - default_threads = max_threads_config.get("local", 4) + # Initialize session state with default if not set + if "max_threads_override" not in st.session_state: + max_threads_config = st.session_state.settings.get("max_threads", {}) + st.session_state.max_threads_override = max_threads_config.get("local", 4) st.number_input( "Threads", min_value=1, - value=default_threads, key="max_threads_override", help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation." ) From e653f77ff4d943877c11c7fc20fed989a89837c0 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:40:03 +0000 Subject: [PATCH 03/10] fix: persist Threads parameter across page navigation Use input_widget() which integrates with the parameter manager to persist the value to params.json. Also copy value to session state for get_max_threads() to access. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/StreamlitUI.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 3182639c..278d7d92 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -1110,16 +1110,20 @@ def parameter_section(self, custom_parameter_function) -> None: # Display threads configuration for local mode only if not st.session_state.settings.get("online_deployment", False): - # Initialize session state with default if not set - if "max_threads_override" not in st.session_state: - max_threads_config = st.session_state.settings.get("max_threads", {}) - st.session_state.max_threads_override = max_threads_config.get("local", 4) - st.number_input( - "Threads", + max_threads_config = st.session_state.settings.get("max_threads", {}) + default_threads = max_threads_config.get("local", 4) + self.input_widget( + key="max_threads", + default=default_threads, + name="Threads", + widget_type="number", min_value=1, - key="max_threads_override", help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation." ) + # Copy to session state for get_max_threads() to access + prefixed_key = f"{self.parameter_manager.param_prefix}max_threads" + if prefixed_key in st.session_state: + st.session_state.max_threads_override = st.session_state[prefixed_key] # Display preset buttons if presets are available for this workflow self.preset_buttons() From 5a9a870d12be4b823fb9eef0a3bc8088592bb192 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:46:24 +0000 Subject: [PATCH 04/10] refactor: use parameter manager directly in get_max_threads() - Pass parameter_manager to get_max_threads() to read persisted value - Remove session state copying workaround in StreamlitUI - Cleaner design that reads directly from params.json https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/common/common.py | 20 ++++++++++++-------- src/workflow/CommandExecutor.py | 4 ++-- src/workflow/StreamlitUI.py | 4 ---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/common/common.py b/src/common/common.py index 8effd1e5..10ef19ae 100644 --- a/src/common/common.py +++ b/src/common/common.py @@ -32,12 +32,15 @@ OS_PLATFORM = sys.platform -def get_max_threads() -> int: +def get_max_threads(parameter_manager=None) -> int: """ Get max threads for current deployment mode. - In local mode, checks for UI override in session state. - In online mode, uses the configured value directly. + In local mode, reads from parameter manager (persisted params.json). + In online mode, uses the configured value directly from settings. + + Args: + parameter_manager: Optional ParameterManager instance for reading persisted params. Returns: int: Maximum number of threads to use for parallel processing. @@ -48,11 +51,12 @@ def get_max_threads() -> int: if settings.get("online_deployment", False): return max_threads_config.get("online", 2) else: - # Local mode: check for UI override, fallback to config default - return st.session_state.get( - "max_threads_override", - max_threads_config.get("local", 4) - ) + # Local mode: read from parameter manager if available + default = max_threads_config.get("local", 4) + if parameter_manager is not None: + params = parameter_manager.get_parameters_from_json() + return params.get("max_threads", default) + return default def is_safe_workspace_name(name: str) -> bool: diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 6053e244..8ff791e9 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -46,7 +46,7 @@ def run_multiple_commands( from src.common.common import get_max_threads # Get thread settings and calculate distribution - max_threads = get_max_threads() + max_threads = get_max_threads(self.parameter_manager) num_commands = len(commands) parallel_commands = min(num_commands, max_threads) @@ -235,7 +235,7 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b # Calculate threads per command based on max_threads setting from src.common.common import get_max_threads - max_threads = get_max_threads() + max_threads = get_max_threads(self.parameter_manager) parallel_commands = min(n_processes, max_threads) threads_per_command = max(1, max_threads // parallel_commands) diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py index 278d7d92..3cca528b 100644 --- a/src/workflow/StreamlitUI.py +++ b/src/workflow/StreamlitUI.py @@ -1120,10 +1120,6 @@ def parameter_section(self, custom_parameter_function) -> None: min_value=1, help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation." ) - # Copy to session state for get_max_threads() to access - prefixed_key = f"{self.parameter_manager.param_prefix}max_threads" - if prefixed_key in st.session_state: - st.session_state.max_threads_override = st.session_state[prefixed_key] # Display preset buttons if presets are available for this workflow self.preset_buttons() From 949a9bd18d182593e505490c4c8295e744344cdf Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:50:19 +0000 Subject: [PATCH 05/10] refactor: move _get_max_threads() to CommandExecutor - Move function from common.py to CommandExecutor as private method - Simplifies code by using self.parameter_manager directly - Remove unnecessary import and parameter passing https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/common/common.py | 27 --------------------------- src/workflow/CommandExecutor.py | 28 +++++++++++++++++++++++----- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/src/common/common.py b/src/common/common.py index 10ef19ae..4d24e765 100644 --- a/src/common/common.py +++ b/src/common/common.py @@ -32,33 +32,6 @@ OS_PLATFORM = sys.platform -def get_max_threads(parameter_manager=None) -> int: - """ - Get max threads for current deployment mode. - - In local mode, reads from parameter manager (persisted params.json). - In online mode, uses the configured value directly from settings. - - Args: - parameter_manager: Optional ParameterManager instance for reading persisted params. - - Returns: - int: Maximum number of threads to use for parallel processing. - """ - settings = st.session_state.get("settings", {}) - max_threads_config = settings.get("max_threads", {"local": 4, "online": 2}) - - if settings.get("online_deployment", False): - return max_threads_config.get("online", 2) - else: - # Local mode: read from parameter manager if available - default = max_threads_config.get("local", 4) - if parameter_manager is not None: - params = parameter_manager.get_parameters_from_json() - return params.get("max_threads", default) - return default - - def is_safe_workspace_name(name: str) -> bool: """ Check if a workspace name is safe (no path traversal characters). diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 8ff791e9..29bb351d 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -25,6 +25,27 @@ def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: Parame self.logger = logger self.parameter_manager = parameter_manager + def _get_max_threads(self) -> int: + """ + Get max threads for current deployment mode. + + In local mode, reads from parameter manager (persisted params.json). + In online mode, uses the configured value directly from settings. + + Returns: + int: Maximum number of threads to use for parallel processing. + """ + import streamlit as st + settings = st.session_state.get("settings", {}) + max_threads_config = settings.get("max_threads", {"local": 4, "online": 2}) + + if settings.get("online_deployment", False): + return max_threads_config.get("online", 2) + else: + default = max_threads_config.get("local", 4) + params = self.parameter_manager.get_parameters_from_json() + return params.get("max_threads", default) + def run_multiple_commands( self, commands: list[str] ) -> bool: @@ -43,10 +64,8 @@ def run_multiple_commands( Returns: bool: True if all commands succeeded, False if any failed. """ - from src.common.common import get_max_threads - # Get thread settings and calculate distribution - max_threads = get_max_threads(self.parameter_manager) + max_threads = self._get_max_threads() num_commands = len(commands) parallel_commands = min(num_commands, max_threads) @@ -234,8 +253,7 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b n_processes = max(io_lengths) # Calculate threads per command based on max_threads setting - from src.common.common import get_max_threads - max_threads = get_max_threads(self.parameter_manager) + max_threads = self._get_max_threads() parallel_commands = min(n_processes, max_threads) threads_per_command = max(1, max_threads // parallel_commands) From 9c756e7ff645e1d789f7f57bf44538f77f9145d4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:51:34 +0000 Subject: [PATCH 06/10] fix: validate _get_max_threads() return value Coerce to int and clamp to minimum of 1 to prevent: - Semaphore(0) deadlocks - Divide-by-zero in threads_per_command calculation - Issues from non-numeric config values https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/CommandExecutor.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 29bb351d..154f8586 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -33,18 +33,25 @@ def _get_max_threads(self) -> int: In online mode, uses the configured value directly from settings. Returns: - int: Maximum number of threads to use for parallel processing. + int: Maximum number of threads to use for parallel processing (minimum 1). """ import streamlit as st settings = st.session_state.get("settings", {}) max_threads_config = settings.get("max_threads", {"local": 4, "online": 2}) if settings.get("online_deployment", False): - return max_threads_config.get("online", 2) + value = max_threads_config.get("online", 2) else: default = max_threads_config.get("local", 4) params = self.parameter_manager.get_parameters_from_json() - return params.get("max_threads", default) + value = params.get("max_threads", default) + + # Validate: coerce to int and clamp to minimum of 1 + try: + value = int(value) + except (TypeError, ValueError): + value = 1 + return max(1, value) def run_multiple_commands( self, commands: list[str] From 321d85ecf378dd88c4432d853d4055d6baa3e5a6 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:52:40 +0000 Subject: [PATCH 07/10] style: move streamlit import to top of file https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/CommandExecutor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 154f8586..3427332e 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -9,6 +9,7 @@ import sys import importlib.util import json +import streamlit as st class CommandExecutor: """ @@ -35,7 +36,6 @@ def _get_max_threads(self) -> int: Returns: int: Maximum number of threads to use for parallel processing (minimum 1). """ - import streamlit as st settings = st.session_state.get("settings", {}) max_threads_config = settings.get("max_threads", {"local": 4, "online": 2}) From 68d2cce630a145ca2e068d607d64ed1fde399633 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:54:20 +0000 Subject: [PATCH 08/10] feat: log warning for invalid max_threads values Logs to minimal log (level 0) when max_threads is non-numeric or < 1. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/CommandExecutor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 3427332e..63f2d920 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -49,9 +49,13 @@ def _get_max_threads(self) -> int: # Validate: coerce to int and clamp to minimum of 1 try: value = int(value) + if value < 1: + self.logger.log(f"WARNING: Invalid max_threads value ({value}), using 1", 0) + value = 1 except (TypeError, ValueError): + self.logger.log(f"WARNING: Invalid max_threads value ({value}), using 1", 0) value = 1 - return max(1, value) + return value def run_multiple_commands( self, commands: list[str] From 8a3db1fb8bd3796bbca9aaad3dc71fd6e2a146e8 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 20:58:31 +0000 Subject: [PATCH 09/10] simplify: remove warning logging from _get_max_threads() UI already enforces min_value=1, keep just the validation. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/CommandExecutor.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 63f2d920..ed1ed756 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -48,12 +48,8 @@ def _get_max_threads(self) -> int: # Validate: coerce to int and clamp to minimum of 1 try: - value = int(value) - if value < 1: - self.logger.log(f"WARNING: Invalid max_threads value ({value}), using 1", 0) - value = 1 + value = max(1, int(value)) except (TypeError, ValueError): - self.logger.log(f"WARNING: Invalid max_threads value ({value}), using 1", 0) value = 1 return value From 356d5f96dc6d873c09092621cf4597f509620874 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 31 Jan 2026 21:00:45 +0000 Subject: [PATCH 10/10] simplify: remove exception handling, fail loud on invalid config https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --- src/workflow/CommandExecutor.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index ed1ed756..86f265b2 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -46,12 +46,7 @@ def _get_max_threads(self) -> int: params = self.parameter_manager.get_parameters_from_json() value = params.get("max_threads", default) - # Validate: coerce to int and clamp to minimum of 1 - try: - value = max(1, int(value)) - except (TypeError, ValueError): - value = 1 - return value + return max(1, int(value)) def run_multiple_commands( self, commands: list[str]