From 4bc1099c3a96054580873dd7f52498b2f59b8861 Mon Sep 17 00:00:00 2001 From: Soubhagya Mohapatra Date: Fri, 21 Mar 2025 00:57:35 +0530 Subject: [PATCH 1/3] queue worker added to topp workflow --- src/Workflow.py | 23 +++++++++++++++-------- src/workflow/CommandExecutor.py | 3 ++- src/workflow/WorkflowManager.py | 6 +++++- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/Workflow.py b/src/Workflow.py index 7523a0cbf..41f096bab 100644 --- a/src/Workflow.py +++ b/src/Workflow.py @@ -7,7 +7,6 @@ import plotly.express as px from src.common.common import show_fig - class Workflow(WorkflowManager): # Setup pages for upload, parameter, execution and results. # For layout use any streamlit components such as tabs (as shown in example), columns, or even expanders. @@ -70,8 +69,10 @@ def execution(self) -> None: # Run FeatureFinderMetabo tool with input and output files. self.logger.log("Detecting features...") - self.executor.run_topp( - "FeatureFinderMetabo", input_output={"in": in_mzML, "out": out_ffm} + featureFinderMetaboJob = self.queue.enqueue( + self.executor.run_topp, + "FeatureFinderMetabo", + input_output={"in": in_mzML, "out": out_ffm} ) # Prepare input and output files for feature linking @@ -82,17 +83,23 @@ def execution(self) -> None: # Run FeatureLinkerUnlabaeledKD with all feature maps passed at once self.logger.log("Linking features...") - self.executor.run_topp( - "FeatureLinkerUnlabeledKD", input_output={"in": in_fl, "out": out_fl} + featureLinkerUnlabeledKDJob = self.queue.enqueue( + self.executor.run_topp, + "FeatureLinkerUnlabeledKD", + input_output={"in": in_fl, "out": out_fl}, + depends_on=featureFinderMetaboJob ) self.logger.log("Exporting consensus features to pandas DataFrame...") - self.executor.run_python( - "export_consensus_feature_df", input_output={"in": out_fl[0]} + self.queue.enqueue( + self.executor.run_python, + "export_consensus_feature_df", + input_output={"in": out_fl[0]}, + depends_on=featureLinkerUnlabeledKDJob ) # Check if adduct detection should be run. if self.params["run-python-script"]: # Example for a custom Python tool, which is located in src/python-tools. - self.executor.run_python("example", {"in": in_mzML}) + self.queue.enqueue(self.executor.run_python, "example", {"in": in_mzML}) @st.fragment def results(self) -> None: diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 6cc493014..603df134c 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -90,7 +90,8 @@ def run_command(self, command: list[str]) -> None: stdout, stderr = process.communicate() # Cleanup PID file - pid_file_path.unlink() + if pid_file_path.exists(): + pid_file_path.unlink() end_time = time.time() execution_time = end_time - start_time diff --git a/src/workflow/WorkflowManager.py b/src/workflow/WorkflowManager.py index a299f5c2a..defc4a74a 100644 --- a/src/workflow/WorkflowManager.py +++ b/src/workflow/WorkflowManager.py @@ -8,11 +8,14 @@ import streamlit as st import shutil import time +from rq import Queue +from redis import Redis class WorkflowManager: # Core workflow logic using the above classes def __init__(self, name: str, workspace: str): self.name = name + self.queue = Queue(connection=Redis()) self.workflow_dir = Path(workspace, name.replace(" ", "-").lower()) self.file_manager = FileManager(self.workflow_dir) self.logger = Logger(self.workflow_dir) @@ -52,7 +55,8 @@ def workflow_process(self) -> None: except Exception as e: self.logger.log(f"ERROR: {e}") # Delete pid dir path to indicate workflow is done - shutil.rmtree(self.executor.pid_dir, ignore_errors=True) + # TODO: since jobs are in queue, deleting pids directory causes issue, need to find a workaround to delete once all three jobs are over + # shutil.rmtree(self.executor.pid_dir, ignore_errors=True) def show_file_upload_section(self) -> None: """ From 0fcb9252d86489540fd91bd46530d9b6b0eec0d0 Mon Sep 17 00:00:00 2001 From: Soubhagya Mohapatra Date: Fri, 21 Mar 2025 01:03:38 +0530 Subject: [PATCH 2/3] add docker, requirements and docs --- .github/workflows/build-windows-executable-app.yaml | 4 +++- Dockerfile | 2 ++ Dockerfile_simple | 2 ++ docs/installation.md | 4 +++- environment.yml | 4 +++- requirements.txt | 4 +++- test_gui.py | 1 - 7 files changed, 16 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build-windows-executable-app.yaml b/.github/workflows/build-windows-executable-app.yaml index 3284c10e5..99522942a 100644 --- a/.github/workflows/build-windows-executable-app.yaml +++ b/.github/workflows/build-windows-executable-app.yaml @@ -259,7 +259,9 @@ jobs: - name: Create .bat file run: | - echo " start /min .\python-${{ env.PYTHON_VERSION }}\python -m streamlit run app.py local" > ${{ env.APP_NAME }}.bat + echo "start /min .\python-${{ env.PYTHON_VERSION }}\python -m redis-server" > ${{ env.APP_NAME }}.bat + echo "start /min .\python-${{ env.PYTHON_VERSION }}\python -m rq worker --with-scheduler" >> ${{ env.APP_NAME }}.bat + echo "start /min .\python-${{ env.PYTHON_VERSION }}\python -m streamlit run app.py local" >> ${{ env.APP_NAME }}.bat - name: Create All-in-one executable folder run: | diff --git a/Dockerfile b/Dockerfile index 9153e87d7..f7a6b91a4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -141,6 +141,8 @@ RUN echo "0 3 * * * /root/miniforge3/envs/streamlit-env/bin/python /app/clean-up RUN echo "#!/bin/bash" > /app/entrypoint.sh && \ echo "source /root/miniforge3/bin/activate streamlit-env" >> /app/entrypoint.sh && \ echo "service cron start" >> /app/entrypoint.sh && \ + echo "redis-server" >> /app/entrypoint.sh && \ + echo "rq worker --with-scheduler" >> /app/entrypoint.sh && \ echo "streamlit run app.py" >> /app/entrypoint.sh # make the script executable RUN chmod +x /app/entrypoint.sh diff --git a/Dockerfile_simple b/Dockerfile_simple index 64c24d16f..07da643b0 100644 --- a/Dockerfile_simple +++ b/Dockerfile_simple @@ -87,6 +87,8 @@ RUN echo "0 3 * * * /root/miniforge3/envs/streamlit-env/bin/python /app/clean-up RUN echo "#!/bin/bash" > /app/entrypoint.sh RUN echo "source /root/miniforge3/bin/activate streamlit-env" >> /app/entrypoint.sh && \ echo "service cron start" >> /app/entrypoint.sh && \ + echo "redis-server" >> /app/entrypoint.sh && \ + echo "rq worker --with-scheduler" >> /app/entrypoint.sh && \ echo "streamlit run app.py" >> /app/entrypoint.sh # make the script executable RUN chmod +x /app/entrypoint.sh diff --git a/docs/installation.md b/docs/installation.md index f3e6c88af..22b06716c 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -43,7 +43,9 @@ Create and activate the conda environment: Run the app via streamlit command in the terminal. *local* and *online* mode can be toggled in the settings.json. Learn more about *local* and *online* mode in the documentation page 📖 **OpenMS Template App**. -`streamlit run app.py` +1. Open a new terminal and run `redis-server` +2. On another terminal run `rq worker --with-scheduler` (multiple workers can be spawned) +3. Finally start the application `streamlit run app.py` ## Docker diff --git a/environment.yml b/environment.yml index 188d5c118..5fc05d763 100644 --- a/environment.yml +++ b/environment.yml @@ -15,4 +15,6 @@ dependencies: - captcha==0.5.0 - pyopenms_viz==1.0.0 - streamlit-js-eval - - psutil==7.0.0 \ No newline at end of file + - psutil==7.0.0 + - redis==5.2.1 + - rq==2.1.0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 95615786e..f5a56027a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,6 @@ plotly==5.22.0 captcha==0.5.0 pyopenms_viz==1.0.0 streamlit-js-eval -psutil==7.0.0 \ No newline at end of file +psutil==7.0.0 +redis==5.2.1 +rq==2.1.0 \ No newline at end of file diff --git a/test_gui.py b/test_gui.py index 101865cfc..0d56fe8aa 100644 --- a/test_gui.py +++ b/test_gui.py @@ -5,7 +5,6 @@ from pathlib import Path import shutil - @pytest.fixture def launch(request): test = AppTest.from_file(request.param) From 05cfa3d45c94382f4094fc12c53d648a88ce4eae Mon Sep 17 00:00:00 2001 From: Soubhagya Mohapatra Date: Sun, 23 Mar 2025 10:09:58 +0530 Subject: [PATCH 3/3] Use pid deletion code --- src/Workflow.py | 7 ++++++- src/workflow/CommandExecutor.py | 6 ++++++ src/workflow/WorkflowManager.py | 3 --- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/Workflow.py b/src/Workflow.py index 41f096bab..8d70d9209 100644 --- a/src/Workflow.py +++ b/src/Workflow.py @@ -90,12 +90,17 @@ def execution(self) -> None: depends_on=featureFinderMetaboJob ) self.logger.log("Exporting consensus features to pandas DataFrame...") - self.queue.enqueue( + exportConsensusFeatureJob = self.queue.enqueue( self.executor.run_python, "export_consensus_feature_df", input_output={"in": out_fl[0]}, depends_on=featureLinkerUnlabeledKDJob ) + # Delete pid dir path to indicate workflow is done + self.queue.enqueue( + self.executor.end_run, + depends_on=exportConsensusFeatureJob + ) # Check if adduct detection should be run. if self.params["run-python-script"]: # Example for a custom Python tool, which is located in src/python-tools. diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 603df134c..51d599ca4 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -211,6 +211,12 @@ def stop(self) -> None: shutil.rmtree(self.pid_dir, ignore_errors=True) self.logger.log("Workflow stopped.") + def end_run(self) -> None: + """ + Cleans up the PID directory by removing all PID files. + """ + shutil.rmtree(self.pid_dir, ignore_errors=True) + def run_python(self, script_file: str, input_output: dict = {}) -> None: """ Executes a specified Python script with dynamic input and output parameters, diff --git a/src/workflow/WorkflowManager.py b/src/workflow/WorkflowManager.py index defc4a74a..56378877d 100644 --- a/src/workflow/WorkflowManager.py +++ b/src/workflow/WorkflowManager.py @@ -54,9 +54,6 @@ def workflow_process(self) -> None: self.logger.log("WORKFLOW FINISHED") except Exception as e: self.logger.log(f"ERROR: {e}") - # Delete pid dir path to indicate workflow is done - # TODO: since jobs are in queue, deleting pids directory causes issue, need to find a workaround to delete once all three jobs are over - # shutil.rmtree(self.executor.pid_dir, ignore_errors=True) def show_file_upload_section(self) -> None: """