Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
__pycache__/
*.py[cod]
*.pem
*.db
*.db-shm
*.db-wal
.env
dist/
build/
*.egg-info/
.mypy_cache/
.ruff_cache/
.coverage
example_artifact/private_key.pem
example_artifact/private_key.pub.pem
example_artifact/public_key.pem
example_artifact/sealed_artifact.json
htmlcov/
20 changes: 0 additions & 20 deletions artifact_demo.py

This file was deleted.

34 changes: 29 additions & 5 deletions ceyo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,47 @@

def cmd_seal(args: argparse.Namespace) -> None:
"""Seal a JSON record file."""
body: dict[str, Any] = json.loads(Path(args.record).read_text(encoding="utf-8"))
try:
body: dict[str, Any] = json.loads(Path(args.record).read_text(encoding="utf-8"))
except FileNotFoundError:
print(f"Error: record file not found: {args.record}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as exc:
print(f"Error: invalid JSON in record file: {exc}", file=sys.stderr)
sys.exit(1)

key_provider = LocalKeyProvider(args.key)
envelope = seal_body(body, key_provider, validate=not args.no_validate)
try:
envelope = seal_body(body, key_provider, validate=not args.no_validate)
except Exception as exc:
print(f"Error sealing artifact: {exc}", file=sys.stderr)
sys.exit(1)

output = args.output or str(Path(args.record).with_suffix(".sealed.json"))
Path(output).write_text(
json.dumps(envelope, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(f"Sealed: {output}")
print(f"Public key: {key_provider._pub_path}")
print(f"Public key: {key_provider.public_key_path}")


def cmd_verify(args: argparse.Namespace) -> None:
"""Verify a sealed artifact against a public key."""
artifact: dict[str, Any] = json.loads(Path(args.artifact).read_text(encoding="utf-8"))
pub_pem = Path(args.pubkey).read_bytes()
try:
artifact: dict[str, Any] = json.loads(Path(args.artifact).read_text(encoding="utf-8"))
except FileNotFoundError:
print(f"Error: artifact file not found: {args.artifact}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as exc:
print(f"Error: invalid JSON in artifact file: {exc}", file=sys.stderr)
sys.exit(1)

try:
pub_pem = Path(args.pubkey).read_bytes()
except FileNotFoundError:
print(f"Error: public key file not found: {args.pubkey}", file=sys.stderr)
sys.exit(1)

result = verify_artifact(artifact, pub_pem)
for msg in result.passed:
Expand Down
28 changes: 20 additions & 8 deletions ceyo/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import annotations

import abc
import os
from pathlib import Path
from typing import Any, Optional

Expand Down Expand Up @@ -82,15 +83,21 @@ def _load_or_create(self) -> ec.EllipticCurvePrivateKey:
else:
self._priv_path.parent.mkdir(parents=True, exist_ok=True)
self._priv = ec.generate_private_key(ec.SECP256R1())
self._priv_path.write_bytes(
self._priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
priv_pem = self._priv.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
# Write public key
self._pub_path.write_bytes(self.get_public_key_pem())
self._priv_path.write_bytes(priv_pem)
# Restrict private key file to owner read/write only.
os.chmod(self._priv_path, 0o600)

# Write public key (inline to avoid circular _load_or_create call).
pub_pem = self._priv.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
self._pub_path.write_bytes(pub_pem)
return self._priv

def get_private_key(self) -> ec.EllipticCurvePrivateKey:
Expand All @@ -116,6 +123,11 @@ def key_id(self) -> str:
def registry(self) -> str:
return "local"

@property
def public_key_path(self) -> Path:
"""Path to the public key PEM file."""
return self._pub_path


class InMemoryKeyProvider(KeyProvider):
"""Key provider that holds keys in memory. Useful for testing."""
Expand Down
2 changes: 1 addition & 1 deletion ceyo/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import re
from typing import Any

# Canonical JSON Schema for a CEYO sealed artifact envelope.
Expand Down Expand Up @@ -144,7 +145,6 @@ def _validate(obj: dict[str, Any], schema: dict[str, Any], path: str = "") -> li
errors.append(f"{path or 'root'}: expected {schema['const']!r}, got {obj!r}")

if "pattern" in schema and isinstance(obj, str):
import re
if not re.match(schema["pattern"], obj):
errors.append(f"{path or 'root'}: does not match pattern {schema['pattern']}")

Expand Down
7 changes: 7 additions & 0 deletions ceyo/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ def append(self, artifact: dict[str, Any]) -> int:

Returns:
The sequence number of the stored artifact.

Raises:
ValueError: If required envelope fields are missing.
"""
missing = [f for f in ("artifact_id", "created_at") if f not in artifact]
if missing:
raise ValueError(f"Artifact missing required fields: {missing}")

envelope_json = json.dumps(artifact, sort_keys=True, separators=(",", ":"))
entry_hash = b64u(sha256(envelope_json.encode("utf-8")))
prev_chain = self._last_chain_hash()
Expand Down
47 changes: 29 additions & 18 deletions ceyo/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import hashlib
import hmac
from typing import Any

from cryptography.exceptions import InvalidSignature
Expand Down Expand Up @@ -38,6 +39,27 @@ def __repr__(self) -> str:
return f"VerificationResult({status}, passed={len(self.passed)}, failed={len(self.failed)})"


def _canonicalize_for_verify(body: Any, scheme: str) -> bytes:
"""Canonicalize body using the scheme declared in the envelope.

Raises RuntimeError if the declared scheme is unavailable.
"""
if scheme == "RFC8785":
try:
import rfc8785
except ImportError:
raise RuntimeError(
"Artifact declares canonicalization scheme 'RFC8785' but the "
"'rfc8785' package is not installed. Install it to verify this artifact."
)
return rfc8785.dumps(body)

import json
return json.dumps(
body, sort_keys=True, separators=(",", ":"), ensure_ascii=False
).encode("utf-8")


def verify_artifact(
artifact: dict[str, Any],
public_key_pem: bytes,
Expand Down Expand Up @@ -82,27 +104,16 @@ def verify_artifact(
body = artifact["body"]
scheme = artifact["canonicalization"]["scheme"]

# Use the declared scheme for canonicalization
if scheme == "RFC8785":
try:
import rfc8785
canonical_bytes = rfc8785.dumps(body)
except ImportError:
# Fall back but warn
import json
canonical_bytes = json.dumps(
body, sort_keys=True, separators=(",", ":"), ensure_ascii=False
).encode("utf-8")
else:
import json
canonical_bytes = json.dumps(
body, sort_keys=True, separators=(",", ":"), ensure_ascii=False
).encode("utf-8")
try:
canonical_bytes = _canonicalize_for_verify(body, scheme)
except RuntimeError as exc:
result._fail(f"Canonicalization: {exc}")
return result

actual_hash = hashlib.sha256(canonical_bytes).digest()
expected_hash = b64u_decode(artifact["integrity"]["hash"]["value_b64u"])

if actual_hash != expected_hash:
if not hmac.compare_digest(actual_hash, expected_hash):
result._fail("Hash mismatch")
return result
result._pass("Hash matches")
Expand All @@ -126,7 +137,7 @@ def verify_artifact(
)
expected_fp = b64u_decode(key_ref["public_key_fingerprint"]["value_b64u"])
actual_fp = hashlib.sha256(pub_der).digest()
if actual_fp != expected_fp:
if not hmac.compare_digest(actual_fp, expected_fp):
result._fail("Key fingerprint mismatch")
return result
result._pass("Key fingerprint matches")
Expand Down
13 changes: 10 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ build-backend = "hatchling.build"
[project]
name = "ceyo"
version = "0.1.0"
description = "Evidentiary infrastructure prototype for AI systems"
description = "Evidentiary infrastructure for AI systems"
readme = "README.md"
license = "MIT"
license = { text = "All Rights Reserved" }
authors = [
{ name = "Brian Covarrubias", email = "bsecure.cov@gmail.com" },
]
requires-python = ">=3.10"
dependencies = [
"cryptography",
"cryptography>=41",
"rfc8785",
]

Expand All @@ -27,6 +27,10 @@ dev = [
[project.scripts]
ceyo = "ceyo.cli:main"

[project.urls]
Homepage = "https://ndr-us.github.io/ceyo-site/"
Repository = "https://github.com/NDR-US/ceyo-protocol"

[tool.ruff]
target-version = "py310"
line-length = 120
Expand All @@ -39,3 +43,6 @@ python_version = "3.10"
warn_return_any = false
warn_unused_configs = true
ignore_missing_imports = true

[tool.coverage.run]
source = ["ceyo"]
57 changes: 0 additions & 57 deletions seal_artifact.py

This file was deleted.

Loading