diff --git a/.gitignore b/.gitignore index 61e63c2..d88b503 100644 --- a/.gitignore +++ b/.gitignore @@ -161,7 +161,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ ### Python Patch ### # Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration diff --git a/cpg_flow_test/jobs/parse_pyramid.py b/cpg_flow_test/jobs/parse_pyramid.py new file mode 100644 index 0000000..82cee0e --- /dev/null +++ b/cpg_flow_test/jobs/parse_pyramid.py @@ -0,0 +1,19 @@ +""" +simple python job, with no further imports +""" + + +def parse_pyramid_job(input_file: str) -> str: + """ + This is a simple example of a job that prints the contents of a file. + + Args: + input_file (str): the path to the file to print + """ + + with open(input_file) as f: + contents = f.read() + + print(f'Contents of {input_file}:') + print(contents) + return contents diff --git a/cpg_flow_test/run-test-workflow.sh b/cpg_flow_test/run-test-workflow.sh index dafa303..e98cd4d 100755 --- a/cpg_flow_test/run-test-workflow.sh +++ b/cpg_flow_test/run-test-workflow.sh @@ -1,7 +1,7 @@ #!/bin/bash DEFAULT_IMAGE_REPOSITORY="australia-southeast1-docker.pkg.dev/cpg-common/images" -IMAGE_TAG="cpg_flow:0.1.0-alpha.14" +IMAGE_TAG="cpg_flow:0.1.2" IMAGE_PATH="$DEFAULT_IMAGE_REPOSITORY/$IMAGE_TAG" PATH_OVERRIDE=0 diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index cf36781..b84d582 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -6,7 +6,8 @@ from cpg_flow.targets.sequencing_group import SequencingGroup from cpg_utils import Path from cpg_utils.hail_batch import get_batch -from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi +from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi, parse_pyramid_job +from jobs.parse_pyramid import parse_pyramid_job """ Here's a fun programming task with four interdependent steps, using the concept of **prime numbers** and their relationships: @@ -193,3 +194,24 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu data=self.expected_outputs(multicohort), jobs=job_pyramid, ) + + +@stage(required_stages=[BuildAPrimePyramid]) +class ParsePyramid(MultiCohortStage): + + def expected_outputs(self, multicohort: MultiCohort) -> Path: + return self.prefix / f'{multicohort.name}_parsed.txt' + + def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutput | None: + + expected_output = self.expected_outputs(multicohort) + + input_file = get_batch().read_input(inputs.as_str(multicohort, BuildAPrimePyramid, 'pyramid')) + + # new python job + job = get_batch().new_python_job(name=f'parse_pyramid_{multicohort.name}') + result = job.call(parse_pyramid_job, input_file) + + get_batch().write_output(result.as_str(), str(expected_output)) + + return self.make_outputs(multicohort, data=expected_output, jobs=job) diff --git a/cpg_flow_test/workflow.py b/cpg_flow_test/workflow.py index 3386c0e..3428c3b 100644 --- a/cpg_flow_test/workflow.py +++ b/cpg_flow_test/workflow.py @@ -5,7 +5,7 @@ from cpg_flow.workflow import run_workflow from cpg_utils.config import set_config_paths -from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi +from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi, ParsePyramid TMP_DIR = os.getenv('TMP_DIR') CONFIG_FILE = str(Path(__file__).parent / 'config.toml') @@ -14,7 +14,7 @@ def run_cpg_flow(dry_run=False): - workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi] + workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi, ParsePyramid] config_paths = os.environ['CPG_CONFIG_PATH'].split(',')