From 319b46166394d127ede12236b30e7ee0fb67b7ed Mon Sep 17 00:00:00 2001 From: Licu Mihai Date: Tue, 9 Sep 2025 14:53:08 +0200 Subject: [PATCH] Refactored Synpp class - Fix run_pipeline to properly use flowchart_path instead of ignoring it - Added input validation and None handling --- src/synpp/pipeline.py | 224 ++++++++++++++++++++++++++++++------------ 1 file changed, 160 insertions(+), 64 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 20dbc1b..336af27 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -931,81 +931,177 @@ def run_from_cmd(argv): class Synpp: def __init__(self, config: dict, working_directory: str = None, logger: logging.Logger = logging.getLogger("synpp"), definitions: List[Dict[str, Union[str, Callable, ModuleType]]] = None, flowchart_path: str = None, - dryrun: bool = False, externals: Dict[str, str] = {}, aliases = {}): - self.config = config + dryrun: bool = False, externals: Dict[str, str] = None, aliases: dict = None): + if not isinstance(config, dict): + raise PipelineError("config must be a dictionary") + + # Store core configuration + self.config = copy.deepcopy(config) self.working_directory = working_directory self.logger = logger self.definitions = definitions self.flowchart_path = flowchart_path self.dryrun = dryrun - self.externals = externals - self.aliases = aliases - - def run_pipeline(self, definitions=None, rerun_required=True, dryrun=None, verbose=False, flowchart_path=None): - if definitions is None and self.definitions is None: + + # Handle mutable defaults safely + self.externals = externals.copy() if externals is not None else {} + self.aliases = aliases.copy() if aliases is not None else {} + + def run_pipeline(self, definitions=None, rerun_required=True, verbose=False): + # Determine which definitions to use + if definitions is not None: + effective_definitions = definitions + elif self.definitions is not None: + effective_definitions = self.definitions + else: raise PipelineError("A list of stage definitions must be available in object or provided explicitly.") - elif definitions is None: - definitions = self.definitions - if dryrun is None: - dryrun = self.dryrun - return run(definitions, self.config, self.working_directory, flowchart_path=flowchart_path, - dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_required, - ensure_working_directory=True, externals=self.externals, aliases=self.aliases) - - def run_single(self, descriptor, config={}, rerun_if_cached=False, dryrun=False, verbose=False): - return run([{'descriptor': descriptor, 'config': config}], self.config, self.working_directory, - dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_if_cached, - flowchart_path=self.flowchart_path, ensure_working_directory=True, externals=self.externals, - aliases=self.aliases)[0] + + + return run( + definitions=effective_definitions, + config=self.config, + working_directory=self.working_directory, + flowchart_path=self.flowchart_path, + dryrun=self.dryrun, + verbose=verbose, + logger=self.logger, + rerun_required=rerun_required, + ensure_working_directory=True, + externals=self.externals, + aliases=self.aliases + ) + + def run_single(self, descriptor, config=None, rerun_if_cached=False, dryrun=False, verbose=False): + if config is None: + config = {} + - @staticmethod - def build_from_yml(config_path, working_directory = None, run = [], overrides = {}): - with open(config_path) as f: - settings = yaml.load(f, Loader=yaml.SafeLoader) + stage_definition = {'descriptor': descriptor, 'config': config} + + results = run( + definitions=[stage_definition], + config=self.config, + working_directory=self.working_directory, + flowchart_path=self.flowchart_path, + dryrun=dryrun, # Use parameter override for single stage + verbose=verbose, + logger=self.logger, + rerun_required=rerun_if_cached, + ensure_working_directory=True, + externals=self.externals, + aliases=self.aliases + ) + return results[0] + @staticmethod + def build_from_yml(config_path, working_directory=None, run=None, overrides=None): + if run is None: + run = [] + if overrides is None: + overrides = {} + + # Load and validate YAML configuration + if not os.path.isfile(config_path): + raise PipelineError(f"Configuration file not found: {config_path}") + + try: + with open(config_path, 'r', encoding='utf-8') as f: + settings = yaml.load(f, Loader=yaml.SafeLoader) + except yaml.YAMLError as e: + raise PipelineError(f"Invalid YAML syntax in {config_path}: {e}") + except Exception as e: + raise PipelineError(f"Error reading configuration file {config_path}: {e}") + + if not isinstance(settings, dict): + raise PipelineError(f"YAML root must be a dictionary in {config_path}") + + # Process stage definitions + if run: + run_stages = run + elif "run" in settings: + run_stages = settings["run"] + else: + raise PipelineError("No 'run' section found in configuration and no run parameter provided") + + if not isinstance(run_stages, list): + raise PipelineError("'run' section must be a list") + definitions = [] - - run = run if len(run) > 0 else settings["run"] - for item in run: - parameters = {} - - if type(item) == dict: - key = list(item.keys())[0] - parameters = item[key] - item = key - - definitions.append({ - "descriptor": item, "config": parameters - }) - - config = settings["config"] if "config" in settings else {} - - for option, value in overrides.items(): - current = config - for segment in option.split(".")[:-1]: - if not segment in current: - current[segment] = dict() - - current = current[segment] - - assert type(current) == dict - - option = option.split(".")[-1] - if option in current: - current[option] = type(current[option])(value) + for item in run_stages: + if isinstance(item, str): + # Simple string descriptor + definitions.append({"descriptor": item, "config": {}}) + elif isinstance(item, dict): + # Dictionary with single key-value pair (descriptor -> config) + if len(item) != 1: + raise PipelineError(f"Run item dictionary must have exactly one key-value pair: {item}") + descriptor = next(iter(item.keys())) + parameters = item[descriptor] + definitions.append({"descriptor": descriptor, "config": parameters}) else: - current[option] = value - - if working_directory is None: - working_directory = settings["working_directory"] if "working_directory" in settings else None + raise PipelineError(f"Run item must be string or dictionary, got {type(item)}: {item}") - flowchart_path = settings["flowchart_path"] if "flowchart_path" in settings else None - dryrun = settings["dryrun"] if "dryrun" in settings else False - externals = settings["externals"] if "externals" in settings else {} - aliases = settings["aliases"] if "aliases" in settings else {} - - return Synpp(config=config, working_directory=working_directory, definitions=definitions, - flowchart_path=flowchart_path, dryrun=dryrun, externals=externals, aliases=aliases) + # Extract base configuration and apply overrides + base_config = settings.get("config", {}) + + if overrides: + # Work with a deep copy to avoid modifying the original + final_config = copy.deepcopy(base_config) + + for option_path, value in overrides.items(): + if not isinstance(option_path, str) or not option_path: + raise PipelineError(f"Override key must be non-empty string: {option_path}") + + # Split the dot-notation path + path_segments = option_path.split(".") + current_dict = final_config + + # Navigate to the parent of the target key + for segment in path_segments[:-1]: + if segment not in current_dict: + current_dict[segment] = {} + elif not isinstance(current_dict[segment], dict): + raise PipelineError(f"Cannot override {option_path}: '{segment}' is not a dictionary") + current_dict = current_dict[segment] + + # Ensure current location is a dictionary + if not isinstance(current_dict, dict): + raise PipelineError(f"Cannot apply override {option_path}: path does not lead to a dictionary") + + # Apply the override value + final_key = path_segments[-1] + if final_key in current_dict: + # Try to preserve the original type if possible + original_value = current_dict[final_key] + try: + # Attempt type-preserving conversion + current_dict[final_key] = type(original_value)(value) + except (ValueError, TypeError): + # If conversion fails, use the new value as-is + current_dict[final_key] = value + else: + # New key, use value as-is + current_dict[final_key] = value + else: + final_config = copy.deepcopy(base_config) + + # Extract other settings with proper defaults + resolved_working_directory = working_directory if working_directory is not None else settings.get("working_directory") + flowchart_path = settings.get("flowchart_path") + dryrun = settings.get("dryrun", False) + externals = settings.get("externals", {}) + aliases = settings.get("aliases", {}) + + # Create and return Synpp instance + return Synpp( + config=final_config, + working_directory=resolved_working_directory, + definitions=definitions, + flowchart_path=flowchart_path, + dryrun=dryrun, + externals=externals, + aliases=aliases + ) def stage(function=None, *args, **kwargs):