From c3f036d60482fec7a6ce5ab0e30d8e93a98403f1 Mon Sep 17 00:00:00 2001 From: Andrew Robbins Date: Fri, 20 Dec 2024 00:57:15 -0500 Subject: [PATCH 1/7] use direct zarr sink --- ALLCools/count_matrix/dataset.py | 155 +++++++++++++++++++------------ 1 file changed, 95 insertions(+), 60 deletions(-) diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index 8106811..6df8a55 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -4,13 +4,17 @@ from concurrent.futures import ProcessPoolExecutor, as_completed from functools import lru_cache from shutil import rmtree +import tempfile + import numpy as np import pandas as pd import pybedtools import pysam import xarray as xr +from numcodecs import blosc from scipy import stats +import zarr, zarr.creation, zarr.convenience, zarr.hierarchy, zarr.storage from ALLCools.utilities import parse_chrom_size, parse_mc_pattern @@ -68,11 +72,9 @@ def summary(self): return mc_type_data -def _determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir): +def _determine_datasets(regions, quantifiers, chrom_size_path): """Determine datasets for each region.""" - tmp_dir = pathlib.Path(tmp_dir).absolute() - tmp_dir.mkdir(exist_ok=True, parents=True) - + tmpdir = tempfile.mkdtemp() chrom_sizes = parse_chrom_size(chrom_size_path) datasets = {} for pair in regions: @@ -122,7 +124,7 @@ def _id(i, c=chrom): except ValueError: raise ValueError(f"Can not understand region specification {region_path}") - region_path = f"{tmp_dir}/{name}.regions.csv" + region_path = f"{tmpdir}/{name}.regions.csv" region_bed_df.to_csv(region_path) datasets[name] = {"regions": region_path, "quant": []} @@ -152,7 +154,7 @@ def _id(i, c=chrom): if quant_type not in ALLOW_QUANT_TYPES: raise ValueError(f"QUANT_TYPE need to be in {ALLOW_QUANT_TYPES}, got {quant_type} in {quantifier}.") datasets[name]["quant"].append(_Quant(mc_types=mc_types, quant_type=quant_type, kwargs=kwargs)) - return datasets + return datasets, tmpdir def _count_single_region_set(allc_table, region_config, obs_dim, region_dim): @@ -183,7 +185,7 @@ def _count_single_region_set(allc_table, region_config, obs_dim, region_dim): data = xr.DataArray( np.array([sample_data]), coords=[[sample], region_ids, total_mc_types, ["mc", "cov"]], - dims=[obs_dim, region_dim, "mc_type", "count_type"], + dims=[obs_dim, region_dim, "mc_type", "count_type"] ) total_data.append(data) total_data = xr.Dataset({f"{region_dim}_da": xr.concat(total_data, dim=obs_dim)}) @@ -208,7 +210,7 @@ def _calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9): def _count_single_zarr( - allc_table, region_config, obs_dim, region_dim, output_path, obs_dim_dtype, count_dtype="uint32" + allc_table, region_config, obs_dim, region_dim, chunk_start, regiongroup, count_dtype="uint32" ): """Process single region set and its quantifiers.""" # count all ALLC and mC types that's needed for quantifiers if this region_dim @@ -216,7 +218,6 @@ def _count_single_zarr( allc_table=allc_table, region_config=region_config, obs_dim=obs_dim, region_dim=region_dim ) - total_ds = {} # deal with count quantifiers count_mc_types = [] for quant in region_config["quant"]: @@ -227,8 +228,8 @@ def _count_single_zarr( count_da = count_ds.sel(mc_type=count_mc_types)[f"{region_dim}_da"] max_int = np.iinfo(count_dtype).max count_da = xr.where(count_da > max_int, max_int, count_da) - total_ds[f"{region_dim}_da"] = count_da.astype(count_dtype) - + regiongroup[f"{region_dim}_da"][ + chunk_start : chunk_start + allc_table.index.size, :, :, :] = count_da.astype(count_dtype).data # deal with hypo-score, hyper-score quantifiers for quant in region_config["quant"]: if quant.quant_type == "hypo-score": @@ -240,7 +241,9 @@ def _count_single_zarr( var_dim=region_dim, **quant.kwargs, ) - total_ds[f"{region_dim}_da_{mc_type}-hypo-score"] = data + regiongroup[f"{region_dim}_da_{mc_type}-hypo-score"][ + chunk_start : chunk_start + allc_table.index.size, : + ] = data.data elif quant.quant_type == "hyper-score": for mc_type in quant.mc_types: data = _calculate_pv( @@ -250,11 +253,9 @@ def _count_single_zarr( var_dim=region_dim, **quant.kwargs, ) - total_ds[f"{region_dim}_da_{mc_type}-hyper-score"] = data - total_ds = xr.Dataset(total_ds) - total_ds.coords[obs_dim] = total_ds.coords[obs_dim].astype(obs_dim_dtype) - total_ds.to_zarr(output_path, mode="w") - return output_path + regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][chunk_start : chunk_start + allc_table.index.size, :] = data.data + + return True @doc_params( @@ -302,7 +303,6 @@ def generate_dataset( # determine index length and str dtype max_length = allc_table.index.map(lambda idx: len(idx)).max() - obs_dim_dtype = f" 0: + DA = regiongroup.empty( + name=f"{region_dim}_da", + shape=(n_sample, region_size, len(count_mc_types), 2), + chunks=(chunk_size, region_size, len(count_mc_types), 2), + dtype="uint32" + ) + DA.attrs['_ARRAY_DIMENSIONS']=[obs_dim, region_dim, "mc_type", "count_type"] + count = regiongroup.array( + name="count_type", + data=(["mc", "cov"]), + dtype=" Date: Sat, 21 Dec 2024 00:06:38 +0000 Subject: [PATCH 2/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ALLCools/count_matrix/dataset.py | 74 +++++++++---------- docs/CONDUCT.md | 20 ++--- docs/CONTRIBUTING.md | 14 ++-- docs/README.md | 12 +-- .../basic/intro_basic_clustering.md | 12 +-- docs/allcools/intro.md | 16 ++-- docs/allcools/start/analysis_steps.md | 22 +++--- docs/allcools/start/installation.md | 4 +- 8 files changed, 83 insertions(+), 91 deletions(-) diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index 6df8a55..5f89056 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -1,20 +1,22 @@ import pathlib import subprocess +import tempfile from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from functools import lru_cache -from shutil import rmtree -import tempfile - import numpy as np import pandas as pd import pybedtools import pysam import xarray as xr +import zarr +import zarr.convenience +import zarr.creation +import zarr.hierarchy +import zarr.storage from numcodecs import blosc from scipy import stats -import zarr, zarr.creation, zarr.convenience, zarr.hierarchy, zarr.storage from ALLCools.utilities import parse_chrom_size, parse_mc_pattern @@ -185,7 +187,7 @@ def _count_single_region_set(allc_table, region_config, obs_dim, region_dim): data = xr.DataArray( np.array([sample_data]), coords=[[sample], region_ids, total_mc_types, ["mc", "cov"]], - dims=[obs_dim, region_dim, "mc_type", "count_type"] + dims=[obs_dim, region_dim, "mc_type", "count_type"], ) total_data.append(data) total_data = xr.Dataset({f"{region_dim}_da": xr.concat(total_data, dim=obs_dim)}) @@ -209,9 +211,7 @@ def _calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9): return pv -def _count_single_zarr( - allc_table, region_config, obs_dim, region_dim, chunk_start, regiongroup, count_dtype="uint32" -): +def _count_single_zarr(allc_table, region_config, obs_dim, region_dim, chunk_start, regiongroup, count_dtype="uint32"): """Process single region set and its quantifiers.""" # count all ALLC and mC types that's needed for quantifiers if this region_dim count_ds = _count_single_region_set( @@ -228,8 +228,9 @@ def _count_single_zarr( count_da = count_ds.sel(mc_type=count_mc_types)[f"{region_dim}_da"] max_int = np.iinfo(count_dtype).max count_da = xr.where(count_da > max_int, max_int, count_da) - regiongroup[f"{region_dim}_da"][ - chunk_start : chunk_start + allc_table.index.size, :, :, :] = count_da.astype(count_dtype).data + regiongroup[f"{region_dim}_da"][chunk_start : chunk_start + allc_table.index.size, :, :, :] = count_da.astype( + count_dtype + ).data # deal with hypo-score, hyper-score quantifiers for quant in region_config["quant"]: if quant.quant_type == "hypo-score": @@ -253,7 +254,9 @@ def _count_single_zarr( var_dim=region_dim, **quant.kwargs, ) - regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][chunk_start : chunk_start + allc_table.index.size, :] = data.data + regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][ + chunk_start : chunk_start + allc_table.index.size, : + ] = data.data return True @@ -312,7 +315,7 @@ def generate_dataset( # prepare regions and determine quantifiers pathlib.Path(output_path).mkdir(exist_ok=True) z = zarr.storage.DirectoryStore(path=output_path) - root = zarr.hierarchy.group(store = z, overwrite = True) + root = zarr.hierarchy.group(store=z, overwrite=True) datasets, tmpdir = _determine_datasets(regions, quantifiers, chrom_size_path) # copy chrom_size_path to output_path subprocess.run(["cp", "-f", chrom_size_path, f"{output_path}/chrom_sizes.txt"], check=True) @@ -324,12 +327,9 @@ def generate_dataset( bed.index.name = region_dim region_size = bed.index.size dsobs = regiongroup.array( - name=obs_dim, - data=allc_table.index.values, - chunks=(chunk_size), - dtype=f" Date: Wed, 8 Jan 2025 14:29:57 -0500 Subject: [PATCH 3/7] add dependency on zarr under version 3 --- environment.yml | 2 +- pyproject.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index a625dd1..0be6010 100644 --- a/environment.yml +++ b/environment.yml @@ -30,7 +30,7 @@ dependencies: - statsmodels - xarray - yaml - - zarr + - zarr < 3 - pip: - papermill - imblearn diff --git a/pyproject.toml b/pyproject.toml index ee85625..f7e4da1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ 'seaborn', "xarray", "pyyaml", + "zarr < 3" ] [project.optional-dependencies] From e9fcd17789923ce6247ce2a0e37227cadd4e2ffc Mon Sep 17 00:00:00 2001 From: Andrew Robbins Date: Sun, 16 Feb 2025 00:10:55 -0500 Subject: [PATCH 4/7] maybe fix lack of obs_dim --- ALLCools/count_matrix/dataset.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index 5f89056..f920191 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -211,7 +211,7 @@ def _calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9): return pv -def _count_single_zarr(allc_table, region_config, obs_dim, region_dim, chunk_start, regiongroup, count_dtype="uint32"): +def _count_single_zarr(allc_table, region_config, obs_dim, obs_dim_dtype, region_dim, chunk_start, regiongroup, count_dtype="uint32"): """Process single region set and its quantifiers.""" # count all ALLC and mC types that's needed for quantifiers if this region_dim count_ds = _count_single_region_set( @@ -257,7 +257,7 @@ def _count_single_zarr(allc_table, region_config, obs_dim, region_dim, chunk_sta regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][ chunk_start : chunk_start + allc_table.index.size, : ] = data.data - + regiongroup[obs_dim] = count_ds.coords[obs_dim].astype(obs_dim_dtype) return True @@ -306,6 +306,7 @@ def generate_dataset( # determine index length and str dtype max_length = allc_table.index.map(lambda idx: len(idx)).max() + obs_dim_dtype = f" Date: Sun, 16 Feb 2025 13:54:22 +0000 Subject: [PATCH 5/7] fix lack of obs_dim --- ALLCools/count_matrix/dataset.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index f920191..697df3c 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -211,7 +211,9 @@ def _calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9): return pv -def _count_single_zarr(allc_table, region_config, obs_dim, obs_dim_dtype, region_dim, chunk_start, regiongroup, count_dtype="uint32"): +def _count_single_zarr( + allc_table, region_config, obs_dim, obs_dim_dtype, region_dim, chunk_start, regiongroup, count_dtype="uint32" +): """Process single region set and its quantifiers.""" # count all ALLC and mC types that's needed for quantifiers if this region_dim count_ds = _count_single_region_set( @@ -257,7 +259,9 @@ def _count_single_zarr(allc_table, region_config, obs_dim, obs_dim_dtype, region regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][ chunk_start : chunk_start + allc_table.index.size, : ] = data.data - regiongroup[obs_dim] = count_ds.coords[obs_dim].astype(obs_dim_dtype) + regiongroup[obs_dim][chunk_start : chunk_start + allc_table.index.size] = ( + count_ds.coords[obs_dim].astype(obs_dim_dtype).data + ) return True @@ -327,10 +331,6 @@ def generate_dataset( bed.columns = [f"{region_dim}_chrom", f"{region_dim}_start", f"{region_dim}_end"] bed.index.name = region_dim region_size = bed.index.size - dsobs = regiongroup.array( - name=obs_dim, data=allc_table.index.values, chunks=(chunk_size), dtype=f" Date: Sun, 16 Feb 2025 21:23:27 -0500 Subject: [PATCH 6/7] properly iterate over datasets --- ALLCools/count_matrix/dataset.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index 697df3c..b3c6c9c 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -324,8 +324,10 @@ def generate_dataset( datasets, tmpdir = _determine_datasets(regions, quantifiers, chrom_size_path) # copy chrom_size_path to output_path subprocess.run(["cp", "-f", chrom_size_path, f"{output_path}/chrom_sizes.txt"], check=True) + rgs = {} for region_dim, region_config in datasets.items(): regiongroup = root.create_group(region_dim) + rgs[region_dim] = regiongroup # save region coords to the ds bed = pd.read_csv(f"{tmpdir}/{region_dim}.regions.csv", index_col=0) bed.columns = [f"{region_dim}_chrom", f"{region_dim}_start", f"{region_dim}_end"] @@ -397,10 +399,9 @@ def generate_dataset( obs_dim_dtype=obs_dim_dtype, region_dim=region_dim, chunk_start=chunk_start, - regiongroup=regiongroup, + regiongroup=rgs[region_dim], ) futures[f] = (region_dim, i) - for f in as_completed(futures): region_dim, i = futures[f] print(f"Chunk {i} of {region_dim} returned") @@ -415,5 +416,6 @@ def generate_dataset( "ds_sample_dim": {region_dim: obs_dim for region_dim in datasets.keys()}, }, ) - zarr.convenience.consolidate_metadata(z) + for region_dim in datasets.keys(): + zarr.convenience.consolidate_metadata(f"{output_path}/{region_dim}") return output_path From a983c2cb2a87f39a671e40da75df8ca4ea1f2656 Mon Sep 17 00:00:00 2001 From: Andrew Robbins Date: Fri, 7 Mar 2025 08:13:50 -0500 Subject: [PATCH 7/7] iterate over ALL counts for every region --- ALLCools/count_matrix/dataset.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/ALLCools/count_matrix/dataset.py b/ALLCools/count_matrix/dataset.py index e79a2d3..6f2f701 100644 --- a/ALLCools/count_matrix/dataset.py +++ b/ALLCools/count_matrix/dataset.py @@ -98,7 +98,7 @@ def _determine_datasets(regions, quantifiers, chrom_size_path): "do not have index in its fourth column, adding it automatically. " "If this is not desired, add a fourth column containing UNIQUE IDs to the BED file.", ) - region_bed_df[name] = [f"{name}_{i}" for i in range(region_bed_df.shape[0])] + region_bed_df[name] = (f"{name}_{i}" for i in range(region_bed_df.shape[0])) # check if name is unique() if region_bed_df.iloc[:, 3].duplicated().sum() > 0: raise ValueError(f"Region IDs in {region_path} (fourth column) are not unique.") @@ -219,7 +219,6 @@ def _count_single_zarr( count_ds = _count_single_region_set( allc_table=allc_table, region_config=region_config, obs_dim=obs_dim, region_dim=region_dim ) - # deal with count quantifiers count_mc_types = [] for quant in region_config["quant"]: @@ -326,8 +325,7 @@ def generate_dataset( subprocess.run(["cp", "-f", chrom_size_path, f"{output_path}/chrom_sizes.txt"], check=True) rgs = {} for region_dim, region_config in datasets.items(): - regiongroup = root.create_group(region_dim) - rgs[region_dim] = regiongroup + rgs[region_dim] = root.create_group(region_dim) # save region coords to the ds bed = pd.read_csv(f"{tmpdir}/{region_dim}.regions.csv", index_col=0) bed.columns = [f"{region_dim}_chrom", f"{region_dim}_start", f"{region_dim}_end"] @@ -343,7 +341,7 @@ def generate_dataset( if ds.coords[k].dtype == "O": ds.coords[k] = ds.coords[k].astype(str) ds.to_zarr(f"{output_path}/{region_dim}", mode="w", consolidated=False) - dsobs = regiongroup.empty( + dsobs = rgs[region_dim].empty( name=obs_dim, shape=allc_table.index.size, chunks=(chunk_size), dtype=f" 0: - DA = regiongroup.empty( + DA = rgs[region_dim].empty( name=f"{region_dim}_da", shape=(n_sample, region_size, len(count_mc_types), 2), chunks=(chunk_size, region_size, len(count_mc_types), 2), dtype="uint32", ) DA.attrs["_ARRAY_DIMENSIONS"] = [obs_dim, region_dim, "mc_type", "count_type"] - count = regiongroup.array(name="count_type", data=(["mc", "cov"]), dtype="