diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index ca66ced..6f2f701 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -1,15 +1,21 @@ import pathlib import subprocess +import tempfile from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from functools import lru_cache -from shutil import rmtree import numpy as np import pandas as pd import pybedtools import pysam import xarray as xr +import zarr +import zarr.convenience +import zarr.creation +import zarr.hierarchy +import zarr.storage +from numcodecs import blosc from scipy import stats from ALLCools.utilities import parse_chrom_size, parse_mc_pattern @@ -68,11 +74,9 @@ def summary(self): return mc_type_data -def _determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir): +def _determine_datasets(regions, quantifiers, chrom_size_path): """Determine datasets for each region.""" - tmp_dir = pathlib.Path(tmp_dir).absolute() - tmp_dir.mkdir(exist_ok=True, parents=True) - + tmpdir = tempfile.mkdtemp() chrom_sizes = parse_chrom_size(chrom_size_path) datasets = {} for pair in regions: @@ -94,7 +98,7 @@ def _determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir): "do not have index in its fourth column, adding it automatically. " "If this is not desired, add a fourth column containing UNIQUE IDs to the BED file.", ) - region_bed_df[name] = [f"{name}_{i}" for i in range(region_bed_df.shape[0])] + region_bed_df[name] = (f"{name}_{i}" for i in range(region_bed_df.shape[0])) # check if name is unique() if region_bed_df.iloc[:, 3].duplicated().sum() > 0: raise ValueError(f"Region IDs in {region_path} (fourth column) are not unique.") @@ -122,7 +126,7 @@ def _id(i, c=chrom): except ValueError: raise ValueError(f"Can not understand region specification {region_path}") - region_path = f"{tmp_dir}/{name}.regions.csv" + region_path = f"{tmpdir}/{name}.regions.csv" region_bed_df.to_csv(region_path) datasets[name] = {"regions": region_path, "quant": []} @@ -152,7 +156,7 @@ def _id(i, c=chrom): if quant_type not in ALLOW_QUANT_TYPES: raise ValueError(f"QUANT_TYPE need to be in {ALLOW_QUANT_TYPES}, got {quant_type} in {quantifier}.") datasets[name]["quant"].append(_Quant(mc_types=mc_types, quant_type=quant_type, kwargs=kwargs)) - return datasets + return datasets, tmpdir def _count_single_region_set(allc_table, region_config, obs_dim, region_dim): @@ -208,15 +212,13 @@ def _calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9): def _count_single_zarr( - allc_table, region_config, obs_dim, region_dim, output_path, obs_dim_dtype, count_dtype="uint32" + allc_table, region_config, obs_dim, obs_dim_dtype, region_dim, chunk_start, regiongroup, count_dtype="uint32" ): """Process single region set and its quantifiers.""" # count all ALLC and mC types that's needed for quantifiers if this region_dim count_ds = _count_single_region_set( allc_table=allc_table, region_config=region_config, obs_dim=obs_dim, region_dim=region_dim ) - - total_ds = {} # deal with count quantifiers count_mc_types = [] for quant in region_config["quant"]: @@ -227,8 +229,9 @@ def _count_single_zarr( count_da = count_ds.sel(mc_type=count_mc_types)[f"{region_dim}_da"] max_int = np.iinfo(count_dtype).max count_da = xr.where(count_da > max_int, max_int, count_da) - total_ds[f"{region_dim}_da"] = count_da.astype(count_dtype) - + regiongroup[f"{region_dim}_da"][chunk_start : chunk_start + allc_table.index.size, :, :, :] = count_da.astype( + count_dtype + ).data # deal with hypo-score, hyper-score quantifiers for quant in region_config["quant"]: if quant.quant_type == "hypo-score": @@ -240,7 +243,9 @@ def _count_single_zarr( var_dim=region_dim, **quant.kwargs, ) - total_ds[f"{region_dim}_da_{mc_type}-hypo-score"] = data + regiongroup[f"{region_dim}_da_{mc_type}-hypo-score"][ + chunk_start : chunk_start + allc_table.index.size, : + ] = data.data elif quant.quant_type == "hyper-score": for mc_type in quant.mc_types: data = _calculate_pv( @@ -250,11 +255,13 @@ def _count_single_zarr( var_dim=region_dim, **quant.kwargs, ) - total_ds[f"{region_dim}_da_{mc_type}-hyper-score"] = data - total_ds = xr.Dataset(total_ds) - total_ds.coords[obs_dim] = total_ds.coords[obs_dim].astype(obs_dim_dtype) - total_ds.to_zarr(output_path, mode="w") - return output_path + regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][ + chunk_start : chunk_start + allc_table.index.size, : + ] = data.data + regiongroup[obs_dim][chunk_start : chunk_start + allc_table.index.size] = ( + count_ds.coords[obs_dim].astype(obs_dim_dtype).data + ) + return True @doc_params( @@ -311,68 +318,92 @@ def generate_dataset( # prepare regions and determine quantifiers pathlib.Path(output_path).mkdir(exist_ok=True) - tmp_dir = f"{output_path}_tmp" - datasets = _determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir) - + z = zarr.storage.DirectoryStore(path=output_path) + root = zarr.hierarchy.group(store=z, overwrite=True) + datasets, tmpdir = _determine_datasets(regions, quantifiers, chrom_size_path) # copy chrom_size_path to output_path subprocess.run(["cp", "-f", chrom_size_path, f"{output_path}/chrom_sizes.txt"], check=True) - - chunk_records = defaultdict(dict) + rgs = {} + for region_dim, region_config in datasets.items(): + rgs[region_dim] = root.create_group(region_dim) + # save region coords to the ds + bed = pd.read_csv(f"{tmpdir}/{region_dim}.regions.csv", index_col=0) + bed.columns = [f"{region_dim}_chrom", f"{region_dim}_start", f"{region_dim}_end"] + bed.index.name = region_dim + region_size = bed.index.size + # append region bed to the saved ds + ds = xr.Dataset() + for col, data in bed.items(): + ds.coords[col] = data + ds.coords[region_dim] = bed.index.values + # change object dtype to string + for k in ds.coords.keys(): + if ds.coords[k].dtype == "O": + ds.coords[k] = ds.coords[k].astype(str) + ds.to_zarr(f"{output_path}/{region_dim}", mode="w", consolidated=False) + dsobs = rgs[region_dim].empty( + name=obs_dim, shape=allc_table.index.size, chunks=(chunk_size), dtype=f" 0: + DA = rgs[region_dim].empty( + name=f"{region_dim}_da", + shape=(n_sample, region_size, len(count_mc_types), 2), + chunks=(chunk_size, region_size, len(count_mc_types), 2), + dtype="uint32", + ) + DA.attrs["_ARRAY_DIMENSIONS"] = [obs_dim, region_dim, "mc_type", "count_type"] + count = rgs[region_dim].array(name="count_type", data=(["mc", "cov"]), dtype="