diff --git a/primitives/envelope-gate/gate.py b/primitives/envelope-gate/gate.py index 75198bc..1c2c800 100644 --- a/primitives/envelope-gate/gate.py +++ b/primitives/envelope-gate/gate.py @@ -5,10 +5,10 @@ - FIRST_FAIL: halt on first violation (default, frozen at msg-0003). - ACCUMULATE_ALL: collect all violations (available for diagnostics). -Decision mapping: + Decision mapping: - No violations -> ALLOW - Structural violation -> DENY - - Policy violation -> HOLD + - Policy violation -> HOLD - Not addressed to gate -> SILENCE Deterministic. No side effects. No network calls. @@ -16,12 +16,16 @@ from __future__ import annotations +import hashlib import importlib.util +import json +import os import sys from dataclasses import dataclass, field from pathlib import Path from typing import List + # --------------------------------------------------------------------------- # Robust local imports via importlib (avoids sibling gate.py collision) # --------------------------------------------------------------------------- @@ -50,6 +54,7 @@ def _load_local(module_name: str): @dataclass(frozen=True) class GateResult: """Outcome of running an envelope through the conformance gate.""" + msg_id: str exit: str # ALLOW | HOLD | DENY | SILENCE violations: List[Violation] = field(default_factory=list) @@ -65,14 +70,44 @@ def _classify_exit(violations: List[Violation]) -> str: """Map a list of violations to a gate exit decision.""" if not violations: return Exit.ALLOW.value + # Any R0 structural failure -> DENY (envelope is malformed) for v in violations: if v.code.startswith("R0_"): return Exit.DENY.value + # Everything else -> HOLD (policy or enum issue, fixable) return Exit.HOLD.value +# --------------------------------------------------------------------------- +# Geometry Layer v0 -- optional JSONL emission (best-effort, off by default) +# --------------------------------------------------------------------------- + + +def _sha256_bytes(b: bytes) -> str: + """Return hex SHA-256 digest of *b*.""" + return hashlib.sha256(b).hexdigest() + + +def _canonical_json(obj: dict) -> str: + """Canonical JSON: sorted keys, compact separators, no ASCII escaping.""" + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + + +def _emit_geometry_event(event: dict) -> None: + """Append one JSONL line to GEOMETRY_LOG_PATH if set; silently no-op otherwise.""" + path = os.environ.get("GEOMETRY_LOG_PATH") + if not path: + return + try: + line = _canonical_json(event) + "\n" + with open(path, "a", encoding="utf-8") as fh: + fh.write(line) + except Exception: + return + + def evaluate( envelope: Envelope, policy: str = "FIRST_FAIL", @@ -81,7 +116,7 @@ def evaluate( Args: envelope: A parsed Envelope object. - policy: "FIRST_FAIL" (default) or "ACCUMULATE_ALL". + policy: "FIRST_FAIL" (default) or "ACCUMULATE_ALL". Returns: GateResult with exit decision and any violations found. @@ -97,9 +132,26 @@ def evaluate( if policy == "FIRST_FAIL": break + exit_decision = _classify_exit(violations) + violation_codes = sorted([v.code for v in violations]) + + # -- Geometry Layer v0: optional emission (never throws, never blocks) -- + _emit_geometry_event({ + "schema_version": "0.1", + "primitive": "envelope-gate", + "event": "gate_evaluated", + "envelope_id": envelope.msg_id or None, + "exit": exit_decision, + "violations": violation_codes, + "input_hash": _sha256_bytes(envelope.raw.encode("utf-8")), + "result_hash": _sha256_bytes( + _canonical_json({"exit": exit_decision, "violations": violation_codes}).encode("utf-8") + ), + }) + return GateResult( msg_id=envelope.msg_id or "(unknown)", - exit=_classify_exit(violations), + exit=exit_decision, violations=violations, rules_checked=rules_checked, rules_total=len(ALL_RULES),