Skip to content
75 changes: 69 additions & 6 deletions src/satctl/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def download_item(
"""
...

@abstractmethod
def save_item(
self,
item: Granule,
Expand All @@ -169,9 +168,49 @@ def save_item(
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths
dict[str, list]: Dictionary mapping granule_id to list of output paths.
Empty list means all files were skipped (already exist).

Raises:
FileNotFoundError: If granule data not downloaded
ValueError: If invalid configuration
Exception: If processing fails (scene loading, resampling, writing)
"""
...
# Validate inputs using base class helper
self._validate_save_inputs(item, params)

# Parse datasets using base class helper
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Early return if no datasets to process (all files already exist)
if not datasets_dict:
log.info("Skipping %s - all datasets already exist", item.granule_id)
return {item.granule_id: []}

# Load and resample scene
log.debug("Loading and resampling scene for %s", item.granule_id)
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
scene = self.resample(scene, area_def=area_def)

# Write datasets using base class helper
result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)

# Log success
num_files = len(result.get(item.granule_id, []))
log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files)
return result

@property
def authenticator(self) -> Authenticator:
Expand Down Expand Up @@ -474,6 +513,7 @@ def save(

success = []
failure = []
skipped = [] # Track skipped items separately for logging
num_workers = num_workers or 1
batch_id = str(uuid.uuid4())
# this prevents pickle errors for unpicklable entities
Expand Down Expand Up @@ -504,10 +544,32 @@ def save(
}
for future in as_completed(future_to_item_map):
item = future_to_item_map[future]
if future.result():
success.append(item)
else:
try:
result = future.result()
# Check if files were actually written
files_written = result.get(item.granule_id, [])
if files_written:
# Files were written - successful processing
success.append(item)
else:
# Empty list = skipped (all files already existed)
success.append(item)
skipped.append(item)
except Exception as e:
# Worker raised an exception = processing failed
failure.append(item)
log.warning("Failed to process %s: %s", item.granule_id, str(e))

# Log summary
if skipped:
log.info(
"Batch complete: %d processed, %d skipped, %d failed",
len(success) - len(skipped),
len(skipped),
len(failure),
)
else:
log.info("Batch complete: %d processed, %d failed", len(success), len(failure))

emit_event(
ProgressEventType.BATCH_COMPLETED,
Expand All @@ -519,6 +581,7 @@ def save(
log.info("Interrupted, cleaning up...")
if executor:
executor.shutdown(wait=False, cancel_futures=True)
raise # Re-raise to allow outer handler to clean up
finally:
emit_event(
ProgressEventType.BATCH_COMPLETED,
Expand Down
55 changes: 0 additions & 55 deletions src/satctl/sources/earthdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,61 +421,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader
item.to_file(granule_dir)
return True

def save_item(
self,
item: Granule,
destination: Path,
writer: Writer,
params: ConversionParams,
force: bool = False,
) -> dict[str, list]:
"""Save granule item to output files after processing.

Args:
item (Granule): Granule to process
destination (Path): Base destination directory
writer (Writer): Writer instance for output
params (ConversionParams): Conversion parameters
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths
"""
# Validate inputs using base class helper
self._validate_save_inputs(item, params)

# Parse datasets using base class helper
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Early return if no datasets to process
if not datasets_dict:
log.debug("All datasets already exist for %s, skipping", item.granule_id)
return {item.granule_id: []}

# Get files for processing
files = self.get_files(item)
log.debug("Found %d files to process", len(files))

# Load and resample scene
log.debug("Loading and resampling scene")
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
scene = self.resample(scene, area_def=area_def)

# Write datasets using base class helper
return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)

def validate(self, item: Granule) -> None:
"""Validate a granule.

Expand Down
52 changes: 3 additions & 49 deletions src/satctl/sources/mtg.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import logging
import re
import threading
import warnings
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, cast

import dask.config
import numpy as np
from eumdac.datastore import DataStore
from pydantic import BaseModel
Expand All @@ -15,7 +13,7 @@
from satctl.auth import AuthBuilder
from satctl.auth.eumetsat import EUMETSATAuthenticator
from satctl.downloaders import DownloadBuilder, Downloader
from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams
from satctl.model import Granule, ProductInfo, SearchParams
from satctl.sources import DataSource
from satctl.utils import extract_zip
from satctl.writers import Writer
Expand All @@ -30,8 +28,6 @@ class MTGAsset(BaseModel):
class MTGSource(DataSource):
"""Source for EUMETSAT MTG product"""

_netcdf_lock = threading.Lock()

def __init__(
self,
collection_name: str,
Expand Down Expand Up @@ -221,6 +217,8 @@ def load_scene(
# The upper_right_corner='NE' argument flips it automatically in upright position
if not lazy:
scene.load(datasets, upper_right_corner="NE")
# Compute scene to avoid issues with resampling (MTG-specific requirement)
scene = scene.compute()
return scene

def validate(self, item: Granule) -> None:
Expand Down Expand Up @@ -266,50 +264,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader
log.warning("Failed to download: %s", item.granule_id)
return result

def save_item(
self,
item: Granule,
destination: Path,
writer: Writer,
params: ConversionParams,
force: bool = False,
) -> dict[str, list]:
"""Save granule item to output files after processing.

Args:
item (Granule): Granule to process
destination (Path): Base destination directory
writer (Writer): Writer instance for output
params (ConversionParams): Conversion parameters
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths
"""
self._validate_save_inputs(item, params)
datasets_dict = self._prepare_datasets(writer, params)
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Load and resample scene
log.debug("Loading and resampling scene")
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
scene = scene.compute()
scene = self.resample(scene, area_def=area_def)

# Write datasets using base class helper
res = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)

return res

def _write_scene_datasets(
self,
scene: Scene,
Expand Down
81 changes: 0 additions & 81 deletions src/satctl/sources/sentinel1.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,87 +342,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader

return all_success

def save_item(
self,
item: Granule,
destination: Path,
writer: Writer,
params: ConversionParams,
force: bool = False,
) -> dict[str, list]:
"""Process and save Sentinel-1 granule to output files.

Workflow:
1. Validate inputs (local files exist, datasets specified)
2. Load scene with SAR data using sar-c_safe reader
3. Define target area (from params or full granule extent)
4. Resample to target projection and resolution
5. Write datasets to output files

Args:
item: Granule to process (must have local_path set)
destination: Base destination directory for outputs
writer: Writer instance for file output (GeoTIFF, NetCDF, etc.)
params: Conversion parameters including:
- datasets: List of datasets to process
- area_geometry: Optional AOI for spatial subsetting
- target_crs: Target coordinate reference system
- resolution: Target spatial resolution
force: If True, overwrite existing output files. Defaults to False.

Returns:
Dictionary mapping granule_id to list of output file paths

Raises:
FileNotFoundError: If local_path doesn't exist
ValueError: If datasets is None and no default composite is configured
"""
# Validate that granule was downloaded
if item.local_path is None or not item.local_path.exists():
raise FileNotFoundError(f"Invalid source file or directory: {item.local_path}")

# Ensure datasets are specified
if params.datasets is None and self.default_composite is None:
raise ValueError("Missing datasets or default composite for storage")

# Parse dataset names and prepare output filenames
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Load scene with requested SAR datasets
log.debug("Loading and resampling scene")
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define target area for resampling
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)

# Resample to target area
scene = self.resample(scene, area_def=area_def)

# Write each dataset to output file
paths: dict[str, list] = defaultdict(list)
output_dir = destination / item.granule_id
output_dir.mkdir(exist_ok=True, parents=True)

for dataset_name, file_name in datasets_dict.items():
output_path = output_dir / f"{file_name}.{writer.extension}"
paths[item.granule_id].append(
writer.write(
dataset=cast(DataArray, scene[dataset_name]),
output_path=output_path,
)
)

return paths


class Sentinel1GRDSource(Sentinel1Source):
"""Source for Sentinel-1 Ground Range Detected (GRD) products.
Expand Down
Loading
Loading