Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: pre-commit

on:
pull_request:
push:
branches: [main]

jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python
id: setup-python
uses: actions/setup-python@v5
with:
python-version: '3.12'
cache: 'pip'

- uses: actions/cache@v4
with:
path: ~/.cache/pre-commit
key: >
${{ format('pre-commit-{0}-{1}',
steps.setup-python.outputs.python-version,
hashFiles('.pre-commit-config.yaml')
) }}

- name: Install pre-commit
run: |
pip install --upgrade pip
pip install pre-commit
pre-commit install

- name: Run pre-commit hooks
working-directory: ${{ inputs.working-directory }}
run: |
git ls-files | xargs pre-commit run \
--show-diff-on-failure \
--color=always \
--files
13 changes: 13 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/google/pyink
rev: 24.10.1
hooks:
- id: pyink
args: ["--pyink-indentation=2", "--line-length=80"]
6 changes: 0 additions & 6 deletions EASTR/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
# __init__.py

#https://timothybramlett.com/How_to_create_a_Python_Package_with___init__py.html
# from .stringLength import stringLength
# from .stringToLower import stringToLower
# from .stringToUpper import stringToUpper
244 changes: 129 additions & 115 deletions EASTR/alignment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,149 +6,163 @@

import mappy as mp

def get_seq(chrom,start,end,pysam_fa):
seq = pysam_fa.fetch(region=chrom,start=start,end=end)
return seq

def get_seq(chrom, start, end, pysam_fa):
seq = pysam_fa.fetch(region=chrom, start=start, end=end)
return seq

def align_seq_pair(rseq:str, qseq:str, scoring:list, k:int, w:int, m:int, best_n=1):
#TODO ambiguous bases not working
a = mp.Aligner(seq=rseq,k=k,w=w,best_n=best_n,scoring=scoring,min_chain_score=m)
itr = list(a.map(qseq,MD=True,cs=True))

if best_n > 1:
return itr
def align_seq_pair(
rseq: str, qseq: str, scoring: list, k: int, w: int, m: int, best_n=1
):
# TODO ambiguous bases not working
a = mp.Aligner(
seq=rseq, k=k, w=w, best_n=best_n, scoring=scoring, min_chain_score=m
)
itr = list(a.map(qseq, MD=True, cs=True))

if not itr:
return
if best_n > 1:
return itr

for hit in itr:
#TODO if hit.r_st==intron_len <- look for additional alignments?
#or it is not needed because the longest alignment is the "best" one?
if not itr:
return

if hit.strand != 1:
continue
for hit in itr:
# TODO if hit.r_st==intron_len <- look for additional alignments?
# or it is not needed because the longest alignment is the "best" one?

if not hit.is_primary:
continue
if hit.strand != 1:
continue

else:
return hit #returns the first primary hit
if not hit.is_primary:
continue

def calc_alignment_score(hit,scoring):
if hit is None:
return None
else:
return hit # returns the first primary hit

# TODO verify alignment_score calc
matches = hit.mlen
gap_penalty = 0
cs = hit.cs

#gaps
p = re.compile('[\\-\\+]([atgc]+)')
m = p.findall(cs)
gaps = len(m)
for gap in m:
gap_len = len(gap)
gap_penalty += min(scoring[2] + (gap_len - 1) * scoring[3],
scoring[4] + (gap_len - 1) * scoring[5])
def calc_alignment_score(hit, scoring):
if hit is None:
return None

#mismatches
p = re.compile('\\*([atgc]+)')
m = p.findall(cs)
mismatches = len(m)
# TODO verify alignment_score calc
matches = hit.mlen
gap_penalty = 0
cs = hit.cs

alignment_score = matches*scoring[0] - (mismatches)*scoring[1] - gap_penalty
# gaps
p = re.compile("[\\-\\+]([atgc]+)")
m = p.findall(cs)
gaps = len(m)
for gap in m:
gap_len = len(gap)
gap_penalty += min(
scoring[2] + (gap_len - 1) * scoring[3],
scoring[4] + (gap_len - 1) * scoring[5],
)

return alignment_score
# mismatches
p = re.compile("\\*([atgc]+)")
m = p.findall(cs)
mismatches = len(m)

def get_alignment(chrom, jstart, jend, overhang, pysam_fa, max_length, scoring, k, w, m):
alignment_score = (
matches * scoring[0] - (mismatches) * scoring[1] - gap_penalty
)

return alignment_score

intron_len = jend - jstart

rstart = max(jstart - overhang, 0)
rend = min(jstart + overhang, max_length)
qstart = max(jend - overhang, 0)
qend = min(jend + overhang, max_length)
def get_alignment(
chrom, jstart, jend, overhang, pysam_fa, max_length, scoring, k, w, m
):

intron_len = jend - jstart

rstart = max(jstart - overhang, 0)
rend = min(jstart + overhang, max_length)
qstart = max(jend - overhang, 0)
qend = min(jend + overhang, max_length)

rseq = get_seq(chrom, rstart, rend, pysam_fa)
qseq = get_seq(chrom, qstart, qend, pysam_fa )
hit = align_seq_pair(rseq, qseq, scoring,k,w,m)
rseq = get_seq(chrom, rstart, rend, pysam_fa)
qseq = get_seq(chrom, qstart, qend, pysam_fa)
hit = align_seq_pair(rseq, qseq, scoring, k, w, m)

if hit:
#check if the alignment is in the overlap region of short introns
if overhang * 2 >= intron_len:
if hit:
# check if the alignment is in the overlap region of short introns
if overhang * 2 >= intron_len:

if hit.r_st == intron_len:
return None
if hit.r_st == intron_len:
return None

if overhang > intron_len:
if hit.r_st - hit.q_st == intron_len:
return None
if overhang > intron_len:
if hit.r_st - hit.q_st == intron_len:
return None

if hit.r_st >= overhang and hit.q_en <= overhang:
return None
if hit.r_st >= overhang and hit.q_en <= overhang:
return None

return hit
return hit


# def get_self_alignment(chrom, start, end, max_length, scoring, k, w, m, pysam_fa, overhang):
# rseq,qseq,hit = get_alignment(chrom, start, end, overhang, pysam_fa, max_length, scoring, k, w, m)
# return rseq,qseq,hit


def get_flanking_subsequences(introns,chrom_sizes,overhang,ref_fa):
tmp_regions = tempfile.NamedTemporaryFile(mode='a',dir=os.getcwd(),delete=False)
tmp_fa = tempfile.NamedTemporaryFile(dir=os.getcwd(),delete=False)

seen = set()
for key in list(introns.keys()):
chrom = key[0]
jstart = key[1]
jend = key[2]
max_length = chrom_sizes[chrom]
rstart = max(jstart - overhang + 1, 1) #1 based
rend = min(jstart + overhang, max_length)
qstart = max(jend - overhang + 1, 1) #1 based
qend = min(jend + overhang, max_length)
r1 = f'{chrom}:{rstart}-{rend}'
r2 = f'{chrom}:{qstart}-{qend}'
introns[key]['jstart'] = r1
introns[key]['jend'] = r2

if rstart> max_length or qstart > max_length:
#remove key from introns dict
del introns[key]
print(f'Warning: intron {key} from the GTF file is out of range in FASTA file. Skipping...')
continue

for r in [r1,r2]:
if r not in seen:
t=tmp_regions.write(f'{r}\n')
seen.add(r)


tmp_regions.close()
tmp_fa.close()

cmd1 = f"samtools faidx {ref_fa} -r {tmp_regions.name} -o {tmp_fa.name}"
p1 = subprocess.run(shlex.split(cmd1), check=True)


seqs = {}
with open(tmp_fa.name,'r') as f:
for line in f:
line = line.strip()
if line.startswith(">"):
seq_name = line[1:]
if seq_name not in seqs:
seqs[seq_name] = ''
continue
sequence = line
seqs[seq_name]=seqs[seq_name] + sequence

os.unlink(tmp_regions.name)
os.unlink(tmp_fa.name)

return seqs
def get_flanking_subsequences(introns, chrom_sizes, overhang, ref_fa):
tmp_regions = tempfile.NamedTemporaryFile(
mode="a", dir=os.getcwd(), delete=False
)
tmp_fa = tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False)

seen = set()
for key in list(introns.keys()):
chrom = key[0]
jstart = key[1]
jend = key[2]
max_length = chrom_sizes[chrom]
rstart = max(jstart - overhang + 1, 1) # 1 based
rend = min(jstart + overhang, max_length)
qstart = max(jend - overhang + 1, 1) # 1 based
qend = min(jend + overhang, max_length)
r1 = f"{chrom}:{rstart}-{rend}"
r2 = f"{chrom}:{qstart}-{qend}"
introns[key]["jstart"] = r1
introns[key]["jend"] = r2

if rstart > max_length or qstart > max_length:
# remove key from introns dict
del introns[key]
print(
f"Warning: intron {key} from the GTF file is out of range in FASTA file. Skipping..."
)
continue

for r in [r1, r2]:
if r not in seen:
t = tmp_regions.write(f"{r}\n")
seen.add(r)

tmp_regions.close()
tmp_fa.close()

cmd1 = f"samtools faidx {ref_fa} -r {tmp_regions.name} -o {tmp_fa.name}"
p1 = subprocess.run(shlex.split(cmd1), check=True)

seqs = {}
with open(tmp_fa.name, "r") as f:
for line in f:
line = line.strip()
if line.startswith(">"):
seq_name = line[1:]
if seq_name not in seqs:
seqs[seq_name] = ""
continue
sequence = line
seqs[seq_name] = seqs[seq_name] + sequence

os.unlink(tmp_regions.name)
os.unlink(tmp_fa.name)

return seqs
Loading