diff --git a/README.md b/README.md index 42fd142..1b7895b 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ nextflow run Biostatistics-Unit-HT/Flanders -r 1.0 -profile [docker|singulari ### Example: Run Only Colocalization (with existing `.h5ad`) ```bash -nextflow run Biostatistics-Unit-HT/Flanders -r 1.0 -profile [docker|singularity|conda] --coloc_input /path/to/finemapping_output.h5ad --run_colocalization true --coloc_id my_coloc_run -w ./work -resume +nextflow run Biostatistics-Unit-HT/Flanders -r 1.0 -profile [docker|singularity|conda] --coloc_h5ad_input /path/to/finemapping_output.h5ad --run_colocalization true --coloc_id my_coloc_run -w ./work -resume ``` ### Quick run with example dataset @@ -84,6 +84,14 @@ Flanders separates the fine-mapping and colocalization process into two distinct

+#### 4.1 Getting external credible sets from Open Targets + - It is possible to create an Anndata using credible-sets already genereted from Open Targets. To do that follow these steps + - Download the Open Targets [credible-set](https://platform.opentargets.org/downloads/credible_set/access) and [study](https://platform.opentargets.org/downloads/study/access). + - Create a conda environment with the following packages: anndata, polars, pyarrow, scipy, math, argparse + - Run the python script in the bin folder called parse_credible_sets_refined.py following the instructions from the arguments +
+
+ ### Step 2: Colocalization analysis #### Inputs @@ -125,7 +133,6 @@ iCOLOC approach allows to:

- ## 👩‍🔬 Credits Developed by the Biostatistics and Genome Analysis Units at [Human Technopole](https://humantechnopole.it/en/)
- [Arianna Landini](mailto:arianna.landini@fht.org)
diff --git a/assets/OT_mapping_credset_table.txt b/assets/OT_mapping_credset_table.txt new file mode 100644 index 0000000..71e055f --- /dev/null +++ b/assets/OT_mapping_credset_table.txt @@ -0,0 +1,6 @@ +project,finemapping,data,description +GCST,SuSiE-inf,gwas,GWAS catalog data +GTEx,SuSie-inf,eqtl,eQTLs from GTEx +UKB_PPP_EUR,SuSiE-inf,pqtl,Data from UKB-PPP considering only Europeans +FINNGEN_R12,SuSie,gwas,Data from Finngen consortium + diff --git a/bin/funs_locus_breaker_cojo_finemap_all_at_once.R b/bin/funs_locus_breaker_cojo_finemap_all_at_once.R index 53968b8..7d809b5 100755 --- a/bin/funs_locus_breaker_cojo_finemap_all_at_once.R +++ b/bin/funs_locus_breaker_cojo_finemap_all_at_once.R @@ -583,7 +583,7 @@ expand_cs <- function(fitted) { #' cs_idx <- which(lbf_vec > 2) #' find_threshold(lbf_vec, original_idx = cs_idx) find_threshold <- function(vector, original_idx = NULL) { - if (var(vector) != 0) { + if ( var(vector) != 0 && !is.na(var(vector)) ) { thres <- optim( fn = function(th) thresholder(th, vector), p = 0, @@ -716,7 +716,7 @@ from_susie_to_anndata <- function(finemap_list=NULL, cs_indices=NULL, analysis_i lABFs_list <- c(lABFs_list, lABFs) min_res_labf_vec <- c(min_res_labf_vec, min_res_labf) top_pvalue_vec <- c(top_pvalue_vec, top_pvalue) - purity_df <- rbind(purity_df, finemap$sets$purity |> dplyr::mutate(logsum.logABF=logsum.logABF)) + purity_df <- rbind(purity_df, finemap$sets$purity |> dplyr::mutate(logsum.logABF=logsum.logABF, coverage = finemap$sets$requested_coverage)) comment_section <- c(comment_section, rep(finemap$comment_section, length(finemap$sets$cs_index))) comment_section[is.na(comment_section)] <- "NaN" metadata_df <- rbind(metadata_df, finemap$metadata) @@ -731,7 +731,7 @@ from_susie_to_anndata <- function(finemap_list=NULL, cs_indices=NULL, analysis_i metadata_df$L_index, sep = "::" ) - + # Prepare `obs_df` metadata obs_df <- metadata_df |> dplyr::select(-L_index) |> dplyr::rename(type=TYPE) obs_df$chr <- paste0("chr", obs_df$chr) @@ -767,6 +767,11 @@ from_susie_to_anndata <- function(finemap_list=NULL, cs_indices=NULL, analysis_i ) } + # Filter out values for SNPs out of the credible set right before matrices creation + for (cs in seq_along(lABFs_list)) { + lABFs_list[[cs]] <- lABFs_list[[cs]] |> dplyr::filter(is_cs) + } + lABF_matrix_sparse <- create_sparse_matrix("lABF") beta_matrix_sparse <- create_sparse_matrix("bC") se_matrix_sparse <- create_sparse_matrix("bC_se") diff --git a/bin/parse_credible_sets_refined.py b/bin/parse_credible_sets_refined.py new file mode 100755 index 0000000..83219f5 --- /dev/null +++ b/bin/parse_credible_sets_refined.py @@ -0,0 +1,424 @@ +# This script create an anndata starting from study and credisble_set parquet files from Open Target. +# This anndata can later be used by the Flanders pipeline for colocalization + +import polars as pl +import pandas as pd +from scipy.sparse import coo_matrix +from scipy.sparse import csr_matrix +import pyarrow as pa +import math +import anndata as ad +import argparse + +#Although I give all the option here, some project have only certain type of data. For example FINNGEN_R12 has only gwas and SuSie +parser = argparse.ArgumentParser(description="Program to generate credible sets from Open Target in Anndata format for Flanders colocalization") +parser.add_argument("--project", default=None, type=str, help="Name of the project to get data from in OTG (example: FINNGEN_R12, GTEx, GCST)") +parser.add_argument("--type_sumstat", default='gwas', type=str, help="Either a gwas, eqtl or pqtl") +parser.add_argument("--type_finemap", default='SuSie', type=str, help="Either SuSie or SuSiE-inf") +parser.add_argument("--study", default=None, type=str, help="Parquet file of the study from OT") +parser.add_argument("--credset", default=None, type=str, help="Parquet file of the credible sets from OT") +parser.add_argument("--ref_panel", default=None, type=str, help="A csv panel to use to impute missing SNPs from the credible set. Mandatory columns are (chr,pos,a0,a1) wuth a0 and a1 in any particula order") +parser.add_argument("--out", default=None, type=str, help="Name of the output anndata file") +args = parser.parse_args() + + +class OT_create_anndata: + def __init__( + self, study:str, + project:str, + type_study:str, + credset:str, + ref_panel:str, + type_finemap:str): + + self.type_study = type_study + self.study = study + self.credset = credset + self.project = project + self.imputation_panel = ref_panel + self.type_finemap = type_finemap + self.log10 = math.log(10) + self.log_prior_odds = math.log(0.01 / 0.99) + + def _read_study(self): + self.study_pl = pl.read_parquet(self.study) + self.study_pl = self.study_pl.filter((pl.col("hasSumstats") == True) & (pl.col("projectId") == self.project) & (pl.col('studyType') == self.type_study)) + self.study_pl = self.study_pl.with_columns( + pl.col("ldPopulationStructure") + .map_elements( + lambda lst: ",".join(item["ldPopulation"] for item in lst) if lst is not None else None, + return_dtype=pl.String # avoid the warning + ) + .alias("ldPopulationStructure") + ) + if self.type_study != "eqtl" or self.type_study != "pqtl": + self.study_pl = self.study_pl.with_columns( + pl.col("traitFromSource").alias("biosampleId") + ) + if self.project=='GTEx': + self.study_pl = self.study_pl.filter( + pl.col("studyId").str.contains("^gtex_ge_", literal=False) + ) + if self.project=='UKB_PPP_EUR': + self.study_pl = self.study_pl.filter(pl.col("ldPopulationStructure")=="nfe") + return self.study_pl + + def _read_credset(self): + print(self.type_study) + credset_pl = pl.read_parquet(self.credset) + if self.type_study == "eqtl" or self.type_study == "pqtl": + self.credset_query_pl = credset_pl.filter((pl.col("isTransQtl") == False) & (pl.col('finemappingMethod') == self.type_finemap) & (pl.col("studyId").is_in(self.study_pl["studyId"].to_list()))) + else: + self.credset_query_pl = credset_pl.filter( (pl.col('finemappingMethod') == self.type_finemap) & (pl.col("studyId").is_in(self.study_pl["studyId"].to_list()))) + print('ok') + self.credset_query_pl = self.credset_query_pl.filter(pl.col("locus").list.eval(pl.element().struct.field("is99CredibleSet")).list.any()) + if len(self.credset_query_pl)==0: + print('Empty credible sets, please check if the parameter are right (type_sumstat, project, type_finemap)') + exit() + + + def _reshape_credset(self): + study_credset = self.credset_query_pl.join(self.study_pl, on = "studyId", how = "inner") + if "biosampleId" not in study_credset.columns: + study_credset = study_credset.with_columns(pl.lit(None).alias("biosampleId")) + var = study_credset.explode("locus").with_columns(pl.col("locus").struct.unnest()) + var = (var + .group_by("studyLocusId") + .agg([ + pl.col("logBF").sum().alias("sumlogABF") + ]) + ) + study_credset = var.join(study_credset, on = "studyLocusId") + study_credset = study_credset.with_columns( + pl.when((pl.lit(self.type_study != "eqtl")) | (pl.lit(self.type_study != "pqtl"))).then( + pl.col("studyId").alias("phenotype_id") + ).otherwise( + pl.concat_str( + [ + pl.col("biosampleId"), + pl.lit(":"), + pl.col("geneId") + ] + ).alias("phenotype_id") + )) + study_credset = study_credset.with_columns( + pl.concat_str([ + pl.col("projectId"), + pl.lit(":"), + pl.col("studyType") + ]).alias("projectId") + ) + study_credset = study_credset.with_columns([pl.col('variantId').alias('snp'), pl.col('chromosome').alias('chr')]) + study_credset = study_credset.with_columns( + pl.col("snp").str.split_exact("_", 4) + .struct.rename_fields(["chr_snp", "pos_snp", "a0", "a1"]) + .alias("fields") + ).unnest("fields").drop("snp").with_columns([ + pl.min_horizontal("a0", "a1").alias("a_allele1"), + pl.max_horizontal("a0", "a1").alias("a_allele2") + ]).with_columns( + pl.concat_str([ + pl.lit('chr'), + pl.col("chr_snp").cast(pl.Utf8).str.replace("X", "23"), + pl.lit(":"), + pl.col("pos_snp"), + pl.lit(":"), + pl.col("a_allele1"), + pl.lit(":"), + pl.col("a_allele2"), + ]).alias("snp") + ).drop(["chr_snp","pos_snp","a_allele2","a_allele1","a0","a1"]) + study_credset = study_credset.with_columns(pl.col("chr").cast(str).replace("X", "23").alias("chr")) + self.study_credset = study_credset.with_columns( + pl.concat_str([ + pl.lit('chr'), + pl.col("chr"), + pl.lit("::"), + pl.col("projectId"), + pl.lit("::"), + pl.col("phenotype_id"), + pl.lit("::"), + pl.col("snp") + ]).alias("csname"), + pl.when((pl.lit(self.type_study == "eqtl")) | (pl.lit(self.type_study == "pqtl"))).then( + (pl.col("region").str.extract(r":(-?\d+)", 1).cast(pl.Int64) - 500000).clip(lower_bound = 0) + ).otherwise( + pl.col("region").str.extract(r":(-?\d+)", 1).cast(pl.Int64) + ).alias("start"), + pl.when(pl.lit(self.type_study == "eqtl") | (pl.lit(self.type_study == "pqtl"))).then( + pl.col("region").str.extract(r"-(-?\d+)$", 1).cast(pl.Int64) + 500000 + ).otherwise( + pl.col("region").str.extract(r"-(-?\d+)$", 1).cast(pl.Int64) + ).alias("end"), + (pl.col("sumlogABF") * (self.log10 + self.log_prior_odds - (pl.col("locus").list.len().cast(pl.Float64).log()))).alias("min_res_labf"), + pl.col("sumlogABF").alias("cs_log10bf"), + pl.col('snp').alias("topsnp") + ).drop("chr") + + def _explode_locus_snps(self): + if "traitFromSource" not in self.study_credset.columns: + self.study_credset = self.study_credset.with_columns(pl.lit(None).alias("traitFromSource")) + var = self.study_credset.explode("locus").with_columns(pl.col("locus").struct.unnest()) + var= var.with_columns( + pl.col("variantId").str.split_exact("_", 4) + .struct.rename_fields(["chr", "pos", "a0", "a1"]) + .alias("fields") + ).unnest("fields").drop("variantId").with_columns([ + pl.min_horizontal("a0", "a1").alias("a_allele1"), + pl.max_horizontal("a0", "a1").alias("a_allele2") + ]).with_columns( + pl.concat_str([ + pl.lit("chr"), + pl.col("chr").cast(pl.Utf8).str.replace("X", "23"), # Note: you have "chr" twice - is this intentional? + pl.lit(":"), + pl.col("pos"), + pl.lit(":"), + pl.col("a_allele1"), + pl.lit(":"), + pl.col("a_allele2") + ]).alias("snp"), + pl.concat_str([ + pl.lit("chr"), + pl.col("chr") + ]).alias("chrname"), + pl.col('standardError').alias('se') + ).drop("chr","standardError") + var = var.with_columns(pl.col("chrname").cast(str).replace("chrX", "chr23").alias("chr")) + var = var.with_columns(pl.col("pos").cast(pl.Int64)).drop("chrname") + self.var = var.with_columns([ + pl.when( + (pl.col("a0") == pl.col("a_allele2")) & (pl.col("a1") == pl.col("a_allele1")) + ).then( + -pl.col("beta") + ).otherwise( + pl.col("beta") + ).alias("beta")]).select( + "csname", + 'projectId', + 'start', + 'end', + 'phenotype_id', + 'topsnp', + 'min_res_labf', + 'cs_log10bf', + 'logBF', + 'beta', + 'se', + 'snp', + 'chr', + 'pos', + 'a_allele1', + 'a_allele2', + 'nSamples' + ) + + def _merge_with_imputation_refined(self): + imputation_panel = pl.read_csv(self.imputation_panel) + ref = imputation_panel.with_columns([ + pl.min_horizontal("a0", "a1").alias("a_allele1"), + pl.max_horizontal("a0", "a1").alias("a_allele2") + ]).with_columns( + pl.col("chr").cast(pl.Utf8).str.replace("chrX", "chr23"), + pl.concat_str([ + pl.col("chr").cast(pl.Utf8).str.replace("chrX", "chr23"), + pl.lit(":"), + pl.col("pos"), + pl.lit(":"), + pl.col("a_allele1"), + pl.lit(":"), + pl.col("a_allele2") + ]).alias("snp") + ) + + # Perform full outer join + ot_merge = ref.join( + self.var, + on=['chr', 'pos', 'a_allele1', 'a_allele2'], + how='full' + ) + + # Coalesce the columns + ot_merge = ot_merge.with_columns([ + pl.coalesce(["chr", "chr_right"]).alias("chr"), + pl.coalesce(["pos", "pos_right"]).alias("pos"), + pl.coalesce(["a_allele1", "a_allele1_right"]).alias("a_allele1"), + pl.coalesce(["a_allele2", "a_allele2_right"]).alias("a_allele2"), + pl.coalesce(["snp", "snp_right"]).alias("snp") + ]).drop(["chr_right", "pos_right", "a_allele1_right", "a_allele2_right", "snp_right"]) + + # Sort by chromosome and position + sorted_df = ot_merge.sort(["chr", "pos"]) + + # Create a helper column to identify credible set boundaries + # We'll mark the start of each credible set region + result_with_filled_nulls = sorted_df.with_columns([ + # Create boundary markers for credible sets + pl.when( + pl.col("csname").is_not_null() & + pl.col("start").is_not_null() & + pl.col("end").is_not_null() + ).then(1).otherwise(0).alias("is_cs_boundary"), + + # Forward fill the credible set information within chromosomal regions + pl.col("csname").forward_fill().over("chr"), + pl.col("start").forward_fill().over("chr"), + pl.col("end").forward_fill().over("chr"), + pl.col("projectId").forward_fill().over("chr"), + pl.col("phenotype_id").forward_fill().over("chr"), + pl.col("topsnp").forward_fill().over("chr"), + pl.col("min_res_labf").forward_fill().over("chr"), + pl.col("cs_log10bf").forward_fill().over("chr"), + ]).with_columns([ + # Only keep the filled values if the SNP falls within the credible set region + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("csname")).otherwise(None).alias("csname"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("projectId")).otherwise(None).alias("projectId"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("phenotype_id")).otherwise(None).alias("phenotype_id"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("topsnp")).otherwise(None).alias("topsnp"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("min_res_labf")).otherwise(None).alias("min_res_labf"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("cs_log10bf")).otherwise(None).alias("cs_log10bf"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("start")).otherwise(None).alias("start"), + + pl.when( + (pl.col("pos") >= pl.col("start")) & + (pl.col("pos") <= pl.col("end")) + ).then(pl.col("end")).otherwise(None).alias("end"), + ]).with_columns([ + # Fill summary statistics with 0 + pl.col("logBF").fill_null(0), + pl.col("beta").fill_null(0), + pl.col("se").fill_null(0), + + # Ensure SNP format is consistent + pl.col("snp").fill_null( + pl.concat_str([ + pl.col("chr"), + pl.lit(":"), + pl.col("pos"), + pl.lit(":"), + pl.col("a_allele1"), + pl.lit(":"), + pl.col("a_allele2"), + ]) + ) + ]).select([ + "chr", + pl.col("pos").cast(pl.Utf8), + "csname", + "snp", + 'start', + 'end', + 'projectId', + 'phenotype_id', + 'topsnp', + 'min_res_labf', + 'cs_log10bf', + 'logBF', + 'beta', + 'se' + ]) + + self.result_with_filled_nulls = result_with_filled_nulls + + + def _create_anndata_refined(self): + susie_credset_pd = self.result_with_filled_nulls.to_pandas() + + # Only use rows with valid credible sets + valid_data = susie_credset_pd[susie_credset_pd["csname"].notna()].copy() + valid_data["csname"] = valid_data["csname"].astype(str) + + # Factorize using only valid data to ensure consistent lengths + row_idx, row_labels = valid_data["csname"].factorize() + col_idx, col_labels = valid_data["snp"].factorize() + + # Get values from the same valid_data to ensure consistent lengths + logbf = valid_data["logBF"].fillna(0).to_numpy() + beta = valid_data["beta"].fillna(0).to_numpy() + se = valid_data["se"].fillna(0).to_numpy() + + # Create sparse matrices + sparse_matrix_logbf = coo_matrix((logbf, (row_idx, col_idx))) + sparse_matrix_beta = coo_matrix((beta, (row_idx, col_idx))) + sparse_matrix_se = coo_matrix((se, (row_idx, col_idx))) + + adata = ad.AnnData(sparse_matrix_logbf) + adata.obs["cs_name"] = row_labels + adata.var["snp"] = col_labels + adata.X = csr_matrix(adata.X) + adata.layers["beta"] = csr_matrix(sparse_matrix_beta) + adata.layers["se"] = csr_matrix(sparse_matrix_se) + + # Create mappings for obs metadata + start_mapping = dict(zip(valid_data['csname'], valid_data['start'])) + end_mapping = dict(zip(valid_data['csname'], valid_data['end'])) + topsnp_mapping = dict(zip(valid_data['csname'], valid_data['topsnp'])) + credset_abf = dict(zip(valid_data['csname'], valid_data['cs_log10bf'])) + project_mapping = dict(zip(valid_data['csname'], valid_data['projectId'])) + min_credset_abf = dict(zip(valid_data['csname'], valid_data['min_res_labf'])) + study_mapping = dict(zip(valid_data['csname'], valid_data['phenotype_id'])) + + # Assign obs metadata + start_labels = [start_mapping.get(label) for label in row_labels] + end_labels = [end_mapping.get(label) for label in row_labels] + topsnp_labels = [topsnp_mapping.get(label) for label in row_labels] + credset_abf_labels = [credset_abf.get(label) for label in row_labels] + phenotype_labels = [study_mapping.get(label) for label in row_labels] + min_credset_abf_labels = [min_credset_abf.get(label) for label in row_labels] + project_labels = [project_mapping.get(label) for label in row_labels] + + adata.obs["start"] = start_labels + adata.obs["end"] = end_labels + adata.obs["topsnp"] = topsnp_labels + adata.obs["study_id"] = project_labels + adata.obs["min_res_labf"] = min_credset_abf_labels + adata.obs["logsum(logABF)"] = credset_abf_labels + adata.obs['start'] = adata.obs['start'].apply(lambda x: 0 if x < 0 else x) + adata.obs["phenotype_id"] = phenotype_labels + adata.obs[['chr']] = adata.obs['topsnp'].str.split(':', expand=True)[[0]] + adata.obs.loc[:, "coverage"] = 0.99 + + # Assign var metadata + adata.var[['chr', 'pos']] = adata.var['snp'].str.split(':', expand=True)[[0, 1]] + + # Set indices + adata.var.index = adata.var['snp'] + adata.obs.index = adata.obs['cs_name'] + return adata + +if __name__=='__main__': + ot_obj = OT_create_anndata(study = args.study, credset = args.credset, ref_panel = args.ref_panel, type_finemap=args.type_finemap, project=args.project, type_study = args.type_sumstat) + ot_obj._read_study() + ot_obj._read_credset() + ot_obj._reshape_credset() + ot_obj._explode_locus_snps() + ot_obj._merge_with_imputation_refined() + adata = ot_obj._create_anndata() + adata.write(f'{args.out}.h5ad', compression="gzip") + diff --git a/bin/s02_sumstat_munging_and_aligning.R b/bin/s02_sumstat_munging_and_aligning.R index 5ee1b56..56dac6f 100755 --- a/bin/s02_sumstat_munging_and_aligning.R +++ b/bin/s02_sumstat_munging_and_aligning.R @@ -667,16 +667,21 @@ if(nrow(loci_list) > 0){ ### Add study ID to the loci table. Save loci_list[, study_id := opt$study_id] - ### Check if locus spans the HLA locus chr6:28,510,120-33,480,577 + ### Check if locus spans the HLA locus chr6:28,510,120-33,480,577 (GRCh38) / chr6:28,477,797-33,448,354 (GRCh37) ### https://www.ncbi.nlm.nih.gov/grc/human/regions/MHC?asm=GRCh38 + ### https://www.ncbi.nlm.nih.gov/grc/human/regions/MHC?asm=GRCh37 # This code checks for four conditions to determine if a locus partially or completely spans the HLA region: # 1) Locus starts before (or at the exact beginning of) HLA and ends after (or at the exact end of) HLA. # 2) Locus starts and ends within the HLA region. # 3) Locus starts before the end of HLA and ends after the end of HLA. # 4) Locus starts before the beginning of HLA and ends after the beginning of HLA. - hla_start=28510120 - hla_end=33480577 - hla_coord <- seq(hla_start,hla_end) + if(as.numeric(opt$grch)==37 && isFALSE(opt$run_liftover)){ + hla_start=28477797 + hla_end=33448354 + } else { + hla_start=28510120 + hla_end=33480577 + } loci_list[, is_in_hla := chr == 6 & ((start <= hla_end & end >= hla_start) | @@ -728,4 +733,4 @@ idx <- Rsamtools::indexTabix(zipped, end=which(colnames(dataset_munged)=="BP") ) -message("-- COMPLETED SUCCESSFULLY --") \ No newline at end of file +message("-- COMPLETED SUCCESSFULLY --") diff --git a/bin/s04_susie_finemapping.R b/bin/s04_susie_finemapping.R index 0be0e8e..fa9697d 100755 --- a/bin/s04_susie_finemapping.R +++ b/bin/s04_susie_finemapping.R @@ -72,13 +72,13 @@ if (is.null(opt$batch)) { "SNP"="SNPID", "BP" = "SNP_POS" ) - loci_df <- loci_df %>% rename( "chr" = "CHR", "start" = "START", "end" = "END" ) + if(loci_df[1,'TYPE'] == "gwas"){ dataset_aligned <- dataset_aligned %>% rename( @@ -146,7 +146,7 @@ for (i in 1:nrow(loci_df)) { dentist.bin="DENTIST" ) } - + print(dataset_aligned_sub) # Compute LD matrix susie_ld <- prep_susie_ld( D=dataset_aligned_sub, diff --git a/bin/s08bis_concat_anndata_bioalpha.py b/bin/s08bis_concat_anndata_bioalpha.py new file mode 100755 index 0000000..574b65a --- /dev/null +++ b/bin/s08bis_concat_anndata_bioalpha.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +import bioalpha as bsc +import argparse +import pandas as pd +import re + +def fix_ad_var(adata): + """ + Standardize variant metadata in an AnnData object. + + Ensures: + - 'snp' column exists in adata.var + - Extracts chromosome ('chr') and position ('pos') from SNP identifiers + in format 'chr{num}:{pos}:EA:RA' + """ + var = adata.var.copy() + + # Add 'snp' column if missing + if "snp" not in var.columns: + var["snp"] = var.index.astype(str) + + # Extract chr and pos using regex + var["chr"] = var["snp"].apply(lambda x: re.sub(r":.*", "", x)) + var["pos"] = var["snp"].apply(lambda x: int(re.sub(r".*:(\d+):.*", r"\1", x))) + + adata.var = var + return adata + + +def main(): + parser = argparse.ArgumentParser( + description="Concatenate multiple .h5ad files listed in a text file into one AnnData object." + ) + parser.add_argument( + "-i", + "--input", + required=True, + help="Path to a text file listing .h5ad files to concatenate (one per line).", + ) + parser.add_argument( + "-o", + "--output_file", + required=True, + help="Path and file name for the output merged .h5ad file.", + ) + args = parser.parse_args() + + # Read list of files + with open(args.input, "r") as f: + h5ad_files = [line.strip() for line in f if line.strip()] + + if not h5ad_files: + raise ValueError("No valid file paths found in the input text file.") + + print(f"Loading {len(h5ad_files)} .h5ad files...") + adata_list = [bsc.sc.read_h5ad(file) for file in h5ad_files] + + print("Concatenating...") + adata = bsc.sc.concat(adata_list, join="outer", fill_value=0) + + print("Fixing variant metadata...") + adata = fix_ad_var(adata) + + print(f"Writing output to: {args.output_file}") + adata.write_h5ad(args.output_file) + + print("✅ Merge complete!") + + +if __name__ == "__main__": + main() diff --git a/conf/base.config b/conf/base.config index 13171f4..3e5237a 100644 --- a/conf/base.config +++ b/conf/base.config @@ -1,5 +1,6 @@ process { - conda = "/ssu/gassu/conda_envs/flanders_3.2" + //conda = "/ssu/gassu/conda_envs/flanders_3.2" + conda = "/software/cardinal_analysis/ht/conda_envs/flanders_3.2" // conda = "${projectDir}/pipeline_environment.yml" container = "htgenomeanalysisunit/flanders:3.0.1" @@ -38,4 +39,9 @@ process { withLabel:process_high_memory { memory = { 64.GB * task.attempt } } + withLabel:process_gpu { + accelerator = { 4 * task.attempt } + memory = { 64.GB * task.attempt } + time = { 6.h * task.attempt } + } } diff --git a/conf/test.config b/conf/test.config index daed9bd..e76e191 100644 --- a/conf/test.config +++ b/conf/test.config @@ -26,13 +26,30 @@ profiles { tiledb_lb_maf = 0.01 tiledb_lb_typesumstat = "gwas" tiledb_lb_hole = 250000 - tiledb_metadata_file = "${projectDir}/example_data/example_data_tiledb_metadata.csv" - tiledb_bfile = "${projectDir}/example_data/ld/1KG_phase3_hg37_1:10546866-11576788_alpha_sorted_alleles" + tiledb_lb_file = "${projectDir}/example_data/example_data_tiledb_metadata.csv" + tiledb_pfile = "${projectDir}/example_data/ld/1KG_phase3_hg37_1:10546866-11576788_alpha_sorted_alleles" tiledb_cs_thresh = 0.99 - is_test_profile = true - tiledb_large_locus_size = 1000000 - workers = 4 - run_colocalization = true + is_test_profile = true + tiledb_large_locus_size = 1000000 + workers = 4 + run_colocalization = true + } + } + test_tiledb_sc { + params { + tiledb = true + tiledb_uri = "/lustre/scratch124/humgen/projects_v2/cardinal_analysis/analysed_datasets/eqtls/F3_QTLs_saige/UKBB/celltype_2/tiledb/results/TileDB/TileDB_F3_saige_UKBB_celltype_2" + tiledb_batch_size = 2 + path_tiledb_lb_out = "out" + tiledb_lb_maf = 0.01 + tiledb_lb_typesumstat = "qtl" + tiledb_lb_hole = 250000 + tiledb_lb_file = "${projectDir}/test_finemap_saige.csv" + tiledb_pfile = "/lustre/scratch124/humgen/projects_v2/cardinal_analysis/analysis/core_dataset/freeze3/eQTL_input/genotypes_autosomes_symlink__maf0.001/GRCh38_ukb_processed_autosomes" + tiledb_cs_thresh = 0.99 + is_test_profile = true + tiledb_large_locus_size = 1000000 + run_colocalization = true } } -} \ No newline at end of file +} diff --git a/example_data/test_finemap_saige.csv b/example_data/test_finemap_saige.csv new file mode 100644 index 0000000..fb3bd4d --- /dev/null +++ b/example_data/test_finemap_saige.csv @@ -0,0 +1,4 @@ +CHR,TRAIT,SIG,LIM +4,B_memory_IGHMhigh:ENSG00000174125,0.001334232,0.001334232 +4,B_memory_IGHMhigh:ENSG00000164111,2.604472e-12,1e-5 +4,B_memory_IGHMhigh:ENSG00000070190,5.721449e-10,1e-5 diff --git a/main.nf b/main.nf index 69ce7b2..0171722 100644 --- a/main.nf +++ b/main.nf @@ -44,7 +44,7 @@ workflow { if (params.tiledb){ // Split input file in a set of loci to run in parallel - Channel.fromPath(params.tiledb_metadata_file, checkIfExists:true) + Channel.fromPath(params.tiledb_lb_file, checkIfExists:true) .splitText(by: params.tiledb_batch_size, keepHeader: true, file: true) .map { batch_file -> def batch_index = (batch_file.name =~ /\.(\d+)\.csv$/)[0][1] @@ -54,7 +54,7 @@ workflow { // inspect values at runtime LOCUS_BREAKER_TILEDB(tiledb_metadata_batches) - bfiles = Channel.fromPath("${params.tiledb_bfile}.{pgen,pvar,psam}").collect() + bfiles = Channel.fromPath("${params.tiledb_pfile}.{pgen,pvar,psam}").collect() // Use a single meta-study map so combine(by:0) can match restructured_segments = LOCUS_BREAKER_TILEDB.out.locus_breaker_tdb_segments @@ -95,7 +95,7 @@ workflow { [batch_num, meta_loci, interval_file] } - // Build finemapping_config by mapping over the tiledb_bfile channel so + // Build finemapping_config by mapping over the tiledb_pfile channel so // the bfile becomes a concrete path value inside the tuple. finemap_params = [ "skip_dentist": params.skip_dentist, @@ -284,4 +284,4 @@ workflow { // At the end store log status completionSummary() } -} \ No newline at end of file +} diff --git a/modules/local/concat_anndata_ba/main.nf b/modules/local/concat_anndata_ba/main.nf new file mode 100644 index 0000000..7ed7d5a --- /dev/null +++ b/modules/local/concat_anndata_ba/main.nf @@ -0,0 +1,33 @@ +process CONCAT_ANNDATA_BA { + tag "concat_anndata" + label "process_gpu" +// conda '/software/cardinal_analysis/ht/conda_envs/ht_scrna_scalpha' + + publishDir "${params.outdir}/results/anndata/", mode: params.publish_dir_mode, pattern:"*.h5ad" + + input: + path(all_h5ad, stageAs: 'input_file???/*') + val(output_name) + + output: + path "${output_name}", emit: anndata + + script: + def args = task.ext.args ?: '' + + """ + export RETICULATE_PYTHON=\$(which python) + + ls input_file*/*.h5ad > all_h5ad_input_list.txt + + s08bis_concat_anndata_bioalpha.py \ + ${args} \ + --input all_h5ad_input_list.txt \ + --output_file ${output_name} + """ + + stub: + """ + touch ${output_name} + """ +} \ No newline at end of file diff --git a/modules/local/locus_breaker_tiledb/main.nf b/modules/local/locus_breaker_tiledb/main.nf index 9226b0b..0436889 100644 --- a/modules/local/locus_breaker_tiledb/main.nf +++ b/modules/local/locus_breaker_tiledb/main.nf @@ -2,8 +2,8 @@ process LOCUS_BREAKER_TILEDB { label "process_multi" - conda '/ssu/gassu/conda_envs/scqtl' - //conda '/software/cardinal_analysis/ht/conda_envs/scqtl' + //conda '/ssu/gassu/conda_envs/tdbsumstat' + conda '/software/cardinal_analysis/ht/conda_envs/tdbsumstat' publishDir "${params.outdir}/results/gwas_and_loci_tables/", mode: params.publish_dir_mode @@ -20,15 +20,15 @@ process LOCUS_BREAKER_TILEDB { // Define the shell script to execute script: """ - scqtl --workers ${params.workers} \ + tdbsumstat \ export \ - --table ${traits_list_table} \ + --table-lb ${traits_list_table} \ --uri-path ${params.tiledb_uri} \ - --out_lb "out" \ - --maf ${params.tiledb_lb_maf} \ + --out ${params.path_tiledb_lb_out} \ + --maf-lb ${params.tiledb_lb_maf} \ --type-sumstat ${params.tiledb_lb_typesumstat} \ - --hole ${params.tiledb_lb_hole} \ - --locus-max-size ${params.tiledb_large_locus_size} \ + --hole-lb ${params.tiledb_lb_hole} \ + --locus-max-size-lb ${params.tiledb_large_locus_size} \ --locusbreaker \ --batch-name ${batch_index} diff --git a/nextflow.config b/nextflow.config index c43f86f..e7e13a1 100644 --- a/nextflow.config +++ b/nextflow.config @@ -29,8 +29,9 @@ params { tiledb_lb_typesumstat = null tiledb_lb_hole = null tiledb_cs_thresh = null - tiledb_metadata_file = null - tiledb_bfile = null + tiledb_lb_file = null + tiledb_metadata = null + tiledb_pfile = null tiledb_large_locus_size = null tiledb_batch_size = null workers = null @@ -160,6 +161,7 @@ profiles { test { includeConfig 'conf/test.config' } test_long { includeConfig 'conf/test.config' } test_tiledb { includeConfig 'conf/test.config' } + test_tiledb_sc { includeConfig 'conf/test.config' } } // Set filename and location for timeline, report, trace and dag @@ -196,4 +198,4 @@ manifest { nextflowVersion = '!>=24.04' version = '1.0.0' doi = '' -} \ No newline at end of file +} diff --git a/workflows/finemap/main.nf b/workflows/finemap/main.nf index 86f6840..fb62850 100644 --- a/workflows/finemap/main.nf +++ b/workflows/finemap/main.nf @@ -2,6 +2,7 @@ include { SUSIE_FINEMAPPING } from "../../modules/local/susie_finemapping include { APPEND_TO_MASTER_COLOC } from "../../modules/local/append_to_master_coloc" //include { RDS_TO_ANNDATA } from "../../modules/local/rds_to_anndata" include { CONCAT_ANNDATA } from "../../modules/local/concat_anndata" +include { CONCAT_ANNDATA_BA } from "../../modules/local/concat_anndata_ba" workflow RUN_FINEMAPPING { take: @@ -60,9 +61,19 @@ workflow RUN_FINEMAPPING { all_h5ad = SUSIE_FINEMAPPING.out.susie_results_h5ad .collect() - CONCAT_ANNDATA(all_h5ad, "${params.finemap_id}_anndata.h5ad") + // Branch all_h5ad based on length (how many annDatas to concatenate) - only one branch will have data + all_h5ad_branched = all_h5ad.branch { items -> + small: items.size() < 100 + return items + large: items.size() >= 100 + return items + } + + // Run annData concatenation - use R for fewer files, bioalpha for more + all_h5ad_small = CONCAT_ANNDATA(all_h5ad_branched.small, "${params.finemap_id}_anndata.h5ad") + all_h5ad_large = CONCAT_ANNDATA_BA(all_h5ad_branched.large, "${params.finemap_id}_anndata.h5ad") emit: - finemap_anndata = CONCAT_ANNDATA.out.anndata + finemap_anndata = all_h5ad_small.mix(all_h5ad_large) // Mix the concatenated anndata - only one channel will actually contain data coloc_master = APPEND_TO_MASTER_COLOC.out.coloc_master -} \ No newline at end of file +}