From d84ceb48b20d46dfe262154ea8f011556ee6d699 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 19 Feb 2025 15:50:20 +1000 Subject: [PATCH 1/5] add a new python job --- cpg_flow_test/jobs/parse_pyramid.py | 19 +++++++++++++++++++ cpg_flow_test/stages.py | 23 +++++++++++++++++++++++ cpg_flow_test/workflow.py | 4 ++-- 3 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 cpg_flow_test/jobs/parse_pyramid.py diff --git a/cpg_flow_test/jobs/parse_pyramid.py b/cpg_flow_test/jobs/parse_pyramid.py new file mode 100644 index 0000000..82cee0e --- /dev/null +++ b/cpg_flow_test/jobs/parse_pyramid.py @@ -0,0 +1,19 @@ +""" +simple python job, with no further imports +""" + + +def parse_pyramid_job(input_file: str) -> str: + """ + This is a simple example of a job that prints the contents of a file. + + Args: + input_file (str): the path to the file to print + """ + + with open(input_file) as f: + contents = f.read() + + print(f'Contents of {input_file}:') + print(contents) + return contents diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index cf36781..530eb22 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -7,6 +7,7 @@ from cpg_utils import Path from cpg_utils.hail_batch import get_batch from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi +from jobs.parse_pyramid import parse_pyramid_job """ Here's a fun programming task with four interdependent steps, using the concept of **prime numbers** and their relationships: @@ -193,3 +194,25 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu data=self.expected_outputs(multicohort), jobs=job_pyramid, ) + + +@stage(required_stages=[BuildAPrimePyramid]) +class ParsePyramid(MultiCohortStage): + + def expected_outputs(self, multicohort: MultiCohort) -> Path: + return self.prefix() / f'{multicohort.name}_parsed.txt' + + def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutput | None: + + expected_output = self.expected_outputs(multicohort) + + input_file = inputs.as_str(multicohort, BuildAPrimePyramid, 'pyramid') + input_local = get_batch().read_input(input_file) + + # new python job + job = get_batch().new_python_job(name=f'parse_pyramid_{multicohort.name}') + result = job.call(parse_pyramid_job, input_local) + + get_batch().write_output(result.as_str(), str(expected_output)) + + return self.make_outputs(multicohort, data=expected_output, jobs=job) diff --git a/cpg_flow_test/workflow.py b/cpg_flow_test/workflow.py index 3386c0e..3428c3b 100644 --- a/cpg_flow_test/workflow.py +++ b/cpg_flow_test/workflow.py @@ -5,7 +5,7 @@ from cpg_flow.workflow import run_workflow from cpg_utils.config import set_config_paths -from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi +from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi, ParsePyramid TMP_DIR = os.getenv('TMP_DIR') CONFIG_FILE = str(Path(__file__).parent / 'config.toml') @@ -14,7 +14,7 @@ def run_cpg_flow(dry_run=False): - workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi] + workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi, ParsePyramid] config_paths = os.environ['CPG_CONFIG_PATH'].split(',') From 94f2a97b8379016e5796b98098ecf6929717d69a Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 19 Feb 2025 16:03:15 +1000 Subject: [PATCH 2/5] prefix is a property --- cpg_flow_test/stages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index 530eb22..b231f8c 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -200,7 +200,7 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu class ParsePyramid(MultiCohortStage): def expected_outputs(self, multicohort: MultiCohort) -> Path: - return self.prefix() / f'{multicohort.name}_parsed.txt' + return self.prefix / f'{multicohort.name}_parsed.txt' def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutput | None: From 962a6a721bc85bcd0ce5792e49baac0d80c2b319 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 19 Feb 2025 16:05:14 +1000 Subject: [PATCH 3/5] update gitignore for PyCharm --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 61e63c2..d88b503 100644 --- a/.gitignore +++ b/.gitignore @@ -161,7 +161,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ ### Python Patch ### # Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration From fe614afdc06f4788c36426bea9d9ecf2c080ca26 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 19 Feb 2025 16:12:03 +1000 Subject: [PATCH 4/5] irritating --- cpg_flow_test/stages.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index b231f8c..f1a0815 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -206,12 +206,11 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu expected_output = self.expected_outputs(multicohort) - input_file = inputs.as_str(multicohort, BuildAPrimePyramid, 'pyramid') - input_local = get_batch().read_input(input_file) + input_file = get_batch().read_input(str(inputs.as_path(multicohort, BuildAPrimePyramid, 'pyramid'))) # new python job job = get_batch().new_python_job(name=f'parse_pyramid_{multicohort.name}') - result = job.call(parse_pyramid_job, input_local) + result = job.call(parse_pyramid_job, input_file) get_batch().write_output(result.as_str(), str(expected_output)) From 6660bb3eaad2b440798c8a2d3b348b3b955df81f Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 19 Feb 2025 16:39:18 +1000 Subject: [PATCH 5/5] update default image and syntax --- cpg_flow_test/run-test-workflow.sh | 2 +- cpg_flow_test/stages.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpg_flow_test/run-test-workflow.sh b/cpg_flow_test/run-test-workflow.sh index dafa303..e98cd4d 100755 --- a/cpg_flow_test/run-test-workflow.sh +++ b/cpg_flow_test/run-test-workflow.sh @@ -1,7 +1,7 @@ #!/bin/bash DEFAULT_IMAGE_REPOSITORY="australia-southeast1-docker.pkg.dev/cpg-common/images" -IMAGE_TAG="cpg_flow:0.1.0-alpha.14" +IMAGE_TAG="cpg_flow:0.1.2" IMAGE_PATH="$DEFAULT_IMAGE_REPOSITORY/$IMAGE_TAG" PATH_OVERRIDE=0 diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index f1a0815..b84d582 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -6,7 +6,7 @@ from cpg_flow.targets.sequencing_group import SequencingGroup from cpg_utils import Path from cpg_utils.hail_batch import get_batch -from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi +from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi, parse_pyramid_job from jobs.parse_pyramid import parse_pyramid_job """ @@ -206,7 +206,7 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu expected_output = self.expected_outputs(multicohort) - input_file = get_batch().read_input(str(inputs.as_path(multicohort, BuildAPrimePyramid, 'pyramid'))) + input_file = get_batch().read_input(inputs.as_str(multicohort, BuildAPrimePyramid, 'pyramid')) # new python job job = get_batch().new_python_job(name=f'parse_pyramid_{multicohort.name}')