Skip to content

BioDepot/temporal-scheduler

Repository files navigation

Biodepot Scheduler

Description

This scheduler is based on Temporal.io workflow framework with key modifications to optimize bioinformatics workflows. We support a hybrid architecture across cloud, HPC, and local servers. Asynchronous execution is also supported to reduce execution time. This workflow has been tested using bulk RNA-seq datasets.

Getting started

Prerequisites.

See the singularity docs for installation instructions. I have used version 3.5.3 during development, so, if other versions fail, open a github issue and try using 3.5.3 while waiting.

Having the docker NVIDIA container toolkit is advised but not strictly required. If you don't have this, workflows requiring GPUs will fail.

Setup poetry env / install dependencies.

Create a virtual environment using python3.11.

poetry env use python3.11

To install poetry dependencies, run the following in the project root dir.

poetry install

Be sure to activate the resulting environment by running poetry shell before running any of the commands described below. (Depending on the poetry version, you may need to first add the poetry shell plugin with poetry self add poetry-plugin-shell).

Usage.

Option 1. run_bwb_workflow.py script

This is a simple approach meant mainly for development. The main merit of this approach is that it runs all workers required by a given workflow / config locally and terminates them once the workflow completes, eliminating complicated worker management.

  1. Run a temporal instance, either through docker compose (see the temporal docker compose repo here) or by installing temporal and running temporal server start-dev. If you use the docker compose option, use the postgres configuration, since the default is buggy.

  2. Optionally, create a minio instance to coordinate file transfers between worker processes. Note that this is only needed if the use_local_storage option is set to False for a given workflow.

  3. Create a .env file for the BWB scheduler with the following keys:

    • SCHED_STORAGE_DIR - This is the directory where the scheduler will store its outputs. If you are using an S3 instance to coordinate file transfer between workers (i.e. use_local_storage=false in your workflow definition), the scheduler will upload from and download to this directory.
    • MINIO_ENDPOINT_URL - Optional. This must begin with “http://”, otherwise fsspec will fail to connect.
    • MINIO_ACCESS_KEY - Optional.
    • MINIO_SECRET_KEY - Optional.
  4. Inside the poetry environment, run a workflow with python3 bwb/scheduling_service/run_bwb_workflow.py [WORKFLOW_JSON_PATH] [WORKFLOW_CONFIG_PATH], with the latter parameter being optional. For example, the following runs the salmon-demo workflow locally:

python3 bwb/scheduling_service/run_bwb_workflow.py bwb/scheduling_service/test_workflows/test_scheme.json

(NOTE: Salmon quant sometimes gets OOM-killed when running with singularity on low-RAM hardware. This is not an issue with docker or when running on a server with a lot of RAM. I suspect it has to do with singularity processes being run inside the same process group as the process that spawned them, but I'm not entirely sure.)

Option 2. Using the server with docker compose.

The deployment directory contains a docker-compose which sets up all necessary dependencies plus a single local temporal worker; instructions on deployment can be found in that directory's README. (Be sure to correctly configure the .env within that directory before running, since this is necessary to setup worker RAM / CPU / GPUs). The API presents the following three endpoints:

  • start_workflow (POST) - Takes request whose body is JSON specification of scheme graph as generated by BWB client. On success, returns JSON object with keys workflow_id and run_id which uniquely identify the started temporal workflow. Gives 4** code on failure.
  • stop_workflow (POST) - Takes a JSON object with keys "workflow_id" and "run_id", corresponding to those returned by start_workflow endpoint. Gives 4** code on failure.
  • /workflow_status (POST) - Takes a JSON object with keys "workflow_id" and "run_id", corresponding to those returned by start_workflow endpoint. On success, returns JSON object with keys workflow_status (Started | Finished | Failed) and node_statuses, which has inner keys outputs, logs, and status. outputs maps node IDs to a dictionary; in this inner dictionary, cmd set IDs (identifying each run of a node that produces a distinct set of outputs) are keys and the values are key -> value pairs returned by node container as outputs (i.e. /tmp/output contents, as in BWB). logs has the same structure, but the innermost values are the logs of container runs identified by node / CMD set ID pairs. status shows how many node CMDs have run and how many are remaining.
  • /add_worker_to_workflow (POST) - Takes a JSON request with keys worker_queue(see "Deploying Workers"), workflow_id (returned by start_workflow), worker_mem_mb, worker_cpus, and worker_gpus. worker_queue is a queue unique to a particular worker. This request will allow the identified workflow to begin scheduling commands to that worker. (The motivations for this approach, as opposed to the standard temporal publish-subscribe model, will be described in my thesis.)
curl -H "Content-Type: application/json" --data @bwb/scheduling_service/test_workflows/test_scheme_req.json http://localhost:8000/start_workflow && echo

You'll get response w/ key workflow_id. Now add a worker to process it. Be sure that the values given here for worker resources (RAM, CPUs, GPUs) accord with those in the .env file for the docker-compose.

curl -H "Content-Type: application/json" --data '{"workflow_id": "[WORKFLOW_ID]", "worker_queue": "worker1", "worker_cpus": [WORKER_CPUS], "worker_gpus": [WORKER_GPUS], "worker_mem_mb": [WORKER_MEM_MB]}' http://localhost:8000/add_worker_to_workflow && echo

I am aware that the workflow_status endpoint should probably be broken up, and that the API in general is fairly limited. If you have any endpoints you want to suggest, please let me know.

Worker Deployment.

This is the major pain point of the existing architecture, because I am unsure whether responsibility for worker management should fall on the user, the API, or some other service that will be contacted by the UI. The run_bwb_workflow.py script will create workers as needed and delete them once the given workflow is done, and the docker-compose sets up a single worker with resources given in the deployment/.env file.

To deploy a regular worker, run the following:

python bwb/scheduling_service/worker.py regular --queue [UNIQUE_QUEUE_NAME] --ram [WORKER_RAM] --cpu_cores [WORKER_CPUS] --gpus [WORKER_GPUS]

Then run the following to assign this worker to a particular workflow:

curl -H "Content-Type: application/json" --data '{"workflow_id": "[WORKFLOW_ID]", "worker_queue": "worker1", "worker_cpus": [WORKER_CPUS], "worker_gpus": [WORKER_GPUS], "worker_mem_mb": [WORKER_MEM_MB]}' http://localhost:8000/add_worker_to_workflow && echo

Workers for SLURM require special treatment, since these must manage an SSH connection with the SLURM server. To run a SLURM worker connecting to a given slurm server, run the following, where CONFIG_PATH is the path to a valid configuration file for a workflow specifying a SLURM executor; one such example is given in bwb/scheduling_service/test_workflows/nsf_bulkrna_slurm_config.json.

python bwb/scheduling_service/worker.py slurm --config [CONFIG_PATH]

The SLURM config may also include optional port and transfer_port keys if the SSH endpoint is not on port 22.

For a same-machine local Slurm test target, use:

bash scripts/local_slurm_test_env_up.sh
python3 -m bwb_scheduler.benchmark_harness benchmarks/test_scheme_local_docker_slurm_manifest.json --wait-for-api-seconds 120 --poll-seconds 10 --timeout-seconds 7200
bash scripts/local_slurm_test_env_down.sh

Or run the whole thing in one shot:

bash scripts/run_local_slurm_e2e.sh

To run the full salmon path from Orange .ows IR through scheduler JSON decode, Temporal submission, and execution on the local Docker Slurm cluster:

bash scripts/run_salmon_ir_local_slurm_e2e.sh

This wrapper:

  • decodes star_salmon_short/star_salmon_short.ows with python3 -m bwb_scheduler.ir_to_scheduler
  • writes generated files under benchmarks/generated/
  • brings up the local Slurm + Temporal test environment
  • submits the decoded workflow and waits for completion
  • tears the environment back down unless KEEP_LOCAL_SLURM_TEST_ENV_UP=true

Useful options:

  • pass a different .ows file as the first argument: bash scripts/run_salmon_ir_local_slurm_e2e.sh /path/to/workflow.ows
  • set RUN_ID=my-run to override the decoded workflow run_id
  • set KEEP_LOCAL_SLURM_TEST_ENV_UP=true to leave the environment running after the benchmark finishes

Useful scripts:

  • scripts/setup_local_slurm_host.sh
  • scripts/run_local_docker_slurm_worker.sh --detach
  • scripts/local_slurm_test_env_up.sh
  • scripts/local_slurm_test_env_down.sh
  • scripts/run_local_slurm_e2e.sh
  • scripts/run_salmon_ir_local_slurm_e2e.sh
  • scripts/run_arabidopsis_conditional_local_slurm_e2e.sh

To run the Arabidopsis conditional path from the upstream Nextflow IR round-trip through scheduler JSON decode, Temporal submission, and local Docker Slurm execution:

bash scripts/run_arabidopsis_conditional_local_slurm_e2e.sh

This wrapper:

  • generates the Arabidopsis IR with /mnt/pikachu/bwb-nextflow-utils/scripts/run_rnaseq_arabidopsis_roundtrip.py
  • decodes that IR with python3 -m bwb_scheduler.nextflow_ir_to_scheduler
  • stages the Arabidopsis FASTQs, transcript FASTA, and prebuilt index into scheduler storage
  • starts the local Temporal + Slurm harness
  • submits the workflow and waits for completion
  • tears the environment back down unless KEEP_LOCAL_SLURM_TEST_ENV_UP=true

Useful options:

  • set MAKE_INDEX=true or MAKE_INDEX=false to exercise the conditional and prebuilt-index modes
  • set RUN_ID=my-run to control the generated scheduler JSON and manifest names
  • set ROUNDTRIP_WORK_DIR=/tmp/custom_roundtrip to keep upstream IR/BWB outputs in a custom location
  • set USE_LOCAL_STORAGE=true for the local shared-filesystem harness; this is the default and recommended mode

The latest successful Arabidopsis conditional validation artifact is:

  • benchmarks/results/benchmark_run_20260328T074511Z.json

Running the Bulk RNA workflow.

Below, we give steps for running the Bulk RNA seq workflow using the run_bwb_workflow.py script.

Running locally.

  1. Stage the reads into the container filesystem. Because the run_id for the bulk rna workflow is bulkrna, the path [SCHED_STORAGE_DIR]/bulkrna/singularity_data will be mounted as “/data” for all containers run in it. Copy the FASTQ directory into this path.
  2. Edit “bwb/scheduling_service/test_workflows/bulk_rna_async.json” to give the correct value for the FASTQ input dir. Update the value of s3downloaddir on the "Start" widget. I.e. If the FASTQ files are stored in [SCHED_STORAGE_DIR]/bulkrna/singularity_data/fastq_files, then s3downloaddir should have the value /data/fastq_files, since that will be its path inside the containers.
  3. Start the workflow execution with
python3 bwb/scheduling_service/run_bwb_workflow bwb/scheduling_service/test_workflows/bulk_rna_async.json
  1. If you want, you can monitor the execution via the temporal UI at localhost:8080. Note that, if the workflow fails for some reason, the default behavior will be to retry it anytime you open a new worker. If you don’t want this to happen, terminate the running workflow from the Temporal UI.

Running on SLURM clusters.

  1. Follow all of the above steps to setup the bulk RNA workflow, but don’t run it yet.
  2. The configuration file for hybrid execution is located in bwb/sched_storage/test_workflows/nsf_bulkrna_slurm_config.json. All of the keys in this file will work properly for the Bridges2 supercomputer, except for executors->slurm->user; you need to edit this to your username on the Bridges2 login node. You will also want to edit executors->slurm->storage_dir to change the working directory to be used on the supercomputer. I think the rest of the format is fairly self explanatory, but, to be clear: mem is the amount of memory requested by sbatch script for a job type, time is the walltime, and cpus_per_task is the number of CPUs requested per task. “Partition” is the partition jobs are submitted to. The only field you may want to change for the bulkRNA workflow is the walltime, since a reasonable choice will vary based on the size of the input files.
  3. If you are intending to use the Bridges2 supercomputer, ensure that you have an SSH key to the supercomputer. Follow the instructions on the Bridges2 user guide to get a key.
  4. Run the workflow with
python3 bwb/scheduling_service/run_bwb_workflow bwb/scheduling_service/test_workflows/bulk_rna_async.json bwb/scheduling_service/test_workflows/nsf_bulkrna_slurm_config.json

Workflow Format.

The workflow definition language is inspired very directly by Orange’s format. The code to create a workflow in this format can be found within my fork of BWB and just serializes a number of fields from the BWB Scheme object, including some ones I have added. (This is a horrible, hacky approach and will soon be replaced by Varun’s UI. I will also make a standalone script to do this at some point.) The workflow format has the following relevant keys:

  • run_id - Essentially used to determine where outputs will be stored. When the workflow is run, all outputs related to it will be stored in [SCHED_STORAGE_DIR]/[run_id], and all inputs / outputs will be uploaded to a bucket called [run_id]. (I created this field before starting temporal integration, and it has no relation to the temporal run_id. It should probably be renamed.)
  • use_local_storage - If set to false, the scheduler will upload (download) file dependencies to (from) a minio instance specified in the .env. If you are running workers on different machines which share a filesystem, you can use this field and set SCHED_STORAGE_DIR to a directory within the shared mount in order to elide file transfers.
  • links - An array of objects in which each object is essentially the same as a BWB channel, transmitting an output from a source to a sink node. source_channel (sink_channel) gives the name of this parameter in the source (sink) node.
    • condition_ref - Optional string naming a top-level condition record that guards this edge.
    • condition - Optional object carrying inline condition metadata when a reference is not enough.
  • nodes - An array of objects, each specifying a node (basically a BWB widget) in a format very close to OWS. Each object therein has a uniquely-identifying integer key id. Each node has the following keys, among others:
    • arg_types - str -> object map. Key is a parameter name, value is an object specifying how that parameter’s value should be incorporated into the command for this node (widget). (I.e. If “argument” is true, the corresponding parameter will be passed as a direct argument to the command; if “flag” has a value, then it will be passed as “{flag} {value}”; if “env” has a value, it will be passed as an “{env} {value}”.) This is taken directly from BWB.
      • NOTE: If you are specifying an output for a node, you annoyingly need to create a “corresponding” key in “arg_types”, so that the scheduler knows the type of the output. This is a holdover from BWB and should be fixed at some point.
    • parameters - In theory, these are the user-input values for each widget in the OWS graph. If there is a key in “arg_types”, then there should be a corresponding key giving its user-input value in “parameters”. However, because the BWB application shoves a bunch of other information into the “parameters” field, this field also includes a bunch of extraneous information.
    • iterate_settings - Also taken directly from Orange / BWB format. If a given parameter is a list of values and is listed in “iterate_settings.iterableAttrs”, then the widget will run a command for each value of that parameter. (You can also specify a “group_size” n, such that it will split the array into groups of size n and run a command for each group.)
    • command - Same as BWB. Base command that will be run in the widget. Parameters will be appended to this according to the format specified in “arg_types”. As in BWB, this can be a list of commands; in this case, parameters will be appended to only the last command in the array. To reference parameters in other commands, use _bwb{PARAM_NAME} within the command string. (This is awful and should be replaced with a better system.)
    • image_name - Name of an image on docker hub that will be run for this node (widget). (This is also awful and should be edited so that local containers can be used.)
    • async - Boolean. If the node is iterable and async is true, then each command run on this node (widget) will individually generate outputs and trigger successors. E.g. In the bulk RNA async workflow, we set trimgalore as an async node. This means that, as soon as a pair of fastq files is processed, we immediately trigger STAR to process them, without waiting for every single pair of fastq files to complete, as in BWB.
    • end_async - Boolean. True if this node is a barrier. (I think this is redundant now because of “barrier_for”, I need to check.)
    • barrier_for - Either null or the ID of an asynchronous node. Say that node A is a barrier for node B. In that case, A must be a successor of B in the graph. In this case, A will wait until B has processed all inputs (and until all intermediate nodes between A and B have processed all inputs of B) before it begins execution. E.g. In the Bulk RNA async workflow, “Make Counts Table” is a barrier for Trimgalore, because each sample must be trimmed, ali gned, and quantified before we make the counts table.
    • input_files - A list of parameter names whose values will be container-relative file paths needed for the command to run. This is a recent addition which allows us to only upload / download the files we need to shared storage.
    • output_files - Output files using same format as above.
    • cores - The number of CPU cores required by an instance of a node to run.
    • mem_mb - Amount of RAM in megabytes required by an instance of a node to run.
    • gpus - The number of GPUs required by an instance of a node to run.
  • conditions - Optional array of condition records. The scheduler mirrors the Workflow IR vocabulary used by bwb-nextflow-utils: each record should have an id, and may also include fields like type, expr, source_language, and status.
  • condition_context - Optional object providing runtime values used to evaluate conditions, for example { "make_index": true }.
    • condition_ref - Optional string on a node naming the condition that guards execution of that node.
    • condition - Optional object on a node carrying inline condition metadata.

The current scheduler evaluates a small boolean subset of these conditions at workflow start:

  • variable names such as make_index
  • negation such as !make_index
  • true
  • false

Execution behavior:

  • nodes with a false condition are retained in status output and marked skipped
  • links with a false condition are pruned before graph execution
  • downstream nodes can still use static fallback parameters when a conditioned link is pruned

Conditional implementation and Arabidopsis local-harness plan:

  • docs/conditional_execution_runbook.md

Bulk RNA-seq Datasets

The datasets supporting the results of this article are generated as part of the NIH MorPhiC (Molecular Phenotypes of Null Alleles in Cells) program, which is supported by NIH grant UM1HG012651. These datasets are publicly available in the NCBI Gene Expression Omnibus (GEO) repository with accession numbers: GSE287843 and GSE288288.

Acknowledgement

This work is supported by the National Institutes of Health (NIH) grants U24HG012674, R03AI159286 and R21CA280520. This work used Bridges-2 at the Pittsburgh Supercomputing Center through allocation BIO230124: Exploring conducting bioinformatics analysis on HPC for the NIH Morphic project from the Advanced Cyberinfrastructure Coordination Ecosystem: Services & Support (ACCESS) program, which is supported by National Science Foundation grants #2138259, #2138286, #2138307, #2137603, and #2138296.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors