From 1c496b90c849efac1b8e5c1be40d39958309a57b Mon Sep 17 00:00:00 2001 From: Mike Fuller Date: Thu, 19 Feb 2026 19:19:32 +1100 Subject: [PATCH 1/2] Updates to Validator to process FOCUS 1.3 model. New check functions and entity type Object. Signed-off-by: Mike Fuller --- .../focus_to_duckdb_converter.py | 2523 ++++++++++++++++- focus_validator/outputter/outputter_web.py | 10 +- 2 files changed, 2406 insertions(+), 127 deletions(-) diff --git a/focus_validator/config_objects/focus_to_duckdb_converter.py b/focus_validator/config_objects/focus_to_duckdb_converter.py index ad34c88..a3c6f5c 100644 --- a/focus_validator/config_objects/focus_to_duckdb_converter.py +++ b/focus_validator/config_objects/focus_to_duckdb_converter.py @@ -470,6 +470,17 @@ def __init__(self, rule, rule_id: str, **kwargs: Any) -> None: ) +class SkippedMissingGeneratorCheck(SkippedCheck): + + def __init__(self, rule, rule_id: str, check_function: str = "", **kwargs: Any) -> None: + super().__init__(rule, rule_id, **kwargs) + self.check_function = check_function + self.errorMessage = ( + f"Rule skipped - missing generator for CheckFunction '{check_function}'. " + "This check type is not yet implemented." + ) + + class ColumnPresentCheckGenerator(DuckDBCheckGenerator): REQUIRED_KEYS = {"ColumnName"} @@ -1539,139 +1550,2260 @@ def generatePredicate(self) -> str | None: return sql_query.get_predicate_sql() -class CheckDistinctCountGenerator(DuckDBCheckGenerator): - REQUIRED_KEYS = {"ColumnAName", "ColumnBName", "ExpectedCount"} +class CheckDistinctCountGenerator(DuckDBCheckGenerator): + REQUIRED_KEYS = {"ColumnAName", "ColumnBName", "ExpectedCount"} + + def generateSql(self) -> SQLQuery: + a = self.params.ColumnAName + b = self.params.ColumnBName + n = self.params.ExpectedCount + keyword = self._get_validation_keyword() + + message = ( + self.errorMessage + or f"For each {a}, there {keyword} be exactly {n} distinct {b} values." + ) + msg_sql = message.replace("'", "''") + + # Build WHERE clause for row-level filtering before aggregation + # This applies parent conditions (e.g., "SkuPriceId IS NOT NULL") before GROUP BY + where_clause = "" + if self.row_condition_sql and self.row_condition_sql.strip(): + where_clause = f"WHERE {self.row_condition_sql}" + + # Requirement SQL (finds violations) + # IMPORTANT: Apply row_condition_sql BEFORE GROUP BY to filter groups themselves + requirement_sql = f""" + WITH counts AS ( + SELECT {a} AS grp, COUNT(DISTINCT {b}) AS distinct_count + FROM {{table_name}} + {where_clause} + GROUP BY {a} + ), + invalid AS ( + SELECT grp, distinct_count + FROM counts + WHERE distinct_count <> {n} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + # Note: This is a complex aggregation check that doesn't naturally translate + # to a simple predicate for row-level filtering. Setting predicate_sql to None. + predicate_sql = None + + return SQLQuery( + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql + ) + + def getCheckType(self) -> str: + return "distinct_count" + + +class CheckModelRuleGenerator(DuckDBCheckGenerator): + REQUIRED_KEYS = {"ModelRuleId"} + + def getCheckType(self) -> str: + return "model_rule_reference" + + def generateSql(self) -> SQLQuery: + # Won’t be executed; we’ll attach a special executor instead. + self.errorMessage = f"Conformance reference to {self.params.ModelRuleId}" + requirement_sql = "SELECT 0 AS violations" + return SQLQuery(requirement_sql=requirement_sql.strip(), predicate_sql=None) + + def generateCheck(self) -> DuckDBColumnCheck: + # Let the base create the DuckDBColumnCheck (with errorMessage, type, sql) + chk = super().generateCheck() + + target_id = self.params.ModelRuleId + plan = self.plan + # Make a dict {rule_id -> result} out of parent_results_by_idx + plan + id2res: dict[str, dict] = {} + if plan: + for pidx, res in (self.parent_results_by_idx or {}).items(): + rid = getattr(plan.nodes[pidx], "rule_id", None) + if rid: + # normalize shape: we expect {"ok": bool, "details": {...}} + id2res[rid] = res + + def _exec_reference(_conn): + # Try to find the referenced rule’s result among parents + res = id2res.get(target_id) + if res is None: + # Not a direct parent? Try to find in global results registry + converter = None + if ( + hasattr(self, "child_builder") + and callable(self.child_builder) + and hasattr(self.child_builder, "__closure__") + and self.child_builder.__closure__ + ): + # Access the converter instance from the child_builder lambda's closure + for cell in self.child_builder.__closure__: + if hasattr(cell.cell_contents, "_global_results_by_idx"): + converter = cell.cell_contents + break + + if converter and hasattr(converter, "_global_results_by_idx") and plan: + # Look for the target_id in global results by scanning plan nodes + for node_idx, result in converter._global_results_by_idx.items(): + if ( + node_idx < len(plan.nodes) + and plan.nodes[node_idx].rule_id == target_id + ): + res = result + break + + if res is None: + # Still not found? Fall back to a clear failure. + details = { + "violations": 1, + "message": f"Referenced rule '{target_id}' not found upstream", + "referenced_rule_id": target_id, + } + return False, details + + ok = bool(res.get("ok", False)) + det = dict(res.get("details") or {}) + violations = det.get("violations", 0 if ok else 1) + + details = { + "violations": int(violations), + "message": f"Conformance reference to {target_id} ({'OK' if ok else 'FAIL'})", + "referenced_rule_id": target_id, + } + return ok, details + + # Attach the callable to the check so run_check can use it + chk.special_executor = _exec_reference + chk.exec_mode = "reference" + chk.referenced_rule_id = target_id + return chk + + +class JSONCheckPathTypeGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path is of a particular type. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.key" or "$.items[*].field") + ExpectedType: Expected JSON type (e.g., "string", "number", "boolean", "object", "array", "null") + + Uses DuckDB's json_extract and custom type detection to validate JSON path types. + Supports both single values and array paths with [*] syntax. + + For array paths (e.g., $.items[*].type), validates that ALL elements match the expected type. + """ + REQUIRED_KEYS = {"ColumnName", "Path", "ExpectedType"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + expected_type = self.params.ExpectedType + keyword = self._get_validation_keyword() + + # Normalize expected type to lowercase for comparison + expected_type_lower = expected_type.lower() + + # Map specific numeric type names to generic 'number' for JSON validation + # In JSON, there's no distinction between decimal, integer, float, etc. - they're all numbers + if expected_type_lower in ('decimal', 'integer', 'float', 'double', 'bigint', 'int', 'numeric'): + expected_type_normalized = 'number' + else: + expected_type_normalized = expected_type_lower + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} be of type '{expected_type}'" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Helper function to detect JSON type from a value + # Maps DuckDB's json_type results to JSON type names + def type_check_expr(value_expr: str) -> str: + return f""" + CASE + WHEN {value_expr} IS NULL THEN NULL + WHEN json_type({value_expr}) = 'BOOLEAN' THEN 'boolean' + WHEN json_type({value_expr}) IN ('TINYINT', 'SMALLINT', 'INTEGER', 'BIGINT', 'UTINYINT', 'USMALLINT', 'UINTEGER', 'UBIGINT', 'FLOAT', 'DOUBLE', 'DECIMAL', 'HUGEINT', 'UHUGEINT') THEN 'number' + WHEN json_type({value_expr}) = 'VARCHAR' THEN 'string' + WHEN json_type({value_expr}) = 'NULL' THEN 'null' + WHEN json_type({value_expr}) = 'ARRAY' THEN 'array' + WHEN json_type({value_expr}) IN ('OBJECT', 'JSON') THEN 'object' + ELSE 'unknown' + END + """ + + # Detect if path contains array wildcard [*] + # If it does, we know json_extract will return an ARRAY, so we can use unnest directly + # If not, it returns a single value, so we check the type directly + is_array_path = '[*]' in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + if is_array_path: + # Array path: json_extract returns an ARRAY of values + # We need to check each element in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ( + {extracted_value} IS NULL + OR json_array_length({extracted_value}) = 0 + OR EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE {type_check_expr('elem')} != '{expected_type_normalized}' + ) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE {type_check_expr('elem')} != '{expected_type_normalized}' + ) + """ + else: + # Single value path: json_extract returns a single JSON value + # Check the type directly + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ( + {extracted_value} IS NULL + OR {type_check_expr(extracted_value)} != '{expected_type_normalized}' + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {type_check_expr(extracted_value)} = '{expected_type_normalized}' + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_type" + + +class JSONCheckPathKeyValueFormatGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to KeyValueFormat requirements. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.tags" or "$.items[*].metadata") + + KeyValueFormat requirements: + 1. Must be a valid JSON object (not array, not primitive) + 2. Keys must be unique within the object + 3. Values must be primitive types only (string, number, boolean, null) + 4. Values must NOT be objects or arrays + + For array paths (e.g., $.items[*].tags), validates ALL elements. + """ + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} conform to KeyValueFormat (JSON object with primitive values only)" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Detect if path contains array wildcard [*] + is_array_path = '[*]' in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + # Function to check if a value is a valid KeyValueFormat object + # Requirements: + # 1. Must be an OBJECT type + # 2. All values must be primitive (not OBJECT or ARRAY) + # Strategy: Use json_keys() to get all keys, then check each value's type + def keyvalue_check(value_expr: str) -> str: + return f"""( + json_type({value_expr}) IN ('OBJECT', 'JSON') + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_keys({value_expr})) AS t(key_name) + WHERE json_type(json_extract({value_expr}, '$.' || key_name)) IN ('OBJECT', 'ARRAY', 'JSON') + ) + )""" + + if is_array_path: + # Array path: json_extract returns an ARRAY of values + # Check each element conforms to KeyValueFormat + # Skip if path doesn't exist (NULL) or returns empty array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keyvalue_check('elem')} + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keyvalue_check('elem')} + ) + """ + else: + # Single value path: check the value directly + # Skip if path doesn't exist (NULL) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND NOT {keyvalue_check(extracted_value)} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {keyvalue_check(extracted_value)} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_keyvalue_format" + + +class JSONCheckPathKeyStartsWithGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path only have keys that start with a specific prefix, + ignoring certain specified keys. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.Elements[*]") + Prefix: Required prefix for keys (e.g., "x_") + IgnoreKeys: List of keys to exclude from validation + + Use case: Enforce naming conventions like custom properties must start with "x_" + while allowing standard FOCUS-defined properties. + + For array paths (e.g., $.Elements[*]), validates ALL elements. + """ + REQUIRED_KEYS = {"ColumnName", "Path", "Prefix", "IgnoreKeys"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + prefix = self.params.Prefix + ignore_keys = self.params.IgnoreKeys # Should be a list + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' keys {keyword} start with '{prefix}' (except {ignore_keys})" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path and prefix for SQL + path_escaped = path.replace("'", "''") + prefix_escaped = prefix.replace("'", "''") + + # Build SQL array of ignored keys + if ignore_keys and len(ignore_keys) > 0: + ignore_keys_sql = "[" + ", ".join(f"'{k.replace('\"', '\"\"')}'" for k in ignore_keys) + "]" + else: + ignore_keys_sql = "[]" + + # Detect if path contains array wildcard [*] + is_array_path = '[*]' in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + # Function to check if all keys (except ignored ones) start with prefix + # Returns a check expression that's true if all non-ignored keys start with prefix + def keys_check(value_expr: str) -> str: + return f"""( + json_type({value_expr}) IN ('OBJECT', 'JSON') + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_keys({value_expr})) AS t(key_name) + WHERE key_name NOT IN (SELECT unnest({ignore_keys_sql})) + AND NOT starts_with(key_name, '{prefix_escaped}') + ) + )""" + + if is_array_path: + # Array path: json_extract returns an ARRAY of values + # Check each element has all keys starting with prefix (except ignored) + # Skip if path doesn't exist (NULL) or returns empty array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keys_check('elem')} + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE NOT {keys_check('elem')} + ) + """ + else: + # Single value path: check the value directly + # Skip if path doesn't exist (NULL) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND NOT {keys_check(extracted_value)} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {keys_check(extracted_value)} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_key_starts_with" + + +class JSONCheckPathKeyExistsGenerator(DuckDBCheckGenerator): + """ + Check if a specific key exists at a JSON path. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression ending with the key to check (e.g., "$.Elements[*].AllocatedRatio") + + The path should end with the key name to check. For example: + - "$.Elements[*].AllocatedRatio" checks if each element in Elements has AllocatedRatio key + - "$.metadata.version" checks if metadata object has version key + + For array paths (e.g., $.Elements[*].KeyName), validates ALL elements have the key. + """ + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Extract the key name from the path (last component after final '.') + # e.g., "$.Elements[*].AllocatedRatio" -> "AllocatedRatio" + if '.' in path: + key_name = path.rsplit('.', 1)[1] + parent_path = path.rsplit('.', 1)[0] + else: + raise ValueError(f"Path must contain at least one '.' to specify a key: {path}") + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' key '{key_name}' {keyword} exist" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes for SQL + parent_path_escaped = parent_path.replace("'", "''") + key_name_escaped = key_name.replace("'", "''") + + # Detect if parent path contains array wildcard [*] + is_array_path = '[*]' in parent_path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{parent_path_escaped}')" + + if is_array_path: + # Array path: check each element has the key + # Use json_keys() to get keys of each object in the array + # Skip rows where: path doesn't exist, array is empty, or not all elements are objects + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND ( + EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) NOT IN ('OBJECT', 'JSON') + ) + OR EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys(elem))) + ) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) NOT IN ('OBJECT', 'JSON') + ) + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE json_type(elem) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys(elem))) + ) + """ + else: + # Single object path: check if the key exists + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_type({extracted_value}) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys({extracted_value}))) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_type({extracted_value}) IN ('OBJECT', 'JSON') + AND '{key_name_escaped}' IN (SELECT unnest(json_keys({extracted_value}))) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_key_exists" + + +class JSONCheckPathValueGenerator(DuckDBCheckGenerator): + """ + Check if element(s) at JSON path have a specific value. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.Elements[*].ContractId") + Value: The expected value to check against + + For array paths (e.g., $.Elements[*].field), checks if ALL elements have the value. + For single paths (e.g., $.field), checks if the value matches. + + Commonly used to check for null values or specific constants. + """ + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + value = self.params.Value + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} equal {value}" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes for SQL + path_escaped = path.replace("'", "''") + + # Convert Python value to SQL literal for comparison + # For JSON comparisons, we need to cast properly + if value is None: + value_sql = "NULL" + value_json = "NULL" + elif isinstance(value, bool): + value_sql = "TRUE" if value else "FALSE" + value_json = f"'{str(value).lower()}'" # JSON booleans are lowercase + elif isinstance(value, (int, float)): + value_sql = str(value) + value_json = str(value) + else: + # String value - need to quote for JSON comparison + value_escaped = str(value).replace("'", "''") + value_sql = f"'{value_escaped}'" + value_json = f"'\"{value_escaped}\"'" # JSON strings are quoted + + # Detect if path contains array wildcard [*] + is_array_path = '[*]' in path + + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + if is_array_path: + # Array path: check if ALL elements equal the value + # Extract as text for comparison since json_extract returns JSON + if value is None: + # For null checks, check if any element is NOT null (SQL NULL or JSON null string 'null') + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NOT NULL AND elem::TEXT != 'null' + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NOT NULL AND elem::TEXT != 'null' + ) + """ + else: + # For non-null checks, compare as text since JSON extract returns JSON + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT != {value_json} OR elem IS NULL + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT != {value_json} OR elem IS NULL + ) + """ + else: + # Single value path: check if the value matches + if value is None: + # Check if the value is NOT null (either SQL NULL or JSON null string 'null') + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT = 'null') + """ + else: + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != {value_json}) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT = {value_json} + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_value" + + +class JSONCheckPathNotValueGenerator(DuckDBCheckGenerator): + """ + JSONCheckPathNotValue check generator. + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} + Validates that element(s) at a JSON path do NOT have a specific value. + """ + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} + + def generateSql(self) -> SQLQuery: + """Generate SQL for JSON path NOT value check""" + col = self.params.ColumnName + path = self.params.Path + value = self.params.Value + + # Escape path for SQL + path_escaped = path.replace("'", "''") + + # Build error message + msg = f"{col} at path '{path}' MUST NOT equal {value}" + msg_sql = msg.replace("'", "''") + + # Check if this is an array path (contains [*]) + is_array_path = '[*]' in path + + # Format value for SQL comparison + if value is None: + value_sql = "NULL" + value_json = "NULL" + elif isinstance(value, bool): + value_sql = "TRUE" if value else "FALSE" + value_json = f"'{str(value).lower()}'" # JSON uses lowercase true/false + elif isinstance(value, (int, float)): + value_sql = str(value) + value_json = str(value) + else: + # String value - escape for SQL + value_escaped = str(value).replace("'", "''") + value_sql = f"'{value_escaped}'" + # For JSON comparison, strings are quoted + value_json = f"'\"{value_escaped}\"'" + + # Extract the value at the path + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" + + if is_array_path: + # Array path: check if ANY element equals the value (violation) + if value is None: + # Check if any element IS null (SQL NULL or JSON null string 'null') + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NULL OR elem::TEXT = 'null' + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem IS NULL OR elem::TEXT = 'null' + ) + """ + else: + # For non-null checks, check if any element equals the value + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT = {value_json} + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND json_array_length({extracted_value}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_value}) AS t(elem) + WHERE elem::TEXT = {value_json} + ) + """ + else: + # Single value path: check if the value equals the specified value (violation) + if value is None: + # Check if the value IS null (JSON null string 'null', not missing path) + # Only check rows where the path exists (not SQL NULL) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT = 'null' + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != 'null') + """ + else: + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT = {value_json} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != {value_json}) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_not_value" + + +class JSONCheckPathSameValueGenerator(DuckDBCheckGenerator): + """ + JSONCheckPathSameValue check generator. + REQUIRED_KEYS = {"ColumnAName", "PathA", "ColumnBName", "PathB"} + Validates that element(s) at PathA in ColumnA have the same value as: + - ColumnB at PathB (if PathB is not null) + - ColumnB directly (if PathB is null) + """ + REQUIRED_KEYS = {"ColumnAName", "PathA", "ColumnBName", "PathB"} + + def generateSql(self) -> SQLQuery: + """Generate SQL for JSON path same value check""" + col_a = self.params.ColumnAName + path_a = self.params.PathA + col_b = self.params.ColumnBName + path_b = self.params.PathB + + # Escape paths for SQL + path_a_escaped = path_a.replace("'", "''") + path_b_escaped = path_b.replace("'", "''") if path_b else None + + # Build error message + if path_b: + msg = f"{col_a} at path '{path_a}' MUST equal {col_b} at path '{path_b}'" + else: + msg = f"{col_a} at path '{path_a}' MUST equal {col_b}" + msg_sql = msg.replace("'", "''") + + # Check if paths are array paths + is_array_path_a = '[*]' in path_a + is_array_path_b = '[*]' in path_b if path_b else False + + # Extract values + extracted_a = f"json_extract(TRY_CAST({col_a} AS JSON), '{path_a_escaped}')" + + if path_b: + extracted_b = f"json_extract(TRY_CAST({col_b} AS JSON), '{path_b_escaped}')" + else: + # Compare to column value directly + extracted_b = col_b + + # Generate SQL based on path types + if is_array_path_a and is_array_path_b: + # Both arrays: check if arrays are equal element by element + # This is complex - we need to compare array lengths and each element + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND ( + json_array_length({extracted_a}) != json_array_length({extracted_b}) + OR EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) WITH ORDINALITY AS a(elem_a, idx) + JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) + ON a.idx = b.idx + WHERE a.elem_a::TEXT != b.elem_b::TEXT + OR (a.elem_a IS NULL AND b.elem_b IS NOT NULL) + OR (a.elem_a IS NOT NULL AND b.elem_b IS NULL) + ) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_a}) = json_array_length({extracted_b}) + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) WITH ORDINALITY AS a(elem_a, idx) + JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) + ON a.idx = b.idx + WHERE a.elem_a::TEXT != b.elem_b::TEXT + OR (a.elem_a IS NULL AND b.elem_b IS NOT NULL) + OR (a.elem_a IS NOT NULL AND b.elem_b IS NULL) + ) + """ + + elif is_array_path_a and not is_array_path_b: + # Array A, single B: check if all elements of A equal B + if path_b: + # B is a JSON path (single value) + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != {extracted_b}::TEXT + OR (elem IS NULL AND {extracted_b} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_b} IS NULL) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != {extracted_b}::TEXT + OR (elem IS NULL AND {extracted_b} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_b} IS NULL) + ) + """ + else: + # B is a column value directly + # Note: JSON strings are quoted, so we need to quote col_b for comparison + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != ('"' || {col_b}::TEXT || '"') + OR (elem IS NULL AND {col_b} IS NOT NULL) + OR (elem IS NOT NULL AND {col_b} IS NULL) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND json_array_length({extracted_a}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_a}) AS t(elem) + WHERE elem::TEXT != ('"' || {col_b}::TEXT || '"') + OR (elem IS NULL AND {col_b} IS NOT NULL) + OR (elem IS NOT NULL AND {col_b} IS NULL) + ) + """ + + elif not is_array_path_a and is_array_path_b: + # Single A, array B: check if A equals all elements of B + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_b}) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest({extracted_b}) AS t(elem) + WHERE elem::TEXT != {extracted_a}::TEXT + OR (elem IS NULL AND {extracted_a} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_a} IS NULL) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_b} IS NOT NULL + AND json_array_length({extracted_b}) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest({extracted_b}) AS t(elem) + WHERE elem::TEXT != {extracted_a}::TEXT + OR (elem IS NULL AND {extracted_a} IS NOT NULL) + OR (elem IS NOT NULL AND {extracted_a} IS NULL) + ) + """ + + else: + # Both single values: direct comparison + if path_b: + # B is a JSON path + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND ({extracted_a} IS NOT NULL OR {extracted_b} IS NOT NULL) + AND ( + ({extracted_a} IS NULL AND {extracted_b} IS NOT NULL) + OR ({extracted_a} IS NOT NULL AND {extracted_b} IS NULL) + OR {extracted_a}::TEXT != {extracted_b}::TEXT + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND json_valid({col_b}::TEXT) + AND ( + ({extracted_a} IS NULL AND {extracted_b} IS NULL) + OR ({extracted_a} IS NOT NULL AND {extracted_b} IS NOT NULL + AND {extracted_a}::TEXT = {extracted_b}::TEXT) + ) + """ + else: + # B is a column value directly + # Note: JSON strings are quoted, so we need to quote col_b for comparison + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_a}::TEXT != ('"' || {col_b}::TEXT || '"') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col_a} IS NOT NULL + AND {col_b} IS NOT NULL + AND json_valid({col_a}::TEXT) + AND {extracted_a} IS NOT NULL + AND {extracted_a}::TEXT = ('"' || {col_b}::TEXT || '"') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_same_value" + + +class JSONCheckPathNumericFormatGenerator(DuckDBCheckGenerator): + """ + Validates that element(s) at JSON path meet numeric format requirements. + + Arguments: + - ColumnName: The column containing JSON data + - Path: JSONPath expression to extract element(s) + + Validation: + - Checks that extracted values match numeric pattern: ^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$ + - Supports both array paths (with [*]) and single value paths + - Only validates non-null JSON values + """ + REQUIRED_KEYS = {"ColumnName", "Path"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + message = ( + self.errorMessage + or f"{col} at path {path} {keyword} contain numeric values (optional +/- sign, optional decimal, optional scientific notation)." + ) + msg_sql = message.replace("'", "''") + + # Determine if this is an array path or single value path + is_array_path = '[*]' in path + + # Numeric pattern: supports 123, -123, 1.23, -1.23, 1.23e10, 1.23e-10, 1.23E+10, etc. + numeric_pattern = r'^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$' + + if is_array_path: + # Array path: Check if ANY element violates the numeric format + # Only check elements that exist and are not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: All elements must be valid numeric format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + """ + else: + # Single value path: Check if the value violates the numeric format + # Only check if the value exists and is not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_extract({col}::TEXT, '{path}') IS NOT NULL + AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' + AND NOT (TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"') ~ '{numeric_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: Value must be valid numeric format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR json_extract({col}::TEXT, '{path}') IS NULL + OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' + OR (TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"') ~ '{numeric_pattern}') + """ + + # Apply any conditional filters + if is_array_path: + requirement_lines = requirement_sql.strip().split('\n') + # Find WHERE clause and apply condition + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + else: + requirement_lines = requirement_sql.strip().split('\n') + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + + if getattr(self, 'condition_params', None): + predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_numeric_format" + + +class JSONCheckPathUnitFormatGenerator(DuckDBCheckGenerator): + """ + Validates that element(s) at JSON path meet unit format requirements. + + Arguments: + - ColumnName: The column containing JSON data + - Path: JSONPath expression to extract element(s) + + Validation: + - Checks that extracted values match FOCUS unit format patterns + - Supports both array paths (with [*]) and single value paths + - Only validates non-null JSON values + """ + REQUIRED_KEYS = {"ColumnName", "Path"} + + def _generate_unit_format_regex(self) -> str: + """ + Generate the complete regex pattern for FOCUS Unit Format validation. + This is identical to FormatUnitGenerator's method. + """ + # Data Size Unit Names (both decimal and binary) + data_size_units = [ + # Bits (decimal) + "b", "Kb", "Mb", "Gb", "Tb", "Pb", "Eb", + # Bytes (decimal) + "B", "KB", "MB", "GB", "TB", "PB", "EB", + # Bits (binary) + "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", + # Bytes (binary) + "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", + ] + + # Time-based Unit Names + time_units_singular = ["Year", "Month", "Day", "Hour", "Minute", "Second"] + time_units_plural = ["Years", "Months", "Days", "Hours", "Minutes", "Seconds"] + + # Build regex patterns + patterns = [] + data_size_pattern = "|".join(data_size_units) + time_singular_pattern = "|".join(time_units_singular) + time_plural_pattern = "|".join(time_units_plural) + count_unit_pattern = r"[A-Za-z][A-Za-z0-9]*(?:\s+[A-Za-z][A-Za-z0-9]*)*" + + # Pattern 1: Standalone units + patterns.append( + f"^({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 2: - + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern})-({time_plural_pattern})$" + ) + # Pattern 3: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/({time_singular_pattern})$" + ) + # Pattern 4: + patterns.append( + f"^[0-9]+ ({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 5: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/[0-9]+ ({time_plural_pattern})$" + ) + + # Combine all patterns with OR + return "|".join(f"({pattern})" for pattern in patterns) + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + message = ( + self.errorMessage + or f"{col} at path {path} {keyword} follow the FOCUS Unit Format specification." + ) + msg_sql = message.replace("'", "''") + + # Get the combined regex pattern + combined_pattern = self._generate_unit_format_regex() + + # Determine if this is an array path or single value path + is_array_path = '[*]' in path + + if is_array_path: + # Array path: Check if ANY element violates the unit format + # Only check elements that exist and are not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: All elements must match unit format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + """ + else: + # Single value path: Check if the value violates the unit format + # Only check if the value exists and is not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_extract({col}::TEXT, '{path}') IS NOT NULL + AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' + AND NOT regexp_matches(TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"'), '{combined_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: Value must match unit format + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR json_extract({col}::TEXT, '{path}') IS NULL + OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' + OR regexp_matches(TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"'), '{combined_pattern}') + """ + + # Apply any conditional filters + if is_array_path: + requirement_lines = requirement_sql.strip().split('\n') + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + else: + requirement_lines = requirement_sql.strip().split('\n') + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + + if getattr(self, 'condition_params', None): + predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_unit_format" + + +class JSONCheckPathDistinctParentGenerator(DuckDBCheckGenerator): + """ + Validates that distinct count of child elements equals expected count. + + Arguments: + - ColumnName: The column containing JSON data + - ParentPath: JSONPath to parent elements (typically with [*]) + - ChildPath: JSONPath relative to parent to extract child values + - ExpectedCount: Expected number of distinct child values + + Validation: + - Extracts parent elements using ParentPath + - For each parent, extracts child value using ChildPath + - Counts distinct non-null child values + - Checks if distinct count equals ExpectedCount + """ + REQUIRED_KEYS = {"ColumnName", "ParentPath", "ChildPath", "ExpectedCount"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + parent_path = self.params.ParentPath + child_path = self.params.ChildPath + expected_count = self.params.ExpectedCount + keyword = self._get_validation_keyword() + message = ( + self.errorMessage + or f"{col} at parent path {parent_path} {keyword} have exactly {expected_count} distinct values for child path {child_path}." + ) + msg_sql = message.replace("'", "''") + + # Build the validation query + # For each row, extract parent elements, then extract child values, count distinct + # Child path format: if it starts with $, use it directly; otherwise prepend $. to make it a path + if child_path.startswith('$.'): + full_child_path = child_path + elif child_path.startswith('$'): + full_child_path = child_path + else: + # Assume it's a simple key name, make it a path + full_child_path = f'$.{child_path}' + + requirement_sql = f""" + WITH row_distinct_counts AS ( + SELECT + {col}, + ( + SELECT COUNT(DISTINCT child_value) + FROM ( + SELECT json_extract(parent_elem::TEXT, '{full_child_path}') AS child_value + FROM unnest(json_extract({col}::TEXT, '{parent_path}')) AS t(parent_elem) + WHERE parent_elem IS NOT NULL + AND parent_elem::TEXT != 'null' + AND json_extract(parent_elem::TEXT, '{full_child_path}') IS NOT NULL + AND json_extract(parent_elem::TEXT, '{full_child_path}')::TEXT != 'null' + ) child_values + ) AS distinct_count + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{parent_path}')) > 0 + ), + violations AS ( + SELECT {col} + FROM row_distinct_counts + WHERE distinct_count != {expected_count} + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: distinct count must equal expected count + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR ( + SELECT COUNT(DISTINCT child_value) + FROM ( + SELECT json_extract(parent_elem::TEXT, '{full_child_path}') AS child_value + FROM unnest(json_extract({col}::TEXT, '{parent_path}')) AS t(parent_elem) + WHERE parent_elem IS NOT NULL + AND parent_elem::TEXT != 'null' + AND json_extract(parent_elem::TEXT, '{full_child_path}') IS NOT NULL + AND json_extract(parent_elem::TEXT, '{full_child_path}')::TEXT != 'null' + ) child_values + ) = {expected_count} + """ + + # Apply any conditional filters + requirement_lines = requirement_sql.strip().split('\n') + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + + if getattr(self, 'condition_params', None): + predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_path_distinct_parent" + + +class FormatJSONFormatGenerator(DuckDBCheckGenerator): + """ + Validates that element(s) at JSON path meet JSON format requirements. + + Arguments: + - ColumnName: The column containing JSON data + - Path: JSONPath expression to extract element(s) (optional - if not provided, validates entire column) + + Validation: + - Checks that extracted values are valid JSON + - Supports both array paths (with [*]) and single value paths + - If Path is not provided, validates the entire column value as valid JSON + - Only validates non-null JSON values + - Validates that the extracted element is itself valid JSON + """ + REQUIRED_KEYS = {"ColumnName"} + + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = getattr(self.params, 'Path', None) + keyword = self._get_validation_keyword() + + # If no path provided, validate the entire column + if not path: + message = ( + self.errorMessage + or f"{col} {keyword} contain valid JSON format." + ) + msg_sql = message.replace("'", "''") + + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND NOT json_valid({col}::TEXT) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + predicate_sql = f""" + {col} IS NULL + OR json_valid({col}::TEXT) + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + # Path provided - validate elements at that path + message = ( + self.errorMessage + or f"{col} at path {path} {keyword} contain valid JSON format." + ) + msg_sql = message.replace("'", "''") + + # Determine if this is an array path or single value path + is_array_path = '[*]' in path + + if is_array_path: + # Array path: Check if ANY element is not valid JSON + # Only check elements that exist and are not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT json_valid(elem::TEXT) + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: All elements must be valid JSON + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT json_valid(elem::TEXT) + ) + """ + else: + # Single value path: Check if the value is not valid JSON + # Only check if the value exists and is not JSON null + requirement_sql = f""" + WITH violations AS ( + SELECT + {col} + FROM {{{{table_name}}}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_extract({col}::TEXT, '{path}') IS NOT NULL + AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' + AND NOT json_valid(json_extract({col}::TEXT, '{path}')::TEXT) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM violations + """ + + # Predicate: Value must be valid JSON + predicate_sql = f""" + {col} IS NULL + OR NOT json_valid({col}::TEXT) + OR json_extract({col}::TEXT, '{path}') IS NULL + OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' + OR json_valid(json_extract({col}::TEXT, '{path}')::TEXT) + """ + + # Apply any conditional filters + if is_array_path: + requirement_lines = requirement_sql.strip().split('\n') + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + else: + requirement_lines = requirement_sql.strip().split('\n') + for i, line in enumerate(requirement_lines): + if 'WHERE {col} IS NOT NULL' in line: + base_condition = line.strip() + full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) + requirement_lines[i] = f" WHERE {full_condition}" + break + requirement_sql = '\n'.join(requirement_lines) + + if getattr(self, 'condition_params', None): + predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "format_json_format" + + +class JSONFormatStringGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to StringHandling requirements (ASCII characters only). + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.name" or "$.items[*].label") + + StringHandling requirements: + - Strings must contain only ASCII characters ([\x00-\x7F]) + + For array paths (e.g., $.items[*].label), validates ALL string elements. + """ + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: - a = self.params.ColumnAName - b = self.params.ColumnBName - n = self.params.ExpectedCount + col = self.params.ColumnName + path = self.params.Path keyword = self._get_validation_keyword() - + + # Build error message message = ( - self.errorMessage - or f"For each {a}, there {keyword} be exactly {n} distinct {b} values." + self.errorMessage + or f"{col} at path '{path}' {keyword} contain only ASCII characters" ) msg_sql = message.replace("'", "''") - - # Build WHERE clause for row-level filtering before aggregation - # This applies parent conditions (e.g., "SkuPriceId IS NOT NULL") before GROUP BY - where_clause = "" - if self.row_condition_sql and self.row_condition_sql.strip(): - where_clause = f"WHERE {self.row_condition_sql}" - - # Requirement SQL (finds violations) - # IMPORTANT: Apply row_condition_sql BEFORE GROUP BY to filter groups themselves - requirement_sql = f""" - WITH counts AS ( - SELECT {a} AS grp, COUNT(DISTINCT {b}) AS distinct_count - FROM {{table_name}} - {where_clause} - GROUP BY {a} - ), - invalid AS ( - SELECT grp, distinct_count - FROM counts - WHERE distinct_count <> {n} - ) - SELECT - COUNT(*) AS violations, - CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message - FROM invalid - """ - - # Note: This is a complex aggregation check that doesn't naturally translate - # to a simple predicate for row-level filtering. Setting predicate_sql to None. - predicate_sql = None - + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Determine if this is an array path (contains [*]) + is_array_path = '[*]' in path + + if is_array_path: + # Array path: Check all elements in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + ) + """ + else: + # Single value path + extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND NOT (TRIM({extracted_value}::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND (TRIM({extracted_value}::TEXT, '"') ~ '^[\\x00-\\x7F]*$') + """ + return SQLQuery( - requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: - return "distinct_count" + return "json_format_string" -class CheckModelRuleGenerator(DuckDBCheckGenerator): - REQUIRED_KEYS = {"ModelRuleId"} +class JSONFormatUnitGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to FOCUS Unit Format requirements. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.unit" or "$.items[*].unit") + + Unit Format requirements: + - Must match one of 5 FOCUS unit format patterns: + 1. Standalone units (e.g., "GB", "Hours", "Requests") + 2. unit-time (e.g., "GB-Hours") + 3. unit/time (e.g., "GB/Hour", "Requests/Second") + 4. quantity units (e.g., "1000 Requests") + 5. units/interval (e.g., "Requests/3 Months") + + For array paths (e.g., $.items[*].unit), validates ALL elements. + """ + REQUIRED_KEYS = {"ColumnName", "Path"} - def getCheckType(self) -> str: - return "model_rule_reference" + def _generate_unit_format_regex(self) -> str: + """ + Generate the complete regex pattern for FOCUS Unit Format validation. + Identical to FormatUnitGenerator and JSONCheckPathUnitFormatGenerator. + """ + # Data Size Unit Names (both decimal and binary) + data_size_units = [ + # Bits (decimal) + "b", "Kb", "Mb", "Gb", "Tb", "Pb", "Eb", + # Bytes (decimal) + "B", "KB", "MB", "GB", "TB", "PB", "EB", + # Bits (binary) + "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", + # Bytes (binary) + "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", + ] - def generateSql(self) -> SQLQuery: - # Won’t be executed; we’ll attach a special executor instead. - self.errorMessage = f"Conformance reference to {self.params.ModelRuleId}" - requirement_sql = "SELECT 0 AS violations" - return SQLQuery(requirement_sql=requirement_sql.strip(), predicate_sql=None) + # Time-based Unit Names + time_units_singular = ["Year", "Month", "Day", "Hour", "Minute", "Second"] + time_units_plural = ["Years", "Months", "Days", "Hours", "Minutes", "Seconds"] - def generateCheck(self) -> DuckDBColumnCheck: - # Let the base create the DuckDBColumnCheck (with errorMessage, type, sql) - chk = super().generateCheck() + # Build regex patterns + patterns = [] + data_size_pattern = "|".join(data_size_units) + time_singular_pattern = "|".join(time_units_singular) + time_plural_pattern = "|".join(time_units_plural) + count_unit_pattern = r"[A-Za-z][A-Za-z0-9]*(?:\s+[A-Za-z][A-Za-z0-9]*)*" - target_id = self.params.ModelRuleId - plan = self.plan - # Make a dict {rule_id -> result} out of parent_results_by_idx + plan - id2res: dict[str, dict] = {} - if plan: - for pidx, res in (self.parent_results_by_idx or {}).items(): - rid = getattr(plan.nodes[pidx], "rule_id", None) - if rid: - # normalize shape: we expect {"ok": bool, "details": {...}} - id2res[rid] = res + # Pattern 1: Standalone units + patterns.append( + f"^({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 2: - + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern})-({time_plural_pattern})$" + ) + # Pattern 3: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/({time_singular_pattern})$" + ) + # Pattern 4: + patterns.append( + f"^[0-9]+ ({data_size_pattern}|{time_singular_pattern}|{time_plural_pattern}|{count_unit_pattern})$" + ) + # Pattern 5: / + patterns.append( + f"^({data_size_pattern}|{count_unit_pattern}|{time_plural_pattern})/[0-9]+ ({time_plural_pattern})$" + ) - def _exec_reference(_conn): - # Try to find the referenced rule’s result among parents - res = id2res.get(target_id) - if res is None: - # Not a direct parent? Try to find in global results registry - converter = None - if ( - hasattr(self, "child_builder") - and callable(self.child_builder) - and hasattr(self.child_builder, "__closure__") - and self.child_builder.__closure__ - ): - # Access the converter instance from the child_builder lambda's closure - for cell in self.child_builder.__closure__: - if hasattr(cell.cell_contents, "_global_results_by_idx"): - converter = cell.cell_contents - break + # Combine all patterns with OR + return "|".join(f"({pattern})" for pattern in patterns) - if converter and hasattr(converter, "_global_results_by_idx") and plan: - # Look for the target_id in global results by scanning plan nodes - for node_idx, result in converter._global_results_by_idx.items(): - if ( - node_idx < len(plan.nodes) - and plan.nodes[node_idx].rule_id == target_id - ): - res = result - break + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} follow the FOCUS Unit Format specification" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Get the combined unit format regex pattern + combined_pattern = self._generate_unit_format_regex() + + # Determine if this is an array path (contains [*]) + is_array_path = '[*]' in path + + if is_array_path: + # Array path: Check all elements in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') + ) + """ + else: + # Single value path + extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND NOT regexp_matches(TRIM({extracted_value}::TEXT, '"'), '{combined_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND regexp_matches(TRIM({extracted_value}::TEXT, '"'), '{combined_pattern}') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) - if res is None: - # Still not found? Fall back to a clear failure. - details = { - "violations": 1, - "message": f"Referenced rule '{target_id}' not found upstream", - "referenced_rule_id": target_id, - } - return False, details + def getCheckType(self) -> str: + return "json_format_unit" - ok = bool(res.get("ok", False)) - det = dict(res.get("details") or {}) - violations = det.get("violations", 0 if ok else 1) - details = { - "violations": int(violations), - "message": f"Conformance reference to {target_id} ({'OK' if ok else 'FAIL'})", - "referenced_rule_id": target_id, - } - return ok, details +class JSONFormatNumericGenerator(DuckDBCheckGenerator): + """ + Check element(s) at JSON path conform to Numeric Format requirements. + + Arguments: + ColumnName: The column containing JSON data + Path: JSONPath expression (e.g., "$.value" or "$.items[*].price") + + Numeric Format requirements: + - Optional leading sign (+/-) + - Optional decimal point with digits + - Required digits + - Optional scientific notation (e.g., 1.23e10, 1.23E-5) + - Pattern: ^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$ + + For array paths (e.g., $.items[*].price), validates ALL elements. + """ + REQUIRED_KEYS = {"ColumnName", "Path"} - # Attach the callable to the check so run_check can use it - chk.special_executor = _exec_reference - chk.exec_mode = "reference" - chk.referenced_rule_id = target_id - return chk + def generateSql(self) -> SQLQuery: + col = self.params.ColumnName + path = self.params.Path + keyword = self._get_validation_keyword() + + # Build error message + message = ( + self.errorMessage + or f"{col} at path '{path}' {keyword} contain numeric values" + ) + msg_sql = message.replace("'", "''") + + # Escape single quotes in path for SQL + path_escaped = path.replace("'", "''") + + # Numeric pattern: supports 123, -123, 1.23, -1.23, 1.23e10, 1.23e-10, 1.23E+10, etc. + numeric_pattern = r'^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$' + + # Determine if this is an array path (contains [*]) + is_array_path = '[*]' in path + + if is_array_path: + # Array path: Check all elements in the array + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 + AND NOT EXISTS ( + SELECT 1 + FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) + WHERE elem IS NOT NULL + AND elem::TEXT != 'null' + AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') + ) + """ + else: + # Single value path + extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" + + requirement_sql = f""" + WITH invalid AS ( + SELECT 1 + FROM {{table_name}} + WHERE {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND NOT (TRIM({extracted_value}::TEXT, '"') ~ '{numeric_pattern}') + ) + SELECT + COUNT(*) AS violations, + CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message + FROM invalid + """ + + predicate_sql = f""" + {col} IS NOT NULL + AND json_valid({col}::TEXT) + AND {extracted_value} IS NOT NULL + AND {extracted_value}::TEXT != 'null' + AND (TRIM({extracted_value}::TEXT, '"') ~ '{numeric_pattern}') + """ + + return SQLQuery( + requirement_sql=requirement_sql.strip(), + predicate_sql=predicate_sql.strip() + ) + + def getCheckType(self) -> str: + return "json_format_numeric" class CompositeBaseRuleGenerator(DuckDBCheckGenerator): @@ -2327,6 +4459,62 @@ class FocusToDuckDBSchemaConverter: "generator": ColumnByColumnEqualsColumnValueGenerator, "factory": lambda args: "ColumnAName", }, + "JSONCheckPathType": { + "generator": JSONCheckPathTypeGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathKeyValueFormat": { + "generator": JSONCheckPathKeyValueFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathKeyStartsWith": { + "generator": JSONCheckPathKeyStartsWithGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathKeyExists": { + "generator": JSONCheckPathKeyExistsGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathValue": { + "generator": JSONCheckPathValueGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathNotValue": { + "generator": JSONCheckPathNotValueGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathSameValue": { + "generator": JSONCheckPathSameValueGenerator, + "factory": lambda args: "ColumnAName", + }, + "JSONCheckPathNumericFormat": { + "generator": JSONCheckPathNumericFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathUnitFormat": { + "generator": JSONCheckPathUnitFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONCheckPathDistinctParent": { + "generator": JSONCheckPathDistinctParentGenerator, + "factory": lambda args: "ColumnName", + }, + "FormatJSONFormat": { + "generator": FormatJSONFormatGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONFormatString": { + "generator": JSONFormatStringGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONFormatUnit": { + "generator": JSONFormatUnitGenerator, + "factory": lambda args: "ColumnName", + }, + "JSONFormatNumeric": { + "generator": JSONFormatNumericGenerator, + "factory": lambda args: "ColumnName", + }, } # Version-specific overrides: each version only defines what changes from previous versions @@ -2459,6 +4647,10 @@ def __init__( # Build the effective CHECK_GENERATORS mapping for this version self.CHECK_GENERATORS = self._build_check_generators_for_version(rules_version) + # Track missing generators for reporting + self.missing_generators: Set[str] = set() + self.missing_generator_rules: List[Tuple[str, str]] = [] # (rule_id, check_function) + # Example caches (optional) self._prepared: Dict[str, Any] = {} self._views: Dict[str, str] = {} # rule_id -> temp view name @@ -2607,7 +4799,33 @@ def prepare(self, *, conn: duckdb.DuckDBPyConnection, plan: ValidationPlan) -> N def finalize( self, *, success: bool, results_by_idx: Dict[int, Dict[str, Any]] ) -> None: - """Optional cleanup: drop temps, emit summaries, etc.""" + """Optional cleanup: drop temps, emit summaries, report missing generators, etc.""" + # Report missing generators if any were encountered + if self.missing_generators: + # Build the affected rules list + rules_list = "\n".join( + f" - {rule_id}: {check_fn}" + for rule_id, check_fn in self.missing_generator_rules[:10] + ) + if len(self.missing_generator_rules) > 10: + rules_list += f"\n ... and {len(self.missing_generator_rules) - 10} more" + + missing_gens = ", ".join(sorted(self.missing_generators)) + + log.warning( + "\n" + "="*80 + "\n" + + "VALIDATION INCOMPLETE: Missing Check Generators\n" + + "="*80 + "\n" + + "The following check functions are not implemented:\n" + + " %s\n\n" + + "Affected rules (%d total):\n%s\n" + + "\nThese rules have been marked as SKIPPED in the report.\n" + + "="*80, + missing_gens, + len(self.missing_generator_rules), + rules_list + ) + # e.g., self.conn.execute("DROP VIEW IF EXISTS ...") # Close DuckDB connection to prevent hanging in CI environments if hasattr(self, "conn") and self.conn is not None: @@ -2719,8 +4937,20 @@ def _extract_missing_columns(err_msg: str) -> list[str]: if ( isinstance(check, SkippedCheck) or getattr(check, "checkType", "") == "skipped_check" + or getattr(check, "check_type", "") == "skipped_check" ): - ok, details = check.run(self.conn) + # For SkippedCheck generators, call their run() method + if isinstance(check, SkippedCheck): + ok, details = check.run(self.conn) + else: + # For DuckDBColumnCheck objects marked as skipped_check + ok = True + details = { + "skipped": True, + "reason": getattr(check, "errorMessage", None) or "Rule skipped", + "violations": 0 + } + details.setdefault("violations", 0) details.setdefault( "message", @@ -2799,8 +5029,8 @@ def _extract_missing_columns(err_msg: str) -> list[str]: # Fallback to generic child identifier unique_child_id = f"child#{i + 1}" - # Put rule_id AFTER the spread to ensure it overrides any existing rule_id - child_detail_entry = {**det_i, "rule_id": unique_child_id} + # Put rule_id AND ok AFTER the spread to ensure they override any existing values + child_detail_entry = {**det_i, "rule_id": unique_child_id, "ok": ok_i} child_details.append(child_detail_entry) # Aggregate the children results normally @@ -2873,9 +5103,23 @@ def _extract_missing_columns(err_msg: str) -> list[str]: return ok, details # ---- leaf SQL execution ------------------------------------------------ sql = getattr(check, "checkSql", None) - if not sql: + if not sql or sql == "None": + # Check if this should have been caught as a skipped check + check_type = getattr(check, "checkType", None) or getattr(check, "check_type", None) + rule_id = getattr(check, "rule_id", None) + error_msg = getattr(check, "errorMessage", None) + + # If it looks like a skipped check but wasn't caught, handle it gracefully + if check_type == "skipped_check" or "skipped" in str(error_msg).lower(): + return True, { + "skipped": True, + "reason": error_msg or "Rule skipped", + "violations": 0, + "message": error_msg or f"{rule_id}: skipped" + } + raise InvalidRuleException( - f"Leaf check has no SQL to execute (rule_id={getattr(check, 'rule_id', None)})" + f"Leaf check has no SQL to execute (rule_id={rule_id}, check_type={check_type})" ) # Handle SQLQuery objects with transpilation support @@ -3057,15 +5301,30 @@ def __make_generator__( reg = self.CHECK_GENERATORS.get(check_fn) if not reg or "generator" not in reg: - raise InvalidRuleException( - textwrap.dedent( - f""" - Rule {rule_id} @ {breadcrumb}: No generator registered for CheckFunction='{check_fn}'. - Available generators: {sorted(self.CHECK_GENERATORS.keys())} - Requirement: - {_compact_json(requirement)} - """ - ).strip() + # Log warning and track missing generator instead of raising exception + self.missing_generators.add(check_fn) + self.missing_generator_rules.append((rule_id, check_fn)) + + log.warning( + "Missing generator for CheckFunction '%s' in rule '%s'. " + "Rule will be skipped. Available generators: %s", + check_fn, + rule_id, + sorted(self.CHECK_GENERATORS.keys()) + ) + + # Return a skipped check generator that will be marked appropriately + return SkippedMissingGeneratorCheck( + rule=rule, + rule_id=rule_id, + check_function=check_fn, + compile_condition=None, + child_builder=None, + breadcrumb=breadcrumb, + parent_results_by_idx=parent_results_by_idx or {}, + parent_edges=parent_edges or (), + plan=getattr(self, "plan", None), + row_condition_sql=row_condition_sql, ) gen_cls = reg["generator"] @@ -3152,6 +5411,7 @@ def __make_generator__( breadcrumb=child_bc, parent_results_by_idx=parent_results_by_idx or {}, parent_edges=child_parent_edges, + inherited_condition=row_condition_sql if is_composite else None, ), breadcrumb=breadcrumb, **params, @@ -3165,11 +5425,15 @@ def __generate_duckdb_check__( breadcrumb: str, parent_results_by_idx, parent_edges, + inherited_condition: Optional[str] = None, ) -> Union["DuckDBColumnCheck", SkippedCheck]: """ Build a DuckDBColumnCheck for this requirement. For composites (AND/OR), the Composite* generators will recursively call back here to build child checks and set `nestedChecks` + `nestedCheckHandler`. + + Args: + inherited_condition: Condition inherited from parent composite (for inline children) """ if not isinstance(requirement, dict): raise InvalidRuleException( @@ -3178,6 +5442,13 @@ def __generate_duckdb_check__( # Build effective condition from parent_edges AND from downstream composite consumers eff_cond = self._build_effective_condition(rule, parent_edges) + + # If this is an inline child of a composite, inherit the composite's effective condition + if inherited_condition: + if eff_cond: + eff_cond = f"({eff_cond}) AND ({inherited_condition})" + else: + eff_cond = inherited_condition # ENHANCEMENT: Also check if this rule is referenced by composite rules with conditions # This handles the case where a rule like PricingQuantity-C-008-M is referenced by diff --git a/focus_validator/outputter/outputter_web.py b/focus_validator/outputter/outputter_web.py index ed50ad1..d775e71 100644 --- a/focus_validator/outputter/outputter_web.py +++ b/focus_validator/outputter/outputter_web.py @@ -291,6 +291,7 @@ def _create_entity_view(self, base_data: Dict[str, Any]) -> Dict[str, Any]: dataset_entities = [] column_entities = [] attribute_entities = [] + object_entities = [] for entity_data in entities_data.values(): entity_type = entity_data["entityType"] @@ -298,6 +299,8 @@ def _create_entity_view(self, base_data: Dict[str, Any]) -> Dict[str, Any]: dataset_entities.append(entity_data) elif entity_type == "Column": column_entities.append(entity_data) + elif entity_type == "Object": + object_entities.append(entity_data) else: attribute_entities.append(entity_data) @@ -313,6 +316,11 @@ def _create_entity_view(self, base_data: Dict[str, Any]) -> Dict[str, Any]: "entities": column_entities, "expanded": True, }, + "objects": { + "name": "Objects", + "entities": object_entities, + "expanded": True, + }, "attributes": { "name": "Attributes", "entities": attribute_entities, @@ -667,7 +675,7 @@ def _get_rule_function(self, rule_obj) -> str: return "Unknown" def _get_rule_entity_type(self, rule_obj) -> str: - """Extract entity type from rule object (Column, Dataset, Attribute)""" + """Extract entity type from rule object (Column, Dataset, Object, Attribute)""" if rule_obj and hasattr(rule_obj, "entity_type"): return rule_obj.entity_type return "Unknown" From ce54958ef6a8e8eabb8b3f899d7ed5c79abf8ad6 Mon Sep 17 00:00:00 2001 From: Mike Fuller Date: Thu, 19 Feb 2026 19:34:39 +1100 Subject: [PATCH 2/2] tidy up for lint tests to pass Signed-off-by: Mike Fuller --- .../focus_to_duckdb_converter.py | 894 +++++++++--------- 1 file changed, 434 insertions(+), 460 deletions(-) diff --git a/focus_validator/config_objects/focus_to_duckdb_converter.py b/focus_validator/config_objects/focus_to_duckdb_converter.py index a3c6f5c..231f0c6 100644 --- a/focus_validator/config_objects/focus_to_duckdb_converter.py +++ b/focus_validator/config_objects/focus_to_duckdb_converter.py @@ -472,7 +472,9 @@ def __init__(self, rule, rule_id: str, **kwargs: Any) -> None: class SkippedMissingGeneratorCheck(SkippedCheck): - def __init__(self, rule, rule_id: str, check_function: str = "", **kwargs: Any) -> None: + def __init__( + self, rule, rule_id: str, check_function: str = "", **kwargs: Any + ) -> None: super().__init__(rule, rule_id, **kwargs) self.check_function = check_function self.errorMessage = ( @@ -1688,17 +1690,18 @@ def _exec_reference(_conn): class JSONCheckPathTypeGenerator(DuckDBCheckGenerator): """ Check element(s) at JSON path is of a particular type. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.key" or "$.items[*].field") ExpectedType: Expected JSON type (e.g., "string", "number", "boolean", "object", "array", "null") - + Uses DuckDB's json_extract and custom type detection to validate JSON path types. Supports both single values and array paths with [*] syntax. - + For array paths (e.g., $.items[*].type), validates that ALL elements match the expected type. """ + REQUIRED_KEYS = {"ColumnName", "Path", "ExpectedType"} def generateSql(self) -> SQLQuery: @@ -1706,27 +1709,35 @@ def generateSql(self) -> SQLQuery: path = self.params.Path expected_type = self.params.ExpectedType keyword = self._get_validation_keyword() - + # Normalize expected type to lowercase for comparison expected_type_lower = expected_type.lower() - + # Map specific numeric type names to generic 'number' for JSON validation # In JSON, there's no distinction between decimal, integer, float, etc. - they're all numbers - if expected_type_lower in ('decimal', 'integer', 'float', 'double', 'bigint', 'int', 'numeric'): - expected_type_normalized = 'number' + if expected_type_lower in ( + "decimal", + "integer", + "float", + "double", + "bigint", + "int", + "numeric", + ): + expected_type_normalized = "number" else: expected_type_normalized = expected_type_lower - + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' {keyword} be of type '{expected_type}'" ) msg_sql = message.replace("'", "''") - + # Escape single quotes in path for SQL path_escaped = path.replace("'", "''") - + # Helper function to detect JSON type from a value # Maps DuckDB's json_type results to JSON type names def type_check_expr(value_expr: str) -> str: @@ -1742,14 +1753,14 @@ def type_check_expr(value_expr: str) -> str: ELSE 'unknown' END """ - + # Detect if path contains array wildcard [*] # If it does, we know json_extract will return an ARRAY, so we can use unnest directly # If not, it returns a single value, so we check the type directly - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" - + if is_array_path: # Array path: json_extract returns an ARRAY of values # We need to check each element in the array @@ -1757,13 +1768,13 @@ def type_check_expr(value_expr: str) -> str: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND ( {extracted_value} IS NULL OR json_array_length({extracted_value}) = 0 OR EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE {type_check_expr('elem')} != '{expected_type_normalized}' ) @@ -1774,14 +1785,14 @@ def type_check_expr(value_expr: str) -> str: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE {type_check_expr('elem')} != '{expected_type_normalized}' ) @@ -1793,7 +1804,7 @@ def type_check_expr(value_expr: str) -> str: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND ( {extracted_value} IS NULL @@ -1805,17 +1816,16 @@ def type_check_expr(value_expr: str) -> str: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {type_check_expr(extracted_value)} = '{expected_type_normalized}' """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -1825,41 +1835,42 @@ def getCheckType(self) -> str: class JSONCheckPathKeyValueFormatGenerator(DuckDBCheckGenerator): """ Check element(s) at JSON path conform to KeyValueFormat requirements. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.tags" or "$.items[*].metadata") - + KeyValueFormat requirements: 1. Must be a valid JSON object (not array, not primitive) 2. Keys must be unique within the object 3. Values must be primitive types only (string, number, boolean, null) 4. Values must NOT be objects or arrays - + For array paths (e.g., $.items[*].tags), validates ALL elements. """ + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: col = self.params.ColumnName path = self.params.Path keyword = self._get_validation_keyword() - + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' {keyword} conform to KeyValueFormat (JSON object with primitive values only)" ) msg_sql = message.replace("'", "''") - + # Escape single quotes in path for SQL path_escaped = path.replace("'", "''") - + # Detect if path contains array wildcard [*] - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" - + # Function to check if a value is a valid KeyValueFormat object # Requirements: # 1. Must be an OBJECT type @@ -1874,7 +1885,7 @@ def keyvalue_check(value_expr: str) -> str: WHERE json_type(json_extract({value_expr}, '$.' || key_name)) IN ('OBJECT', 'ARRAY', 'JSON') ) )""" - + if is_array_path: # Array path: json_extract returns an ARRAY of values # Check each element conforms to KeyValueFormat @@ -1883,12 +1894,12 @@ def keyvalue_check(value_expr: str) -> str: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE NOT {keyvalue_check('elem')} ) @@ -1898,14 +1909,14 @@ def keyvalue_check(value_expr: str) -> str: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE NOT {keyvalue_check('elem')} ) @@ -1917,7 +1928,7 @@ def keyvalue_check(value_expr: str) -> str: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND NOT {keyvalue_check(extracted_value)} @@ -1927,17 +1938,16 @@ def keyvalue_check(value_expr: str) -> str: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {keyvalue_check(extracted_value)} """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -1948,18 +1958,19 @@ class JSONCheckPathKeyStartsWithGenerator(DuckDBCheckGenerator): """ Check element(s) at JSON path only have keys that start with a specific prefix, ignoring certain specified keys. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.Elements[*]") Prefix: Required prefix for keys (e.g., "x_") IgnoreKeys: List of keys to exclude from validation - + Use case: Enforce naming conventions like custom properties must start with "x_" while allowing standard FOCUS-defined properties. - + For array paths (e.g., $.Elements[*]), validates ALL elements. """ + REQUIRED_KEYS = {"ColumnName", "Path", "Prefix", "IgnoreKeys"} def generateSql(self) -> SQLQuery: @@ -1968,29 +1979,33 @@ def generateSql(self) -> SQLQuery: prefix = self.params.Prefix ignore_keys = self.params.IgnoreKeys # Should be a list keyword = self._get_validation_keyword() - + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' keys {keyword} start with '{prefix}' (except {ignore_keys})" ) msg_sql = message.replace("'", "''") - + # Escape single quotes in path and prefix for SQL path_escaped = path.replace("'", "''") prefix_escaped = prefix.replace("'", "''") - + # Build SQL array of ignored keys if ignore_keys and len(ignore_keys) > 0: - ignore_keys_sql = "[" + ", ".join(f"'{k.replace('\"', '\"\"')}'" for k in ignore_keys) + "]" + ignore_keys_sql = ( + "[" + + ", ".join(f"'{k.replace('\"', '\"\"')}'" for k in ignore_keys) + + "]" + ) else: ignore_keys_sql = "[]" - + # Detect if path contains array wildcard [*] - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" - + # Function to check if all keys (except ignored ones) start with prefix # Returns a check expression that's true if all non-ignored keys start with prefix def keys_check(value_expr: str) -> str: @@ -2003,7 +2018,7 @@ def keys_check(value_expr: str) -> str: AND NOT starts_with(key_name, '{prefix_escaped}') ) )""" - + if is_array_path: # Array path: json_extract returns an ARRAY of values # Check each element has all keys starting with prefix (except ignored) @@ -2012,12 +2027,12 @@ def keys_check(value_expr: str) -> str: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE NOT {keys_check('elem')} ) @@ -2027,14 +2042,14 @@ def keys_check(value_expr: str) -> str: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE NOT {keys_check('elem')} ) @@ -2046,7 +2061,7 @@ def keys_check(value_expr: str) -> str: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND NOT {keys_check(extracted_value)} @@ -2056,17 +2071,16 @@ def keys_check(value_expr: str) -> str: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {keys_check(extracted_value)} """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -2076,48 +2090,53 @@ def getCheckType(self) -> str: class JSONCheckPathKeyExistsGenerator(DuckDBCheckGenerator): """ Check if a specific key exists at a JSON path. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression ending with the key to check (e.g., "$.Elements[*].AllocatedRatio") - + The path should end with the key name to check. For example: - "$.Elements[*].AllocatedRatio" checks if each element in Elements has AllocatedRatio key - "$.metadata.version" checks if metadata object has version key - + For array paths (e.g., $.Elements[*].KeyName), validates ALL elements have the key. """ + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: col = self.params.ColumnName path = self.params.Path keyword = self._get_validation_keyword() - + # Extract the key name from the path (last component after final '.') # e.g., "$.Elements[*].AllocatedRatio" -> "AllocatedRatio" - if '.' in path: - key_name = path.rsplit('.', 1)[1] - parent_path = path.rsplit('.', 1)[0] + if "." in path: + key_name = path.rsplit(".", 1)[1] + parent_path = path.rsplit(".", 1)[0] else: - raise ValueError(f"Path must contain at least one '.' to specify a key: {path}") - + raise ValueError( + f"Path must contain at least one '.' to specify a key: {path}" + ) + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' key '{key_name}' {keyword} exist" ) msg_sql = message.replace("'", "''") - + # Escape single quotes for SQL parent_path_escaped = parent_path.replace("'", "''") key_name_escaped = key_name.replace("'", "''") - + # Detect if parent path contains array wildcard [*] - is_array_path = '[*]' in parent_path - - extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{parent_path_escaped}')" - + is_array_path = "[*]" in parent_path + + extracted_value = ( + f"json_extract(TRY_CAST({col} AS JSON), '{parent_path_escaped}')" + ) + if is_array_path: # Array path: check each element has the key # Use json_keys() to get keys of each object in the array @@ -2126,18 +2145,18 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND ( EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE json_type(elem) NOT IN ('OBJECT', 'JSON') ) OR EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE json_type(elem) IN ('OBJECT', 'JSON') AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys(elem))) @@ -2149,19 +2168,19 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE json_type(elem) NOT IN ('OBJECT', 'JSON') ) AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE json_type(elem) IN ('OBJECT', 'JSON') AND '{key_name_escaped}' NOT IN (SELECT unnest(json_keys(elem))) @@ -2173,7 +2192,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_type({extracted_value}) IN ('OBJECT', 'JSON') @@ -2184,18 +2203,17 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_type({extracted_value}) IN ('OBJECT', 'JSON') AND '{key_name_escaped}' IN (SELECT unnest(json_keys({extracted_value}))) """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -2205,17 +2223,18 @@ def getCheckType(self) -> str: class JSONCheckPathValueGenerator(DuckDBCheckGenerator): """ Check if element(s) at JSON path have a specific value. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.Elements[*].ContractId") Value: The expected value to check against - + For array paths (e.g., $.Elements[*].field), checks if ALL elements have the value. For single paths (e.g., $.field), checks if the value matches. - + Commonly used to check for null values or specific constants. """ + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} def generateSql(self) -> SQLQuery: @@ -2223,39 +2242,32 @@ def generateSql(self) -> SQLQuery: path = self.params.Path value = self.params.Value keyword = self._get_validation_keyword() - + # Build error message - message = ( - self.errorMessage - or f"{col} at path '{path}' {keyword} equal {value}" - ) + message = self.errorMessage or f"{col} at path '{path}' {keyword} equal {value}" msg_sql = message.replace("'", "''") - + # Escape single quotes for SQL path_escaped = path.replace("'", "''") - + # Convert Python value to SQL literal for comparison # For JSON comparisons, we need to cast properly if value is None: - value_sql = "NULL" value_json = "NULL" elif isinstance(value, bool): - value_sql = "TRUE" if value else "FALSE" value_json = f"'{str(value).lower()}'" # JSON booleans are lowercase elif isinstance(value, (int, float)): - value_sql = str(value) value_json = str(value) else: # String value - need to quote for JSON comparison value_escaped = str(value).replace("'", "''") - value_sql = f"'{value_escaped}'" value_json = f"'\"{value_escaped}\"'" # JSON strings are quoted - + # Detect if path contains array wildcard [*] - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" - + if is_array_path: # Array path: check if ALL elements equal the value # Extract as text for comparison since json_extract returns JSON @@ -2265,12 +2277,12 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem IS NOT NULL AND elem::TEXT != 'null' ) @@ -2280,14 +2292,14 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem IS NOT NULL AND elem::TEXT != 'null' ) @@ -2298,12 +2310,12 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem::TEXT != {value_json} OR elem IS NULL ) @@ -2313,14 +2325,14 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem::TEXT != {value_json} OR elem IS NULL ) @@ -2333,7 +2345,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' @@ -2343,9 +2355,9 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND ({extracted_value} IS NULL OR {extracted_value}::TEXT = 'null') """ @@ -2354,7 +2366,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != {value_json}) ) @@ -2363,17 +2375,16 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT = {value_json} """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -2386,6 +2397,7 @@ class JSONCheckPathNotValueGenerator(DuckDBCheckGenerator): REQUIRED_KEYS = {"ColumnName", "Path", "Value"} Validates that element(s) at a JSON path do NOT have a specific value. """ + REQUIRED_KEYS = {"ColumnName", "Path", "Value"} def generateSql(self) -> SQLQuery: @@ -2393,37 +2405,33 @@ def generateSql(self) -> SQLQuery: col = self.params.ColumnName path = self.params.Path value = self.params.Value - + # Escape path for SQL path_escaped = path.replace("'", "''") - + # Build error message msg = f"{col} at path '{path}' MUST NOT equal {value}" msg_sql = msg.replace("'", "''") - + # Check if this is an array path (contains [*]) - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + # Format value for SQL comparison if value is None: - value_sql = "NULL" value_json = "NULL" elif isinstance(value, bool): - value_sql = "TRUE" if value else "FALSE" value_json = f"'{str(value).lower()}'" # JSON uses lowercase true/false elif isinstance(value, (int, float)): - value_sql = str(value) value_json = str(value) else: # String value - escape for SQL value_escaped = str(value).replace("'", "''") - value_sql = f"'{value_escaped}'" # For JSON comparison, strings are quoted value_json = f"'\"{value_escaped}\"'" - + # Extract the value at the path extracted_value = f"json_extract(TRY_CAST({col} AS JSON), '{path_escaped}')" - + if is_array_path: # Array path: check if ANY element equals the value (violation) if value is None: @@ -2432,12 +2440,12 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem IS NULL OR elem::TEXT = 'null' ) @@ -2447,14 +2455,14 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem IS NULL OR elem::TEXT = 'null' ) @@ -2465,12 +2473,12 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem::TEXT = {value_json} ) @@ -2480,14 +2488,14 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND json_array_length({extracted_value}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_value}) AS t(elem) WHERE elem::TEXT = {value_json} ) @@ -2501,7 +2509,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT = 'null' @@ -2511,9 +2519,9 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != 'null') """ @@ -2522,7 +2530,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT = {value_json} @@ -2532,16 +2540,15 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND ({extracted_value} IS NULL OR {extracted_value}::TEXT != {value_json}) """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -2556,6 +2563,7 @@ class JSONCheckPathSameValueGenerator(DuckDBCheckGenerator): - ColumnB at PathB (if PathB is not null) - ColumnB directly (if PathB is null) """ + REQUIRED_KEYS = {"ColumnAName", "PathA", "ColumnBName", "PathB"} def generateSql(self) -> SQLQuery: @@ -2564,31 +2572,31 @@ def generateSql(self) -> SQLQuery: path_a = self.params.PathA col_b = self.params.ColumnBName path_b = self.params.PathB - + # Escape paths for SQL path_a_escaped = path_a.replace("'", "''") path_b_escaped = path_b.replace("'", "''") if path_b else None - + # Build error message if path_b: msg = f"{col_a} at path '{path_a}' MUST equal {col_b} at path '{path_b}'" else: msg = f"{col_a} at path '{path_a}' MUST equal {col_b}" msg_sql = msg.replace("'", "''") - + # Check if paths are array paths - is_array_path_a = '[*]' in path_a - is_array_path_b = '[*]' in path_b if path_b else False - + is_array_path_a = "[*]" in path_a + is_array_path_b = "[*]" in path_b if path_b else False + # Extract values extracted_a = f"json_extract(TRY_CAST({col_a} AS JSON), '{path_a_escaped}')" - + if path_b: extracted_b = f"json_extract(TRY_CAST({col_b} AS JSON), '{path_b_escaped}')" else: # Compare to column value directly extracted_b = col_b - + # Generate SQL based on path types if is_array_path_a and is_array_path_b: # Both arrays: check if arrays are equal element by element @@ -2597,7 +2605,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col_a} IS NOT NULL + WHERE {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2608,7 +2616,7 @@ def generateSql(self) -> SQLQuery: OR EXISTS ( SELECT 1 FROM unnest({extracted_a}) WITH ORDINALITY AS a(elem_a, idx) - JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) + JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) ON a.idx = b.idx WHERE a.elem_a::TEXT != b.elem_b::TEXT OR (a.elem_a IS NULL AND b.elem_b IS NOT NULL) @@ -2621,9 +2629,9 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col_a} IS NOT NULL + {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2633,14 +2641,14 @@ def generateSql(self) -> SQLQuery: AND NOT EXISTS ( SELECT 1 FROM unnest({extracted_a}) WITH ORDINALITY AS a(elem_a, idx) - JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) + JOIN unnest({extracted_b}) WITH ORDINALITY AS b(elem_b, idx) ON a.idx = b.idx WHERE a.elem_a::TEXT != b.elem_b::TEXT OR (a.elem_a IS NULL AND b.elem_b IS NOT NULL) OR (a.elem_a IS NOT NULL AND b.elem_b IS NULL) ) """ - + elif is_array_path_a and not is_array_path_b: # Array A, single B: check if all elements of A equal B if path_b: @@ -2649,7 +2657,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col_a} IS NOT NULL + WHERE {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2657,7 +2665,7 @@ def generateSql(self) -> SQLQuery: AND {extracted_b} IS NOT NULL AND json_array_length({extracted_a}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_a}) AS t(elem) WHERE elem::TEXT != {extracted_b}::TEXT OR (elem IS NULL AND {extracted_b} IS NOT NULL) @@ -2669,9 +2677,9 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col_a} IS NOT NULL + {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2679,7 +2687,7 @@ def generateSql(self) -> SQLQuery: AND {extracted_b} IS NOT NULL AND json_array_length({extracted_a}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_a}) AS t(elem) WHERE elem::TEXT != {extracted_b}::TEXT OR (elem IS NULL AND {extracted_b} IS NOT NULL) @@ -2693,13 +2701,13 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col_a} IS NOT NULL + WHERE {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND {extracted_a} IS NOT NULL AND json_array_length({extracted_a}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_a}) AS t(elem) WHERE elem::TEXT != ('"' || {col_b}::TEXT || '"') OR (elem IS NULL AND {col_b} IS NOT NULL) @@ -2711,29 +2719,29 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col_a} IS NOT NULL + {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND {extracted_a} IS NOT NULL AND json_array_length({extracted_a}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_a}) AS t(elem) WHERE elem::TEXT != ('"' || {col_b}::TEXT || '"') OR (elem IS NULL AND {col_b} IS NOT NULL) OR (elem IS NOT NULL AND {col_b} IS NULL) ) """ - + elif not is_array_path_a and is_array_path_b: # Single A, array B: check if A equals all elements of B requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col_a} IS NOT NULL + WHERE {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2741,7 +2749,7 @@ def generateSql(self) -> SQLQuery: AND {extracted_b} IS NOT NULL AND json_array_length({extracted_b}) > 0 AND EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_b}) AS t(elem) WHERE elem::TEXT != {extracted_a}::TEXT OR (elem IS NULL AND {extracted_a} IS NOT NULL) @@ -2753,9 +2761,9 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col_a} IS NOT NULL + {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2763,14 +2771,14 @@ def generateSql(self) -> SQLQuery: AND {extracted_b} IS NOT NULL AND json_array_length({extracted_b}) > 0 AND NOT EXISTS ( - SELECT 1 + SELECT 1 FROM unnest({extracted_b}) AS t(elem) WHERE elem::TEXT != {extracted_a}::TEXT OR (elem IS NULL AND {extracted_a} IS NOT NULL) OR (elem IS NOT NULL AND {extracted_a} IS NULL) ) """ - + else: # Both single values: direct comparison if path_b: @@ -2779,7 +2787,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col_a} IS NOT NULL + WHERE {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) @@ -2795,15 +2803,15 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col_a} IS NOT NULL + {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND json_valid({col_b}::TEXT) AND ( ({extracted_a} IS NULL AND {extracted_b} IS NULL) - OR ({extracted_a} IS NOT NULL AND {extracted_b} IS NOT NULL + OR ({extracted_a} IS NOT NULL AND {extracted_b} IS NOT NULL AND {extracted_a}::TEXT = {extracted_b}::TEXT) ) """ @@ -2814,7 +2822,7 @@ def generateSql(self) -> SQLQuery: WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col_a} IS NOT NULL + WHERE {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND {extracted_a} IS NOT NULL @@ -2825,18 +2833,17 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col_a} IS NOT NULL + {col_a} IS NOT NULL AND {col_b} IS NOT NULL AND json_valid({col_a}::TEXT) AND {extracted_a} IS NOT NULL AND {extracted_a}::TEXT = ('"' || {col_b}::TEXT || '"') """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -2846,16 +2853,17 @@ def getCheckType(self) -> str: class JSONCheckPathNumericFormatGenerator(DuckDBCheckGenerator): """ Validates that element(s) at JSON path meet numeric format requirements. - + Arguments: - ColumnName: The column containing JSON data - Path: JSONPath expression to extract element(s) - + Validation: - Checks that extracted values match numeric pattern: ^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$ - Supports both array paths (with [*]) and single value paths - Only validates non-null JSON values """ + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: @@ -2869,25 +2877,25 @@ def generateSql(self) -> SQLQuery: msg_sql = message.replace("'", "''") # Determine if this is an array path or single value path - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + # Numeric pattern: supports 123, -123, 1.23, -1.23, 1.23e10, 1.23e-10, 1.23E+10, etc. - numeric_pattern = r'^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$' + numeric_pattern = r"^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$" if is_array_path: # Array path: Check if ANY element violates the numeric format # Only check elements that exist and are not JSON null requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') ) @@ -2900,12 +2908,12 @@ def generateSql(self) -> SQLQuery: # Predicate: All elements must be valid numeric format predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR NOT EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') ) @@ -2915,10 +2923,10 @@ def generateSql(self) -> SQLQuery: # Only check if the value exists and is not JSON null requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_extract({col}::TEXT, '{path}') IS NOT NULL AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' @@ -2932,40 +2940,15 @@ def generateSql(self) -> SQLQuery: # Predicate: Value must be valid numeric format predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR json_extract({col}::TEXT, '{path}') IS NULL OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' OR (TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"') ~ '{numeric_pattern}') """ - # Apply any conditional filters - if is_array_path: - requirement_lines = requirement_sql.strip().split('\n') - # Find WHERE clause and apply condition - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - else: - requirement_lines = requirement_sql.strip().split('\n') - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - - if getattr(self, 'condition_params', None): - predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" - return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -2975,16 +2958,17 @@ def getCheckType(self) -> str: class JSONCheckPathUnitFormatGenerator(DuckDBCheckGenerator): """ Validates that element(s) at JSON path meet unit format requirements. - + Arguments: - ColumnName: The column containing JSON data - Path: JSONPath expression to extract element(s) - + Validation: - Checks that extracted values match FOCUS unit format patterns - Supports both array paths (with [*]) and single value paths - Only validates non-null JSON values """ + REQUIRED_KEYS = {"ColumnName", "Path"} def _generate_unit_format_regex(self) -> str: @@ -2995,13 +2979,35 @@ def _generate_unit_format_regex(self) -> str: # Data Size Unit Names (both decimal and binary) data_size_units = [ # Bits (decimal) - "b", "Kb", "Mb", "Gb", "Tb", "Pb", "Eb", + "b", + "Kb", + "Mb", + "Gb", + "Tb", + "Pb", + "Eb", # Bytes (decimal) - "B", "KB", "MB", "GB", "TB", "PB", "EB", + "B", + "KB", + "MB", + "GB", + "TB", + "PB", + "EB", # Bits (binary) - "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", + "Kib", + "Mib", + "Gib", + "Tib", + "Pib", + "Eib", # Bytes (binary) - "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", + "KiB", + "MiB", + "GiB", + "TiB", + "PiB", + "EiB", ] # Time-based Unit Names @@ -3053,22 +3059,22 @@ def generateSql(self) -> SQLQuery: combined_pattern = self._generate_unit_format_regex() # Determine if this is an array path or single value path - is_array_path = '[*]' in path + is_array_path = "[*]" in path if is_array_path: # Array path: Check if ANY element violates the unit format # Only check elements that exist and are not JSON null requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') ) @@ -3081,12 +3087,12 @@ def generateSql(self) -> SQLQuery: # Predicate: All elements must match unit format predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR NOT EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') ) @@ -3096,10 +3102,10 @@ def generateSql(self) -> SQLQuery: # Only check if the value exists and is not JSON null requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_extract({col}::TEXT, '{path}') IS NOT NULL AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' @@ -3113,39 +3119,15 @@ def generateSql(self) -> SQLQuery: # Predicate: Value must match unit format predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR json_extract({col}::TEXT, '{path}') IS NULL OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' OR regexp_matches(TRIM(json_extract({col}::TEXT, '{path}')::TEXT, '"'), '{combined_pattern}') """ - # Apply any conditional filters - if is_array_path: - requirement_lines = requirement_sql.strip().split('\n') - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - else: - requirement_lines = requirement_sql.strip().split('\n') - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - - if getattr(self, 'condition_params', None): - predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" - return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -3155,19 +3137,20 @@ def getCheckType(self) -> str: class JSONCheckPathDistinctParentGenerator(DuckDBCheckGenerator): """ Validates that distinct count of child elements equals expected count. - + Arguments: - ColumnName: The column containing JSON data - ParentPath: JSONPath to parent elements (typically with [*]) - ChildPath: JSONPath relative to parent to extract child values - ExpectedCount: Expected number of distinct child values - + Validation: - Extracts parent elements using ParentPath - For each parent, extracts child value using ChildPath - Counts distinct non-null child values - Checks if distinct count equals ExpectedCount """ + REQUIRED_KEYS = {"ColumnName", "ParentPath", "ChildPath", "ExpectedCount"} def generateSql(self) -> SQLQuery: @@ -3185,31 +3168,31 @@ def generateSql(self) -> SQLQuery: # Build the validation query # For each row, extract parent elements, then extract child values, count distinct # Child path format: if it starts with $, use it directly; otherwise prepend $. to make it a path - if child_path.startswith('$.'): + if child_path.startswith("$."): full_child_path = child_path - elif child_path.startswith('$'): + elif child_path.startswith("$"): full_child_path = child_path else: # Assume it's a simple key name, make it a path - full_child_path = f'$.{child_path}' + full_child_path = f"$.{child_path}" requirement_sql = f""" WITH row_distinct_counts AS ( - SELECT + SELECT {col}, ( SELECT COUNT(DISTINCT child_value) FROM ( SELECT json_extract(parent_elem::TEXT, '{full_child_path}') AS child_value FROM unnest(json_extract({col}::TEXT, '{parent_path}')) AS t(parent_elem) - WHERE parent_elem IS NOT NULL + WHERE parent_elem IS NOT NULL AND parent_elem::TEXT != 'null' AND json_extract(parent_elem::TEXT, '{full_child_path}') IS NOT NULL AND json_extract(parent_elem::TEXT, '{full_child_path}')::TEXT != 'null' ) child_values ) AS distinct_count FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{parent_path}')) > 0 ), @@ -3226,14 +3209,14 @@ def generateSql(self) -> SQLQuery: # Predicate: distinct count must equal expected count predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR ( SELECT COUNT(DISTINCT child_value) FROM ( SELECT json_extract(parent_elem::TEXT, '{full_child_path}') AS child_value FROM unnest(json_extract({col}::TEXT, '{parent_path}')) AS t(parent_elem) - WHERE parent_elem IS NOT NULL + WHERE parent_elem IS NOT NULL AND parent_elem::TEXT != 'null' AND json_extract(parent_elem::TEXT, '{full_child_path}') IS NOT NULL AND json_extract(parent_elem::TEXT, '{full_child_path}')::TEXT != 'null' @@ -3241,22 +3224,8 @@ def generateSql(self) -> SQLQuery: ) = {expected_count} """ - # Apply any conditional filters - requirement_lines = requirement_sql.strip().split('\n') - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - - if getattr(self, 'condition_params', None): - predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" - return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -3266,11 +3235,11 @@ def getCheckType(self) -> str: class FormatJSONFormatGenerator(DuckDBCheckGenerator): """ Validates that element(s) at JSON path meet JSON format requirements. - + Arguments: - ColumnName: The column containing JSON data - Path: JSONPath expression to extract element(s) (optional - if not provided, validates entire column) - + Validation: - Checks that extracted values are valid JSON - Supports both array paths (with [*]) and single value paths @@ -3278,27 +3247,25 @@ class FormatJSONFormatGenerator(DuckDBCheckGenerator): - Only validates non-null JSON values - Validates that the extracted element is itself valid JSON """ + REQUIRED_KEYS = {"ColumnName"} def generateSql(self) -> SQLQuery: col = self.params.ColumnName - path = getattr(self.params, 'Path', None) + path = getattr(self.params, "Path", None) keyword = self._get_validation_keyword() - + # If no path provided, validate the entire column if not path: - message = ( - self.errorMessage - or f"{col} {keyword} contain valid JSON format." - ) + message = self.errorMessage or f"{col} {keyword} contain valid JSON format." msg_sql = message.replace("'", "''") - + requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND NOT json_valid({col}::TEXT) ) SELECT @@ -3306,17 +3273,17 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM violations """ - + predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR json_valid({col}::TEXT) """ - + return SQLQuery( requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + predicate_sql=predicate_sql.strip(), ) - + # Path provided - validate elements at that path message = ( self.errorMessage @@ -3325,22 +3292,22 @@ def generateSql(self) -> SQLQuery: msg_sql = message.replace("'", "''") # Determine if this is an array path or single value path - is_array_path = '[*]' in path + is_array_path = "[*]" in path if is_array_path: # Array path: Check if ANY element is not valid JSON # Only check elements that exist and are not JSON null requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT json_valid(elem::TEXT) ) @@ -3353,12 +3320,12 @@ def generateSql(self) -> SQLQuery: # Predicate: All elements must be valid JSON predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR NOT EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT json_valid(elem::TEXT) ) @@ -3368,10 +3335,10 @@ def generateSql(self) -> SQLQuery: # Only check if the value exists and is not JSON null requirement_sql = f""" WITH violations AS ( - SELECT + SELECT {col} FROM {{{{table_name}}}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_extract({col}::TEXT, '{path}') IS NOT NULL AND json_extract({col}::TEXT, '{path}')::TEXT != 'null' @@ -3385,39 +3352,15 @@ def generateSql(self) -> SQLQuery: # Predicate: Value must be valid JSON predicate_sql = f""" - {col} IS NULL + {col} IS NULL OR NOT json_valid({col}::TEXT) OR json_extract({col}::TEXT, '{path}') IS NULL OR json_extract({col}::TEXT, '{path}')::TEXT = 'null' OR json_valid(json_extract({col}::TEXT, '{path}')::TEXT) """ - # Apply any conditional filters - if is_array_path: - requirement_lines = requirement_sql.strip().split('\n') - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - else: - requirement_lines = requirement_sql.strip().split('\n') - for i, line in enumerate(requirement_lines): - if 'WHERE {col} IS NOT NULL' in line: - base_condition = line.strip() - full_condition = self._apply_condition(base_condition.replace('WHERE ', '')) - requirement_lines[i] = f" WHERE {full_condition}" - break - requirement_sql = '\n'.join(requirement_lines) - - if getattr(self, 'condition_params', None): - predicate_sql = f"({predicate_sql.strip()}) AND ({self._build_condition_sql()})" - return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -3427,49 +3370,50 @@ def getCheckType(self) -> str: class JSONFormatStringGenerator(DuckDBCheckGenerator): """ Check element(s) at JSON path conform to StringHandling requirements (ASCII characters only). - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.name" or "$.items[*].label") - + StringHandling requirements: - - Strings must contain only ASCII characters ([\x00-\x7F]) - + - Strings must contain only ASCII characters ([\x00-\x7f]) + For array paths (e.g., $.items[*].label), validates ALL string elements. """ + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: col = self.params.ColumnName path = self.params.Path keyword = self._get_validation_keyword() - + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' {keyword} contain only ASCII characters" ) msg_sql = message.replace("'", "''") - + # Escape single quotes in path for SQL path_escaped = path.replace("'", "''") - + # Determine if this is an array path (contains [*]) - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + if is_array_path: # Array path: Check all elements in the array requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 AND EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT (TRIM(elem::TEXT, '"') ~ '^[\\x00-\\x7F]*$') ) @@ -3479,15 +3423,15 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 AND NOT EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT (TRIM(elem::TEXT, '"') ~ '^[\\x00-\\x7F]*$') ) @@ -3495,12 +3439,12 @@ def generateSql(self) -> SQLQuery: else: # Single value path extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" - + requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' @@ -3511,18 +3455,17 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' AND (TRIM({extracted_value}::TEXT, '"') ~ '^[\\x00-\\x7F]*$') """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -3532,11 +3475,11 @@ def getCheckType(self) -> str: class JSONFormatUnitGenerator(DuckDBCheckGenerator): """ Check element(s) at JSON path conform to FOCUS Unit Format requirements. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.unit" or "$.items[*].unit") - + Unit Format requirements: - Must match one of 5 FOCUS unit format patterns: 1. Standalone units (e.g., "GB", "Hours", "Requests") @@ -3544,9 +3487,10 @@ class JSONFormatUnitGenerator(DuckDBCheckGenerator): 3. unit/time (e.g., "GB/Hour", "Requests/Second") 4. quantity units (e.g., "1000 Requests") 5. units/interval (e.g., "Requests/3 Months") - + For array paths (e.g., $.items[*].unit), validates ALL elements. """ + REQUIRED_KEYS = {"ColumnName", "Path"} def _generate_unit_format_regex(self) -> str: @@ -3557,13 +3501,35 @@ def _generate_unit_format_regex(self) -> str: # Data Size Unit Names (both decimal and binary) data_size_units = [ # Bits (decimal) - "b", "Kb", "Mb", "Gb", "Tb", "Pb", "Eb", + "b", + "Kb", + "Mb", + "Gb", + "Tb", + "Pb", + "Eb", # Bytes (decimal) - "B", "KB", "MB", "GB", "TB", "PB", "EB", + "B", + "KB", + "MB", + "GB", + "TB", + "PB", + "EB", # Bits (binary) - "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", + "Kib", + "Mib", + "Gib", + "Tib", + "Pib", + "Eib", # Bytes (binary) - "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", + "KiB", + "MiB", + "GiB", + "TiB", + "PiB", + "EiB", ] # Time-based Unit Names @@ -3605,36 +3571,36 @@ def generateSql(self) -> SQLQuery: col = self.params.ColumnName path = self.params.Path keyword = self._get_validation_keyword() - + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' {keyword} follow the FOCUS Unit Format specification" ) msg_sql = message.replace("'", "''") - + # Escape single quotes in path for SQL path_escaped = path.replace("'", "''") - + # Get the combined unit format regex pattern combined_pattern = self._generate_unit_format_regex() - + # Determine if this is an array path (contains [*]) - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + if is_array_path: # Array path: Check all elements in the array requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 AND EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') ) @@ -3644,15 +3610,15 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 AND NOT EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT regexp_matches(TRIM(elem::TEXT, '"'), '{combined_pattern}') ) @@ -3660,12 +3626,12 @@ def generateSql(self) -> SQLQuery: else: # Single value path extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" - + requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' @@ -3676,18 +3642,17 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' AND regexp_matches(TRIM({extracted_value}::TEXT, '"'), '{combined_pattern}') """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -3697,56 +3662,57 @@ def getCheckType(self) -> str: class JSONFormatNumericGenerator(DuckDBCheckGenerator): """ Check element(s) at JSON path conform to Numeric Format requirements. - + Arguments: ColumnName: The column containing JSON data Path: JSONPath expression (e.g., "$.value" or "$.items[*].price") - + Numeric Format requirements: - Optional leading sign (+/-) - Optional decimal point with digits - Required digits - Optional scientific notation (e.g., 1.23e10, 1.23E-5) - Pattern: ^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$ - + For array paths (e.g., $.items[*].price), validates ALL elements. """ + REQUIRED_KEYS = {"ColumnName", "Path"} def generateSql(self) -> SQLQuery: col = self.params.ColumnName path = self.params.Path keyword = self._get_validation_keyword() - + # Build error message message = ( - self.errorMessage + self.errorMessage or f"{col} at path '{path}' {keyword} contain numeric values" ) msg_sql = message.replace("'", "''") - + # Escape single quotes in path for SQL path_escaped = path.replace("'", "''") - + # Numeric pattern: supports 123, -123, 1.23, -1.23, 1.23e10, 1.23e-10, 1.23E+10, etc. - numeric_pattern = r'^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$' - + numeric_pattern = r"^[+-]?([0-9]*[.])?[0-9]+([eE][+-]?[0-9]+)?$" + # Determine if this is an array path (contains [*]) - is_array_path = '[*]' in path - + is_array_path = "[*]" in path + if is_array_path: # Array path: Check all elements in the array requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 AND EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') ) @@ -3756,15 +3722,15 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND json_array_length(json_extract({col}::TEXT, '{path_escaped}')) > 0 AND NOT EXISTS ( SELECT 1 FROM unnest(json_extract({col}::TEXT, '{path_escaped}')) AS t(elem) - WHERE elem IS NOT NULL + WHERE elem IS NOT NULL AND elem::TEXT != 'null' AND NOT (TRIM(elem::TEXT, '"') ~ '{numeric_pattern}') ) @@ -3772,12 +3738,12 @@ def generateSql(self) -> SQLQuery: else: # Single value path extracted_value = f"json_extract({col}::TEXT, '{path_escaped}')" - + requirement_sql = f""" WITH invalid AS ( SELECT 1 FROM {{table_name}} - WHERE {col} IS NOT NULL + WHERE {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' @@ -3788,18 +3754,17 @@ def generateSql(self) -> SQLQuery: CASE WHEN COUNT(*) > 0 THEN '{msg_sql}' END AS error_message FROM invalid """ - + predicate_sql = f""" - {col} IS NOT NULL + {col} IS NOT NULL AND json_valid({col}::TEXT) AND {extracted_value} IS NOT NULL AND {extracted_value}::TEXT != 'null' AND (TRIM({extracted_value}::TEXT, '"') ~ '{numeric_pattern}') """ - + return SQLQuery( - requirement_sql=requirement_sql.strip(), - predicate_sql=predicate_sql.strip() + requirement_sql=requirement_sql.strip(), predicate_sql=predicate_sql.strip() ) def getCheckType(self) -> str: @@ -4649,7 +4614,9 @@ def __init__( # Track missing generators for reporting self.missing_generators: Set[str] = set() - self.missing_generator_rules: List[Tuple[str, str]] = [] # (rule_id, check_function) + self.missing_generator_rules: List[Tuple[str, str]] = ( + [] + ) # (rule_id, check_function) # Example caches (optional) self._prepared: Dict[str, Any] = {} @@ -4808,24 +4775,29 @@ def finalize( for rule_id, check_fn in self.missing_generator_rules[:10] ) if len(self.missing_generator_rules) > 10: - rules_list += f"\n ... and {len(self.missing_generator_rules) - 10} more" - + rules_list += ( + f"\n ... and {len(self.missing_generator_rules) - 10} more" + ) + missing_gens = ", ".join(sorted(self.missing_generators)) - + log.warning( - "\n" + "="*80 + "\n" + - "VALIDATION INCOMPLETE: Missing Check Generators\n" + - "="*80 + "\n" + - "The following check functions are not implemented:\n" + - " %s\n\n" + - "Affected rules (%d total):\n%s\n" + - "\nThese rules have been marked as SKIPPED in the report.\n" + - "="*80, + "\n" + + "=" * 80 + + "\n" + + "VALIDATION INCOMPLETE: Missing Check Generators\n" + + "=" * 80 + + "\n" + + "The following check functions are not implemented:\n" + + " %s\n\n" + + "Affected rules (%d total):\n%s\n" + + "\nThese rules have been marked as SKIPPED in the report.\n" + + "=" * 80, missing_gens, len(self.missing_generator_rules), - rules_list + rules_list, ) - + # e.g., self.conn.execute("DROP VIEW IF EXISTS ...") # Close DuckDB connection to prevent hanging in CI environments if hasattr(self, "conn") and self.conn is not None: @@ -4948,9 +4920,9 @@ def _extract_missing_columns(err_msg: str) -> list[str]: details = { "skipped": True, "reason": getattr(check, "errorMessage", None) or "Rule skipped", - "violations": 0 + "violations": 0, } - + details.setdefault("violations", 0) details.setdefault( "message", @@ -5105,19 +5077,21 @@ def _extract_missing_columns(err_msg: str) -> list[str]: sql = getattr(check, "checkSql", None) if not sql or sql == "None": # Check if this should have been caught as a skipped check - check_type = getattr(check, "checkType", None) or getattr(check, "check_type", None) + check_type = getattr(check, "checkType", None) or getattr( + check, "check_type", None + ) rule_id = getattr(check, "rule_id", None) error_msg = getattr(check, "errorMessage", None) - + # If it looks like a skipped check but wasn't caught, handle it gracefully if check_type == "skipped_check" or "skipped" in str(error_msg).lower(): return True, { "skipped": True, "reason": error_msg or "Rule skipped", "violations": 0, - "message": error_msg or f"{rule_id}: skipped" + "message": error_msg or f"{rule_id}: skipped", } - + raise InvalidRuleException( f"Leaf check has no SQL to execute (rule_id={rule_id}, check_type={check_type})" ) @@ -5304,15 +5278,15 @@ def __make_generator__( # Log warning and track missing generator instead of raising exception self.missing_generators.add(check_fn) self.missing_generator_rules.append((rule_id, check_fn)) - + log.warning( "Missing generator for CheckFunction '%s' in rule '%s'. " "Rule will be skipped. Available generators: %s", check_fn, rule_id, - sorted(self.CHECK_GENERATORS.keys()) + sorted(self.CHECK_GENERATORS.keys()), ) - + # Return a skipped check generator that will be marked appropriately return SkippedMissingGeneratorCheck( rule=rule, @@ -5431,7 +5405,7 @@ def __generate_duckdb_check__( Build a DuckDBColumnCheck for this requirement. For composites (AND/OR), the Composite* generators will recursively call back here to build child checks and set `nestedChecks` + `nestedCheckHandler`. - + Args: inherited_condition: Condition inherited from parent composite (for inline children) """ @@ -5442,7 +5416,7 @@ def __generate_duckdb_check__( # Build effective condition from parent_edges AND from downstream composite consumers eff_cond = self._build_effective_condition(rule, parent_edges) - + # If this is an inline child of a composite, inherit the composite's effective condition if inherited_condition: if eff_cond: