From 537c76a6385d2a91162f7b54f34f96d385a4213c Mon Sep 17 00:00:00 2001 From: Rook Date: Sat, 31 Jan 2026 14:11:58 +0000 Subject: [PATCH] feat: add FileStore, Store interface, config file support, and content hashing - Extract Store interface in types.ts for both in-memory and persistent stores - Add FileStore (file-backed store with atomic writes via temp+rename) - Add .granules.json config file support with CLI flag overrides - Add contentHash (SHA-256) field to Granule type, computed on creation - Add updateGranuleContent method to Store interface - Update Orchestrator and server to accept Store interface - Add comprehensive tests for FileStore and contentHash --- package-lock.json | 11 ++-- src/file-store.test.ts | 125 ++++++++++++++++++++++++++++++++++++ src/file-store.ts | 142 +++++++++++++++++++++++++++++++++++++++++ src/index.ts | 69 +++++++++++++++++--- src/orchestrator.ts | 6 +- src/server.ts | 9 ++- src/store.ts | 20 +++++- src/types.ts | 14 ++++ 8 files changed, 372 insertions(+), 24 deletions(-) create mode 100644 src/file-store.test.ts create mode 100644 src/file-store.ts diff --git a/package-lock.json b/package-lock.json index 50d74b8..9f8319f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,17 +1,20 @@ { - "name": "granules", - "version": "1.0.0", + "name": "@lbsa71/granules", + "version": "1.0.12", "lockfileVersion": 3, "requires": true, "packages": { "": { - "name": "granules", - "version": "1.0.0", + "name": "@lbsa71/granules", + "version": "1.0.12", "license": "MIT", "dependencies": { "@modelcontextprotocol/sdk": "^1.0.0", "express": "^4.18.2" }, + "bin": { + "granules": "dist/index.js" + }, "devDependencies": { "@types/express": "^4.17.21", "@types/node": "^20.0.0", diff --git a/src/file-store.test.ts b/src/file-store.test.ts new file mode 100644 index 0000000..087970d --- /dev/null +++ b/src/file-store.test.ts @@ -0,0 +1,125 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { FileStore } from "./file-store.js"; +import { computeContentHash } from "./store.js"; +import { unlinkSync, existsSync, readFileSync } from "fs"; +import { join } from "path"; +import { tmpdir } from "os"; + +function tmpFile(): string { + return join(tmpdir(), `.granules-test-${process.pid}-${Date.now()}.json`); +} + +describe("FileStore", () => { + let path: string; + + beforeEach(() => { + path = tmpFile(); + }); + + afterEach(() => { + try { unlinkSync(path); } catch { /* ignore */ } + }); + + it("creates granules with contentHash", () => { + const store = new FileStore(path); + const g = store.createGranule("implement", "do stuff"); + expect(g.contentHash).toBe(computeContentHash("do stuff")); + expect(g.id).toBe("G-1"); + expect(g.state).toBe("unclaimed"); + }); + + it("persists and reloads state", () => { + const store1 = new FileStore(path); + store1.createGranule("plan", "task A"); + store1.createGranule("test", "task B"); + + // Reload from same file + const store2 = new FileStore(path); + const granules = store2.listGranules(); + expect(granules).toHaveLength(2); + expect(granules[0].content).toBe("task A"); + expect(granules[1].content).toBe("task B"); + + // nextId should continue + const g3 = store2.createGranule("review", "task C"); + expect(g3.id).toBe("G-3"); + }); + + it("persists claim/release/complete mutations", () => { + const store1 = new FileStore(path); + const g = store1.createGranule("implement", "work"); + store1.claimGranule(g.id, "W-1"); + + const store2 = new FileStore(path); + const reloaded = store2.getGranule(g.id)!; + expect(reloaded.state).toBe("claimed"); + expect(reloaded.claimedBy).toBe("W-1"); + + store2.completeGranule(g.id, "W-1", "done"); + + const store3 = new FileStore(path); + expect(store3.getGranule(g.id)!.state).toBe("completed"); + }); + + it("writes atomically (temp file + rename)", () => { + const store = new FileStore(path); + store.createGranule("plan", "atomic test"); + // File should exist and be valid JSON + expect(existsSync(path)).toBe(true); + const data = JSON.parse(readFileSync(path, "utf-8")); + expect(data.granules).toHaveLength(1); + }); + + it("updateGranuleContent updates content and hash", () => { + const store = new FileStore(path); + const g = store.createGranule("implement", "original"); + const oldHash = g.contentHash; + + const result = store.updateGranuleContent(g.id, "updated"); + expect(result.success).toBe(true); + expect(result.granule!.content).toBe("updated"); + expect(result.granule!.contentHash).toBe(computeContentHash("updated")); + expect(result.granule!.contentHash).not.toBe(oldHash); + + // Verify persistence + const store2 = new FileStore(path); + expect(store2.getGranule(g.id)!.content).toBe("updated"); + }); + + it("returns failure for non-existent granule", () => { + const store = new FileStore(path); + expect(store.updateGranuleContent("G-999", "x").success).toBe(false); + expect(store.claimGranule("G-999", "W-1").success).toBe(false); + }); + + it("handles stale claims", () => { + const store = new FileStore(path); + const g = store.createGranule("implement", "stale"); + store.claimGranule(g.id, "W-1"); + + // Manually backdate claimedAt + const granule = store.getGranule(g.id)!; + (granule as any).claimedAt = Date.now() - 100_000; + + const stale = store.getStaleClaims(50_000); + expect(stale).toHaveLength(1); + + const released = store.releaseStaleClaims(50_000); + expect(released).toBe(1); + expect(store.getGranule(g.id)!.state).toBe("unclaimed"); + }); + + it("starts fresh when file does not exist", () => { + const store = new FileStore(join(tmpdir(), "nonexistent-" + Date.now() + ".json")); + expect(store.listGranules()).toHaveLength(0); + }); +}); + +describe("computeContentHash", () => { + it("produces consistent SHA-256 hex", () => { + const hash = computeContentHash("hello"); + expect(hash).toMatch(/^[a-f0-9]{64}$/); + expect(computeContentHash("hello")).toBe(hash); + expect(computeContentHash("world")).not.toBe(hash); + }); +}); diff --git a/src/file-store.ts b/src/file-store.ts new file mode 100644 index 0000000..240df1e --- /dev/null +++ b/src/file-store.ts @@ -0,0 +1,142 @@ +import { readFileSync, writeFileSync, renameSync, mkdirSync } from "fs"; +import { dirname, join } from "path"; +import { tmpdir } from "os"; +import { computeContentHash } from "./store.js"; +import type { Granule, GranuleClass, Store } from "./types.js"; + +interface FileStoreState { + nextId: number; + granules: Granule[]; +} + +export class FileStore implements Store { + private granules: Map = new Map(); + private nextId: number = 1; + private filePath: string; + + constructor(filePath: string = ".granules-state.json") { + this.filePath = filePath; + this.load(); + } + + private load(): void { + try { + const data = readFileSync(this.filePath, "utf-8"); + const state: FileStoreState = JSON.parse(data); + this.nextId = state.nextId; + this.granules = new Map(state.granules.map((g) => [g.id, g])); + } catch { + // File doesn't exist or is invalid — start fresh + } + } + + private persist(): void { + const state: FileStoreState = { + nextId: this.nextId, + granules: Array.from(this.granules.values()), + }; + const json = JSON.stringify(state, null, 2); + // Atomic write: write to temp file then rename + const dir = dirname(this.filePath) || "."; + mkdirSync(dir, { recursive: true }); + const tmpPath = join(dir, `.granules-tmp-${process.pid}-${Date.now()}.json`); + writeFileSync(tmpPath, json, "utf-8"); + renameSync(tmpPath, this.filePath); + } + + createGranule(class_: GranuleClass, content: string): Granule { + const id = `G-${this.nextId++}`; + const granule: Granule = { + id, + class: class_, + content, + contentHash: computeContentHash(content), + state: "unclaimed", + createdAt: Date.now(), + }; + this.granules.set(id, granule); + this.persist(); + return granule; + } + + listGranules(): Granule[] { + return Array.from(this.granules.values()); + } + + getGranule(id: string): Granule | undefined { + return this.granules.get(id); + } + + claimGranule(granuleId: string, workerId: string): { success: boolean; granule?: Granule } { + const granule = this.granules.get(granuleId); + if (!granule || granule.state !== "unclaimed") { + return { success: false }; + } + granule.state = "claimed"; + granule.claimedBy = workerId; + granule.claimedAt = Date.now(); + this.persist(); + return { success: true, granule: { ...granule } }; + } + + releaseGranule(granuleId: string, workerId: string, error?: string): { success: boolean } { + const granule = this.granules.get(granuleId); + if (!granule || granule.state !== "claimed" || granule.claimedBy !== workerId) { + return { success: false }; + } + granule.state = "unclaimed"; + granule.claimedBy = undefined; + granule.claimedAt = undefined; + if (error) { + granule.retryCount = (granule.retryCount ?? 0) + 1; + granule.lastError = error; + } + this.persist(); + return { success: true }; + } + + completeGranule(granuleId: string, workerId: string, summary?: string): { success: boolean } { + const granule = this.granules.get(granuleId); + if (!granule || granule.state !== "claimed" || granule.claimedBy !== workerId) { + return { success: false }; + } + granule.state = "completed"; + granule.completedAt = Date.now(); + if (summary !== undefined) { + granule.summary = summary; + } + this.persist(); + return { success: true }; + } + + getStaleClaims(maxAgeMs: number): Granule[] { + const now = Date.now(); + return Array.from(this.granules.values()).filter( + (g) => g.state === "claimed" && g.claimedAt !== undefined && now - g.claimedAt > maxAgeMs + ); + } + + releaseStaleClaims(maxAgeMs: number): number { + const stale = this.getStaleClaims(maxAgeMs); + for (const granule of stale) { + granule.state = "unclaimed"; + granule.claimedBy = undefined; + granule.claimedAt = undefined; + } + if (stale.length > 0) { + this.persist(); + } + return stale.length; + } + + updateGranuleContent(granuleId: string, content: string): { success: boolean; granule?: Granule } { + const granule = this.granules.get(granuleId); + if (!granule) { + return { success: false }; + } + granule.content = content; + granule.contentHash = computeContentHash(content); + this.persist(); + return { success: true, granule: { ...granule } }; + } +} diff --git a/src/index.ts b/src/index.ts index 921cfc5..904369a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,48 @@ #!/usr/bin/env node +import { readFileSync } from "fs"; import { GranuleStore } from "./store.js"; +import { FileStore } from "./file-store.js"; import { Orchestrator } from "./orchestrator.js"; +import type { Store } from "./types.js"; -function parseArgs(): { prompt?: string } { +interface GranulesConfig { + store?: "memory" | "file"; + stateFile?: string; + port?: number; + maxWorkers?: number; +} + +function loadConfig(): GranulesConfig { + try { + const data = readFileSync(".granules.json", "utf-8"); + return JSON.parse(data) as GranulesConfig; + } catch { + return {}; + } +} + +function parseArgs(): { prompt?: string; store?: string; stateFile?: string; port?: number; maxWorkers?: number } { const args = process.argv.slice(2); - const result: { prompt?: string } = {}; + const result: { prompt?: string; store?: string; stateFile?: string; port?: number; maxWorkers?: number } = {}; for (let i = 0; i < args.length; i++) { - if (args[i] === "-p" || args[i] === "--prompt") { - const nextArg = args[i + 1]; - if (nextArg && !nextArg.startsWith("-")) { - result.prompt = nextArg; - i++; - } + const arg = args[i]; + const next = args[i + 1]; + if ((arg === "-p" || arg === "--prompt") && next && !next.startsWith("-")) { + result.prompt = next; + i++; + } else if (arg === "--store" && next) { + result.store = next; + i++; + } else if (arg === "--state-file" && next) { + result.stateFile = next; + i++; + } else if (arg === "--port" && next) { + result.port = Number(next); + i++; + } else if (arg === "--max-workers" && next) { + result.maxWorkers = Number(next); + i++; } } @@ -20,8 +50,27 @@ function parseArgs(): { prompt?: string } { } async function main() { - const { prompt } = parseArgs(); - const store = new GranuleStore(); + const config = loadConfig(); + const flags = parseArgs(); + + // CLI flags override config file + const storeType = flags.store ?? config.store ?? "memory"; + const stateFile = flags.stateFile ?? config.stateFile ?? ".granules-state.json"; + const port = flags.port ?? config.port; + const prompt = flags.prompt; + + // Set port env var if specified (used by server.ts) + if (port !== undefined) { + process.env.PORT = String(port); + } + + let store: Store; + if (storeType === "file") { + store = new FileStore(stateFile); + } else { + store = new GranuleStore(); + } + const orchestrator = new Orchestrator(store); // Setup graceful shutdown handlers diff --git a/src/orchestrator.ts b/src/orchestrator.ts index 11bb1a4..a8898a4 100644 --- a/src/orchestrator.ts +++ b/src/orchestrator.ts @@ -1,4 +1,4 @@ -import { GranuleStore } from "./store.js"; +import type { Store } from "./types.js"; import { startMcpHttpServer } from "./server.js"; import { spawnWorker } from "./worker.js"; import { UIManager } from "./ui.js"; @@ -21,7 +21,7 @@ interface ActiveWorker { } export class Orchestrator { - private store: GranuleStore; + private store: Store; private activeWorkers: Map = new Map(); private nextWorkerId: number = 1; private loopInterval?: NodeJS.Timeout; @@ -29,7 +29,7 @@ export class Orchestrator { private ui: UIManager; private sessionLog: SessionLog; - constructor(store: GranuleStore) { + constructor(store: Store) { this.store = store; this.ui = new UIManager(); this.sessionLog = new SessionLog(join(process.cwd(), "logs", "sessions.json")); diff --git a/src/server.ts b/src/server.ts index fdf55d8..43a34d2 100644 --- a/src/server.ts +++ b/src/server.ts @@ -3,10 +3,9 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/ import { InMemoryTransport } from "@modelcontextprotocol/sdk/inMemory.js"; import { z } from "zod"; import http from "http"; -import type { GranuleStore } from "./store.js"; -import type { GranuleClass } from "./types.js"; +import type { GranuleClass, Store } from "./types.js"; -export function createMcpServer(store: GranuleStore): McpServer { +export function createMcpServer(store: Store): McpServer { const server = new McpServer({ name: "granules", version: "1.0.0", @@ -96,7 +95,7 @@ export function createMcpServer(store: GranuleStore): McpServer { /** * Create an MCP server connected via in-memory transport (for testing) */ -export function createInMemoryMcpServer(store: GranuleStore): { +export function createInMemoryMcpServer(store: Store): { server: McpServer; clientTransport: InMemoryTransport; serverTransport: InMemoryTransport; @@ -113,7 +112,7 @@ const DEFAULT_PORT = 3000; * Start an HTTP MCP server using StreamableHTTP transport */ export async function startMcpHttpServer( - store: GranuleStore, + store: Store, port: number = Number(process.env.PORT) || DEFAULT_PORT ): Promise<{ server: McpServer; httpServer: http.Server }> { const mcpServer = createMcpServer(store); diff --git a/src/store.ts b/src/store.ts index 22f3bdd..ae11201 100644 --- a/src/store.ts +++ b/src/store.ts @@ -1,6 +1,11 @@ -import type { Granule, GranuleClass } from "./types.js"; +import { createHash } from "crypto"; +import type { Granule, GranuleClass, Store } from "./types.js"; -export class GranuleStore { +export function computeContentHash(content: string): string { + return createHash("sha256").update(content).digest("hex"); +} + +export class GranuleStore implements Store { private granules: Map = new Map(); private nextId: number = 1; @@ -11,6 +16,7 @@ export class GranuleStore { id, class: class_, content, + contentHash: computeContentHash(content), state: "unclaimed", createdAt: now, }; @@ -101,4 +107,14 @@ export class GranuleStore { } return stale.length; } + + updateGranuleContent(granuleId: string, content: string): { success: boolean; granule?: Granule } { + const granule = this.granules.get(granuleId); + if (!granule) { + return { success: false }; + } + granule.content = content; + granule.contentHash = computeContentHash(content); + return { success: true, granule: { ...granule } }; + } } diff --git a/src/types.ts b/src/types.ts index 3bf9ce9..9880e82 100644 --- a/src/types.ts +++ b/src/types.ts @@ -16,6 +16,7 @@ export interface Granule { id: string; // "G-1", "G-2", auto-incremented class: GranuleClass; content: string; // Task description + contentHash: string; // SHA-256 hash of content field state: GranuleState; claimedBy?: string; // Worker ID, e.g., "W-1" claimedAt?: number; // Unix timestamp ms @@ -25,3 +26,16 @@ export interface Granule { retryCount?: number; // Number of times this granule has been retried after failure lastError?: string; // Last error message if worker failed } + +/** Store interface implemented by both in-memory and persistent stores. */ +export interface Store { + createGranule(class_: GranuleClass, content: string): Granule; + listGranules(): Granule[]; + getGranule(id: string): Granule | undefined; + claimGranule(granuleId: string, workerId: string): { success: boolean; granule?: Granule }; + releaseGranule(granuleId: string, workerId: string, error?: string): { success: boolean }; + completeGranule(granuleId: string, workerId: string, summary?: string): { success: boolean }; + getStaleClaims(maxAgeMs: number): Granule[]; + releaseStaleClaims(maxAgeMs: number): number; + updateGranuleContent(granuleId: string, content: string): { success: boolean; granule?: Granule }; +}