"""Single-cell variant counting pipeline."""
from __future__ import annotations
import logging
import re
from pathlib import Path
from .count_alleles_sc import make_count_matrix
# local imports
from .filter_variant_data import intersect_vcf_region, parse_intersect_region_new, vcf_to_bed
from .run_counting import tempdir_decorator
[docs]
class WaspCountSC:
"""Container for single-cell WASP counting pipeline configuration.
Attributes
----------
bam_file : str
Path to the BAM alignment file.
variant_file : str
Path to the variant file (VCF, BCF, or PGEN).
barcode_file : str
Path to cell barcode file.
feature_file : str | None
Optional path to feature/region file.
samples : list[str] | None
List of sample IDs to process.
out_file : str
Output file path for AnnData.
"""
[docs]
def __init__(
self,
bam_file: str,
variant_file: str,
barcode_file: str,
feature_file: str | None,
samples: str | list[str] | None = None,
use_region_names: bool = False,
out_file: str | None = None,
temp_loc: str | None = None,
) -> None:
# TODO: ALSO ACCEPT .h5
# User input files
self.bam_file = bam_file
self.variant_file = variant_file
self.barcode_file = barcode_file # Maybe could be optional?
self.feature_file = feature_file
self.use_region_names = use_region_names
# Make sure samples turned into str list
# Ideally single sample for single cell
normalized_samples: list[str] | None
if isinstance(samples, str):
# Check if sample file or comma delim string
if Path(samples).is_file():
with open(samples) as sample_file:
normalized_samples = [l.strip() for l in sample_file]
else:
normalized_samples = [s.strip() for s in samples.split(",")]
else:
normalized_samples = samples
self.samples: list[str] | None = normalized_samples
# parse output?
self.out_file: str = (
out_file if out_file is not None else str(Path.cwd() / "allele_counts.h5ad")
)
# Failsafe if decorator doesnt create temp_loc
self.temp_loc: str = temp_loc if temp_loc is not None else str(Path.cwd())
# Parse variant file prefix (handle VCF, BCF, PGEN)
variant_name = Path(self.variant_file).name
if variant_name.endswith(".vcf.gz"):
variant_prefix = variant_name[:-7] # Remove .vcf.gz
elif variant_name.endswith(".pgen"):
variant_prefix = variant_name[:-5] # Remove .pgen
else:
variant_prefix = re.split(r"\.vcf|\.bcf", variant_name)[0]
self.variant_prefix = variant_prefix
# Filtered variant output
self.vcf_bed = str(Path(self.temp_loc) / f"{variant_prefix}.bed")
# Parse feature file
self.feature_type = None # maybe use a boolean flag instead
if self.feature_file is not None:
f_ext = "".join(Path(self.feature_file).suffixes)
if re.search(r"\.(.*Peak|bed)(?:\.gz)?$", f_ext, re.I):
self.feature_type = "regions"
self.intersect_file = str(
Path(self.temp_loc) / f"{variant_prefix}_intersect_regions.bed"
)
else:
raise ValueError(
f"Invalid feature file type. Expected BED or MACS2 peak file, got: {self.feature_file}"
)
else:
self.intersect_file = self.vcf_bed
[docs]
@tempdir_decorator
def run_count_variants_sc(
bam_file: str,
variant_file: str,
barcode_file: str,
feature_file: str | None = None,
samples: str | list[str] | None = None,
use_region_names: bool = False,
out_file: str | None = None,
temp_loc: str | None = None,
) -> None:
"""Run single-cell ATAC variant counting pipeline.
Parameters
----------
bam_file : str
Path to the BAM alignment file with cell barcodes.
variant_file : str
Path to the variant file (VCF, BCF, or PGEN).
barcode_file : str
Path to cell barcode file (one barcode per line).
feature_file : str | None, optional
Path to feature/region file (BED or MACS2 peak file).
samples : str | list[str] | None, optional
Sample ID(s) to process.
use_region_names : bool, optional
Whether to use region names from the feature file.
out_file : str | None, optional
Output file path. Defaults to 'allele_counts.h5ad'.
temp_loc : str | None, optional
Directory for temporary files.
Returns
-------
None
Results are written to out_file as AnnData.
"""
# Stores file names
count_files = WaspCountSC(
bam_file,
variant_file,
barcode_file=barcode_file,
feature_file=feature_file,
samples=samples,
use_region_names=use_region_names,
out_file=out_file,
temp_loc=temp_loc,
)
# Create intermediary files
# Maybe change include_gt based on preparse?
vcf_to_bed(
vcf_file=count_files.variant_file,
out_bed=count_files.vcf_bed,
samples=count_files.samples,
include_gt=True,
)
assert count_files.feature_file is not None
intersect_vcf_region(
vcf_file=count_files.vcf_bed,
region_file=count_files.feature_file,
out_file=count_files.intersect_file,
)
df = parse_intersect_region_new(
intersect_file=count_files.intersect_file,
samples=count_files.samples,
use_region_names=use_region_names,
region_col=None,
)
# Guard: if no variants survived intersection, warn and write empty output
if df.is_empty():
logging.getLogger(__name__).warning(
"No variants found after intersection — writing empty output file."
)
import anndata as ad
ad.AnnData().write_h5ad(count_files.out_file)
return
# TODO: handle case where barcode file contains multiple columns
with open(count_files.barcode_file) as file:
bc_dict = {line.rstrip(): i for i, line in enumerate(file)}
# Generate Output
adata = make_count_matrix(
bam_file=count_files.bam_file,
df=df,
bc_dict=bc_dict,
include_samples=count_files.samples,
)
# Write outputs
adata.write_h5ad(count_files.out_file)
# TODO: include output options, (ie MTX, dense?)