diff --git a/splice.ts b/splice.ts index 8b80972..6574b69 100755 --- a/splice.ts +++ b/splice.ts @@ -1,1002 +1,25 @@ #!/usr/bin/env -S tsx /** - * splice — simple, human-friendly starter CLI to: - * - ingest a Twitter/X archive - * - normalize items (tweets/likes + media) - * - group into threads/conversations - * - export Markdown and/or OAI JSONL + * Thin forwarder to the modular CLI. + * This ensures `npx tsx splice.ts` runs the latest CLI (including checkpoints). * - * Usage: - * splice --source ./archive --out ./out --format markdown oai - * splice --source ./archive --out ./out --format markdown --dry-run - * splice --help - * - * Exit codes: 0 success, 1 runtime error, 2 invalid args + * In dev: loads TypeScript entry at src/cli/splice.ts + * In build: falls back to compiled dist/cli/splice.js */ -import * as fs from "node:fs/promises"; -import * as fssync from "node:fs"; -import * as path from "node:path"; -import { fileURLToPath } from "node:url"; - -type Level = "debug" | "info" | "warn" | "error"; - -type SourceId = "twitter:tweet" | "twitter:like" | string; - -interface MediaAttachment { - id: string; - contentType: "photo" | "video" | "unknown"; - absPath: string; - metadata?: Record; -} - -interface ContentItem { - id: string; - text: string; - createdAt: string; // ISO-8601 - parentId?: string | null; - source: SourceId; // 'twitter:tweet' | 'twitter:like' - raw?: Record; - media?: MediaAttachment[]; -} - -interface Thread { - id: string; - items: ContentItem[]; // ordered oldest → newest -} - -type Role = "assistant" | "user"; - -interface ChatMessage { - role: Role; - content: string; -} - -/* ----------------------------- tiny arg parser ---------------------------- */ - -type CLIOptions = { - source?: string; - out?: string; - format: string[]; // e.g. ['markdown','oai'] - systemMessage: string; - dryRun: boolean; - logLevel: Level; - help: boolean; - version: boolean; - // filters - since?: string; - until?: string; - minLength: number; - excludeRt: boolean; - onlyThreads: boolean; - withMedia: boolean; - // outputs - statsJson: boolean; -}; - -function parseArgs(argv: string[]): CLIOptions { - const opts: CLIOptions = { - format: ["markdown", "oai"], - systemMessage: "You have been uploaded to the internet", - dryRun: false, - logLevel: "info", - help: false, - version: false, - since: undefined, - until: undefined, - minLength: 0, - excludeRt: false, - onlyThreads: false, - withMedia: false, - statsJson: false, - }; - - const args = argv.slice(2); - let systemExplicit = false; - for (let i = 0; i < args.length; i++) { - const a = args[i]; - if (a === "--help" || a === "-h") { - opts.help = true; - } else if (a === "--version" || a === "-V") { - opts.version = true; - } else if (a === "--source" || a === "--archive-path") { - opts.source = args[++i]; - } else if (a === "--out" || a === "--output-dir") { - opts.out = args[++i]; - } else if ( - a === "--format" || - a === "--formats" || - a === "--output-formats" - ) { - const next = args[++i]; - if (!next) continue; - // allow space or comma separated - const parts = next.split(",").filter(Boolean); - if (parts.length > 1) opts.format = parts; - else { - // collect following non-flag tokens too (space-separated list) - const list = [next]; - while (args[i + 1] && !args[i + 1].startsWith("-")) { - list.push(args[++i]); - } - opts.format = list; - } - } else if (a === "--system-message" || a === "--system") { - const val = args[++i]; - if (val) { - opts.systemMessage = val; - systemExplicit = true; - } - } else if (a === "--dry-run" || a === "-n") { - opts.dryRun = true; - } else if (a === "--log-level") { - const lvl = (args[++i] ?? "").toLowerCase(); - if ( - lvl === "debug" || - lvl === "info" || - lvl === "warn" || - lvl === "error" - ) { - opts.logLevel = lvl; - } - } else if (a === "--since") { - opts.since = args[++i]; - } else if (a === "--until") { - opts.until = args[++i]; - } else if (a === "--min-length") { - const v = parseInt(args[++i] ?? "", 10); - if (!Number.isNaN(v)) opts.minLength = v; - } else if (a === "--exclude-rt") { - opts.excludeRt = true; - } else if (a === "--only-threads") { - opts.onlyThreads = true; - } else if (a === "--with-media") { - opts.withMedia = true; - } else if (a === "--stats-json") { - opts.statsJson = true; - } else if (a === "--") { - break; - } else if (a.startsWith("-")) { - // unknown flag; ignore to keep simple - // could collect for suggestions - } else { - // positional? ignore for now - } - } - if (!systemExplicit && process.env.SPLICE_SYSTEM_MESSAGE) { - opts.systemMessage = process.env.SPLICE_SYSTEM_MESSAGE as string; - } - return opts; -} - -/* --------------------------------- logger -------------------------------- */ - -function makeLogger(level: Level): (lvl: Level, msg: string) => void { - const order: Level[] = ["debug", "info", "warn", "error"]; - const minIdx = order.indexOf(level); - return (lvl: Level, msg: string) => { - if (order.indexOf(lvl) >= minIdx) { - process.stderr.write(`[${lvl}] ${msg}\n`); - } - }; -} - -/* --------------------------------- utils --------------------------------- */ - -function usage(): string { - return [ - "splice — convert a Twitter archive to Markdown, OAI JSONL, and/or JSON", - "", - "Usage:", - " splice --source --out [--format markdown oai json sharegpt] [--system-message ]", - " [--since ] [--until ] [--min-length ] [--exclude-rt] [--only-threads] [--with-media]", - " [--dry-run] [--stats-json] [--log-level ] [--json-stdout] [--quiet|-q] [--verbose] [--version|-V]", - "", - "Options:", - " --source Path to the Twitter archive directory", - " --out Output directory", - " --format One or more formats: markdown, oai, json, sharegpt (default: markdown oai)", - ' --system, --system-message System message for OAI JSONL (default: "You have been uploaded to the internet")', - " --since Include items on/after this ISO date", - " --until Include items on/before this ISO date", - " --min-length Minimum text length", - " --exclude-rt Exclude retweets (RT ...)", - " --only-threads Output threads only (ignore conversations/non-thread tweets)", - " --with-media Only include items that have media", - " --dry-run, -n Plan only; don’t write files", - " --stats-json Write a stats.json summary", - " --log-level debug|info|warn|error (default: info)", - " --json-stdout Emit normalized items JSONL to stdout (no files); logs to stderr", - " --quiet, -q Errors only", - " --verbose Debug logging", - " --version, -V Show version", - " --help, -h Show help", - "", - "Examples:", - " splice --source ./archive --out ./out --format markdown oai json", - ' splice --source ./archive --out ./out --format oai --system-message "You are helpful."', - " splice --source ./archive --out ./out --since 2024-01-01 --only-threads", - " splice --source ./archive --out ./out --json-stdout", - " splice --version", - "", - "Docs: https://github.com/deepfates/splice • Context: https://deepfates.com/convert-your-twitter-archive-into-training-data", - ].join("\n"); -} - -function cleanJsonString(js: string): string { - // remove window.* = prefix and trailing semicolon - return js - .trim() - .replace(/^window\.[^=]+=\s*/i, "") - .replace(/;?\s*$/, ""); -} - -async function readJsonFromJs(filePath: string): Promise { - const raw = await fs.readFile(filePath, "utf8"); - const cleaned = cleanJsonString(raw); - try { - return JSON.parse(cleaned); - } catch { - // try __THAR_CONFIG fallback - const match = raw.match(/window\.__THAR_CONFIG\s*=\s*({[\s\S]*?})\s*;?/); - if (match) return JSON.parse(match[1]); - throw new Error(`Could not parse JSON from ${filePath}`); - } -} - -function parseLooseArray(input: string): any[] { - // Try strict JSON first - try { - const parsed = JSON.parse(input); - return Array.isArray(parsed) ? parsed : []; - } catch (_) { - // Fall through to loose JS evaluation - } - - // Attempt to evaluate as a JS array/object literal in a confined context. - // cleanJsonString should have removed any "window.* = " prefix so input should be an array expression. - try { - // eslint-disable-next-line no-new-func - const fn = new Function('"use strict"; return (' + input + ");"); - const result = fn(); - return Array.isArray(result) ? result : []; - } catch (_) { - return []; - } -} - -async function loadConfig(): Promise { - try { - const mod: any = await import("cosmiconfig"); - const explorer = mod.cosmiconfig("splice"); - const result = await explorer.search(); - return result?.config; - } catch { - return undefined; - } -} - -function mediaTypeFromExt(filename: string): "photo" | "video" | "unknown" { - const ext = path.extname(filename).toLowerCase(); - if (ext === ".mp4" || ext === ".mov") return "video"; - if (ext === ".jpg" || ext === ".jpeg" || ext === ".png" || ext === ".gif") - return "photo"; - return "unknown"; -} - -function sanitizeFilename(name: string, maxLen = 50): string { - return ( - name - .replace(/[^\w\-_ ]/g, "") - .trim() - .replace(/\s+/g, "_") - .slice(0, maxLen) || "untitled" - ); -} - -function toIso(d: string | Date): string { - const dt = typeof d === "string" ? new Date(d) : d; - return Number.isNaN(dt.getTime()) - ? new Date().toISOString() - : dt.toISOString(); -} - -function isRetweet(text: string): boolean { - return /^RT\b/.test(text || ""); -} - -function formatIsoDateOnly(iso: string): string { - const d = new Date(iso); - return isNaN(d.getTime()) - ? new Date().toISOString().slice(0, 10) - : d.toISOString().slice(0, 10); -} - -/* ------------------------------ twitter ingest --------------------------- */ - -type Manifest = { - dataTypes?: Record }>; -}; - -async function detectTwitterArchive(rootPath: string): Promise { - try { - const p = path.join(rootPath, "data", "manifest.js"); - await fs.stat(p); - return true; - } catch { - return false; - } -} - -async function getMediaFiles(root: string, id: string): Promise { - const mediaDir = path.join(root, "data", "tweets_media"); - try { - const files = await fs.readdir(mediaDir); - const filtered: string[] = []; - for (const f of files) { - if (!f.startsWith(`${id}-`)) continue; - const stat = await fs.stat(path.join(mediaDir, f)); - if (stat.size > 0) filtered.push(f); - } - return filtered; - } catch { - return []; - } -} - -function normalizeTweetLike( - item: any, - source: "twitter:tweet" | "twitter:like", -): { - id: string; - text: string; - created_at: string; - parent_id?: string | null; - raw: any; -} | null { - const t = item?.tweet ?? item?.like ?? item; - if (!t) return null; - const id = t.id || t.tweetId; - if (!id) return null; - const text = t.text || t.fullText || t.full_text || ""; - const created_at = t.created_at || t.createdAt || ""; - const parent_id = t.in_reply_to_status_id || t.inReplyTo || null; - return { id, text, created_at, parent_id, raw: t }; -} - -async function ingestTwitter( - rootPath: string, - logger: (l: Level, m: string) => void, -): Promise { - const manifestPath = path.join(rootPath, "data", "manifest.js"); - const manifest: Manifest = await readJsonFromJs(manifestPath); - const types = manifest.dataTypes ?? {}; - const out: ContentItem[] = []; - - const selected: Array<"tweets" | "like"> = Object.keys(types).filter( - (t) => t === "tweets" || t === "like", - ) as any; - for (const dataType of selected) { - const info = types[dataType]; - const files = info?.files ?? []; - if (!files.length) continue; - - logger("info", `Processing ${files.length} files for ${dataType}`); - - for (const f of files) { - const filePath = path.join(rootPath, f.fileName); - const raw = await fs.readFile(filePath, "utf8"); - const cleaned = cleanJsonString(raw); - const data = parseLooseArray(cleaned); - if (!Array.isArray(data) || data.length === 0) continue; - - for (const item of data) { - const norm = normalizeTweetLike( - item, - dataType === "tweets" ? "twitter:tweet" : "twitter:like", - ); - if (!norm) continue; - - const mediaFiles = await getMediaFiles(rootPath, norm.id); - const media: MediaAttachment[] = mediaFiles.map((fn) => ({ - id: `${norm.id}_${fn.replace(/\.\w+$/, "")}`, - contentType: mediaTypeFromExt(fn), - absPath: path.join(rootPath, "data", "tweets_media", fn), - metadata: { - parent: norm.id, - media_info: norm.raw?.extended_entities?.media ?? [], - }, - })); - - out.push({ - id: norm.id, - text: norm.text, - createdAt: norm.created_at - ? toIso(norm.created_at) - : new Date().toISOString(), - parentId: norm.parent_id ?? null, - source: dataType === "tweets" ? "twitter:tweet" : "twitter:like", - raw: norm.raw, - media, - }); - } - } - } - logger("info", `Total normalized items: ${out.length}`); - return out; -} - -/* ----------------------------- transforms/group -------------------------- */ - -function cleanText( - text: string, - entities?: { urls?: Array<{ url: string; expanded_url?: string }> }, -): string { - let t = text ?? ""; - if (entities?.urls) { - for (const u of entities.urls) { - if (u.url && u.expanded_url) t = t.split(u.url).join(u.expanded_url); - } - } - t = t.replace(/https:\/\/t\.co\/\w+/g, ""); - t = t.replace(/@\w+/g, ""); - t = t.replace(/#\w+/g, ""); - t = t.replace(/\s+/g, " "); - return t.trim(); -} - -function applyFilters( - items: ContentItem[], - opts: { - since?: string; - until?: string; - minLength: number; - excludeRt: boolean; - onlyThreads: boolean; - withMedia: boolean; - }, -): ContentItem[] { - const sinceTime = opts.since ? new Date(opts.since).getTime() : -Infinity; - const untilTime = opts.until ? new Date(opts.until).getTime() : Infinity; - return items.filter((it) => { - const t = new Date(it.createdAt).getTime(); - if (!(t >= sinceTime && t <= untilTime)) return false; - if (opts.excludeRt && isRetweet(it.text)) return false; - if (opts.minLength > 0 && (it.text?.trim().length ?? 0) < opts.minLength) - return false; - if (opts.withMedia && !(it.media && it.media.length > 0)) return false; - return true; - }); -} - -function indexById(items: ContentItem[]): Record { - const m: Record = {}; - for (const it of items) if (it.id) m[it.id] = it; - return m; -} - -function groupThreadsAndConversations(all: Record): { - threads: Thread[]; - conversations: ContentItem[][]; -} { - const processed = new Set(); - const threads: Thread[] = []; - const conversations: ContentItem[][] = []; - - const items = Object.values(all); - for (const item of items) { - if (processed.has(item.id)) continue; - - const chain: ContentItem[] = [item]; - let current = item; - while (current.parentId && all[current.parentId]) { - const parent = all[current.parentId]; - chain.push(parent); - current = parent; - if (processed.has(current.id)) break; - } - for (const c of chain) processed.add(c.id); - - const allTweets = chain.every((c) => c.source === "twitter:tweet"); - if (allTweets) { - const ordered = chain.slice().reverse(); - threads.push({ id: ordered[0].id, items: ordered }); - } else { - conversations.push(chain.slice().reverse()); - } - } - return { threads, conversations }; -} - -function messagesFromConversation(items: ContentItem[]): ChatMessage[] { - const msgs: ChatMessage[] = []; - let currentRole: Role | undefined; - let currentContent: string[] = []; - - function flush() { - if (!currentRole) return; - const content = currentContent.join("\n\n").trim(); - if (content) msgs.push({ role: currentRole, content }); - currentContent = []; - } - - for (const it of items) { - const role: Role = - it.raw && "full_text" in (it.raw as any) ? "assistant" : "user"; - const cleaned = cleanText(it.text, (it.raw as any)?.entities); - if (!cleaned) continue; - if (role !== currentRole && currentRole) flush(); - currentRole = role; - currentContent.push(cleaned); - } - flush(); - - // trim to last assistant - for (let i = msgs.length - 1; i >= 0; i--) { - if (msgs[i].role === "assistant") return msgs.slice(0, i + 1); - } - return []; -} - -/* --------------------------------- outputs -------------------------------- */ - -async function ensureDir(p: string) { - await fs.mkdir(p, { recursive: true }); -} - -async function copyMedia( - items: ContentItem[], - imagesDir: string, - logger: (l: Level, m: string) => void, -) { - await ensureDir(imagesDir); - for (const it of items) { - for (const m of it.media ?? []) { - try { - const base = - "_" + (m.absPath ? path.basename(m.absPath) : `${m.id}.bin`); - await fs.copyFile(m.absPath, path.join(imagesDir, base)); - } catch (e) { - logger( - "warn", - `Failed to copy media ${m.absPath}: ${(e as Error).message}`, - ); - } - } - } -} - -async function writeMarkdown( - threads: Thread[], - items: ContentItem[], - outDir: string, - logger: (l: Level, m: string) => void, - dryRun: boolean, -) { - const threadsDir = path.join(outDir, "threads"); - const byDateDir = path.join(outDir, "tweets_by_date"); - const imagesDir = path.join(outDir, "images"); - - if (!dryRun) { - await ensureDir(threadsDir); - await ensureDir(byDateDir); - await ensureDir(imagesDir); - } - - // copy media for all referenced items - const allItems = threads.flatMap((t) => t.items); - const threadIds = new Set(allItems.map((i) => i.id)); - const nonThreadTweets = items.filter( - (i) => - i.source === "twitter:tweet" && - !i.parentId && - !threadIds.has(i.id) && - !isRetweet(i.text), - ); - const copyPool = allItems.concat(nonThreadTweets); - - logger("info", `Preparing media for ${copyPool.length} items`); - if (!dryRun) await copyMedia(copyPool, imagesDir, logger); - - // Save threads - logger("info", `Saving ${threads.length} threads`); - for (const thread of threads) { - const first = thread.items[0]; - const date = formatIsoDateOnly(first.createdAt); - const fm = `---\nDate: ${date}\n---\n`; - - const parts: string[] = []; - for (const t of thread.items) { - const mediaLinks = (t.media ?? []).map( - (m) => - `![${path.basename(m.absPath)}](../images/_${path.basename(m.absPath)})`, - ); - const cleaned = cleanText(t.text, (t.raw as any)?.entities); - parts.push(`${cleaned}\n\n${mediaLinks.join("\n")}`.trim()); - } - - const firstWords = thread.items[0].text.split(/\s+/).slice(0, 5).join(" "); - const name = sanitizeFilename(firstWords) || thread.id; - const filePath = path.join(threadsDir, `${name}.md`); - const topLink = `https://twitter.com/i/web/status/${first.id}`; - const body = `${fm}\n${parts.join("\n\n")}\n\n[View on Twitter](${topLink})`; - - if (dryRun) { - logger("info", `(dry-run) would write thread file: ${filePath}`); - } else { - await fs.writeFile(filePath, body, "utf8"); - } - } - - // Save non-thread tweets by date - const byDate: Record = {}; - for (const t of nonThreadTweets) { - const d = formatIsoDateOnly(t.createdAt); - (byDate[d] ||= []).push(t); - } - - for (const [date, dayItems] of Object.entries(byDate)) { - dayItems.sort((a, b) => a.createdAt.localeCompare(b.createdAt)); - const content = dayItems - .map((t) => { - const dt = new Date(t.createdAt); - const time = isNaN(dt.getTime()) - ? "" - : dt.toLocaleTimeString("en-US", { - hour: "numeric", - minute: "2-digit", - }); - const images = (t.media ?? []) - .map( - (m) => - `![${path.basename(m.absPath)}](../images/_${path.basename(m.absPath)})`, - ) - .join(""); - const cleaned = cleanText(t.text, (t.raw as any)?.entities); - return `*${time}* \n${cleaned}${images}`; - }) - .join("\n\n---\n\n"); - - const filePath = path.join(byDateDir, `${date}.md`); - if (dryRun) { - logger("info", `(dry-run) would write daily file: ${filePath}`); - } else { - await fs.writeFile(filePath, content, "utf8"); - } - } -} -async function writeOAI( - threads: Thread[], - conversations: ContentItem[][], - outDir: string, - systemMessage: string, - logger: (l: Level, m: string) => void, - dryRun: boolean, -) { - const outPath = path.join(outDir, "conversations_oai.jsonl"); - if (dryRun) { - logger("info", `(dry-run) would write OAI JSONL: ${outPath}`); - return; - } - await ensureDir(path.dirname(outPath)); - const fh = await fs.open(outPath, "w"); - - const writeConv = async (items: ContentItem[]) => { - const msgs = messagesFromConversation(items); - if (!msgs.length) return; - const record = { - messages: [{ role: "system", content: systemMessage }, ...msgs], - }; - await fh.write(JSON.stringify(record) + "\n"); - }; - - for (const t of threads) await writeConv(t.items); - for (const c of conversations) await writeConv(c); - await fh.close(); - logger("info", `Wrote OAI JSONL to ${outPath}`); -} - -async function writeNormalizedJSONL( - items: ContentItem[], - outDir: string, - logger: (l: Level, m: string) => void, - dryRun: boolean, -) { - const outPath = path.join(outDir, "normalized_items.jsonl"); - if (dryRun) { - logger("info", `(dry-run) would write normalized items JSONL: ${outPath}`); - return; - } - await ensureDir(path.dirname(outPath)); - const fh = await fs.open(outPath, "w"); - for (const it of items) { - await fh.write(JSON.stringify(it) + "\n"); - } - await fh.close(); - logger("info", `Wrote normalized items JSONL to ${outPath}`); -} - -async function writeShareGPT( - threads: Thread[], - conversations: ContentItem[][], - outDir: string, - logger: (l: Level, m: string) => void, - dryRun: boolean, -) { - const outPath = path.join(outDir, "sharegpt.json"); - if (dryRun) { - logger("info", `(dry-run) would write ShareGPT JSON: ${outPath}`); - return; - } - await ensureDir(path.dirname(outPath)); - const list: Array<{ conversations: Array<{ from: string; value: string }> }> = - []; - const addConv = async (items: ContentItem[]) => { - const msgs = messagesFromConversation(items); - if (!msgs.length) return; - list.push({ - conversations: msgs.map((m) => ({ - from: m.role === "user" ? "human" : "gpt", - value: m.content, - })), - }); - }; - for (const t of threads) await addConv(t.items); - for (const c of conversations) await addConv(c); - await fs.writeFile(outPath, JSON.stringify(list, null, 2), "utf8"); - logger("info", `Wrote ShareGPT JSON to ${outPath}`); -} - -async function writeStatsJSON( - items: ContentItem[], - threads: Thread[], - conversations: ContentItem[][], - outDir: string, - logger: (l: Level, m: string) => void, - dryRun: boolean, -) { - const outPath = path.join(outDir, "stats.json"); - const dates = items - .map((i) => new Date(i.createdAt).toISOString()) - .filter(Boolean); - const start = dates.length ? dates.reduce((a, b) => (a < b ? a : b)) : null; - const end = dates.length ? dates.reduce((a, b) => (a > b ? a : b)) : null; - const stats = { - totalItems: items.length, - tweets: items.filter((i) => i.source === "twitter:tweet").length, - likes: items.filter((i) => i.source === "twitter:like").length, - threads: threads.length, - conversations: conversations.length, - dateRange: { start, end }, - }; - if (dryRun) { - logger("info", `(dry-run) would write stats JSON: ${outPath}`); - return; - } - await ensureDir(path.dirname(outPath)); - await fs.writeFile(outPath, JSON.stringify(stats, null, 2), "utf8"); - logger("info", `Wrote stats JSON to ${outPath}`); -} - -/* ---------------------------------- main ---------------------------------- */ - -async function getVersion(): Promise { - try { - const thisFile = fileURLToPath(import.meta.url); - const dir = path.dirname(thisFile); - const root = path.basename(dir) === "dist" ? path.dirname(dir) : dir; - const pkgPath = path.join(root, "package.json"); - const raw = await fs.readFile(pkgPath, "utf8"); - const pkg = JSON.parse(raw); - return typeof pkg.version === "string" ? pkg.version : "0.0.0"; - } catch { - return "0.0.0"; - } -} - -async function main() { - const opts = parseArgs(process.argv); - if (opts.help) { - process.stderr.write(usage() + "\n"); - process.exit(0); - } - if (opts.version) { - const v = await getVersion(); - process.stdout.write(`splice ${v}\n`); - process.exit(0); - } - // Allow quick verbosity shorthands unless an explicit --log-level was provided - { - const argv = process.argv.slice(2); - const hasExplicitLogLevel = argv.includes("--log-level"); - const wantsQuiet = argv.includes("--quiet") || argv.includes("-q"); - const wantsVerbose = argv.includes("--verbose"); - if (!hasExplicitLogLevel) { - if (wantsQuiet) opts.logLevel = "error"; - else if (wantsVerbose) opts.logLevel = "debug"; - } - } - - const logger = makeLogger(opts.logLevel); - - // Warn on unknown flags with a simple suggestion - { - const argv = process.argv.slice(2); - const known = new Set([ - "--help", - "-h", - "--version", - "-V", - "--source", - "--archive-path", - "--out", - "--output-dir", - "--format", - "--formats", - "--output-formats", - "--system-message", - "--system", - "--dry-run", - "-n", - "--log-level", - "--quiet", - "-q", - "--verbose", - "--json-stdout", - "--", - ]); - const unknown = argv.filter( - (a) => a.startsWith("-") && !known.has(a) && a !== "-" && a !== "--", - ); - const candidates = Array.from(known).filter((f) => f.startsWith("--")); - const suggest = (flag: string): string | null => { - let best: string | null = null; - let score = -1; - for (const c of candidates) { - // simple common prefix score - let s = 0; - const L = Math.min(flag.length, c.length); - for (let i = 0; i < L; i++) { - if (flag[i] === c[i]) s++; - else break; - } - if (s > score) { - score = s; - best = c; - } - } - return score >= 2 ? best : null; - }; - for (const uf of unknown) { - const hint = suggest(uf); - if (hint) logger("warn", `Unknown flag ${uf}. Did you mean ${hint}?`); - else - logger( - "warn", - `Unknown flag ${uf}. Run with --help to see supported flags.`, - ); - } - } - - if (!opts.source || !opts.out) { - process.stderr.write(usage() + "\n"); - process.exit(2); - } - - const source = path.resolve(opts.source); - const outDir = path.resolve(opts.out); - - const detected = await detectTwitterArchive(source); - if (!detected) { - logger( - "error", - `Could not detect a Twitter archive at ${source} (missing data/manifest.js)`, - ); - process.exit(2); - } - - try { - logger("info", `Ingesting from ${source}`); - const items = await ingestTwitter(source, logger); - const filtered = applyFilters(items, { - since: opts.since, - until: opts.until, - minLength: opts.minLength, - excludeRt: opts.excludeRt, - onlyThreads: opts.onlyThreads, - withMedia: opts.withMedia, - }); - const all = indexById(filtered); - let { threads, conversations } = groupThreadsAndConversations(all); - if (opts.onlyThreads) { - conversations = []; - } - logger( - "info", - `Threads: ${threads.length}, Conversations: ${conversations.length}`, - ); - - // Validate formats and support --json-stdout for piping normalized items - const argv = process.argv.slice(2); - const formatSpecified = - argv.includes("--format") || - argv.includes("--formats") || - argv.includes("--output-formats"); - const allowedFormats = new Set(["markdown", "oai", "json", "sharegpt"]); - const requested = opts.format || []; - const validFormats = requested.filter((f) => allowedFormats.has(f)); - const invalidFormats = requested.filter((f) => !allowedFormats.has(f)); - for (const bad of invalidFormats) { - logger("warn", `Unknown format "${bad}". Supported: markdown, oai, json`); - } - const jsonStdout = argv.includes("--json-stdout"); - - if (jsonStdout) { - // Print normalized items as JSONL to stdout; logs remain on stderr - for (const it of items) { - process.stdout.write(JSON.stringify(it) + "\n"); - } - logger("info", "Wrote normalized items to stdout"); - process.exit(0); - } - - if (formatSpecified && validFormats.length === 0) { - logger( - "error", - "No valid formats requested. Supported: markdown, oai, json", - ); - process.stderr.write(usage() + "\n"); - process.exit(2); - } - - if (validFormats.includes("markdown")) { - await writeMarkdown( - threads, - opts.onlyThreads ? [] : filtered, - outDir, - logger, - opts.dryRun, - ); - } - if (validFormats.includes("json")) { - await writeNormalizedJSONL(items, outDir, logger, opts.dryRun); - } - const systemMessage = - process.env.SPLICE_SYSTEM_MESSAGE ?? opts.systemMessage; - logger("debug", `System message: ${systemMessage}`); - if (validFormats.includes("oai")) { - await writeOAI( - threads, - conversations, - outDir, - systemMessage, - logger, - opts.dryRun, - ); - } - if (validFormats.includes("sharegpt")) { - await writeShareGPT(threads, conversations, outDir, logger, opts.dryRun); - } - if (opts.statsJson) { - await writeStatsJSON( - filtered, - threads, - conversations, - outDir, - logger, - opts.dryRun, - ); - } - - logger("info", opts.dryRun ? "Dry run complete." : "Done."); - process.exit(0); - } catch (e) { - logger("error", (e as Error).message); +try { + // Prefer TypeScript entry during development + await import("./src/cli/splice.ts"); +} catch (errTs) { + try { + // Fallback to compiled JavaScript entry after build + await import("./dist/cli/splice.js"); + } catch (errJs) { + const msgTs = (errTs && (errTs as Error).message) || String(errTs); + const msgJs = (errJs && (errJs as Error).message) || String(errJs); + console.error("[error] Failed to load CLI entry."); + console.error(" TS entry error:", msgTs); + console.error(" JS entry error:", msgJs); process.exit(1); } } - -main().catch((err) => { - process.stderr.write(`[error] ${(err as Error).message}\n`); - process.exit(1); -}); diff --git a/src/cli/splice.ts b/src/cli/splice.ts index 55b972c..0c5bb4c 100644 --- a/src/cli/splice.ts +++ b/src/cli/splice.ts @@ -25,6 +25,14 @@ import { writeShareGPT, writeStatsJSON, } from "../outputs/writers"; +import { + FsStore, + createCheckpointManifest, + storeItemsJSONL, + storeThreadsJSON, + storeConversationsJSON, +} from "../core/store"; +import { decisionsFromIds } from "../core/decisions"; /* -------------------------------- version -------------------------------- */ @@ -83,6 +91,8 @@ async function main() { "--archive-path", "--out", "--output-dir", + "--workspace", + "--checkpoint", "--format", "--formats", "--output-formats", @@ -102,6 +112,12 @@ async function main() { "--only-threads", "--with-media", "--stats-json", + "--decisions-import", + "--decisions-file", + "--set-status", + "--status", + "--ids", + "--ids-file", "--", ]); const unknown = argv.filter( @@ -144,6 +160,9 @@ async function main() { const source = path.resolve(opts.source); const outDir = path.resolve(opts.out); + const workspaceDir = path.resolve( + opts.workspace || path.join(outDir, ".splice"), + ); const detected = await detectTwitterArchive(source); if (!detected) { @@ -177,6 +196,159 @@ async function main() { `Threads: ${threads.length}, Conversations: ${conversations.length}`, ); + // Create a checkpoint manifest and store artifacts + if (opts.dryRun) { + logger("info", `(dry-run) would create checkpoint in ${workspaceDir}`); + } else { + try { + const store = new FsStore(workspaceDir, logger); + + // Optional: build a decisions stream from --decisions-import and/or --set-status with ids/ids-file + let decisionsRef: string | undefined; + + // Helper to parse a JSONL file into an iterable of objects + const parseJsonlFile = async function* ( + filePath: string, + ): AsyncIterable { + try { + const raw = await fs.readFile(filePath, "utf8"); + const lines = raw.split(/\r?\n/); + for (const line of lines) { + const t = line.trim(); + if (!t) continue; + try { + yield JSON.parse(t); + } catch { + // ignore invalid line + } + } + } catch { + // ignore missing file + } + }; + + // Helper to load IDs from a file (either JSON array or newline-separated) + const loadIdsFile = async (filePath: string): Promise => { + try { + const raw = await fs.readFile(filePath, "utf8"); + try { + const j = JSON.parse(raw); + if (Array.isArray(j)) + return j.filter((x) => typeof x === "string"); + } catch { + // not JSON; fall through + } + return raw + .split(/\r?\n/) + .map((s) => s.trim()) + .filter(Boolean); + } catch { + return []; + } + }; + + // Build a combined decisions iterable + const decisionIterables: Array> = []; + + // Import existing decisions JSONL if provided + const decisionsPath = opts.decisionsImport; + if (decisionsPath) { + decisionIterables.push(parseJsonlFile(decisionsPath)); + logger("info", `Importing decisions from ${decisionsPath}`); + } + + // Generate new decisions from --set-status and ids/ids-file + const setStatus = opts.setStatus; + if (setStatus) { + let ids: string[] = opts.ids || []; + const idsFile = opts.idsFile; + if (idsFile) { + const fromFile = await loadIdsFile(idsFile); + ids = ids.concat(fromFile); + } + ids = Array.from(new Set(ids.filter(Boolean))); + if (ids.length > 0) { + const records = decisionsFromIds(ids, setStatus, { by: "cli" }); + // Wrap array as async iterable + async function* gen() { + for (const r of records) yield r; + } + decisionIterables.push(gen()); + logger( + "info", + `Prepared ${records.length} decision(s) with status="${setStatus}"`, + ); + } + } + + // If we have any decision streams, materialize them into the store + if (decisionIterables.length > 0) { + // Concatenate iterables + async function* concatAll() { + for (const it of decisionIterables) { + for await (const rec of it) { + yield rec; + } + } + } + decisionsRef = await store.putJSONL(concatAll()); + logger("info", `Stored decisions JSONL as ${decisionsRef}`); + } + + const itemsRefAll = await storeItemsJSONL(store, items); + const filteredRef = await storeItemsJSONL(store, filtered); + const threadsRef = await storeThreadsJSON(store, threads); + const conversationsRef = await storeConversationsJSON( + store, + conversations, + ); + + const transforms = [ + { + name: "filter", + config: { + since: opts.since, + until: opts.until, + minLength: opts.minLength, + excludeRt: opts.excludeRt, + withMedia: opts.withMedia, + }, + inputRef: itemsRefAll, + outputRef: filteredRef, + stats: { total: items.length, filtered: filtered.length }, + }, + { + name: "group:threads", + config: {}, + inputRef: filteredRef, + outputRef: threadsRef, + stats: { threads: threads.length }, + }, + { + name: "group:conversations", + config: {}, + inputRef: filteredRef, + outputRef: conversationsRef, + stats: { conversations: conversations.length }, + }, + ]; + + const latest = await store.resolveLatestCheckpoint().catch(() => null); + const manifest = createCheckpointManifest({ + parentId: (latest && latest.id) || null, + itemsRef: itemsRefAll, + sourceRefs: [{ kind: "twitter", uri: source }], + transforms, + decisionsRef, + materialized: { threadsRef, conversationsRef }, + }); + const cpId = await store.saveCheckpoint(manifest); + logger("info", `Saved checkpoint ${cpId} in ${workspaceDir}`); + } catch (e) { + logger("warn", `Failed to write checkpoint: ${(e as Error).message}`); + } + } + // Validate formats and support --json-stdout for piping normalized items const argv = process.argv.slice(2); const formatSpecified = diff --git a/src/core/decisions.ts b/src/core/decisions.ts new file mode 100644 index 0000000..9b0e74c --- /dev/null +++ b/src/core/decisions.ts @@ -0,0 +1,320 @@ +import type { ContentItem } from "./types"; + +/** + * Decisions — status/tags attached to IDs, typically ContentItem IDs. + * + * Pure helpers to fold a stream of decision records (e.g., parsed from JSONL) + * into the latest per-id state; group and filter items by status; and build + * decision records programmatically. + */ + +/* --------------------------------- Types --------------------------------- */ + +export const DEFAULT_DECISION_STATUSES = ["unread", "export", "skip"] as const; +export type DefaultDecisionStatus = (typeof DEFAULT_DECISION_STATUSES)[number]; + +// Allow custom statuses in addition to defaults +export type DecisionStatus = DefaultDecisionStatus | string; + +export interface DecisionRecord { + // Target identifier (e.g., ContentItem.id) + id: string; + + // Primary decision status (unread | export | skip | ...custom) + status?: DecisionStatus; + + // Optional tags aggregated across decisions (union) + tags?: string[]; + + // Optional freeform note/comment + notes?: string; + + // ISO-8601 timestamp when this decision was made + // If omitted, treated as the lowest (oldest) timestamp. + ts?: string; + + // Optional user/agent (e.g., "alice", "ui", "auto-filter") + by?: string; + + // Arbitrary metadata for future extensions + meta?: Record; +} + +/** + * The latest, consolidated view per id. + * - status is the last-known value (newer decisions win) + * - tags are the union across all decisions for that id + * - notes are the last-known (newer decisions win) + * - ts is the latest timestamp observed for that id + */ +export interface LatestDecision extends Required> { + status?: DecisionStatus; + tags: string[]; + notes?: string; + ts?: string; + by?: string; + meta?: Record; +} + +/* -------------------------------- Helpers -------------------------------- */ + +function isValidIso(ts: string | undefined): boolean { + if (!ts) return false; + const n = Date.parse(ts); + return !Number.isNaN(n); +} + +/** + * Compare two "decision-like" objects (need only ts) in ascending order by time. + * Returns: + * > 0 if a is newer than b + * < 0 if a is older than b + * = 0 if equal recency or both invalid + */ +export function compareDecisionRecency( + a: { ts?: string }, + b: { ts?: string }, +): number { + const aValid = isValidIso(a.ts); + const bValid = isValidIso(b.ts); + if (aValid && bValid) { + return Date.parse(a.ts!) - Date.parse(b.ts!); + } + if (aValid && !bValid) return 1; + if (!aValid && bValid) return -1; + return 0; +} + +/** + * Merge decision B into A, assuming B is newer or otherwise wins. + * - status: take B's if present + * - tags: union + * - notes/by/meta/ts: take B's if present + */ +export function mergeDecision( + a: LatestDecision, + b: DecisionRecord, +): LatestDecision { + const merged: LatestDecision = { + id: a.id, + status: b.status ?? a.status, + tags: Array.from(new Set([...(a.tags || []), ...(b.tags || [])])), + notes: b.notes ?? a.notes, + ts: b.ts ?? a.ts, + by: b.by ?? a.by, + meta: { ...(a.meta || {}), ...(b.meta || {}) }, + }; + return merged; +} + +/** + * Build a LatestDecision from a single record (base state). + */ +export function baseFromDecision(rec: DecisionRecord): LatestDecision { + return { + id: rec.id, + status: rec.status, + tags: Array.from(new Set(rec.tags || [])), + notes: rec.notes, + ts: rec.ts, + by: rec.by, + meta: rec.meta ? { ...rec.meta } : undefined, + }; +} + +export interface FoldOptions { + /** + * Restrict to known statuses. If false, accept any string. + * Defaults to true (validate against DEFAULT_DECISION_STATUSES). + */ + restrictStatuses?: boolean; + + /** + * Custom allowed statuses (takes precedence over defaults if provided). + */ + allowedStatuses?: ReadonlyArray; +} + +/** + * Normalize a status. Returns undefined if restricted and invalid. + */ +export function normalizeStatus( + status: DecisionStatus | undefined, + opts?: FoldOptions, +): DecisionStatus | undefined { + if (!status) return undefined; + const restrict = opts?.restrictStatuses ?? true; + const allowed = opts?.allowedStatuses ?? DEFAULT_DECISION_STATUSES; + if (!restrict) return status; + return allowed.includes(status) ? status : undefined; +} + +/** + * Fold a (possibly long) stream of decisions into the latest per id. + * Newer decisions override older ones. In case of equal recency, last write wins + * based on iteration order. + */ +export async function foldDecisions( + decisions: Iterable | AsyncIterable, + opts?: FoldOptions, +): Promise> { + const out = new Map(); + for await (const rec of decisions as AsyncIterable) { + if (!rec || typeof rec.id !== "string" || rec.id.length === 0) { + continue; + } + // Optionally validate status + const normStatus = normalizeStatus(rec.status, opts); + const normalized: DecisionRecord = { ...rec, status: normStatus }; + + const existing = out.get(rec.id); + if (!existing) { + out.set(rec.id, baseFromDecision(normalized)); + continue; + } + + // Determine if incoming is newer; if equal, prefer incoming (last write wins) + const cmp = compareDecisionRecency(normalized, existing); + if (cmp >= 0) { + out.set(rec.id, mergeDecision(existing, normalized)); + } else { + // Older record: still merge tags (union) but keep newer status/notes/ts + const mergedTags = Array.from( + new Set([...(existing.tags || []), ...(normalized.tags || [])]), + ); + out.set(rec.id, { ...existing, tags: mergedTags }); + } + } + return out; +} + +/* ----------------------------- Selection helpers ----------------------------- */ + +/** + * Returns a numeric ranking for statuses useful for sorting/filtering UIs. + * Higher rank means "more included": + * export (2) > unread (1) > skip (0) + * Unknown/custom statuses get rank 1 by default (treat as unread). + */ +export function statusRank(status?: DecisionStatus): number { + if (status === "export") return 2; + if (status === "skip") return 0; + if (status === "unread") return 1; + return 1; // unknown/custom -> neutral default +} + +export interface ApplyStatusOptions { + /** + * If an item has no decision, treat it as this status for categorization. + * Defaults to "unread". + */ + defaultStatus?: DecisionStatus; + + /** + * Predicate to define whether a decision means "selected for export". + * Defaults to (status === "export"). + */ + isSelected?: (status?: DecisionStatus) => boolean; +} + +export interface AppliedStatus { + // Lists grouped by status + byStatus: Record; + // Convenience aliases for defaults + unread: T[]; + export: T[]; + skip: T[]; + // Flattened selection list (based on isSelected) + selected: T[]; +} + +/** + * Apply latest decisions to a list of items, grouping them by status and + * computing the selection subset (e.g., export). + */ +export function applyDecisionStatus( + items: T[], + latest: Map, + opts?: ApplyStatusOptions, +): AppliedStatus { + const defaultStatus = opts?.defaultStatus ?? "unread"; + const isSelected = + opts?.isSelected ?? ((s?: DecisionStatus) => s === "export"); + + const byStatus: Record = {}; + const selected: T[] = []; + + for (const item of items) { + const dec = latest.get(item.id); + const status = dec?.status ?? defaultStatus; + (byStatus[status] ||= []).push(item); + if (isSelected(status)) selected.push(item); + } + + // Ensure default buckets exist for convenience + const unread = byStatus["unread"] || []; + const willExport = byStatus["export"] || []; + const skip = byStatus["skip"] || []; + + return { byStatus, unread, export: willExport, skip, selected }; +} + +/* ------------------------------- Summarization ------------------------------- */ + +export interface DecisionSummary { + totalIds: number; + countsByStatus: Record; +} + +/** + * Summarize the latest decisions (counts by status). + */ +export function summarizeLatestDecisions( + latest: Map, +): DecisionSummary { + const counts: Record = {}; + for (const rec of latest.values()) { + const status = rec.status ?? "unread"; + counts[status] = (counts[status] || 0) + 1; + } + const total = Array.from(latest.keys()).length; + return { totalIds: total, countsByStatus: counts }; +} + +/* --------------------------------- Builders --------------------------------- */ + +/** + * Utility to create decision records for a set of ids. + */ +export function decisionsFromIds( + ids: string[], + status: DecisionStatus, + params?: Omit, +): DecisionRecord[] { + const base: Omit = { + ts: params?.ts ?? new Date().toISOString(), + by: params?.by, + tags: params?.tags, + notes: params?.notes, + meta: params?.meta, + }; + return ids.map((id) => ({ id, status, ...base })); +} + +/* ------------------------------- Item filters -------------------------------- */ + +/** + * Filter a list of ContentItem to those whose latest decision is selected for export. + */ +export function filterSelectedItems( + items: ContentItem[], + latest: Map, + isSelected: (status?: DecisionStatus) => boolean = (s) => s === "export", +): ContentItem[] { + const out: ContentItem[] = []; + for (const it of items) { + const s = latest.get(it.id)?.status; + if (isSelected(s)) out.push(it); + } + return out; +} diff --git a/src/core/store.ts b/src/core/store.ts new file mode 100644 index 0000000..93b2a85 --- /dev/null +++ b/src/core/store.ts @@ -0,0 +1,377 @@ +import * as fsp from "node:fs/promises"; +import * as fs from "node:fs"; +import * as path from "node:path"; +import { createHash } from "node:crypto"; +import type { ContentItem, Thread, Level } from "./types"; + +/** + * Minimal JSONL-based filesystem store for checkpoints and artifacts. + * - Content-addressed objects under /objects/.{json|jsonl} + * - Checkpoint manifests under /checkpoints/.json + */ + +export const SCHEMA_VERSION = "0.1.0"; + +/* ------------------------------ Type contracts ------------------------------ */ + +export type Logger = (level: Level, message: string) => void; + +export interface CheckpointManifest { + id: string; + createdAt: string; // ISO + schemaVersion: string; // bump on breaking changes + parentId?: string | null; + + // Provenance for sources (freeform; useful for incremental ingest later) + sourceRefs: Array<{ kind: string; uri?: string; cursor?: string }>; + + // Primary normalized input + inputs: { + itemsRef: string; // ref to JSONL of ContentItem + }; + + // Pure transforms applied to inputs or intermediate refs + transforms: Array<{ + name: string; // e.g., "filter:minLength=30" + config: Record; + inputRef: string; + outputRef: string; + stats?: Record; + }>; + + // Optional manual decisions (append-only JSONL) + decisionsRef?: string; + + // Materialized selections/groups for fast output + materialized?: { + threadsRef?: string; // JSON of Thread[] or JSONL of ids (future) + conversationsRef?: string; // JSON of ContentItem[][] (or id lists) + }; + + notes?: string; +} + +export interface Store { + putObject(obj: unknown, opts?: { kind?: string }): Promise; + putJSONL( + iterable: Iterable | AsyncIterable, + opts?: { kind?: string }, + ): Promise; + + getObject(ref: string): Promise; + getJSONL(ref: string): AsyncIterable; + + saveCheckpoint(manifest: CheckpointManifest): Promise; + readCheckpoint(id: string): Promise; + listCheckpoints(): Promise; + resolveLatestCheckpoint(): Promise; +} + +/* -------------------------------- Utilities -------------------------------- */ + +async function ensureDir(p: string) { + await fsp.mkdir(p, { recursive: true }); +} + +function stableStringify(value: unknown): string { + // Deterministic JSON stringify (sort object keys) + const seen = new WeakSet(); + const stringify = (v: unknown): string => { + if (v === null || typeof v !== "object") return JSON.stringify(v); + if (seen.has(v as object)) + throw new TypeError("Converting circular structure to JSON"); + seen.add(v as object); + if (Array.isArray(v)) { + const out = "[" + v.map((x) => stringify(x)).join(",") + "]"; + seen.delete(v as object); + return out; + } + const obj = v as Record; + const keys = Object.keys(obj).sort(); + const body = keys + .map((k) => JSON.stringify(k) + ":" + stringify(obj[k])) + .join(","); + const out = "{" + body + "}"; + seen.delete(v as object); + return out; + }; + return stringify(value); +} + +function hashString(s: string): string { + return createHash("sha256").update(s).digest("hex"); +} + +function hashInit() { + return createHash("sha256"); +} +function hashUpdate(h: ReturnType, s: string) { + h.update(s); +} +function hashDigest(h: ReturnType): string { + return h.digest("hex"); +} + +type RefKind = "json" | "jsonl"; +type Ref = { kind: RefKind; hash: string }; + +function makeRef(kind: RefKind, hash: string): string { + return `${kind}:${hash}`; +} +function parseRef(ref: string): Ref { + const [kind, hash] = ref.split(":"); + if ((kind !== "json" && kind !== "jsonl") || !hash) { + throw new Error(`Invalid ref: ${ref}`); + } + return { kind: kind as RefKind, hash }; +} + +/** + * Read a file line-by-line and yield parsed JSON per line. + */ +async function* readJSONL(filePath: string): AsyncIterable { + const stream = fs.createReadStream(filePath, { encoding: "utf8" }); + let buffer = ""; + for await (const chunk of stream) { + buffer += chunk; + let idx: number; + while ((idx = buffer.indexOf("\n")) >= 0) { + const line = buffer.slice(0, idx).trim(); + buffer = buffer.slice(idx + 1); + if (!line) continue; + yield JSON.parse(line) as T; + } + } + const last = buffer.trim(); + if (last) yield JSON.parse(last) as T; +} + +/* --------------------------------- FsStore --------------------------------- */ + +export class FsStore implements Store { + private root: string; + private objectsDir: string; + private checkpointsDir: string; + private log: Logger; + + constructor(workspaceDir: string, logger?: Logger) { + this.root = path.resolve(workspaceDir); + this.objectsDir = path.join(this.root, "objects"); + this.checkpointsDir = path.join(this.root, "checkpoints"); + this.log = logger ?? (() => {}); + } + + private async init() { + await ensureDir(this.objectsDir); + await ensureDir(this.checkpointsDir); + } + + private objectPath(kind: RefKind, hash: string): string { + return path.join(this.objectsDir, `${hash}.${kind}`); + } + + async putObject(obj: unknown, _opts?: { kind?: string }): Promise { + await this.init(); + const json = stableStringify(obj); + const hash = hashString(json); + const ref = makeRef("json", hash); + const p = this.objectPath("json", hash); + try { + await fsp.stat(p); // exists + this.log("debug", `putObject: reuse ${ref}`); + return ref; + } catch { + // fallthrough + } + await fsp.writeFile(p, json, "utf8"); + this.log("info", `putObject: wrote ${ref}`); + return ref; + } + + async putJSONL( + iterable: Iterable | AsyncIterable, + _opts?: { kind?: string }, + ): Promise { + await this.init(); + // Write to temp, compute hash while writing lines + const tmpName = `.tmp-${Date.now()}-${Math.random().toString(16).slice(2)}`; + const tmpPath = path.join(this.objectsDir, tmpName); + const fh = await fsp.open(tmpPath, "w"); + const h = hashInit(); + try { + const maybeAsync = (iterable as any)?.[Symbol.asyncIterator]; + const maybeSync = (iterable as any)?.[Symbol.iterator]; + + if (typeof maybeAsync === "function") { + for await (const item of iterable as AsyncIterable) { + const line = JSON.stringify(item) + "\n"; + await fh.write(line); + hashUpdate(h, line); + } + } else if (typeof maybeSync === "function") { + for (const item of iterable as Iterable) { + const line = JSON.stringify(item) + "\n"; + await fh.write(line); + hashUpdate(h, line); + } + } else { + throw new TypeError( + "putJSONL: Provided value is not Iterable or AsyncIterable", + ); + } + } finally { + await fh.close(); + } + const hash = hashDigest(h); + const ref = makeRef("jsonl", hash); + const finalPath = this.objectPath("jsonl", hash); + // If already exists, remove temp and reuse existing + try { + await fsp.stat(finalPath); + await fsp.rm(tmpPath).catch(() => {}); + this.log("debug", `putJSONL: reuse ${ref}`); + return ref; + } catch { + // rename temp into place + await fsp.rename(tmpPath, finalPath); + this.log("info", `putJSONL: wrote ${ref}`); + return ref; + } + } + + async getObject(ref: string): Promise { + await this.init(); + const { kind, hash } = parseRef(ref); + if (kind !== "json") + throw new Error(`getObject expects json ref, got ${ref}`); + const p = this.objectPath(kind, hash); + const data = await fsp.readFile(p, "utf8"); + return JSON.parse(data) as T; + } + + getJSONL(ref: string): AsyncIterable { + const { kind, hash } = parseRef(ref); + if (kind !== "jsonl") { + throw new Error(`getJSONL expects jsonl ref, got ${ref}`); + } + const p = this.objectPath(kind, hash); + return readJSONL(p); + } + + async saveCheckpoint(manifest: CheckpointManifest): Promise { + await this.init(); + const id = manifest.id ?? this.generateCheckpointId(manifest); + const full: CheckpointManifest = { + ...manifest, + id, + schemaVersion: manifest.schemaVersion || SCHEMA_VERSION, + createdAt: manifest.createdAt || new Date().toISOString(), + }; + const p = path.join(this.checkpointsDir, `${id}.json`); + await fsp.writeFile(p, stableStringify(full), "utf8"); + this.log("info", `saveCheckpoint: ${id}`); + return id; + } + + async readCheckpoint(id: string): Promise { + await this.init(); + const p = path.join(this.checkpointsDir, `${id}.json`); + const data = await fsp.readFile(p, "utf8"); + return JSON.parse(data) as CheckpointManifest; + } + + async listCheckpoints(): Promise { + await this.init(); + let files: string[] = []; + try { + files = await fsp.readdir(this.checkpointsDir); + } catch { + return []; + } + const out: CheckpointManifest[] = []; + for (const f of files) { + if (!f.endsWith(".json")) continue; + try { + const data = await fsp.readFile( + path.join(this.checkpointsDir, f), + "utf8", + ); + const m = JSON.parse(data) as CheckpointManifest; + out.push(m); + } catch { + // skip invalid files + } + } + out.sort((a, b) => (a.createdAt || "").localeCompare(b.createdAt || "")); + return out; + } + + async resolveLatestCheckpoint(): Promise { + const list = await this.listCheckpoints(); + if (!list.length) return null; + return list[list.length - 1]; + } + + private generateCheckpointId(manifest: Partial): string { + const stamp = new Date().toISOString().replace(/[:.]/g, "-"); + const basis = stableStringify({ + parentId: manifest.parentId ?? null, + inputs: manifest.inputs ?? {}, + transforms: manifest.transforms ?? [], + notes: manifest.notes ?? "", + }); + const short = hashString(basis).slice(0, 8); + return `${stamp}-${short}`; + } +} + +/* ------------------------ Convenience manifest builders ----------------------- */ + +export function createCheckpointManifest(args: { + parentId?: string | null; + itemsRef: string; + sourceRefs?: Array<{ kind: string; uri?: string; cursor?: string }>; + transforms?: CheckpointManifest["transforms"]; + decisionsRef?: string; + materialized?: CheckpointManifest["materialized"]; + notes?: string; +}): CheckpointManifest { + const now = new Date().toISOString(); + return { + id: "", + createdAt: now, + schemaVersion: SCHEMA_VERSION, + parentId: args.parentId ?? null, + sourceRefs: args.sourceRefs ?? [], + inputs: { itemsRef: args.itemsRef }, + transforms: args.transforms ?? [], + decisionsRef: args.decisionsRef, + materialized: args.materialized, + notes: args.notes, + }; +} + +/* ------------------------- Typed helpers for common refs ---------------------- */ + +// Helpers to store common structures and return refs + +export async function storeItemsJSONL( + store: Store, + items: Iterable | AsyncIterable, +): Promise { + return store.putJSONL(items, { kind: "items" }); +} + +export async function storeThreadsJSON( + store: Store, + threads: Thread[], +): Promise { + return store.putObject(threads, { kind: "threads" }); +} + +export async function storeConversationsJSON( + store: Store, + conversations: ContentItem[][], +): Promise { + return store.putObject(conversations, { kind: "conversations" }); +} diff --git a/src/core/types.ts b/src/core/types.ts index 647bf73..8a8a3b6 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -60,6 +60,8 @@ export function makeLogger(level: Level): (lvl: Level, msg: string) => void { export type CLIOptions = { source?: string; out?: string; + workspace?: string; + checkpoint?: string; format: string[]; // e.g. ['markdown','oai'] systemMessage: string; dryRun: boolean; @@ -73,12 +75,16 @@ export type CLIOptions = { excludeRt: boolean; onlyThreads: boolean; withMedia: boolean; + // decisions + decisionsImport?: string; + setStatus?: string; + ids?: string[]; + idsFile?: string; // outputs statsJson: boolean; }; -export const DEFAULT_SYSTEM_MESSAGE = - "You have been uploaded to the internet"; +export const DEFAULT_SYSTEM_MESSAGE = "You have been uploaded to the internet"; export function parseArgs(argv: string[]): CLIOptions { const opts: CLIOptions = { @@ -94,7 +100,15 @@ export function parseArgs(argv: string[]): CLIOptions { excludeRt: false, onlyThreads: false, withMedia: false, + // decisions + decisionsImport: undefined, + setStatus: undefined, + ids: [], + idsFile: undefined, + // outputs statsJson: false, + workspace: undefined, + checkpoint: undefined, }; const args = argv.slice(2); @@ -160,10 +174,29 @@ export function parseArgs(argv: string[]): CLIOptions { opts.withMedia = true; } else if (a === "--stats-json") { opts.statsJson = true; + } else if (a === "--decisions-import" || a === "--decisions-file") { + opts.decisionsImport = args[++i]; + } else if (a === "--set-status" || a === "--status") { + opts.setStatus = args[++i]; + } else if (a === "--ids") { + const next = args[++i]; + if (next) { + const parts = next.split(",").filter(Boolean); + if (parts.length > 1) opts.ids = parts; + else { + const list = [next]; + while (args[i + 1] && !args[i + 1].startsWith("-")) { + list.push(args[++i]); + } + opts.ids = list; + } + } + } else if (a === "--ids-file") { + opts.idsFile = args[++i]; } else if (a === "--") { break; } else if (a.startsWith("-")) { - // unknown flag; ignore to keep simple (CLI warns elsewhere) + // unknown flag; ignore to keep simple (CLI will warn elsewhere) } else { // positional? ignore for now } @@ -181,7 +214,7 @@ export function usage(): string { "Usage:", " splice --source --out [--format markdown oai json sharegpt] [--system-message ]", " [--since ] [--until ] [--min-length ] [--exclude-rt] [--only-threads] [--with-media]", - " [--dry-run] [--stats-json] [--log-level ] [--json-stdout] [--quiet|-q] [--verbose] [--version|-V]", + " [--dry-run] [--stats-json] [--log-level ] [--json-stdout] [--quiet|-q] [--verbose] [--version|-V] [--decisions-import ] [--set-status --ids <...>|--ids-file ]", "", "Options:", " --source Path to the Twitter archive directory",