Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 73 additions & 47 deletions typhon/collocations/collocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,14 @@ def collocate_filesets(

self._info(f"Collocate from {start} to {end}")

# Find the files from both filesets which overlap tempoerally.
# Find the files from both filesets which overlap temporally.
matches = list(filesets[0].match(
filesets[1], start=start, end=end, max_interval=max_interval,
skip_file_errors=skip_file_errors
))

if len(matches) == 0:
return

if processes is None:
processes = 1

Expand Down Expand Up @@ -540,59 +543,73 @@ def _collocate_matches(
skip_errors=skip_file_errors,
)

for iprimary in range(len(matches)):
nfound_secondaries=len(matches[iprimary][1])
got_matches=[next(loaded_matches) for isecondary in range(nfound_secondaries)]

for loaded_match in loaded_matches:
# The FileInfo objects of the matched files:
files = loaded_match[0][0], loaded_match[1][0]
# Get FileInfos
primary_infos=got_matches[0][0][0]
secondaries_infos = [nmatch[1][0] for nmatch in got_matches]
files = primary_infos, secondaries_infos
# Get datasets
primary = got_matches[0][0][1].copy()
secondaries = [nmatch[1][1].copy() for nmatch in got_matches]

# We copy the data from the matches since they might be used for
# other matches as well:
primary, secondary = \
loaded_match[0][1].copy(), loaded_match[1][1].copy()
# Combine secondary files
secondary = xr.concat(secondaries, dim=secondaries[0].time.dims[0])

self._debug(f"Collocate {files[0].path}\nwith {files[1].path}")
# Update dimension numbering
secondary[secondary.time.dims[0]] = np.arange(len(secondary.time))

self._debug(f"Collocate {files[0].path}\nwith {[file.path for file in files[1]]}")

collocations = self.collocate(
(filesets[0].name, primary),
(filesets[1].name, secondary), **kwargs,
)
(filesets[0].name, primary),
(filesets[1].name, secondary), **kwargs,
)

if collocations is None:
self._debug("Found no collocations!")
# At least, give the process caller a progress update:
yield None, None
continue

# Check whether the collocation data is compatible and was build
# correctly
check_collocation_data(collocations)
yield None, None

found = [
collocations[f"{filesets[0].name}/time"].size,
collocations[f"{filesets[1].name}/time"].size
]
else: # after yield the code continues at the next line

self._debug(
f"Found {found[0]} ({filesets[0].name}) and "
f"{found[1]} ({filesets[1].name}) collocations"
)
# Check whether the collocation data is compatible and was build
# correctly
check_collocation_data(collocations)

# Add the names of the processed files:
for f in range(2):
if f"{filesets[f].name}/__file" in collocations.variables:
continue
found = [
collocations[f"{filesets[0].name}/time"].size,
collocations[f"{filesets[1].name}/time"].size
]

collocations[f"{filesets[f].name}/__file"] = files[f].path
self._debug(
f"Found {found[0]} ({filesets[0].name}) and "
f"{found[1]} ({filesets[1].name}) collocations"
)

# Collect the attributes of the input files. The attributes get a
# prefix, primary or secondary, to allow not-unique names.
attributes = {
f"primary.{p}" if f == 0 else f"secondary.{p}": v
for f, file in enumerate(files)
for p, v in file.attr.items()
}
# Add the names of the processed files:
for f in range(2):
if f"{filesets[f].name}/__file" in collocations.variables:
continue
if f == 0:
collocations[f"{filesets[f].name}/__file"] = files[f].path
else:
collocations[f"{filesets[f].name}/__file"] = \
'::'.join([file.path for file in files[1]])

# Collect the attributes of the input files. The attributes get a
# prefix, primary or secondary, to allow not-unique names.
# As representative for the combined secondary files, the file attributes
# from the first secondary file are taken.
attributes = {
f"primary.{p}" if f == 0 else f"secondary.{p}": v
for f, file in enumerate([files[0], files[1][0]])
for p, v in file.attr.items()
}

yield collocations, attributes
yield collocations, attributes


def collocate(
Expand Down Expand Up @@ -735,6 +752,12 @@ def collocate(


"""
# the datasets must be sorted first by time in case an input file is unsorted
if isinstance(primary, tuple):
primary = (primary[0], primary[1].sortby("time"))
if isinstance(primary, tuple):
secondary = (secondary[0], secondary[1].sortby("time"))

if max_distance is None and max_interval is None:
raise ValueError(
"Either max_distance or max_interval must be given!"
Expand Down Expand Up @@ -910,7 +933,7 @@ def _prepare_data(self, primary, secondary, max_interval, start, end):
if not primary_period.size or not secondary_period.size:
return None, None

# We need everything sorted by the time, otherwise xarray's stack
# We need everything sorted by the time, otherwise xarray's stack (typo?: sel)
# method makes problems:
primary_period = primary_period.sortby(primary_period)
primary_dim = primary_period.dims[0]
Expand Down Expand Up @@ -951,15 +974,18 @@ def _get_common_time_period(
# very annoying bug in time retrieving
# (https://github.com/pydata/xarray/issues/1240), this is a
# little bit cumbersome:

# Approach which can handle NaT values:
# added after concatenate was introduced
common_start = max(
start,
pd.Timestamp(primary.time.values.min().item(0)).tz_localize(None) - max_interval,
pd.Timestamp(secondary.time.values.min().item(0)).tz_localize(None) - max_interval
pd.to_datetime(primary.time.values).min().tz_localize(None) - max_interval,
pd.to_datetime(secondary.time.values).min().tz_localize(None) - max_interval
)
common_end = min(
end,
pd.Timestamp(primary.time.values.max().item(0)).tz_localize(None) + max_interval,
pd.Timestamp(secondary.time.values.max().item(0)).tz_localize(None) + max_interval
pd.to_datetime(primary.time.values).max().tz_localize(None) + max_interval,
pd.to_datetime(secondary.time.values).max().tz_localize(None) + max_interval
)

primary_period = primary.time.where(
Expand Down Expand Up @@ -1038,7 +1064,7 @@ def _flat_to_main_coord(data):
# to use them as indices later.
for dim in dims:
if dim not in data.coords:
data[dim] = dim, np.arange(data.dims[dim])
data[dim] = dim, np.arange(data.sizes[dim])

# We assume that coordinates must be unique! Otherwise, we would have
# to use this ugly work-around:
Expand Down Expand Up @@ -1151,7 +1177,7 @@ def _create_return(
}
)
metadata["interval"] = xr.DataArray(
intervals, dims=("collocation", ),
intervals.astype('timedelta64[ns]'), dims=("collocation", ),
attrs={
"max_interval": f"Max. interval in secs: {max_interval}",
"max_distance": f"Max. distance in kilometers: {max_distance}",
Expand Down
11 changes: 7 additions & 4 deletions typhon/collocations/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def read(self, *args, **kwargs):
return data
elif self.read_mode == "collapse" or self.read_mode is None:
# Collapse the data (default)
if self.read_mode is None:
print('default read_mode is collapse')
return collapse(data, self.reference, self.collapser)
elif self.read_mode == "expand":
# Expand the data
Expand Down Expand Up @@ -220,8 +222,8 @@ def search(self, filesets, collocator=None, **kwargs):

def _rows_for_secondaries(primary):
"""Helper function for collapse"""
current_row = np.zeros(primary.size, dtype=int)
rows = np.zeros(primary.size, dtype=int)
current_row = np.zeros(primary.size, dtype=np.int64)
rows = np.zeros(primary.size, dtype=np.int64)
i = 0
for p in primary:
rows[i] = current_row[p]
Expand Down Expand Up @@ -336,8 +338,9 @@ def collapse(data, reference=None, collapser=None):
if collapser is None:
collapser = {}
collapser = {
"mean": lambda m, a: np.nanmean(m, axis=a),
"std": lambda m, a: np.nanstd(m, axis=a),
"mean": lambda m, a: np.nanmean(m, axis=a), # if slice consists solely of
"std": lambda m, a: np.nanstd(m, axis=a), # nans, prints RuntimeWarning,
# but returns nan as desired
"number": lambda m, a: np.count_nonzero(~np.isnan(m), axis=a),
**collapser,
}
Expand Down
42 changes: 30 additions & 12 deletions typhon/files/fileset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
from sys import platform
import traceback
import warnings
import tempfile

import numpy as np
import pandas as pd
from fsspec.implementations.local import LocalFileSystem
from pathlib import Path

import typhon.files
from typhon.trees import IntervalTree
Expand Down Expand Up @@ -1050,8 +1052,8 @@ def find(
):
""" Find all files of this fileset in a given time period.

The *start* and *end* parameters build a semi-open interval: only the
files that are equal or newer than *start* and older than *end* are
The *start* and *end* parameters build a closed interval: only the
files that are equal or newer than *start* and equal or older than *end* are
going to be found.

While searching this method checks whether the file lies in the time
Expand Down Expand Up @@ -1114,9 +1116,13 @@ def find(
# The user can give strings instead of datetime objects:
start = datetime.min if start is None else to_datetime(start)
end = datetime.max if end is None else to_datetime(end)

'''
# We want to have a semi-open interval as explained in the doc string.
# Update 09.03.2023: We want a closed interval just like in collocate() -> _get_common_time_period
# to be consistent
end -= timedelta(microseconds=1)
'''


if end < start:
raise ValueError(
Expand Down Expand Up @@ -1635,8 +1641,11 @@ def get_info(self, file_info, retrieve_via=None):

# Using the handler for getting more information
if retrieve_via in ("handler", "both"):
with typhon.files.decompress(info.path, tmpdir=self.temp_dir) as \
decompressed_path:
with typhon.files.decompress(
info.path,
tmpdir=self.temp_dir,
target=Path(info.path).name + next(tempfile._get_candidate_names()),
) as decompressed_path:
decompressed_file = info.copy()
decompressed_file.path = decompressed_path
handler_info = self.handler.get_info(decompressed_file)
Expand Down Expand Up @@ -2019,6 +2028,7 @@ def _return(file_info, return_value):
try:
# file_info could be a bundle of files
if isinstance(file_info, FileInfo):
print(file_info)
file_content = fileset.read(file_info, **read_args)
else:
file_content = \
Expand Down Expand Up @@ -2071,7 +2081,7 @@ def _return(file_info, return_value):

def match(
self, other, start=None, end=None, max_interval=None,
filters=None, other_filters=None):
filters=None, other_filters=None, skip_file_errors=False):
"""Find matching files between two filesets

Matching files are files that overlap each in their time coverage.
Expand All @@ -2097,16 +2107,22 @@ def match(
Examples:
TODO: Add example
"""

# files1 are only searched for within start and end
# without considering max_interval
start=to_datetime(start)
end=to_datetime(end)
if max_interval is not None:
max_interval = to_timedelta(max_interval, numbers_as="seconds")
start = to_datetime(start) - max_interval
end = to_datetime(end) + max_interval
start_extended=start-max_interval
end_extended=end+max_interval

files1 = list(
self.find(start, end, filters=filters)
self.find(start, end, filters=filters, no_files_error=not(skip_file_errors)) #skip_file_errors added 22.02.2023
)

files2 = list(
other.find(start, end, filters=other_filters)
other.find(start_extended, end_extended, filters=other_filters, no_files_error=not(skip_file_errors)) #skip_file_errors added 22.02.2023
)

# Convert the times (datetime objects) to seconds (integer)
Expand All @@ -2119,6 +2135,8 @@ def match(
for file in files2
]).astype("M8[s]").astype(int)

if len(times1)==0 or len(times2)==0:
return
if max_interval is not None:
# Expand the intervals of the secondary fileset to close-in-time
# intervals.
Expand Down Expand Up @@ -2700,8 +2718,8 @@ def read(self, file_info, **read_args):
read_args = {**self.read_args, **read_args}

if self.decompress:
with typhon.files.decompress(file_info.path, tmpdir=self.temp_dir)\
as decompressed_path:
with typhon.files.decompress(file_info.path, tmpdir=self.temp_dir, target=Path(file_info.path).name+next(tempfile._get_candidate_names()))\
as decompressed_path: # modified 14.02.2023
decompressed_file = file_info.copy()
decompressed_file.path = decompressed_path
data = self.handler.read(decompressed_file, **read_args)
Expand Down
Loading
Loading