Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpg_flow_test/configs/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,6 @@ sequencing_type = 'genome'

[resource_overrides]
# Override the default resources for a stage.

[images]
ubuntu = 'australia-southeast1-docker.pkg.dev/cpg-common/images/sv/ubuntu1804:latest'
5 changes: 3 additions & 2 deletions cpg_flow_test/jobs/build_pyramid.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

from loguru import logger

from hailtop.batch import Batch
from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, hail_batch
from cpg_utils import Path, config, hail_batch


def build_pyramid_job(
Expand All @@ -23,6 +22,7 @@ def build_pyramid_job(
sg_output_files: list[Path] = []
for sg in sequencing_groups:
job = b.new_bash_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id})
job.image(config.config_retrieve(['images', 'ubuntu']))

no_evens_input_file = b.read_input(input_files[sg.id]['no_evens'])

Expand Down Expand Up @@ -54,6 +54,7 @@ def build_pyramid_job(

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_bash_job(name=title, attributes=job_attrs | {'tool': 'cat'})
job.image(config.config_retrieve(['images', 'ubuntu']))
job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.pyramid}')
Expand Down
4 changes: 2 additions & 2 deletions cpg_flow_test/jobs/cumulative_calc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from loguru import logger

from hailtop.batch import Batch
from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, hail_batch
from cpg_utils import Path, config, hail_batch


def cumulative_calc_job(
Expand All @@ -15,6 +14,7 @@ def cumulative_calc_job(
) -> list[Job]:
b = hail_batch.get_batch()
job = b.new_job(name=f'Cumulative Calc: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))
primes_path = b.read_input(input_file_path)

cmd = f"""
Expand Down
7 changes: 4 additions & 3 deletions cpg_flow_test/jobs/filter_evens.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Any

from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, hail_batch
from cpg_utils import Path, config, hail_batch


def filter_evens_job(
Expand All @@ -22,6 +20,7 @@ def filter_evens_job(
sg_output_files = []
for sg in sequencing_groups:
job = b.new_bash_job(name=f'{title}: {sg.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))
input_file_path = input_files[sg.id]['cumulative']
sg_output_files.append(sg_outputs[sg.id])

Expand All @@ -41,6 +40,8 @@ def filter_evens_job(

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_bash_job(name=title, attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))

job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.no_evens_file}')
Expand Down
3 changes: 2 additions & 1 deletion cpg_flow_test/jobs/first_n_primes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, hail_batch
from cpg_utils import Path, config, hail_batch


def first_n_primes_job(
Expand All @@ -14,6 +14,7 @@ def first_n_primes_job(
) -> Job:
b = hail_batch.get_batch()
job = b.new_job(name=f'First N Primes: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))
id_sum_path = b.read_input(input_file_path)

job.command(f"""
Expand Down
3 changes: 2 additions & 1 deletion cpg_flow_test/jobs/iterative_digit_sum.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, hail_batch
from cpg_utils import Path, config, hail_batch


def iterative_digit_sum_job(
Expand All @@ -13,6 +13,7 @@ def iterative_digit_sum_job(
) -> Job:
b = hail_batch.get_batch()
job = b.new_job(name=f'Iterative Digit Sum: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))

job.command(f"""\
#!/bin/bash
Expand Down
4 changes: 2 additions & 2 deletions cpg_flow_test/jobs/say_hi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from loguru import logger

from hailtop.batch import Batch
from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils import Path, hail_batch
from cpg_utils import Path, config, hail_batch


def say_hi_job(
Expand All @@ -14,6 +13,7 @@ def say_hi_job(
) -> Job:
b = hail_batch.get_batch()
job = b.new_job(name=f'Say Hi: {sequencing_group.id}', attributes=job_attrs)
job.image(config.config_retrieve(['images', 'ubuntu']))

job.command(f"""
echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi}
Expand Down
8 changes: 4 additions & 4 deletions cpg_flow_test/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
---

### Task: Prime Pyramid
Write a program that builds a "Prime Pyramid" based on a given input number \( N \). The pyramid is built in four steps:
Write a program that builds a "Prime Pyramid" based on a given input number `N`. The pyramid is built in four steps:

#### Step 1: **Generate Prime Numbers**
Write a function to generate the first `N` prime numbers. i.e. if `N=5``, the output would be `[2, 3, 5, 7, 11]`.
Expand Down Expand Up @@ -60,7 +60,7 @@ def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, Path]
'primes': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_primes.txt',
}

def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput:
def queue_jobs(self, sequencing_group: SequencingGroup, _inputs: StageInput) -> StageOutput:
# Print out alignment input for this sequencing group
logger.info('-----ALIGNMENT INPUT-----')
logger.info(sequencing_group.alignment_input)
Expand All @@ -75,13 +75,13 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S
)

# Generate first N primes
primes_output_path = str(self.expected_outputs(sequencing_group).get('primes', ''))
job_primes = first_n_primes.first_n_primes_job(
sequencing_group,
outputs['id_sum'],
self.get_job_attrs(sequencing_group),
outputs['primes'],
)

# set a dependency
job_primes.depends_on(job_id_sum)

Expand Down Expand Up @@ -122,7 +122,7 @@ def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, Path]
'hello': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_cumulative.txt',
}

def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None:
def queue_jobs(self, sequencing_group: SequencingGroup, _inputs: StageInput) -> StageOutput | None:
outputs = self.expected_outputs(sequencing_group)
return self.make_outputs(
sequencing_group,
Expand Down
127 changes: 1 addition & 126 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ requires-python = ">=3.10,<3.12"
dependencies = [
"analysis-runner>=3.2.2",
"cpg-flow~=1.3",
"cpg-utils>=5.1.1",
"cpg-utils>=5.5.0",
"hail~=0.2.137",
"loguru>=0.7.3",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -81,131 +80,7 @@ select = [
fixable = ["ALL"]

ignore = [
"ANN204", # Missing type annotation for special method `__init__`
"Q000", # Single quotes found but double quotes preferred
"S101", # Use of assert detected
"SLF001", # Private member accessed: `_preemptible`

"ARG001", # Unused function argument
"ARG002", # Unused method argument

"PLR2004", # Magic value used

"ANN001",
"ANN202",
"C408",
"TID252",
"RET504",
"ERA001",
"UP032",
"RUF100",
"ISC001",
"PIE804",
"F401",
"C901",
"W605",
"RET505",
"ANN003",
"RUF013",
"UP031",
"RUF010",
"B006",
"ANN002",
"B023",
"EXE001",
"G001",
"SIM108",
"RUF005",
"G002",
"PD901",
"N999",
"SIM118",
"SIM102",
"PLW2901",
"S603",
"ARG005",
"PGH003",
"B904",
"N802",
"ISC003",
"ANN205",
"S607",
"RUF015",
"E701",
"N818",
"PIE790",
"N803",
"A002",
"RUF012",
"W291",
"S113",
"S311",
"N806",
"PLR5501",
"F403",
"SIM115",
"B007",
"F841",
"C405",
"C419",
"SIM300",
"PD011",
"UP015",
"S602",
"Q002",
"ISC002",
"COM819",
"C416",
"DTZ005",
"G003",
"S608",
"PIE808",
"B008",
"S108",
"E402",
"S605",
"F821",
"RET507",
"RET503",
"UP030",
"UP026",
"PLR1714",
"C403",
"PLR1711",
"PIE810",
"DTZ011",
"S105",
"BLE001",
"C401",
"C400",
"PLR0402",
"SIM201",
"RET506",
"C417",
"PD010",
"PLW1510",
"A001",
"W292",
"PYI024",
"Q003",
"S301",
"RET501",
"PD003",
"SIM117",
"RUF002",
"SIM105",
"E713",
"S324",
"S310",
"Q001",
"UP020",
"S506",
"N805",
"E712",
"E401",
"SIM212",
"DTZ002",
"UP007",
]

[tool.ruff.lint.isort]
Expand Down
Loading