Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/

### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
Expand Down
19 changes: 19 additions & 0 deletions cpg_flow_test/jobs/parse_pyramid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
simple python job, with no further imports
"""


def parse_pyramid_job(input_file: str) -> str:
"""
This is a simple example of a job that prints the contents of a file.

Args:
input_file (str): the path to the file to print
"""

with open(input_file) as f:
contents = f.read()

print(f'Contents of {input_file}:')
print(contents)
return contents
2 changes: 1 addition & 1 deletion cpg_flow_test/run-test-workflow.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

DEFAULT_IMAGE_REPOSITORY="australia-southeast1-docker.pkg.dev/cpg-common/images"
IMAGE_TAG="cpg_flow:0.1.0-alpha.14"
IMAGE_TAG="cpg_flow:0.1.2"
IMAGE_PATH="$DEFAULT_IMAGE_REPOSITORY/$IMAGE_TAG"

PATH_OVERRIDE=0
Expand Down
24 changes: 23 additions & 1 deletion cpg_flow_test/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path
from cpg_utils.hail_batch import get_batch
from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi
from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi, parse_pyramid_job
from jobs.parse_pyramid import parse_pyramid_job

"""
Here's a fun programming task with four interdependent steps, using the concept of **prime numbers** and their relationships:
Expand Down Expand Up @@ -193,3 +194,24 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu
data=self.expected_outputs(multicohort),
jobs=job_pyramid,
)


@stage(required_stages=[BuildAPrimePyramid])
class ParsePyramid(MultiCohortStage):

def expected_outputs(self, multicohort: MultiCohort) -> Path:
return self.prefix / f'{multicohort.name}_parsed.txt'

def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutput | None:

expected_output = self.expected_outputs(multicohort)

input_file = get_batch().read_input(inputs.as_str(multicohort, BuildAPrimePyramid, 'pyramid'))

# new python job
job = get_batch().new_python_job(name=f'parse_pyramid_{multicohort.name}')
result = job.call(parse_pyramid_job, input_file)

get_batch().write_output(result.as_str(), str(expected_output))

return self.make_outputs(multicohort, data=expected_output, jobs=job)
4 changes: 2 additions & 2 deletions cpg_flow_test/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from cpg_flow.workflow import run_workflow
from cpg_utils.config import set_config_paths
from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi
from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi, ParsePyramid

TMP_DIR = os.getenv('TMP_DIR')
CONFIG_FILE = str(Path(__file__).parent / 'config.toml')
Expand All @@ -14,7 +14,7 @@


def run_cpg_flow(dry_run=False):
workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi]
workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi, ParsePyramid]

config_paths = os.environ['CPG_CONFIG_PATH'].split(',')

Expand Down
Loading