diff --git a/src/satctl/sources/base.py b/src/satctl/sources/base.py index bbafd1c..6886d7b 100644 --- a/src/satctl/sources/base.py +++ b/src/satctl/sources/base.py @@ -150,7 +150,6 @@ def download_item( """ ... - @abstractmethod def save_item( self, item: Granule, @@ -169,9 +168,49 @@ def save_item( force (bool): If True, overwrite existing files. Defaults to False. Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths + dict[str, list]: Dictionary mapping granule_id to list of output paths. + Empty list means all files were skipped (already exist). + + Raises: + FileNotFoundError: If granule data not downloaded + ValueError: If invalid configuration + Exception: If processing fails (scene loading, resampling, writing) """ - ... + # Validate inputs using base class helper + self._validate_save_inputs(item, params) + + # Parse datasets using base class helper + datasets_dict = self._prepare_datasets(writer, params) + + # Filter existing files using base class helper + datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) + + # Early return if no datasets to process (all files already exist) + if not datasets_dict: + log.info("Skipping %s - all datasets already exist", item.granule_id) + return {item.granule_id: []} + + # Load and resample scene + log.debug("Loading and resampling scene for %s", item.granule_id) + scene = self.load_scene(item, datasets=list(datasets_dict.values())) + + # Define area using base class helper + area_def = self.define_area( + target_crs=params.target_crs_obj, + area=params.area_geometry, + scene=scene, + source_crs=params.source_crs_obj, + resolution=params.resolution, + ) + scene = self.resample(scene, area_def=area_def) + + # Write datasets using base class helper + result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) + + # Log success + num_files = len(result.get(item.granule_id, [])) + log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files) + return result @property def authenticator(self) -> Authenticator: @@ -474,6 +513,7 @@ def save( success = [] failure = [] + skipped = [] # Track skipped items separately for logging num_workers = num_workers or 1 batch_id = str(uuid.uuid4()) # this prevents pickle errors for unpicklable entities @@ -504,10 +544,32 @@ def save( } for future in as_completed(future_to_item_map): item = future_to_item_map[future] - if future.result(): - success.append(item) - else: + try: + result = future.result() + # Check if files were actually written + files_written = result.get(item.granule_id, []) + if files_written: + # Files were written - successful processing + success.append(item) + else: + # Empty list = skipped (all files already existed) + success.append(item) + skipped.append(item) + except Exception as e: + # Worker raised an exception = processing failed failure.append(item) + log.warning("Failed to process %s: %s", item.granule_id, str(e)) + + # Log summary + if skipped: + log.info( + "Batch complete: %d processed, %d skipped, %d failed", + len(success) - len(skipped), + len(skipped), + len(failure), + ) + else: + log.info("Batch complete: %d processed, %d failed", len(success), len(failure)) emit_event( ProgressEventType.BATCH_COMPLETED, @@ -519,6 +581,7 @@ def save( log.info("Interrupted, cleaning up...") if executor: executor.shutdown(wait=False, cancel_futures=True) + raise # Re-raise to allow outer handler to clean up finally: emit_event( ProgressEventType.BATCH_COMPLETED, diff --git a/src/satctl/sources/earthdata.py b/src/satctl/sources/earthdata.py index c32aefa..5e4ea7e 100644 --- a/src/satctl/sources/earthdata.py +++ b/src/satctl/sources/earthdata.py @@ -421,61 +421,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader item.to_file(granule_dir) return True - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Save granule item to output files after processing. - - Args: - item (Granule): Granule to process - destination (Path): Base destination directory - writer (Writer): Writer instance for output - params (ConversionParams): Conversion parameters - force (bool): If True, overwrite existing files. Defaults to False. - - Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths - """ - # Validate inputs using base class helper - self._validate_save_inputs(item, params) - - # Parse datasets using base class helper - datasets_dict = self._prepare_datasets(writer, params) - - # Filter existing files using base class helper - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - # Early return if no datasets to process - if not datasets_dict: - log.debug("All datasets already exist for %s, skipping", item.granule_id) - return {item.granule_id: []} - - # Get files for processing - files = self.get_files(item) - log.debug("Found %d files to process", len(files)) - - # Load and resample scene - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = self.resample(scene, area_def=area_def) - - # Write datasets using base class helper - return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - def validate(self, item: Granule) -> None: """Validate a granule. diff --git a/src/satctl/sources/mtg.py b/src/satctl/sources/mtg.py index 5e1ce2c..4f300eb 100644 --- a/src/satctl/sources/mtg.py +++ b/src/satctl/sources/mtg.py @@ -1,12 +1,10 @@ import logging import re -import threading import warnings from datetime import datetime, timezone from pathlib import Path from typing import Any, cast -import dask.config import numpy as np from eumdac.datastore import DataStore from pydantic import BaseModel @@ -15,7 +13,7 @@ from satctl.auth import AuthBuilder from satctl.auth.eumetsat import EUMETSATAuthenticator from satctl.downloaders import DownloadBuilder, Downloader -from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams +from satctl.model import Granule, ProductInfo, SearchParams from satctl.sources import DataSource from satctl.utils import extract_zip from satctl.writers import Writer @@ -30,8 +28,6 @@ class MTGAsset(BaseModel): class MTGSource(DataSource): """Source for EUMETSAT MTG product""" - _netcdf_lock = threading.Lock() - def __init__( self, collection_name: str, @@ -221,6 +217,8 @@ def load_scene( # The upper_right_corner='NE' argument flips it automatically in upright position if not lazy: scene.load(datasets, upper_right_corner="NE") + # Compute scene to avoid issues with resampling (MTG-specific requirement) + scene = scene.compute() return scene def validate(self, item: Granule) -> None: @@ -266,50 +264,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader log.warning("Failed to download: %s", item.granule_id) return result - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Save granule item to output files after processing. - - Args: - item (Granule): Granule to process - destination (Path): Base destination directory - writer (Writer): Writer instance for output - params (ConversionParams): Conversion parameters - force (bool): If True, overwrite existing files. Defaults to False. - - Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths - """ - self._validate_save_inputs(item, params) - datasets_dict = self._prepare_datasets(writer, params) - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - # Load and resample scene - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = scene.compute() - scene = self.resample(scene, area_def=area_def) - - # Write datasets using base class helper - res = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - - return res - def _write_scene_datasets( self, scene: Scene, diff --git a/src/satctl/sources/sentinel1.py b/src/satctl/sources/sentinel1.py index ed70c5d..650f503 100644 --- a/src/satctl/sources/sentinel1.py +++ b/src/satctl/sources/sentinel1.py @@ -342,87 +342,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader return all_success - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Process and save Sentinel-1 granule to output files. - - Workflow: - 1. Validate inputs (local files exist, datasets specified) - 2. Load scene with SAR data using sar-c_safe reader - 3. Define target area (from params or full granule extent) - 4. Resample to target projection and resolution - 5. Write datasets to output files - - Args: - item: Granule to process (must have local_path set) - destination: Base destination directory for outputs - writer: Writer instance for file output (GeoTIFF, NetCDF, etc.) - params: Conversion parameters including: - - datasets: List of datasets to process - - area_geometry: Optional AOI for spatial subsetting - - target_crs: Target coordinate reference system - - resolution: Target spatial resolution - force: If True, overwrite existing output files. Defaults to False. - - Returns: - Dictionary mapping granule_id to list of output file paths - - Raises: - FileNotFoundError: If local_path doesn't exist - ValueError: If datasets is None and no default composite is configured - """ - # Validate that granule was downloaded - if item.local_path is None or not item.local_path.exists(): - raise FileNotFoundError(f"Invalid source file or directory: {item.local_path}") - - # Ensure datasets are specified - if params.datasets is None and self.default_composite is None: - raise ValueError("Missing datasets or default composite for storage") - - # Parse dataset names and prepare output filenames - datasets_dict = self._prepare_datasets(writer, params) - - # Filter existing files using base class helper - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - # Load scene with requested SAR datasets - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - - # Define target area for resampling - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - - # Resample to target area - scene = self.resample(scene, area_def=area_def) - - # Write each dataset to output file - paths: dict[str, list] = defaultdict(list) - output_dir = destination / item.granule_id - output_dir.mkdir(exist_ok=True, parents=True) - - for dataset_name, file_name in datasets_dict.items(): - output_path = output_dir / f"{file_name}.{writer.extension}" - paths[item.granule_id].append( - writer.write( - dataset=cast(DataArray, scene[dataset_name]), - output_path=output_path, - ) - ) - - return paths - class Sentinel1GRDSource(Sentinel1Source): """Source for Sentinel-1 Ground Range Detected (GRD) products. diff --git a/src/satctl/sources/sentinel2.py b/src/satctl/sources/sentinel2.py index 9126509..05f9e35 100644 --- a/src/satctl/sources/sentinel2.py +++ b/src/satctl/sources/sentinel2.py @@ -345,47 +345,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader log.warning("Failed to download all required assets for: %s", item.granule_id) return all_success - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Save granule item to output files after processing. - - Args: - item (Granule): Granule to process - destination (Path): Base destination directory - writer (Writer): Writer instance for output - params (ConversionParams): Conversion parameters - force (bool): If True, overwrite existing files. Defaults to False. - - Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths - """ - # Validate inputs using base class helper - self._validate_save_inputs(item, params) - # Parse datasets using base class helper - datasets_dict = self._prepare_datasets(writer, params) - # Filter existing files using base class helper - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - # Load and resample scene - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = self.resample(scene, area_def=area_def) - # Write datasets using base class helper - return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - class Sentinel2L2ASource(Sentinel2Source): """Source for Sentinel-2 MSI L2A product.""" diff --git a/src/satctl/sources/sentinel3.py b/src/satctl/sources/sentinel3.py index c5a4bb8..bf22183 100644 --- a/src/satctl/sources/sentinel3.py +++ b/src/satctl/sources/sentinel3.py @@ -230,14 +230,30 @@ def save_item( force (bool): If True, overwrite existing files. Defaults to False. Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths + dict[str, list]: Dictionary mapping granule_id to list of output paths. + Empty list means all files were skipped (already exist). + + Raises: + FileNotFoundError: If granule data not downloaded + ValueError: If invalid configuration + Exception: If processing fails (scene loading, resampling, writing) """ + # Validate inputs using base class helper self._validate_save_inputs(item, params) + + # Parse datasets using base class helper datasets_dict = self._prepare_datasets(writer, params) + + # Filter existing files using base class helper datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) + # Early return if no datasets to process (all files already exist) + if not datasets_dict: + log.info("Skipping %s - all datasets already exist", item.granule_id) + return {item.granule_id: []} + # Load and resample scene - log.debug("Loading and resampling scene") + log.debug("Loading and resampling scene for %s", item.granule_id) # workaround patch to fix broker SLSTR reader # see https://github.com/pytroll/satpy/issues/3251 @@ -259,7 +275,12 @@ def save_item( scene = self.resample(scene, area_def=area_def) # Write datasets using base class helper - return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) + result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) + + # Log success + num_files = len(result.get(item.granule_id, [])) + log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files) + return result class SLSTRSource(Sentinel3Source):