diff --git a/.gitignore b/.gitignore index fee69b0..7385087 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,14 @@ __pycache__/ +*.py[cod] +*.pem *.db +*.db-shm +*.db-wal +.env +dist/ +build/ +*.egg-info/ +.mypy_cache/ +.ruff_cache/ .coverage -example_artifact/private_key.pem -example_artifact/private_key.pub.pem -example_artifact/public_key.pem -example_artifact/sealed_artifact.json +htmlcov/ diff --git a/artifact_demo.py b/artifact_demo.py deleted file mode 100644 index fc360f6..0000000 --- a/artifact_demo.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python3 -"""CEYO Protocol — end-to-end demo: seal then verify.""" - -import subprocess -import sys - -print("Running CEYO demonstration") - -print("\nStep 1: Sealing artifact") -subprocess.run([sys.executable, "seal_artifact.py"], check=True) - -print("\nStep 2: Verifying artifact") -subprocess.run([ - sys.executable, - "tools/ceyo_verify.py", - "example_artifact/sealed_artifact.json", - "example_artifact/public_key.pem", -], check=True) - -print("\nDemo complete") diff --git a/ceyo/cli.py b/ceyo/cli.py index dad57e0..3062f4f 100644 --- a/ceyo/cli.py +++ b/ceyo/cli.py @@ -16,9 +16,21 @@ def cmd_seal(args: argparse.Namespace) -> None: """Seal a JSON record file.""" - body: dict[str, Any] = json.loads(Path(args.record).read_text(encoding="utf-8")) + try: + body: dict[str, Any] = json.loads(Path(args.record).read_text(encoding="utf-8")) + except FileNotFoundError: + print(f"Error: record file not found: {args.record}", file=sys.stderr) + sys.exit(1) + except json.JSONDecodeError as exc: + print(f"Error: invalid JSON in record file: {exc}", file=sys.stderr) + sys.exit(1) + key_provider = LocalKeyProvider(args.key) - envelope = seal_body(body, key_provider, validate=not args.no_validate) + try: + envelope = seal_body(body, key_provider, validate=not args.no_validate) + except Exception as exc: + print(f"Error sealing artifact: {exc}", file=sys.stderr) + sys.exit(1) output = args.output or str(Path(args.record).with_suffix(".sealed.json")) Path(output).write_text( @@ -26,13 +38,25 @@ def cmd_seal(args: argparse.Namespace) -> None: encoding="utf-8", ) print(f"Sealed: {output}") - print(f"Public key: {key_provider._pub_path}") + print(f"Public key: {key_provider.public_key_path}") def cmd_verify(args: argparse.Namespace) -> None: """Verify a sealed artifact against a public key.""" - artifact: dict[str, Any] = json.loads(Path(args.artifact).read_text(encoding="utf-8")) - pub_pem = Path(args.pubkey).read_bytes() + try: + artifact: dict[str, Any] = json.loads(Path(args.artifact).read_text(encoding="utf-8")) + except FileNotFoundError: + print(f"Error: artifact file not found: {args.artifact}", file=sys.stderr) + sys.exit(1) + except json.JSONDecodeError as exc: + print(f"Error: invalid JSON in artifact file: {exc}", file=sys.stderr) + sys.exit(1) + + try: + pub_pem = Path(args.pubkey).read_bytes() + except FileNotFoundError: + print(f"Error: public key file not found: {args.pubkey}", file=sys.stderr) + sys.exit(1) result = verify_artifact(artifact, pub_pem) for msg in result.passed: diff --git a/ceyo/keys.py b/ceyo/keys.py index cbe6471..5997087 100644 --- a/ceyo/keys.py +++ b/ceyo/keys.py @@ -8,6 +8,7 @@ from __future__ import annotations import abc +import os from pathlib import Path from typing import Any, Optional @@ -82,15 +83,21 @@ def _load_or_create(self) -> ec.EllipticCurvePrivateKey: else: self._priv_path.parent.mkdir(parents=True, exist_ok=True) self._priv = ec.generate_private_key(ec.SECP256R1()) - self._priv_path.write_bytes( - self._priv.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) + priv_pem = self._priv.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), ) - # Write public key - self._pub_path.write_bytes(self.get_public_key_pem()) + self._priv_path.write_bytes(priv_pem) + # Restrict private key file to owner read/write only. + os.chmod(self._priv_path, 0o600) + + # Write public key (inline to avoid circular _load_or_create call). + pub_pem = self._priv.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + self._pub_path.write_bytes(pub_pem) return self._priv def get_private_key(self) -> ec.EllipticCurvePrivateKey: @@ -116,6 +123,11 @@ def key_id(self) -> str: def registry(self) -> str: return "local" + @property + def public_key_path(self) -> Path: + """Path to the public key PEM file.""" + return self._pub_path + class InMemoryKeyProvider(KeyProvider): """Key provider that holds keys in memory. Useful for testing.""" diff --git a/ceyo/schema.py b/ceyo/schema.py index ea547e6..16b2073 100644 --- a/ceyo/schema.py +++ b/ceyo/schema.py @@ -2,6 +2,7 @@ from __future__ import annotations +import re from typing import Any # Canonical JSON Schema for a CEYO sealed artifact envelope. @@ -144,7 +145,6 @@ def _validate(obj: dict[str, Any], schema: dict[str, Any], path: str = "") -> li errors.append(f"{path or 'root'}: expected {schema['const']!r}, got {obj!r}") if "pattern" in schema and isinstance(obj, str): - import re if not re.match(schema["pattern"], obj): errors.append(f"{path or 'root'}: does not match pattern {schema['pattern']}") diff --git a/ceyo/store.py b/ceyo/store.py index d0cd931..cb7895a 100644 --- a/ceyo/store.py +++ b/ceyo/store.py @@ -62,7 +62,14 @@ def append(self, artifact: dict[str, Any]) -> int: Returns: The sequence number of the stored artifact. + + Raises: + ValueError: If required envelope fields are missing. """ + missing = [f for f in ("artifact_id", "created_at") if f not in artifact] + if missing: + raise ValueError(f"Artifact missing required fields: {missing}") + envelope_json = json.dumps(artifact, sort_keys=True, separators=(",", ":")) entry_hash = b64u(sha256(envelope_json.encode("utf-8"))) prev_chain = self._last_chain_hash() diff --git a/ceyo/verify.py b/ceyo/verify.py index 1151b5f..364e842 100644 --- a/ceyo/verify.py +++ b/ceyo/verify.py @@ -3,6 +3,7 @@ from __future__ import annotations import hashlib +import hmac from typing import Any from cryptography.exceptions import InvalidSignature @@ -38,6 +39,27 @@ def __repr__(self) -> str: return f"VerificationResult({status}, passed={len(self.passed)}, failed={len(self.failed)})" +def _canonicalize_for_verify(body: Any, scheme: str) -> bytes: + """Canonicalize body using the scheme declared in the envelope. + + Raises RuntimeError if the declared scheme is unavailable. + """ + if scheme == "RFC8785": + try: + import rfc8785 + except ImportError: + raise RuntimeError( + "Artifact declares canonicalization scheme 'RFC8785' but the " + "'rfc8785' package is not installed. Install it to verify this artifact." + ) + return rfc8785.dumps(body) + + import json + return json.dumps( + body, sort_keys=True, separators=(",", ":"), ensure_ascii=False + ).encode("utf-8") + + def verify_artifact( artifact: dict[str, Any], public_key_pem: bytes, @@ -82,27 +104,16 @@ def verify_artifact( body = artifact["body"] scheme = artifact["canonicalization"]["scheme"] - # Use the declared scheme for canonicalization - if scheme == "RFC8785": - try: - import rfc8785 - canonical_bytes = rfc8785.dumps(body) - except ImportError: - # Fall back but warn - import json - canonical_bytes = json.dumps( - body, sort_keys=True, separators=(",", ":"), ensure_ascii=False - ).encode("utf-8") - else: - import json - canonical_bytes = json.dumps( - body, sort_keys=True, separators=(",", ":"), ensure_ascii=False - ).encode("utf-8") + try: + canonical_bytes = _canonicalize_for_verify(body, scheme) + except RuntimeError as exc: + result._fail(f"Canonicalization: {exc}") + return result actual_hash = hashlib.sha256(canonical_bytes).digest() expected_hash = b64u_decode(artifact["integrity"]["hash"]["value_b64u"]) - if actual_hash != expected_hash: + if not hmac.compare_digest(actual_hash, expected_hash): result._fail("Hash mismatch") return result result._pass("Hash matches") @@ -126,7 +137,7 @@ def verify_artifact( ) expected_fp = b64u_decode(key_ref["public_key_fingerprint"]["value_b64u"]) actual_fp = hashlib.sha256(pub_der).digest() - if actual_fp != expected_fp: + if not hmac.compare_digest(actual_fp, expected_fp): result._fail("Key fingerprint mismatch") return result result._pass("Key fingerprint matches") diff --git a/pyproject.toml b/pyproject.toml index 77b8f02..06cb97e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,15 +5,15 @@ build-backend = "hatchling.build" [project] name = "ceyo" version = "0.1.0" -description = "Evidentiary infrastructure prototype for AI systems" +description = "Evidentiary infrastructure for AI systems" readme = "README.md" -license = "MIT" +license = { text = "All Rights Reserved" } authors = [ { name = "Brian Covarrubias", email = "bsecure.cov@gmail.com" }, ] requires-python = ">=3.10" dependencies = [ - "cryptography", + "cryptography>=41", "rfc8785", ] @@ -27,6 +27,10 @@ dev = [ [project.scripts] ceyo = "ceyo.cli:main" +[project.urls] +Homepage = "https://ndr-us.github.io/ceyo-site/" +Repository = "https://github.com/NDR-US/ceyo-protocol" + [tool.ruff] target-version = "py310" line-length = 120 @@ -39,3 +43,6 @@ python_version = "3.10" warn_return_any = false warn_unused_configs = true ignore_missing_imports = true + +[tool.coverage.run] +source = ["ceyo"] diff --git a/seal_artifact.py b/seal_artifact.py deleted file mode 100644 index 24f0166..0000000 --- a/seal_artifact.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -""" -CEYO Protocol — Sealing tool - -Reads: example_artifact/sample_record.json -Writes: example_artifact/sealed_artifact.json - example_artifact/public_key.pem - example_artifact/private_key.pem (should be ignored by .gitignore) - -Output envelope follows docs/artifact-schema.json: - body → canonicalization → integrity → key_reference - -Signature: ECDSA P-256 over SHA-256(canonical(body)) -Canonicalization: RFC 8785 (JCS) if available; otherwise a deterministic fallback. -""" - -from __future__ import annotations - -import json -from pathlib import Path - -from ceyo.crypto import b64u, b64u_decode, canonicalize, sha256 -from ceyo.keys import LocalKeyProvider -from ceyo.seal import seal_body - -ROOT = Path(__file__).resolve().parent -EXAMPLE_DIR = ROOT / "example_artifact" - -RECORD_PATH = EXAMPLE_DIR / "sample_record.json" -SEALED_PATH = EXAMPLE_DIR / "sealed_artifact.json" -PRIVKEY_PATH = EXAMPLE_DIR / "private_key.pem" -PUBKEY_PATH = EXAMPLE_DIR / "public_key.pem" - - -def main() -> None: - EXAMPLE_DIR.mkdir(parents=True, exist_ok=True) - - if not RECORD_PATH.exists(): - raise FileNotFoundError(f"Missing {RECORD_PATH}. Create it first.") - - body = json.loads(RECORD_PATH.read_text(encoding="utf-8")) - - key_provider = LocalKeyProvider(PRIVKEY_PATH, PUBKEY_PATH) - sealed = seal_body(body, key_provider, validate=False) - - SEALED_PATH.write_text( - json.dumps(sealed, indent=2, ensure_ascii=False) + "\n", - encoding="utf-8", - ) - - print(f"Wrote: {SEALED_PATH}") - print(f"Wrote: {PUBKEY_PATH}") - print(f"Private key (should be ignored): {PRIVKEY_PATH}") - - -if __name__ == "__main__": - main()