diff --git a/focus_validator/config_objects/focus_to_duckdb_converter.py b/focus_validator/config_objects/focus_to_duckdb_converter.py index ad34c88..231f0c6 100644 --- a/focus_validator/config_objects/focus_to_duckdb_converter.py +++ b/focus_validator/config_objects/focus_to_duckdb_converter.py @@ -470,6 +470,19 @@ def __init__(self, rule, rule_id: str, **kwargs: Any) -> None: ) +class SkippedMissingGeneratorCheck(SkippedCheck): + + def __init__( + self, rule, rule_id: str, check_function: str = "", **kwargs: Any + ) -> None: + super().__init__(rule, rule_id, **kwargs) + self.check_function = check_function + self.errorMessage = ( + f"Rule skipped - missing generator for CheckFunction '{check_function}'. " + "This check type is not yet implemented." + ) + + class ColumnPresentCheckGenerator(DuckDBCheckGenerator): REQUIRED_KEYS = {"ColumnName"} @@ -1473,205 +1486,2289 @@ def generatePredicate(self) -> str | None: return sql_query.get_predicate_sql() -class CheckGreaterOrEqualGenerator(DuckDBCheckGenerator): - REQUIRED_KEYS = {"ColumnName", "Value"} +class CheckGreaterOrEqualGenerator(DuckDBCheckGenerator): + REQUIRED_KEYS = {"ColumnName", "Value"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + val = self.params.Value + keyword = self._get_validation_keyword() + message = ( + self.errorMessage or f"{col} {keyword} be greater than or equal to {val}." + ) + msg_sql = message.replace("'", "''") + + # Requirement SQL (finds violations) + condition = f"{col} IS NOT NULL AND {col} < {val}" + condition = self._apply_condition(condition) + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {condition} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + # Predicate SQL (for condition mode) + predicate_sql = f"{col} IS NOT NULL AND {col} >= {self._lit(val)}" + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql + ) + + def get_sample_sql(self) -> str: + """Return SQL to fetch sample violating rows for display""" + col = self.params.ColumnName + val = self.params.Value + + # Build condition to find violating rows (values less than the required minimum) + condition = f"{col} IS NOT NULL AND {col} < {val}" + condition = self._apply_condition(condition) + + return f""" + SELECT {col} + FROM {{table_name}} + WHERE {condition} + """ + + # Make sample_sql accessible as a property for the infrastructure + @property + def sample_sql(self) -> str: + return self.get_sample_sql() + + def getCheckType(self) -> str: + return "check_greater_equal" + + def generatePredicate(self) -> str | None: + """Backward compatibility wrapper""" + if getattr(self, "exec_mode", "requirement") != "condition": + return None + sql_query = self.generateSql() + return sql_query.get_predicate_sql() + + +class CheckDistinctCountGenerator(DuckDBCheckGenerator): + REQUIRED_KEYS = {"ColumnAName", "ColumnBName", "ExpectedCount"} + + def generateSql(self) -> SQLQuery: + a = self.params.ColumnAName + b = self.params.ColumnBName + n = self.params.ExpectedCount + keyword = self._get_validation_keyword() + + message = ( + self.errorMessage + or f"For each {a}, there {keyword} be exactly {n} distinct {b} values." + ) + msg_sql = message.replace("'", "''") + + # Build WHERE clause for row-level filtering before aggregation + # This applies parent conditions (e.g., "SkuPriceId IS NOT NULL") before GROUP BY + where_clause = "" + if self.row_condition_sql and self.row_condition_sql.strip(): + where_clause = f"WHERE {self.row_condition_sql}" + + # Requirement SQL (finds violations) + # IMPORTANT: Apply row_condition_sql BEFORE GROUP BY to filter groups themselves + requirement_sql = f""" + WITH counts AS ( + SELECT {a} AS grp, COUNT(DISTINCT {b}) AS distinct_count + FROM {{table_name}} + {where_clause} + GROUP BY {a} + ), + invalid AS ( + SELECT grp, distinct_count + FROM counts + WHERE distinct_count <> {n} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + # Note: This is a complex aggregation check that doesn't naturally translate + # to a simple predicate for row-level filtering. Setting predicate_sql to None. + predicate_sql = None + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql + ) + + def getCheckType(self) -> str: + return "distinct_count" + + +class CheckModelRuleGenerator(DuckDBCheckGenerator): + REQUIRED_KEYS = {"ModelRuleId"} + + def getCheckType(self) -> str: + return "model_rule_reference" + + def generateSql(self) -> SQLQuery: + # Won’t be executed; we’ll attach a special executor instead. + self.errorMessage = f"Conformance reference to {self.params.ModelRuleId}" + requirement_sql = "SELECT 0 AS violations" + return SQLQuery(requirement_sql=requirement_sql.strip(), predicate_sql=None) + + def generateCheck(self) -> DuckDBColumnCheck: + # Let the base create the DuckDBColumnCheck (with errorMessage, type, sql) + chk = super().generateCheck() + + target_id = self.params.ModelRuleId + plan = self.plan + # Make a dict {rule_id -> result} out of parent_results_by_idx + plan + id2res: dict[str, dict] = {} + if plan: + for pidx, res in (self.parent_results_by_idx or {}).items(): + rid = getattr(plan.nodes[pidx], "rule_id", None) + if rid: + # normalize shape: we expect {"ok": bool, "details": {...}} + id2res[rid] = res + + def _exec_reference(_conn): + # Try to find the referenced rule’s result among parents + res = id2res.get(target_id) + if res is None: + # Not a direct parent? Try to find in global results registry + converter = None + if ( + hasattr(self, "child_builder") + and callable(self.child_builder) + and hasattr(self.child_builder, "__closure__") + and self.child_builder.__closure__ + ): + # Access the converter instance from the child_builder lambda's closure + for cell in self.child_builder.__closure__: + if hasattr(cell.cell_contents, "_global_results_by_idx"): + converter = cell.cell_contents + break + + if converter and hasattr(converter, "_global_results_by_idx") and plan: + # Look for the target_id in global results by scanning plan nodes + for node_idx, result in converter._global_results_by_idx.items(): + if ( + node_idx < len(plan.nodes) + and plan.nodes[node_idx].rule_id == target_id + ): + res = result + break + + if res is None: + # Still not found? Fall back to a clear failure. + details = { + "violations": 1, + "message": f"Referenced rule '{target_id}' not found upstream", + "referenced_rule_id": target_id, + } + return False, details + + ok = bool(res.get("ok", False)) + det = dict(res.get("details") or {}) + violations = det.get("violations", 0 if ok else 1) + + details = { + "violations": int(violations), + "message": f"Conformance reference to {target_id} ({'OK' if ok else 'FAIL'})", + "referenced_rule_id": target_id, + } + return ok, details + + # Attach the callable to the check so run_check can use it + chk.special_executor = _exec_reference + chk.exec_mode = "reference" + chk.referenced_rule_id = target_id + return chk + + +class JSONCheckPathTypeGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path is of a particular type. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.key" or "$.items[*].field") + ExpectedType: Expected JSON type (e.g., "string", "number", "boolean", "object", "array", "null") + + Uses DuckDB's json_extract and custom type detection to validate JSON path types. + Supports both single values and array paths with [*] syntax. + + For array paths (e.g., $.items[*].type), validates that ALL elements match the expected type. + """ + + REQUIRED_KEYS = {"ColumnName", "Path", "ExpectedType"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + expected_type = self.params.ExpectedType + keyword = self._get_validation_keyword() + + # Normalize expected type to lowercase for comparison + expected_type_lower = expected_type.lower() + + # Map specific numeric type names to generic 'number' for JSON validation + # In JSON, there's no distinction between decimal, integer, float, etc. - they're all numbers + if expected_type_lower in ( + "decimal", + "integer", + "float", + "double", + "bigint", + "int", + "numeric", + ): + expected_type_normalized = "number" + else: + expected_type_normalized = expected_type_lower + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} be of type '{expected_type}'" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Helper function to detect JSON type from a value + # Maps DuckDB's json_type results to JSON type names + def type_check_expr(value_expr: str) -> str: + return f""" + CASE + WHEN {value_expr} IS NULL THEN NULL + WHEN json_type({value_expr}) = 'BOOLEAN' THEN 'boolean' + WHEN json_type({value_expr}) IN ('TINYINT', 'SMALLINT', 'INTEGER', 'BIGINT', 'UTINYINT', 'USMALLINT', 'UINTEGER', 'UBIGINT', 'FLOAT', 'DOUBLE', 'DECIMAL', 'HUGEINT', 'UHUGEINT') THEN 'number' + WHEN json_type({value_expr}) = 'VARCHAR' THEN 'string' + WHEN json_type({value_expr}) = 'NULL' THEN 'null' + WHEN json_type({value_expr}) = 'ARRAY' THEN 'array' + WHEN json_type({value_expr}) IN ('OBJECT', 'JSON') THEN 'object' + ELSE 'unknown' + END + """ + + # Detect if path contains array wildcard [*] + # If it does, we know json_extract will return an ARRAY, so we can use unnest directly + # If not, it returns a single value, so we check the type directly + is_array_path = "[*]" in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + if is_array_path: + # Array path: json_extract returns an ARRAY of values + # We need to check each element in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ( + {extracted_value} IS NULL + OR json_array_length({extracted_value}) = 0 + OR EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE {type_check_expr('elem')} != '{expected_type_normalized}' + ) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE {type_check_expr('elem')} != '{expected_type_normalized}' + ) + """ + else: + # Single value path: json_extract returns a single JSON value + # Check the type directly + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ( + {extracted_value} IS NULL + OR {type_check_expr(extracted_value)} != '{expected_type_normalized}' + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {type_check_expr(extracted_value)} = '{expected_type_normalized}' + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_type" + + +class JSONCheckPathKeyValueFormatGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to KeyValueFormat requirements. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.tags" or "$.items[*].metadata") + + KeyValueFormat requirements: + 1. Must be a valid JSON object (not array, not primitive) + 2. Keys must be unique within the object + 3. Values must be primitive types only (string, number, boolean, null) + 4. Values must NOT be objects or arrays + + For array paths (e.g., $.items[*].tags), validates ALL elements. + """ + + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} conform to KeyValueFormat (JSON object with primitive values only)" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Detect if path contains array wildcard [*] + is_array_path = "[*]" in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + # Function to check if a value is a valid KeyValueFormat object + # Requirements: + # 1. Must be an OBJECT type + # 2. All values must be primitive (not OBJECT or ARRAY) + # Strategy: Use json_keys() to get all keys, then check each value's type + def keyvalue_check(value_expr: str) -> str: + return f"""( + json_type({value_expr}) IN ('OBJECT', 'JSON') + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_keys({value_expr})) AS t(key_name) + WHERE json_type(json_extract({value_expr}, '$.' || key_name)) IN ('OBJECT', 'ARRAY', 'JSON') + ) + )""" + + if is_array_path: + # Array path: json_extract returns an ARRAY of values + # Check each element conforms to KeyValueFormat + # Skip if path doesn't exist (NULL) or returns empty array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keyvalue_check('elem')} + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keyvalue_check('elem')} + ) + """ + else: + # Single value path: check the value directly + # Skip if path doesn't exist (NULL) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND NOT {keyvalue_check(extracted_value)} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {keyvalue_check(extracted_value)} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_keyvalue_format" + + +class JSONCheckPathKeyStartsWithGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path only have keys that start with a specific prefix, + ignoring certain specified keys. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.Elements[*]") + Prefix: Required prefix for keys (e.g., "x_") + IgnoreKeys: List of keys to exclude from validation + + Use case: Enforce naming conventions like custom properties must start with "x_" + while allowing standard FOCUS-defined properties. + + For array paths (e.g., $.Elements[*]), validates ALL elements. + """ + + REQUIRED_KEYS = {"ColumnName", "Path", "Prefix", "IgnoreKeys"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + prefix = self.params.Prefix + ignore_keys = self.params.IgnoreKeys # Should be a list + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' keys {keyword} start with '{prefix}' (except {ignore_keys})" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path and prefix for SQL + path_escaped = path.replace("'", "''") + prefix_escaped = prefix.replace("'", "''") + + # Build SQL array of ignored keys + if ignore_keys and len(ignore_keys) > 0: + ignore_keys_sql = ( + "[" + + ", ".join(f"'{k.replace('\"', '\"\"')}'" for k in ignore_keys) + + "]" + ) + else: + ignore_keys_sql = "[]" + + # Detect if path contains array wildcard [*] + is_array_path = "[*]" in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + # Function to check if all keys (except ignored ones) start with prefix + # Returns a check expression that's true if all non-ignored keys start with prefix + def keys_check(value_expr: str) -> str: + return f"""( + json_type({value_expr}) IN ('OBJECT', 'JSON') + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_keys({value_expr})) AS t(key_name) + WHERE key_name NOT IN (SELECT unnest({ignore_keys_sql})) + AND NOT starts_with(key_name, '{prefix_escaped}') + ) + )""" + + if is_array_path: + # Array path: json_extract returns an ARRAY of values + # Check each element has all keys starting with prefix (except ignored) + # Skip if path doesn't exist (NULL) or returns empty array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keys_check('elem')} + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keys_check('elem')} + ) + """ + else: + # Single value path: check the value directly + # Skip if path doesn't exist (NULL) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND NOT {keys_check(extracted_value)} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {keys_check(extracted_value)} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_key_starts_with" + + +class JSONCheckPathKeyExistsGenerator(DuckDBCheckGenerator): + """ + Check if a specific key exists at a JSON path. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression ending with the key to check (e.g., "$.Elements[*].AllocatedRatio") + + The path should end with the key name to check. For example: + - "$.Elements[*].AllocatedRatio" checks if each element in Elements has AllocatedRatio key + - "$.metadata.version" checks if metadata object has version key + + For array paths (e.g., $.Elements[*].KeyName), validates ALL elements have the key. + """ + + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Extract the key name from the path (last component after final '.') + # e.g., "$.Elements[*].AllocatedRatio" -> "AllocatedRatio" + if "." in path: + key_name = path.rsplit(".", 1)[1] + parent_path = path.rsplit(".", 1)[0] + else: + raise ValueError( + f"Path must contain at least one '.' to specify a key: {path}" + ) + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' key '{key_name}' {keyword} exist" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes for SQL + parent_path_escaped = parent_path.replace("'", "''") + key_name_escaped = key_name.replace("'", "''") + + # Detect if parent path contains array wildcard [*] + is_array_path = "[*]" in parent_path + + extracted_value = ( + f"json_extract(TRY_CAST({col} AS JSON), '{parent_path_escaped}')" + ) + + if is_array_path: + # Array path: check each element has the key + # Use json_keys() to get keys of each object in the array + # Skip rows where: path doesn't exist, array is empty, or not all elements are objects + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND ( + EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) NOT IN ('OBJECT', 'JSON') + ) + OR EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys(elem))) + ) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) NOT IN ('OBJECT', 'JSON') + ) + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys(elem))) + ) + """ + else: + # Single object path: check if the key exists + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_type({extracted_value}) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys({extracted_value}))) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_type({extracted_value}) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' IN (SELECT unnest(json_keys({extracted_value}))) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_key_exists" + + +class JSONCheckPathValueGenerator(DuckDBCheckGenerator): + """ + Check if element(s) at JSON path have a specific value. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.Elements[*].ContractId") + Value: The expected value to check against + + For array paths (e.g., $.Elements[*].field), checks if ALL elements have the value. + For single paths (e.g., $.field), checks if the value matches. + + Commonly used to check for null values or specific constants. + """ + + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + value = self.params.Value + keyword = self._get_validation_keyword() + + # Build error message + message = self.errorMessage or f"{col} at path '{path}' {keyword} equal {value}" + msg_sql = message.replace("'", "''") + + # Escape single quotes for SQL + path_escaped = path.replace("'", "''") + + # Convert Python value to SQL literal for comparison + # For JSON comparisons, we need to cast properly + if value is None: + value_json = "NULL" + elif isinstance(value, bool): + value_json = f"'{str(value).lower()}'" # JSON booleans are lowercase + elif isinstance(value, (int, float)): + value_json = str(value) + else: + # String value - need to quote for JSON comparison + value_escaped = str(value).replace("'", "''") + value_json = f"'\"{value_escaped}\"'" # JSON strings are quoted + + # Detect if path contains array wildcard [*] + is_array_path = "[*]" in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + if is_array_path: + # Array path: check if ALL elements equal the value + # Extract as text for comparison since json_extract returns JSON + if value is None: + # For null checks, check if any element is NOT null (SQL NULL or JSON null string 'null') + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NOT NULL AND elem::TEXT != 'null' + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NOT NULL AND elem::TEXT != 'null' + ) + """ + else: + # For non-null checks, compare as text since JSON extract returns JSON + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT != {value_json} OR elem IS NULL + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT != {value_json} OR elem IS NULL + ) + """ + else: + # Single value path: check if the value matches + if value is None: + # Check if the value is NOT null (either SQL NULL or JSON null string 'null') + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT = 'null') + """ + else: + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != {value_json}) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT = {value_json} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_value" + + +class JSONCheckPathNotValueGenerator(DuckDBCheckGenerator): + """ + JSONCheckPathNotValue check generator. + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} + Validates that element(s) at a JSON path do NOT have a specific value. + """ + + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} + + def generateSql(self) -> SQLQuery: + """Generate SQL for JSON path NOT value check""" + col = self.params.ColumnName + path = self.params.Path + value = self.params.Value + + # Escape path for SQL + path_escaped = path.replace("'", "''") + + # Build error message + msg = f"{col} at path '{path}' MUST NOT equal {value}" + msg_sql = msg.replace("'", "''") + + # Check if this is an array path (contains [*]) + is_array_path = "[*]" in path + + # Format value for SQL comparison + if value is None: + value_json = "NULL" + elif isinstance(value, bool): + value_json = f"'{str(value).lower()}'" # JSON uses lowercase true/false + elif isinstance(value, (int, float)): + value_json = str(value) + else: + # String value - escape for SQL + value_escaped = str(value).replace("'", "''") + # For JSON comparison, strings are quoted + value_json = f"'\"{value_escaped}\"'" + + # Extract the value at the path + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + if is_array_path: + # Array path: check if ANY element equals the value (violation) + if value is None: + # Check if any element IS null (SQL NULL or JSON null string 'null') + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NULL OR elem::TEXT = 'null' + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NULL OR elem::TEXT = 'null' + ) + """ + else: + # For non-null checks, check if any element equals the value + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT = {value_json} + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT = {value_json} + ) + """ + else: + # Single value path: check if the value equals the specified value (violation) + if value is None: + # Check if the value IS null (JSON null string 'null', not missing path) + # Only check rows where the path exists (not SQL NULL) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT = 'null' + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != 'null') + """ + else: + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT = {value_json} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != {value_json}) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_not_value" + + +class JSONCheckPathSameValueGenerator(DuckDBCheckGenerator): + """ + JSONCheckPathSameValue check generator. + REQUIRED_KEYS = {"ColumnAName", "PathA", "ColumnBName", "PathB"} + Validates that element(s) at PathA in ColumnA have the same value as: + - ColumnB at PathB (if PathB is not null) + - ColumnB directly (if PathB is null) + """ + + REQUIRED_KEYS = {"ColumnAName", "PathA", "ColumnBName", "PathB"} + + def generateSql(self) -> SQLQuery: + """Generate SQL for JSON path same value check""" + col_a = self.params.ColumnAName + path_a = self.params.PathA + col_b = self.params.ColumnBName + path_b = self.params.PathB + + # Escape paths for SQL + path_a_escaped = path_a.replace("'", "''") + path_b_escaped = path_b.replace("'", "''") if path_b else None + + # Build error message + if path_b: + msg = f"{col_a} at path '{path_a}' MUST equal {col_b} at path '{path_b}'" + else: + msg = f"{col_a} at path '{path_a}' MUST equal {col_b}" + msg_sql = msg.replace("'", "''") + + # Check if paths are array paths + is_array_path_a = "[*]" in path_a + is_array_path_b = "[*]" in path_b if path_b else False + + # Extract values + extracted_a = f"json_extract(TRY_CAST({col_a} AS JSON), '{path_a_escaped}')" + + if path_b: + extracted_b = f"json_extract(TRY_CAST({col_b} AS JSON), '{path_b_escaped}')" + else: + # Compare to column value directly + extracted_b = col_b + + # Generate SQL based on path types + if is_array_path_a and is_array_path_b: + # Both arrays: check if arrays are equal element by element + # This is complex - we need to compare array lengths and each element + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND ( + json_array_length({extracted_a}) != json_array_length({extracted_b}) + OR EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) WITH ORDINALITY AS a(elem_a, idx) + JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) + ON a.idx = b.idx + WHERE a.elem_a::TEXT != b.elem_b::TEXT + OR (a.elem_a IS NULL AND b.elem_b IS NOT NULL) + OR (a.elem_a IS NOT NULL AND b.elem_b IS NULL) + ) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_a}) = json_array_length({extracted_b}) + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) WITH ORDINALITY AS a(elem_a, idx) + JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) + ON a.idx = b.idx + WHERE a.elem_a::TEXT != b.elem_b::TEXT + OR (a.elem_a IS NULL AND b.elem_b IS NOT NULL) + OR (a.elem_a IS NOT NULL AND b.elem_b IS NULL) + ) + """ + + elif is_array_path_a and not is_array_path_b: + # Array A, single B: check if all elements of A equal B + if path_b: + # B is a JSON path (single value) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != {extracted_b}::TEXT + OR (elem IS NULL AND {extracted_b} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_b} IS NULL) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != {extracted_b}::TEXT + OR (elem IS NULL AND {extracted_b} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_b} IS NULL) + ) + """ + else: + # B is a column value directly + # Note: JSON strings are quoted, so we need to quote col_b for comparison + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != ('"' || {col_b}::TEXT || '"') + OR (elem IS NULL AND {col_b} IS NOT NULL) + OR (elem IS NOT NULL AND {col_b} IS NULL) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != ('"' || {col_b}::TEXT || '"') + OR (elem IS NULL AND {col_b} IS NOT NULL) + OR (elem IS NOT NULL AND {col_b} IS NULL) + ) + """ + + elif not is_array_path_a and is_array_path_b: + # Single A, array B: check if A equals all elements of B + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_b}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_b}) AS t(elem) + WHERE elem::TEXT != {extracted_a}::TEXT + OR (elem IS NULL AND {extracted_a} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_a} IS NULL) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_b}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_b}) AS t(elem) + WHERE elem::TEXT != {extracted_a}::TEXT + OR (elem IS NULL AND {extracted_a} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_a} IS NULL) + ) + """ + + else: + # Both single values: direct comparison + if path_b: + # B is a JSON path + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND ({extracted_a} IS NOT NULL OR {extracted_b} IS NOT NULL) + AND ( + ({extracted_a} IS NULL AND {extracted_b} IS NOT NULL) + OR ({extracted_a} IS NOT NULL AND {extracted_b} IS NULL) + OR {extracted_a}::TEXT != {extracted_b}::TEXT + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND ( + ({extracted_a} IS NULL AND {extracted_b} IS NULL) + OR ({extracted_a} IS NOT NULL AND {extracted_b} IS NOT NULL + AND {extracted_a}::TEXT = {extracted_b}::TEXT) + ) + """ + else: + # B is a column value directly + # Note: JSON strings are quoted, so we need to quote col_b for comparison + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_a}::TEXT != ('"' || {col_b}::TEXT || '"') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_a}::TEXT = ('"' || {col_b}::TEXT || '"') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_same_value" + + +class JSONCheckPathNumericFormatGenerator(DuckDBCheckGenerator): + """ + Validates that element(s) at JSON path meet numeric format requirements. + + Arguments: + - ColumnName: The column containing JSON data + - Path: JSONPath expression to extract element(s) + + Validation: + - Checks that extracted values match numeric pattern: ^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$ + - Supports both array paths (with [*]) and single value paths + - Only validates non-null JSON values + """ + + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + message = ( + self.errorMessage + or f"{col} at path {path} {keyword} contain numeric values (optional +/- sign, optional decimal, optional scientific notation)." + ) + msg_sql = message.replace("'", "''") + + # Determine if this is an array path or single value path + is_array_path = "[*]" in path + + # Numeric pattern: supports 123, -123, 1.23, -1.23, 1.23e10, 1.23e-10, 1.23E+10, etc. + numeric_pattern = r"^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$" + + if is_array_path: + # Array path: Check if ANY element violates the numeric format + # Only check elements that exist and are not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: All elements must be valid numeric format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + """ + else: + # Single value path: Check if the value violates the numeric format + # Only check if the value exists and is not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_extract({col}::TEXT, '{path}') IS NOT NULL + AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' + AND NOT (TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"') ~ '{numeric_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: Value must be valid numeric format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR json_extract({col}::TEXT, '{path}') IS NULL + OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' + OR (TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"') ~ '{numeric_pattern}') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_numeric_format" + + +class JSONCheckPathUnitFormatGenerator(DuckDBCheckGenerator): + """ + Validates that element(s) at JSON path meet unit format requirements. + + Arguments: + - ColumnName: The column containing JSON data + - Path: JSONPath expression to extract element(s) + + Validation: + - Checks that extracted values match FOCUS unit format patterns + - Supports both array paths (with [*]) and single value paths + - Only validates non-null JSON values + """ + + REQUIRED_KEYS = {"ColumnName", "Path"} + + def _generate_unit_format_regex(self) -> str: + """ + Generate the complete regex pattern for FOCUS Unit Format validation. + This is identical to FormatUnitGenerator's method. + """ + # Data Size Unit Names (both decimal and binary) + data_size_units = [ + # Bits (decimal) + "b", + "Kb", + "Mb", + "Gb", + "Tb", + "Pb", + "Eb", + # Bytes (decimal) + "B", + "KB", + "MB", + "GB", + "TB", + "PB", + "EB", + # Bits (binary) + "Kib", + "Mib", + "Gib", + "Tib", + "Pib", + "Eib", + # Bytes (binary) + "KiB", + "MiB", + "GiB", + "TiB", + "PiB", + "EiB", + ] + + # Time-based Unit Names + time_units_singular = ["Year", "Month", "Day", "Hour", "Minute", "Second"] + time_units_plural = ["Years", "Months", "Days", "Hours", "Minutes", "Seconds"] + + # Build regex patterns + patterns = [] + data_size_pattern = "|".join(data_size_units) + time_singular_pattern = "|".join(time_units_singular) + time_plural_pattern = "|".join(time_units_plural) + count_unit_pattern = r"[A-Za-z][A-Za-z0-9]*(?:\s+[A-Za-z][A-Za-z0-9]*)*" + + # Pattern 1: Standalone units + patterns.append( + f"^({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 2: - + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern})-({time_plural_pattern})$" + ) + # Pattern 3: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/({time_singular_pattern})$" + ) + # Pattern 4: + patterns.append( + f"^[0-9]+ ({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 5: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/[0-9]+ ({time_plural_pattern})$" + ) + + # Combine all patterns with OR + return "|".join(f"({pattern})" for pattern in patterns) + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + message = ( + self.errorMessage + or f"{col} at path {path} {keyword} follow the FOCUS Unit Format specification." + ) + msg_sql = message.replace("'", "''") + + # Get the combined regex pattern + combined_pattern = self._generate_unit_format_regex() + + # Determine if this is an array path or single value path + is_array_path = "[*]" in path + + if is_array_path: + # Array path: Check if ANY element violates the unit format + # Only check elements that exist and are not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: All elements must match unit format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + """ + else: + # Single value path: Check if the value violates the unit format + # Only check if the value exists and is not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_extract({col}::TEXT, '{path}') IS NOT NULL + AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' + AND NOT regexp_matches(TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"'), '{combined_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: Value must match unit format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR json_extract({col}::TEXT, '{path}') IS NULL + OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' + OR regexp_matches(TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"'), '{combined_pattern}') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_unit_format" + + +class JSONCheckPathDistinctParentGenerator(DuckDBCheckGenerator): + """ + Validates that distinct count of child elements equals expected count. + + Arguments: + - ColumnName: The column containing JSON data + - ParentPath: JSONPath to parent elements (typically with [*]) + - ChildPath: JSONPath relative to parent to extract child values + - ExpectedCount: Expected number of distinct child values + + Validation: + - Extracts parent elements using ParentPath + - For each parent, extracts child value using ChildPath + - Counts distinct non-null child values + - Checks if distinct count equals ExpectedCount + """ + + REQUIRED_KEYS = {"ColumnName", "ParentPath", "ChildPath", "ExpectedCount"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + parent_path = self.params.ParentPath + child_path = self.params.ChildPath + expected_count = self.params.ExpectedCount + keyword = self._get_validation_keyword() + message = ( + self.errorMessage + or f"{col} at parent path {parent_path} {keyword} have exactly {expected_count} distinct values for child path {child_path}." + ) + msg_sql = message.replace("'", "''") + + # Build the validation query + # For each row, extract parent elements, then extract child values, count distinct + # Child path format: if it starts with $, use it directly; otherwise prepend $. to make it a path + if child_path.startswith("$."): + full_child_path = child_path + elif child_path.startswith("$"): + full_child_path = child_path + else: + # Assume it's a simple key name, make it a path + full_child_path = f"$.{child_path}" + + requirement_sql = f""" + WITH row_distinct_counts AS ( + SELECT + {col}, + ( + SELECT COUNT(DISTINCT child_value) + FROM ( + SELECT json_extract(parent_elem::TEXT, '{full_child_path}') AS child_value + FROM unnest(json_extract({col}::TEXT, '{parent_path}')) AS t(parent_elem) + WHERE parent_elem IS NOT NULL + AND parent_elem::TEXT != 'null' + AND json_extract(parent_elem::TEXT, '{full_child_path}') IS NOT NULL + AND json_extract(parent_elem::TEXT, '{full_child_path}')::TEXT != 'null' + ) child_values + ) AS distinct_count + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{parent_path}')) > 0 + ), + violations AS ( + SELECT {col} + FROM row_distinct_counts + WHERE distinct_count != {expected_count} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: distinct count must equal expected count + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR ( + SELECT COUNT(DISTINCT child_value) + FROM ( + SELECT json_extract(parent_elem::TEXT, '{full_child_path}') AS child_value + FROM unnest(json_extract({col}::TEXT, '{parent_path}')) AS t(parent_elem) + WHERE parent_elem IS NOT NULL + AND parent_elem::TEXT != 'null' + AND json_extract(parent_elem::TEXT, '{full_child_path}') IS NOT NULL + AND json_extract(parent_elem::TEXT, '{full_child_path}')::TEXT != 'null' + ) child_values + ) = {expected_count} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_distinct_parent" + + +class FormatJSONFormatGenerator(DuckDBCheckGenerator): + """ + Validates that element(s) at JSON path meet JSON format requirements. + + Arguments: + - ColumnName: The column containing JSON data + - Path: JSONPath expression to extract element(s) (optional - if not provided, validates entire column) + + Validation: + - Checks that extracted values are valid JSON + - Supports both array paths (with [*]) and single value paths + - If Path is not provided, validates the entire column value as valid JSON + - Only validates non-null JSON values + - Validates that the extracted element is itself valid JSON + """ + + REQUIRED_KEYS = {"ColumnName"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = getattr(self.params, "Path", None) + keyword = self._get_validation_keyword() + + # If no path provided, validate the entire column + if not path: + message = self.errorMessage or f"{col} {keyword} contain valid JSON format." + msg_sql = message.replace("'", "''") + + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND NOT json_valid({col}::TEXT) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + predicate_sql = f""" + {col} IS NULL + OR json_valid({col}::TEXT) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip(), + ) + + # Path provided - validate elements at that path + message = ( + self.errorMessage + or f"{col} at path {path} {keyword} contain valid JSON format." + ) + msg_sql = message.replace("'", "''") + + # Determine if this is an array path or single value path + is_array_path = "[*]" in path + + if is_array_path: + # Array path: Check if ANY element is not valid JSON + # Only check elements that exist and are not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT json_valid(elem::TEXT) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: All elements must be valid JSON + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT json_valid(elem::TEXT) + ) + """ + else: + # Single value path: Check if the value is not valid JSON + # Only check if the value exists and is not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_extract({col}::TEXT, '{path}') IS NOT NULL + AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' + AND NOT json_valid(json_extract({col}::TEXT, '{path}')::TEXT) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: Value must be valid JSON + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR json_extract({col}::TEXT, '{path}') IS NULL + OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' + OR json_valid(json_extract({col}::TEXT, '{path}')::TEXT) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "format_json_format" + + +class JSONFormatStringGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to StringHandling requirements (ASCII characters only). + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.name" or "$.items[*].label") + + StringHandling requirements: + - Strings must contain only ASCII characters ([\x00-\x7f]) + + For array paths (e.g., $.items[*].label), validates ALL string elements. + """ + + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} contain only ASCII characters" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Determine if this is an array path (contains [*]) + is_array_path = "[*]" in path + + if is_array_path: + # Array path: Check all elements in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + ) + """ + else: + # Single value path + extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND NOT (TRIM({extracted_value}::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND (TRIM({extracted_value}::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_format_string" + + +class JSONFormatUnitGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to FOCUS Unit Format requirements. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.unit" or "$.items[*].unit") + + Unit Format requirements: + - Must match one of 5 FOCUS unit format patterns: + 1. Standalone units (e.g., "GB", "Hours", "Requests") + 2. unit-time (e.g., "GB-Hours") + 3. unit/time (e.g., "GB/Hour", "Requests/Second") + 4. quantity units (e.g., "1000 Requests") + 5. units/interval (e.g., "Requests/3 Months") + + For array paths (e.g., $.items[*].unit), validates ALL elements. + """ + + REQUIRED_KEYS = {"ColumnName", "Path"} + + def _generate_unit_format_regex(self) -> str: + """ + Generate the complete regex pattern for FOCUS Unit Format validation. + Identical to FormatUnitGenerator and JSONCheckPathUnitFormatGenerator. + """ + # Data Size Unit Names (both decimal and binary) + data_size_units = [ + # Bits (decimal) + "b", + "Kb", + "Mb", + "Gb", + "Tb", + "Pb", + "Eb", + # Bytes (decimal) + "B", + "KB", + "MB", + "GB", + "TB", + "PB", + "EB", + # Bits (binary) + "Kib", + "Mib", + "Gib", + "Tib", + "Pib", + "Eib", + # Bytes (binary) + "KiB", + "MiB", + "GiB", + "TiB", + "PiB", + "EiB", + ] + + # Time-based Unit Names + time_units_singular = ["Year", "Month", "Day", "Hour", "Minute", "Second"] + time_units_plural = ["Years", "Months", "Days", "Hours", "Minutes", "Seconds"] + + # Build regex patterns + patterns = [] + data_size_pattern = "|".join(data_size_units) + time_singular_pattern = "|".join(time_units_singular) + time_plural_pattern = "|".join(time_units_plural) + count_unit_pattern = r"[A-Za-z][A-Za-z0-9]*(?:\s+[A-Za-z][A-Za-z0-9]*)*" + + # Pattern 1: Standalone units + patterns.append( + f"^({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 2: - + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern})-({time_plural_pattern})$" + ) + # Pattern 3: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/({time_singular_pattern})$" + ) + # Pattern 4: + patterns.append( + f"^[0-9]+ ({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 5: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/[0-9]+ ({time_plural_pattern})$" + ) + + # Combine all patterns with OR + return "|".join(f"({pattern})" for pattern in patterns) def generateSql(self) -> SQLQuery: col = self.params.ColumnName - val = self.params.Value + path = self.params.Path keyword = self._get_validation_keyword() + + # Build error message message = ( - self.errorMessage or f"{col} {keyword} be greater than or equal to {val}." + self.errorMessage + or f"{col} at path '{path}' {keyword} follow the FOCUS Unit Format specification" ) msg_sql = message.replace("'", "''") - # Requirement SQL (finds violations) - condition = f"{col} IS NOT NULL AND {col} < {val}" - condition = self._apply_condition(condition) + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") - requirement_sql = f""" - WITH invalid AS ( - SELECT 1 - FROM {{table_name}} - WHERE {condition} - ) - SELECT - COUNT(*) AS violations, - CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message - FROM invalid - """ + # Get the combined unit format regex pattern + combined_pattern = self._generate_unit_format_regex() - # Predicate SQL (for condition mode) - predicate_sql = f"{col} IS NOT NULL AND {col} >= {self._lit(val)}" + # Determine if this is an array path (contains [*]) + is_array_path = "[*]" in path + + if is_array_path: + # Array path: Check all elements in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + """ + else: + # Single value path + extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND NOT regexp_matches(TRIM({extracted_value}::TEXT, '"'), '{combined_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND regexp_matches(TRIM({extracted_value}::TEXT, '"'), '{combined_pattern}') + """ return SQLQuery( - requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) - def get_sample_sql(self) -> str: - """Return SQL to fetch sample violating rows for display""" - col = self.params.ColumnName - val = self.params.Value - - # Build condition to find violating rows (values less than the required minimum) - condition = f"{col} IS NOT NULL AND {col} < {val}" - condition = self._apply_condition(condition) + def getCheckType(self) -> str: + return "json_format_unit" - return f""" - SELECT {col} - FROM {{table_name}} - WHERE {condition} - """ - # Make sample_sql accessible as a property for the infrastructure - @property - def sample_sql(self) -> str: - return self.get_sample_sql() +class JSONFormatNumericGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to Numeric Format requirements. - def getCheckType(self) -> str: - return "check_greater_equal" + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.value" or "$.items[*].price") - def generatePredicate(self) -> str | None: - """Backward compatibility wrapper""" - if getattr(self, "exec_mode", "requirement") != "condition": - return None - sql_query = self.generateSql() - return sql_query.get_predicate_sql() + Numeric Format requirements: + - Optional leading sign (+/-) + - Optional decimal point with digits + - Required digits + - Optional scientific notation (e.g., 1.23e10, 1.23E-5) + - Pattern: ^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$ + For array paths (e.g., $.items[*].price), validates ALL elements. + """ -class CheckDistinctCountGenerator(DuckDBCheckGenerator): - REQUIRED_KEYS = {"ColumnAName", "ColumnBName", "ExpectedCount"} + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: - a = self.params.ColumnAName - b = self.params.ColumnBName - n = self.params.ExpectedCount + col = self.params.ColumnName + path = self.params.Path keyword = self._get_validation_keyword() + # Build error message message = ( self.errorMessage - or f"For each {a}, there {keyword} be exactly {n} distinct {b} values." + or f"{col} at path '{path}' {keyword} contain numeric values" ) msg_sql = message.replace("'", "''") - # Build WHERE clause for row-level filtering before aggregation - # This applies parent conditions (e.g., "SkuPriceId IS NOT NULL") before GROUP BY - where_clause = "" - if self.row_condition_sql and self.row_condition_sql.strip(): - where_clause = f"WHERE {self.row_condition_sql}" + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Numeric pattern: supports 123, -123, 1.23, -1.23, 1.23e10, 1.23e-10, 1.23E+10, etc. + numeric_pattern = r"^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$" + + # Determine if this is an array path (contains [*]) + is_array_path = "[*]" in path + + if is_array_path: + # Array path: Check all elements in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ - # Requirement SQL (finds violations) - # IMPORTANT: Apply row_condition_sql BEFORE GROUP BY to filter groups themselves - requirement_sql = f""" - WITH counts AS ( - SELECT {a} AS grp, COUNT(DISTINCT {b}) AS distinct_count - FROM {{table_name}} - {where_clause} - GROUP BY {a} - ), - invalid AS ( - SELECT grp, distinct_count - FROM counts - WHERE distinct_count <> {n} - ) - SELECT - COUNT(*) AS violations, - CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message - FROM invalid - """ + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + """ + else: + # Single value path + extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND NOT (TRIM({extracted_value}::TEXT, '"') ~ '{numeric_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ - # Note: This is a complex aggregation check that doesn't naturally translate - # to a simple predicate for row-level filtering. Setting predicate_sql to None. - predicate_sql = None + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND (TRIM({extracted_value}::TEXT, '"') ~ '{numeric_pattern}') + """ return SQLQuery( - requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: - return "distinct_count" - - -class CheckModelRuleGenerator(DuckDBCheckGenerator): - REQUIRED_KEYS = {"ModelRuleId"} - - def getCheckType(self) -> str: - return "model_rule_reference" - - def generateSql(self) -> SQLQuery: - # Won’t be executed; we’ll attach a special executor instead. - self.errorMessage = f"Conformance reference to {self.params.ModelRuleId}" - requirement_sql = "SELECT 0 AS violations" - return SQLQuery(requirement_sql=requirement_sql.strip(), predicate_sql=None) - - def generateCheck(self) -> DuckDBColumnCheck: - # Let the base create the DuckDBColumnCheck (with errorMessage, type, sql) - chk = super().generateCheck() - - target_id = self.params.ModelRuleId - plan = self.plan - # Make a dict {rule_id -> result} out of parent_results_by_idx + plan - id2res: dict[str, dict] = {} - if plan: - for pidx, res in (self.parent_results_by_idx or {}).items(): - rid = getattr(plan.nodes[pidx], "rule_id", None) - if rid: - # normalize shape: we expect {"ok": bool, "details": {...}} - id2res[rid] = res - - def _exec_reference(_conn): - # Try to find the referenced rule’s result among parents - res = id2res.get(target_id) - if res is None: - # Not a direct parent? Try to find in global results registry - converter = None - if ( - hasattr(self, "child_builder") - and callable(self.child_builder) - and hasattr(self.child_builder, "__closure__") - and self.child_builder.__closure__ - ): - # Access the converter instance from the child_builder lambda's closure - for cell in self.child_builder.__closure__: - if hasattr(cell.cell_contents, "_global_results_by_idx"): - converter = cell.cell_contents - break - - if converter and hasattr(converter, "_global_results_by_idx") and plan: - # Look for the target_id in global results by scanning plan nodes - for node_idx, result in converter._global_results_by_idx.items(): - if ( - node_idx < len(plan.nodes) - and plan.nodes[node_idx].rule_id == target_id - ): - res = result - break - - if res is None: - # Still not found? Fall back to a clear failure. - details = { - "violations": 1, - "message": f"Referenced rule '{target_id}' not found upstream", - "referenced_rule_id": target_id, - } - return False, details - - ok = bool(res.get("ok", False)) - det = dict(res.get("details") or {}) - violations = det.get("violations", 0 if ok else 1) - - details = { - "violations": int(violations), - "message": f"Conformance reference to {target_id} ({'OK' if ok else 'FAIL'})", - "referenced_rule_id": target_id, - } - return ok, details - - # Attach the callable to the check so run_check can use it - chk.special_executor = _exec_reference - chk.exec_mode = "reference" - chk.referenced_rule_id = target_id - return chk + return "json_format_numeric" class CompositeBaseRuleGenerator(DuckDBCheckGenerator): @@ -2327,6 +4424,62 @@ class FocusToDuckDBSchemaConverter: "generator": ColumnByColumnEqualsColumnValueGenerator, "factory": lambda args: "ColumnAName", }, + "JSONCheckPathType": { + "generator": JSONCheckPathTypeGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathKeyValueFormat": { + "generator": JSONCheckPathKeyValueFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathKeyStartsWith": { + "generator": JSONCheckPathKeyStartsWithGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathKeyExists": { + "generator": JSONCheckPathKeyExistsGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathValue": { + "generator": JSONCheckPathValueGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathNotValue": { + "generator": JSONCheckPathNotValueGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathSameValue": { + "generator": JSONCheckPathSameValueGenerator, + "factory": lambda args: "ColumnAName", + }, + "JSONCheckPathNumericFormat": { + "generator": JSONCheckPathNumericFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathUnitFormat": { + "generator": JSONCheckPathUnitFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathDistinctParent": { + "generator": JSONCheckPathDistinctParentGenerator, + "factory": lambda args: "ColumnName", + }, + "FormatJSONFormat": { + "generator": FormatJSONFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONFormatString": { + "generator": JSONFormatStringGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONFormatUnit": { + "generator": JSONFormatUnitGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONFormatNumeric": { + "generator": JSONFormatNumericGenerator, + "factory": lambda args: "ColumnName", + }, } # Version-specific overrides: each version only defines what changes from previous versions @@ -2459,6 +4612,12 @@ def __init__( # Build the effective CHECK_GENERATORS mapping for this version self.CHECK_GENERATORS = self._build_check_generators_for_version(rules_version) + # Track missing generators for reporting + self.missing_generators: Set[str] = set() + self.missing_generator_rules: List[Tuple[str, str]] = ( + [] + ) # (rule_id, check_function) + # Example caches (optional) self._prepared: Dict[str, Any] = {} self._views: Dict[str, str] = {} # rule_id -> temp view name @@ -2607,7 +4766,38 @@ def prepare(self, *, conn: duckdb.DuckDBPyConnection, plan: ValidationPlan) -> N def finalize( self, *, success: bool, results_by_idx: Dict[int, Dict[str, Any]] ) -> None: - """Optional cleanup: drop temps, emit summaries, etc.""" + """Optional cleanup: drop temps, emit summaries, report missing generators, etc.""" + # Report missing generators if any were encountered + if self.missing_generators: + # Build the affected rules list + rules_list = "\n".join( + f" - {rule_id}: {check_fn}" + for rule_id, check_fn in self.missing_generator_rules[:10] + ) + if len(self.missing_generator_rules) > 10: + rules_list += ( + f"\n ... and {len(self.missing_generator_rules) - 10} more" + ) + + missing_gens = ", ".join(sorted(self.missing_generators)) + + log.warning( + "\n" + + "=" * 80 + + "\n" + + "VALIDATION INCOMPLETE: Missing Check Generators\n" + + "=" * 80 + + "\n" + + "The following check functions are not implemented:\n" + + " %s\n\n" + + "Affected rules (%d total):\n%s\n" + + "\nThese rules have been marked as SKIPPED in the report.\n" + + "=" * 80, + missing_gens, + len(self.missing_generator_rules), + rules_list, + ) + # e.g., self.conn.execute("DROP VIEW IF EXISTS ...") # Close DuckDB connection to prevent hanging in CI environments if hasattr(self, "conn") and self.conn is not None: @@ -2719,8 +4909,20 @@ def _extract_missing_columns(err_msg: str) -> list[str]: if ( isinstance(check, SkippedCheck) or getattr(check, "checkType", "") == "skipped_check" + or getattr(check, "check_type", "") == "skipped_check" ): - ok, details = check.run(self.conn) + # For SkippedCheck generators, call their run() method + if isinstance(check, SkippedCheck): + ok, details = check.run(self.conn) + else: + # For DuckDBColumnCheck objects marked as skipped_check + ok = True + details = { + "skipped": True, + "reason": getattr(check, "errorMessage", None) or "Rule skipped", + "violations": 0, + } + details.setdefault("violations", 0) details.setdefault( "message", @@ -2799,8 +5001,8 @@ def _extract_missing_columns(err_msg: str) -> list[str]: # Fallback to generic child identifier unique_child_id = f"child#{i + 1}" - # Put rule_id AFTER the spread to ensure it overrides any existing rule_id - child_detail_entry = {**det_i, "rule_id": unique_child_id} + # Put rule_id AND ok AFTER the spread to ensure they override any existing values + child_detail_entry = {**det_i, "rule_id": unique_child_id, "ok": ok_i} child_details.append(child_detail_entry) # Aggregate the children results normally @@ -2873,9 +5075,25 @@ def _extract_missing_columns(err_msg: str) -> list[str]: return ok, details # ---- leaf SQL execution ------------------------------------------------ sql = getattr(check, "checkSql", None) - if not sql: + if not sql or sql == "None": + # Check if this should have been caught as a skipped check + check_type = getattr(check, "checkType", None) or getattr( + check, "check_type", None + ) + rule_id = getattr(check, "rule_id", None) + error_msg = getattr(check, "errorMessage", None) + + # If it looks like a skipped check but wasn't caught, handle it gracefully + if check_type == "skipped_check" or "skipped" in str(error_msg).lower(): + return True, { + "skipped": True, + "reason": error_msg or "Rule skipped", + "violations": 0, + "message": error_msg or f"{rule_id}: skipped", + } + raise InvalidRuleException( - f"Leaf check has no SQL to execute (rule_id={getattr(check, 'rule_id', None)})" + f"Leaf check has no SQL to execute (rule_id={rule_id}, check_type={check_type})" ) # Handle SQLQuery objects with transpilation support @@ -3057,15 +5275,30 @@ def __make_generator__( reg = self.CHECK_GENERATORS.get(check_fn) if not reg or "generator" not in reg: - raise InvalidRuleException( - textwrap.dedent( - f""" - Rule {rule_id} @ {breadcrumb}: No generator registered for CheckFunction='{check_fn}'. - Available generators: {sorted(self.CHECK_GENERATORS.keys())} - Requirement: - {_compact_json(requirement)} - """ - ).strip() + # Log warning and track missing generator instead of raising exception + self.missing_generators.add(check_fn) + self.missing_generator_rules.append((rule_id, check_fn)) + + log.warning( + "Missing generator for CheckFunction '%s' in rule '%s'. " + "Rule will be skipped. Available generators: %s", + check_fn, + rule_id, + sorted(self.CHECK_GENERATORS.keys()), + ) + + # Return a skipped check generator that will be marked appropriately + return SkippedMissingGeneratorCheck( + rule=rule, + rule_id=rule_id, + check_function=check_fn, + compile_condition=None, + child_builder=None, + breadcrumb=breadcrumb, + parent_results_by_idx=parent_results_by_idx or {}, + parent_edges=parent_edges or (), + plan=getattr(self, "plan", None), + row_condition_sql=row_condition_sql, ) gen_cls = reg["generator"] @@ -3152,6 +5385,7 @@ def __make_generator__( breadcrumb=child_bc, parent_results_by_idx=parent_results_by_idx or {}, parent_edges=child_parent_edges, + inherited_condition=row_condition_sql if is_composite else None, ), breadcrumb=breadcrumb, **params, @@ -3165,11 +5399,15 @@ def __generate_duckdb_check__( breadcrumb: str, parent_results_by_idx, parent_edges, + inherited_condition: Optional[str] = None, ) -> Union["DuckDBColumnCheck", SkippedCheck]: """ Build a DuckDBColumnCheck for this requirement. For composites (AND/OR), the Composite* generators will recursively call back here to build child checks and set `nestedChecks` + `nestedCheckHandler`. + + Args: + inherited_condition: Condition inherited from parent composite (for inline children) """ if not isinstance(requirement, dict): raise InvalidRuleException( @@ -3179,6 +5417,13 @@ def __generate_duckdb_check__( # Build effective condition from parent_edges AND from downstream composite consumers eff_cond = self._build_effective_condition(rule, parent_edges) + # If this is an inline child of a composite, inherit the composite's effective condition + if inherited_condition: + if eff_cond: + eff_cond = f"({eff_cond}) AND ({inherited_condition})" + else: + eff_cond = inherited_condition + # ENHANCEMENT: Also check if this rule is referenced by composite rules with conditions # This handles the case where a rule like PricingQuantity-C-008-M is referenced by # a composite like PricingQuantity-C-007-C that has a condition. diff --git a/focus_validator/outputter/outputter_web.py b/focus_validator/outputter/outputter_web.py index ed50ad1..d775e71 100644 --- a/focus_validator/outputter/outputter_web.py +++ b/focus_validator/outputter/outputter_web.py @@ -291,6 +291,7 @@ def _create_entity_view(self, base_data: Dict[str, Any]) -> Dict[str, Any]: dataset_entities = [] column_entities = [] attribute_entities = [] + object_entities = [] for entity_data in entities_data.values(): entity_type = entity_data["entityType"] @@ -298,6 +299,8 @@ def _create_entity_view(self, base_data: Dict[str, Any]) -> Dict[str, Any]: dataset_entities.append(entity_data) elif entity_type == "Column": column_entities.append(entity_data) + elif entity_type == "Object": + object_entities.append(entity_data) else: attribute_entities.append(entity_data) @@ -313,6 +316,11 @@ def _create_entity_view(self, base_data: Dict[str, Any]) -> Dict[str, Any]: "entities": column_entities, "expanded": True, }, + "objects": { + "name": "Objects", + "entities": object_entities, + "expanded": True, + }, "attributes": { "name": "Attributes", "entities": attribute_entities, @@ -667,7 +675,7 @@ def _get_rule_function(self, rule_obj) -> str: return "Unknown" def _get_rule_entity_type(self, rule_obj) -> str: - """Extract entity type from rule object (Column, Dataset, Attribute)""" + """Extract entity type from rule object (Column, Dataset, Object, Attribute)""" if rule_obj and hasattr(rule_obj, "entity_type"): return rule_obj.entity_type return "Unknown"