Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: '3.11'

- name: Install dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/security.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: '3.11'

- name: Install dependencies
run: |
Expand Down
15 changes: 8 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
default_language_version:
python: python3.10
python: python3.11
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
rev: v6.0.0
hooks:
- id: check-yaml
exclude: '\.*conda/.*'
Expand All @@ -16,21 +16,22 @@ repos:
- id: check-added-large-files

- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.43.0
rev: v0.47.0
hooks:
- id: markdownlint
args: [-s, .markdownlint.json]

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.8.3
rev: v0.14.13
hooks:
- id: ruff
- id: ruff-format
name: ruff (format)
args: [--fix]
- id: ruff
name: ruff (lint)

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
rev: v1.19.1
hooks:
- id: mypy
args:
Expand Down
6 changes: 0 additions & 6 deletions cpg_flow_test/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
from jobs.build_pyramid import build_pyramid
from jobs.cumulative_calc import cumulative_calc
from jobs.filter_evens import filter_evens
from jobs.first_n_primes import first_n_primes
from jobs.iterative_digit_sum import iterative_digit_sum
from jobs.say_hi import say_hi
21 changes: 12 additions & 9 deletions cpg_flow_test/jobs/build_pyramid.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
from typing import Any

from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

def build_pyramid(
b: Batch,
from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import get_batch


def build_pyramid_job(
sequencing_groups: list[SequencingGroup],
input_files: dict[str, Any],
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
b = get_batch()
title = 'Build A Pyramid'
# Compute the no evens list for each sequencing group
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
for sg in sequencing_groups:
job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id})
job.image(config_retrieve(['workflow', 'driver_image']))
no_evens_input_file_path = input_files[sg.id]['no_evens']
no_evens_input_file = b.read_input(no_evens_input_file_path)

Expand Down Expand Up @@ -52,6 +56,7 @@ def build_pyramid(

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title, attributes=job_attrs | {'tool': 'cat'})
job.image(config_retrieve(['workflow', 'driver_image']))
job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.pyramid}')
Expand All @@ -60,6 +65,4 @@ def build_pyramid(
logger.info('-----PRINT PYRAMID-----')
logger.info(output_file_path)

all_jobs = [job, *sg_jobs]

return all_jobs
return [job, *sg_jobs]
14 changes: 9 additions & 5 deletions cpg_flow_test/jobs/cumulative_calc.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import get_batch


def cumulative_calc(
b: Batch,
def cumulative_calc_job(
sequencing_group: SequencingGroup,
input_file_path: str,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
b = get_batch()
title = f'Cumulative Calc: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
job.image(config_retrieve(['workflow', 'driver_image']))
primes_path = b.read_input(input_file_path)

cmd = f"""
Expand Down
22 changes: 12 additions & 10 deletions cpg_flow_test/jobs/filter_evens.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
from typing import Any

from cpg_flow.stage import Stage, StageInput
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

def filter_evens(
b: Batch,
from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import get_batch


def filter_evens_job(
sequencing_groups: list[SequencingGroup],
input_files: dict[str, dict[str, Any]],
job_attrs: dict[str, str],
sg_outputs: dict[str, dict[str, Any]],
output_file_path: str,
) -> list[Job]:
b = get_batch()
title = 'Filter Evens'

# Compute the no evens list for each sequencing group
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
for sg in sequencing_groups:
job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs)
job.image(config_retrieve(['workflow', 'driver_image']))
input_file_path = input_files[sg.id]['cumulative']
no_evens_input_file = b.read_input(input_file_path)
no_evens_output_file_path = str(sg_outputs[sg.id])
Expand All @@ -44,6 +47,7 @@ def filter_evens(

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title, attributes=job_attrs)
job.image(config_retrieve(['workflow', 'driver_image']))
job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.no_evens_file}')
Expand All @@ -52,6 +56,4 @@ def filter_evens(
logger.info('-----PRINT NO EVENS-----')
logger.info(output_file_path)

all_jobs = [job, *sg_jobs]

return all_jobs
return [job, *sg_jobs]
14 changes: 9 additions & 5 deletions cpg_flow_test/jobs/first_n_primes.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import get_batch


def first_n_primes(
b: Batch,
def first_n_primes_job(
sequencing_group: SequencingGroup,
input_file_path: str,
job_attrs: dict[str, str],
output_file_path: str,
depends_on: Job,
) -> list[Job]:
b = get_batch()
title = f'First N Primes: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
job.image(config_retrieve(['workflow', 'driver_image']))
id_sum_path = b.read_input(input_file_path)

if depends_on:
Expand Down
14 changes: 9 additions & 5 deletions cpg_flow_test/jobs/iterative_digit_sum.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import get_batch


def iterative_digit_sum(
b: Batch,
def iterative_digit_sum_job(
sequencing_group: SequencingGroup,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
b = get_batch()
title = f'Iterative Digit Sum: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
job.image(config_retrieve(['workflow', 'driver_image']))

cmd = f"""\
#!/bin/bash
Expand Down
14 changes: 9 additions & 5 deletions cpg_flow_test/jobs/say_hi.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
from loguru import logger

from hailtop.batch.job import Job

from cpg_flow.targets.sequencing_group import SequencingGroup
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import get_batch


def say_hi(
b: Batch,
def say_hi_job(
sequencing_group: SequencingGroup,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
b = get_batch()
title = f'Say Hi: {sequencing_group.id}'
job = b.new_job(name=title, attributes=job_attrs)
job.image(config_retrieve(['workflow', 'driver_image']))

cmd = f"""
echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi}
Expand Down
Loading
Loading