Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions primitives/envelope-gate/gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@
- FIRST_FAIL: halt on first violation (default, frozen at msg-0003).
- ACCUMULATE_ALL: collect all violations (available for diagnostics).

Decision mapping:
Decision mapping:
- No violations -> ALLOW
- Structural violation -> DENY
- Policy violation -> HOLD
- Policy violation -> HOLD
- Not addressed to gate -> SILENCE

Deterministic. No side effects. No network calls.
"""

from __future__ import annotations

import hashlib
import importlib.util
import json
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import List


# ---------------------------------------------------------------------------
# Robust local imports via importlib (avoids sibling gate.py collision)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -50,6 +54,7 @@ def _load_local(module_name: str):
@dataclass(frozen=True)
class GateResult:
"""Outcome of running an envelope through the conformance gate."""

msg_id: str
exit: str # ALLOW | HOLD | DENY | SILENCE
violations: List[Violation] = field(default_factory=list)
Expand All @@ -65,14 +70,44 @@ def _classify_exit(violations: List[Violation]) -> str:
"""Map a list of violations to a gate exit decision."""
if not violations:
return Exit.ALLOW.value

# Any R0 structural failure -> DENY (envelope is malformed)
for v in violations:
if v.code.startswith("R0_"):
return Exit.DENY.value

# Everything else -> HOLD (policy or enum issue, fixable)
return Exit.HOLD.value


# ---------------------------------------------------------------------------
# Geometry Layer v0 -- optional JSONL emission (best-effort, off by default)
# ---------------------------------------------------------------------------


def _sha256_bytes(b: bytes) -> str:
"""Return hex SHA-256 digest of *b*."""
return hashlib.sha256(b).hexdigest()


def _canonical_json(obj: dict) -> str:
"""Canonical JSON: sorted keys, compact separators, no ASCII escaping."""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


def _emit_geometry_event(event: dict) -> None:
"""Append one JSONL line to GEOMETRY_LOG_PATH if set; silently no-op otherwise."""
path = os.environ.get("GEOMETRY_LOG_PATH")
if not path:
return
try:
line = _canonical_json(event) + "\n"
with open(path, "a", encoding="utf-8") as fh:
fh.write(line)
except Exception:
return


def evaluate(
envelope: Envelope,
policy: str = "FIRST_FAIL",
Expand All @@ -81,7 +116,7 @@ def evaluate(

Args:
envelope: A parsed Envelope object.
policy: "FIRST_FAIL" (default) or "ACCUMULATE_ALL".
policy: "FIRST_FAIL" (default) or "ACCUMULATE_ALL".

Returns:
GateResult with exit decision and any violations found.
Expand All @@ -97,9 +132,26 @@ def evaluate(
if policy == "FIRST_FAIL":
break

exit_decision = _classify_exit(violations)
violation_codes = sorted([v.code for v in violations])

# -- Geometry Layer v0: optional emission (never throws, never blocks) --
_emit_geometry_event({
"schema_version": "0.1",
"primitive": "envelope-gate",
"event": "gate_evaluated",
"envelope_id": envelope.msg_id or None,
"exit": exit_decision,
"violations": violation_codes,
"input_hash": _sha256_bytes(envelope.raw.encode("utf-8")),
"result_hash": _sha256_bytes(
_canonical_json({"exit": exit_decision, "violations": violation_codes}).encode("utf-8")
),
})

return GateResult(
msg_id=envelope.msg_id or "(unknown)",
exit=_classify_exit(violations),
exit=exit_decision,
violations=violations,
rules_checked=rules_checked,
rules_total=len(ALL_RULES),
Expand Down