Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: '3.11'

- name: Install dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/security.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: '3.11'

- name: Install dependencies
run: |
Expand Down
15 changes: 8 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
default_language_version:
python: python3.10
python: python3.11
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
rev: v6.0.0
hooks:
- id: check-yaml
exclude: '\.*conda/.*'
Expand All @@ -16,21 +16,22 @@ repos:
- id: check-added-large-files

- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.43.0
rev: v0.47.0
hooks:
- id: markdownlint
args: [-s, .markdownlint.json]

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.8.3
rev: v0.14.13
hooks:
- id: ruff
- id: ruff-format
name: ruff (format)
args: [--fix]
- id: ruff
name: ruff (lint)

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
rev: v1.19.1
hooks:
- id: mypy
args:
Expand Down
3 changes: 3 additions & 0 deletions cpg_flow_test/configs/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,6 @@ sequencing_type = 'genome'

[resource_overrides]
# Override the default resources for a stage.

[images]
ubuntu = 'australia-southeast1-docker.pkg.dev/cpg-common/images/sv/ubuntu1804:latest'
6 changes: 0 additions & 6 deletions cpg_flow_test/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
from jobs.build_pyramid import build_pyramid
from jobs.cumulative_calc import cumulative_calc
from jobs.filter_evens import filter_evens
from jobs.first_n_primes import first_n_primes
from jobs.iterative_digit_sum import iterative_digit_sum
from jobs.say_hi import say_hi
45 changes: 23 additions & 22 deletions cpg_flow_test/jobs/build_pyramid.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
from typing import Any

from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, config, hail_batch

def build_pyramid(
b: Batch,

def build_pyramid_job(
sequencing_groups: list[SequencingGroup],
input_files: dict[str, Any],
job_attrs: dict[str, str],
output_file_path: str,
output_file_path: Path,
) -> list[Job]:
b = hail_batch.get_batch()

title = 'Build A Pyramid'
# Compute the no evens list for each sequencing group
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id})
no_evens_input_file_path = input_files[sg.id]['no_evens']
no_evens_input_file = b.read_input(no_evens_input_file_path)
sg_jobs: list[Job] = []
sg_output_files: list[Path] = []
for sg in sequencing_groups:
job = b.new_bash_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id})
job.image(config.config_retrieve(['images', 'ubuntu']))

id_sum_input_file_path = input_files[sg.id]['id_sum']
id_sum_input_file = b.read_input(id_sum_input_file_path)
no_evens_input_file = b.read_input(input_files[sg.id]['no_evens'])

pyramid_output_file_path = str(sg.dataset.prefix() / f'{sg.id}_pyramid.txt')
id_sum_input_file = b.read_input(input_files[sg.id]['id_sum'])

pyramid_output_file_path = sg.dataset.prefix() / f'{sg.id}_pyramid.txt'
sg_output_files.append(pyramid_output_file_path)
cmd = f"""
job.command(f"""
pyramid=()
max_row_size=$(cat {no_evens_input_file} | rev | cut -d' ' -f1 | rev)
rows=($(cat {no_evens_input_file} | cut -d' ' -f2-))
Expand All @@ -44,14 +47,14 @@ def build_pyramid(
done

printf "%s\\n" "${{pyramid[@]}}" > {job.pyramid_file}
"""
""")

job.command(cmd)
b.write_output(job.pyramid_file, pyramid_output_file_path)
sg_jobs.append(job)

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title, attributes=job_attrs | {'tool': 'cat'})
job = b.new_bash_job(name=title, attributes=job_attrs | {'tool': 'cat'})
job.image(config.config_retrieve(['images', 'ubuntu']))
job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.pyramid}')
Expand All @@ -60,6 +63,4 @@ def build_pyramid(
logger.info('-----PRINT PYRAMID-----')
logger.info(output_file_path)

all_jobs = [job, *sg_jobs]

return all_jobs
return [job, *sg_jobs]
20 changes: 11 additions & 9 deletions cpg_flow_test/jobs/cumulative_calc.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, config, hail_batch


def cumulative_calc(
b: Batch,
def cumulative_calc_job(
sequencing_group: SequencingGroup,
input_file_path: str,
input_file_path: Path,
job_attrs: dict[str, str],
output_file_path: str,
output_file_path: Path,
) -> list[Job]:
title = f'Cumulative Calc: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
b = hail_batch.get_batch()
job = b.new_job(name=f'Cumulative Calc: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))
primes_path = b.read_input(input_file_path)

cmd = f"""
Expand Down
48 changes: 22 additions & 26 deletions cpg_flow_test/jobs/filter_evens.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,53 @@
from typing import Any
from loguru import logger

from cpg_flow.stage import Stage, StageInput
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, config, hail_batch

def filter_evens(
b: Batch,

def filter_evens_job(
sequencing_groups: list[SequencingGroup],
input_files: dict[str, dict[str, Any]],
input_files: dict[str, dict[str, Path]],
job_attrs: dict[str, str],
sg_outputs: dict[str, dict[str, Any]],
output_file_path: str,
sg_outputs: dict[str, dict[str, Path] | Path],
) -> list[Job]:
b = hail_batch.get_batch()
title = 'Filter Evens'

# Compute the no evens list for each sequencing group
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs)
for sg in sequencing_groups:
job = b.new_bash_job(name=f'{title}: {sg.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))
input_file_path = input_files[sg.id]['cumulative']
no_evens_input_file = b.read_input(input_file_path)
no_evens_output_file_path = str(sg_outputs[sg.id])
sg_output_files.append(no_evens_output_file_path)
sg_output_files.append(sg_outputs[sg.id])

cmd = f"""
numbers=($(cat {no_evens_input_file}))
job.command(f"""
numbers=($(cat {b.read_input(input_file_path)}))
result=()
for num in "${{numbers[@]}}"; do
if (( num % 2 != 0 )); then
result+=("$num")
fi
done
echo "{sg.id}: ${{result[@]}}" > {job.sg_no_evens_file}
"""
""")

job.command(cmd)
b.write_output(job.sg_no_evens_file, no_evens_output_file_path)
b.write_output(job.sg_no_evens_file, sg_outputs[sg.id])
sg_jobs.append(job)

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title, attributes=job_attrs)
job = b.new_bash_job(name=title, attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))

job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.no_evens_file}')
b.write_output(job.no_evens_file, output_file_path)
b.write_output(job.no_evens_file, sg_outputs['no_evens'])

logger.info('-----PRINT NO EVENS-----')
logger.info(output_file_path)

all_jobs = [job, *sg_jobs]
logger.info(sg_outputs['no_evens'])

return all_jobs
return [job, *sg_jobs]
32 changes: 14 additions & 18 deletions cpg_flow_test/jobs/first_n_primes.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, config, hail_batch


def first_n_primes(
b: Batch,
def first_n_primes_job(
sequencing_group: SequencingGroup,
input_file_path: str,
input_file_path: Path,
job_attrs: dict[str, str],
output_file_path: str,
depends_on: Job,
) -> list[Job]:
title = f'First N Primes: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
output_file_path: Path,
) -> Job:
b = hail_batch.get_batch()
job = b.new_job(name=f'First N Primes: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))
id_sum_path = b.read_input(input_file_path)

if depends_on:
job.depends_on(depends_on)

cmd = f"""
job.command(f"""
is_prime() {{
local num=$1
if [ $num -lt 2 ]; then
Expand All @@ -46,9 +44,7 @@ def first_n_primes(
done

echo ${{primes[@]}} > {job.primes}
"""

job.command(cmd)
""")

logger.info('-----PRINT PRIMES-----')
logger.info(output_file_path)
Expand Down
25 changes: 13 additions & 12 deletions cpg_flow_test/jobs/iterative_digit_sum.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, config, hail_batch


def iterative_digit_sum(
b: Batch,
def iterative_digit_sum_job(
sequencing_group: SequencingGroup,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
title = f'Iterative Digit Sum: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
output_file_path: Path,
) -> Job:
b = hail_batch.get_batch()
job = b.new_job(name=f'Iterative Digit Sum: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))

cmd = f"""\
job.command(f"""\
#!/bin/bash

# Function to calculate the iterative digit sum
Expand Down Expand Up @@ -49,8 +51,7 @@ def iterative_digit_sum(
result=$(extract_digits_and_sum {sequencing_group.id})
echo "Result: $result\n"
echo $result > {job.id_sum}
"""
job.command(cmd)
""")

logger.info('-----PRINT ID_SUM-----')
logger.info(output_file_path)
Expand Down
26 changes: 13 additions & 13 deletions cpg_flow_test/jobs/say_hi.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

def say_hi(
b: Batch,
from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, config, hail_batch


def say_hi_job(
sequencing_group: SequencingGroup,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
title = f'Say Hi: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
output_file_path: Path,
) -> Job:
b = hail_batch.get_batch()
job = b.new_job(name=f'Say Hi: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))

cmd = f"""
job.command(f"""
echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi}
"""

job.command(cmd)
""")

logger.info('-----PRINT SAY HI-----')
logger.info(output_file_path)
Expand Down
Loading
Loading