-
Notifications
You must be signed in to change notification settings - Fork 68
Implement DAG-based message ordering (Phases A+B) #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cboos
wants to merge
20
commits into
main
Choose a base branch
from
dev/dag
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
28b7b97
Add DAG-based message architecture spec
cboos a7bd9d1
Add DAG infrastructure module (Phase A)
cboos f5b1dcd
Integrate DAG ordering into directory-mode loading (Phase B)
cboos 40aeddf
Add hierarchical session navigation with parent/child relationships (…
cboos 9cc1f85
Make session backlinks navigable with #msg-d-{N} anchors
cboos 00d0ba8
Remove unused extract_working_directories (review clean-up)
cboos b0f52fa
Remove unused has_cache_changes (review clean-up)
cboos fdd4c47
Fix progress entries breaking DAG chain by repairing parent pointers
cboos 0b0e46b
Add within-session fork (rewind) visualization
cboos 44e2ffc
Add message previews to fork/branch nav items and headers
cboos a60cef1
Improve fork/branch presentation with context and visual hierarchy
cboos db88023
Fix false forks from context compaction replays and tool-result side-…
cboos f118cbd
Add debug UUID toggle to show uuid/parentUuid on each message
cboos abd9a2d
Handle tool-result variant 2: User continues, Assistant subtree dead-…
cboos c2dc669
Fix false orphans caused by over-aggressive user text deduplication
cboos 8a2afd6
Fix false forks from context compaction replays and tool-result side-…
cboos 6148ccd
Thread session_tree through individual session file generation
cboos e852096
Remove unused import (ruff fix)
cboos b357585
Fix pyright errors: type narrowing and protected access
cboos 07cf557
Fix ty check warnings: narrow TranscriptEntry union before accessing …
cboos File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,13 +29,16 @@ | |
| from .parser import parse_timestamp | ||
| from .factories import create_transcript_entry | ||
| from .models import ( | ||
| BaseTranscriptEntry, | ||
| TranscriptEntry, | ||
| AssistantTranscriptEntry, | ||
| QueueOperationTranscriptEntry, | ||
| SummaryTranscriptEntry, | ||
| SystemTranscriptEntry, | ||
| UserTranscriptEntry, | ||
| ToolResultContent, | ||
| ) | ||
| from .dag import SessionTree, build_dag_from_entries, traverse_session_tree | ||
| from .renderer import get_renderer, is_html_outdated | ||
|
|
||
|
|
||
|
|
@@ -47,6 +50,100 @@ def get_file_extension(format: str) -> str: | |
| return "md" if format in ("md", "markdown") else format | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Progress Chain Repair | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| def _scan_file_progress(path: Path, chain: dict[str, Optional[str]]) -> None: | ||
| """Extract progress entry uuid->parentUuid from a single JSONL file.""" | ||
| try: | ||
| with open(path, "r", encoding="utf-8", errors="replace") as f: | ||
| for line in f: | ||
| if "progress" not in line: # Fast pre-filter | ||
| continue | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| try: | ||
| raw = json.loads(line) | ||
| if not isinstance(raw, dict): | ||
| continue | ||
| d = cast(dict[str, Any], raw) | ||
| if d.get("type") == "progress": | ||
| uuid = d.get("uuid") | ||
| if isinstance(uuid, str): | ||
| chain[uuid] = d.get("parentUuid") | ||
| except json.JSONDecodeError: | ||
| continue | ||
| except FileNotFoundError: | ||
| pass # Race condition: file may have been deleted | ||
|
|
||
|
|
||
| def _scan_progress_chains(*paths: Path) -> dict[str, Optional[str]]: | ||
| """Fast scan of JSONL files for progress entry uuid->parentUuid mappings.""" | ||
| chain: dict[str, Optional[str]] = {} | ||
| for path in paths: | ||
| if path.is_file(): | ||
| _scan_file_progress(path, chain) | ||
| elif path.is_dir(): | ||
| for f in path.glob("*.jsonl"): | ||
| _scan_file_progress(f, chain) | ||
| # Also scan subagent directories | ||
| for f in path.glob("*/subagents/*.jsonl"): | ||
| _scan_file_progress(f, chain) | ||
| return chain | ||
|
|
||
|
|
||
| def _scan_sidechain_uuids(directory: Path) -> set[str]: | ||
| """Collect UUIDs from sidechain/subagent files not loaded into the DAG. | ||
|
|
||
| Some subagent files (e.g. aprompt_suggestion) are never referenced | ||
| via agentId in the main session, so they aren't loaded by | ||
| load_transcript(). Their UUIDs are needed to suppress false orphan | ||
| warnings when main-chain entries reference sidechain parents. | ||
| """ | ||
| uuids: set[str] = set() | ||
| for f in directory.glob("*/subagents/*.jsonl"): | ||
| try: | ||
| with open(f, "r", encoding="utf-8", errors="replace") as fh: | ||
| for line in fh: | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| try: | ||
| raw = json.loads(line) | ||
| if isinstance(raw, dict): | ||
| uuid = cast(dict[str, Any], raw).get("uuid") | ||
| if isinstance(uuid, str): | ||
| uuids.add(uuid) | ||
| except json.JSONDecodeError: | ||
| continue | ||
| except FileNotFoundError: | ||
| pass | ||
| return uuids | ||
|
|
||
|
|
||
| def _repair_parent_chains( | ||
| messages: list[TranscriptEntry], | ||
| progress_chain: dict[str, Optional[str]], | ||
| ) -> None: | ||
| """Repair parentUuid fields that point to progress entries. | ||
|
|
||
| Walks the progress chain to find the nearest non-progress ancestor. | ||
| Mutates entries in place (Pydantic v2 models are mutable by default). | ||
| """ | ||
| if not progress_chain: | ||
| return | ||
| for msg in messages: | ||
| parent = getattr(msg, "parentUuid", None) | ||
| if parent and parent in progress_chain: | ||
| current: Optional[str] = parent | ||
| while current is not None and current in progress_chain: | ||
| current = progress_chain[current] | ||
| msg.parentUuid = current # type: ignore[union-attr] | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Transcript Loading Functions | ||
| # ============================================================================= | ||
|
|
@@ -315,8 +412,12 @@ def load_directory_transcripts( | |
| from_date: Optional[str] = None, | ||
| to_date: Optional[str] = None, | ||
| silent: bool = False, | ||
| ) -> list[TranscriptEntry]: | ||
| """Load all JSONL transcript files from a directory and combine them.""" | ||
| ) -> tuple[list[TranscriptEntry], SessionTree]: | ||
| """Load all JSONL transcript files from a directory and combine them. | ||
|
|
||
| Returns (messages, session_tree) — the tree is reused by the renderer | ||
| to avoid rebuilding the DAG. | ||
| """ | ||
| all_messages: list[TranscriptEntry] = [] | ||
|
|
||
| # Find all .jsonl files, excluding agent files (they are loaded via load_transcript | ||
|
|
@@ -331,14 +432,35 @@ def load_directory_transcripts( | |
| ) | ||
| all_messages.extend(messages) | ||
|
|
||
| # Sort all messages chronologically | ||
| def get_timestamp(entry: TranscriptEntry) -> str: | ||
| if hasattr(entry, "timestamp"): | ||
| return entry.timestamp # type: ignore | ||
| return "" | ||
| # Repair parent chains: progress entries create UUID gaps | ||
| progress_chain = _scan_progress_chains(directory_path) | ||
| _repair_parent_chains(all_messages, progress_chain) | ||
|
|
||
| all_messages.sort(key=get_timestamp) | ||
| return all_messages | ||
| # Partition: sidechain entries excluded from DAG (Phase C scope) | ||
| sidechain_entries = [e for e in all_messages if getattr(e, "isSidechain", False)] | ||
| main_entries = [e for e in all_messages if not getattr(e, "isSidechain", False)] | ||
|
|
||
| # Collect sidechain UUIDs so DAG build can suppress orphan warnings | ||
| # for parents that exist in sidechain data (will be integrated in Phase C) | ||
| sidechain_uuids: set[str] = { | ||
| e.uuid for e in sidechain_entries if isinstance(e, BaseTranscriptEntry) | ||
| } | ||
| # Also scan unloaded subagent files (e.g. aprompt_suggestion agents | ||
| # that are never referenced via agentId in the main session) | ||
| sidechain_uuids |= _scan_sidechain_uuids(directory_path) | ||
|
|
||
| # Build DAG and traverse (entries grouped by session, depth-first) | ||
| tree = build_dag_from_entries(main_entries, sidechain_uuids=sidechain_uuids) | ||
| dag_ordered = traverse_session_tree(tree) | ||
|
|
||
| # Re-add summaries/queue-ops (excluded from DAG since they lack uuid) | ||
| non_dag_entries: list[TranscriptEntry] = [ | ||
| e | ||
| for e in main_entries | ||
| if isinstance(e, (SummaryTranscriptEntry, QueueOperationTranscriptEntry)) | ||
| ] | ||
|
|
||
| return dag_ordered + sidechain_entries + non_dag_entries, tree | ||
|
Comment on lines
+456
to
+463
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Queue-operation chronology is lost when re-appended at the tail.
|
||
|
|
||
|
|
||
| # ============================================================================= | ||
|
|
@@ -406,9 +528,11 @@ def deduplicate_messages(messages: list[TranscriptEntry]) -> list[TranscriptEntr | |
| content_key = item.tool_use_id | ||
| break | ||
| else: | ||
| # No tool result found - this is a user text message | ||
| # No tool result found - this is a user text message. | ||
| # Use uuid to keep distinct messages (even at same timestamp) | ||
| # so DAG parent references remain valid. | ||
| is_user_text = True | ||
| # content_key stays empty (dedupe by timestamp alone) | ||
| content_key = message.uuid | ||
| elif isinstance(message, SummaryTranscriptEntry): | ||
| # Summaries have no timestamp or uuid - use leafUuid to keep them distinct | ||
| content_key = message.leafUuid | ||
|
|
@@ -698,6 +822,7 @@ def _generate_paginated_html( | |
| session_data: Dict[str, SessionCacheData], | ||
| working_directories: List[str], | ||
| silent: bool = False, | ||
| session_tree: Optional[SessionTree] = None, | ||
| ) -> Path: | ||
| """Generate paginated HTML files for combined transcript. | ||
|
|
||
|
|
@@ -856,6 +981,7 @@ def _generate_paginated_html( | |
| page_title, | ||
| page_info=page_info, | ||
| page_stats=page_stats, | ||
| session_tree=session_tree, | ||
| ) | ||
| page_file.write_text(html_content, encoding="utf-8") | ||
|
|
||
|
|
@@ -948,11 +1074,18 @@ def convert_jsonl_to( | |
| # Initialize working_directories for both branches (used by pagination in directory mode) | ||
| working_directories: List[str] = [] | ||
|
|
||
| # session_tree is populated in directory mode (DAG already built); | ||
| # None in single-file mode (renderer builds it on demand) | ||
| session_tree: Optional[SessionTree] = None | ||
|
|
||
| if input_path.is_file(): | ||
| # Single file mode - cache only available for directory mode | ||
| if output_path is None: | ||
| output_path = input_path.with_suffix(f".{ext}") | ||
| messages = load_transcript(input_path, silent=silent) | ||
| # Repair progress chain gaps for single-file mode | ||
| progress_chain = _scan_progress_chains(input_path) | ||
| _repair_parent_chains(messages, progress_chain) | ||
| title = f"Claude Transcript - {input_path.stem}" | ||
| cache_was_updated = False # No cache in single file mode | ||
| else: | ||
|
|
@@ -988,7 +1121,7 @@ def convert_jsonl_to( | |
| return output_path | ||
|
|
||
| # Phase 2: Load messages (will use fresh cache when available) | ||
| messages = load_directory_transcripts( | ||
| messages, session_tree = load_directory_transcripts( | ||
| input_path, cache_manager, from_date, to_date, silent | ||
| ) | ||
|
|
||
|
|
@@ -1065,6 +1198,7 @@ def convert_jsonl_to( | |
| session_data, | ||
| working_directories, | ||
| silent=silent, | ||
| session_tree=session_tree, | ||
| ) | ||
| else: | ||
| # Use single-file generation for small projects or filtered views | ||
|
|
@@ -1091,7 +1225,9 @@ def convert_jsonl_to( | |
| if should_regenerate: | ||
| # For referenced images, pass the output directory | ||
| output_dir = output_path.parent | ||
| content = renderer.generate(messages, title, output_dir=output_dir) | ||
| content = renderer.generate( | ||
| messages, title, output_dir=output_dir, session_tree=session_tree | ||
| ) | ||
| assert content is not None | ||
| output_path.write_text(content, encoding="utf-8") | ||
|
|
||
|
|
@@ -1117,44 +1253,12 @@ def convert_jsonl_to( | |
| cache_was_updated, | ||
| image_export_mode, | ||
| silent=silent, | ||
| session_tree=session_tree, | ||
| ) | ||
|
|
||
| return output_path | ||
|
|
||
|
|
||
| def has_cache_changes( | ||
| project_dir: Path, | ||
| cache_manager: Optional[CacheManager], | ||
| from_date: Optional[str] = None, | ||
| to_date: Optional[str] = None, | ||
| ) -> bool: | ||
| """Check if cache needs updating (fast mtime comparison only). | ||
|
|
||
| Returns True if there are modified files or cache is stale. | ||
| Does NOT load any messages - that's deferred to ensure_fresh_cache. | ||
| """ | ||
| if cache_manager is None: | ||
| return True # No cache means we need to process | ||
|
|
||
| jsonl_files = list(project_dir.glob("*.jsonl")) | ||
| if not jsonl_files: | ||
| return False | ||
|
|
||
| # Get cached project data | ||
| cached_project_data = cache_manager.get_cached_project_data() | ||
|
|
||
| # Check various invalidation conditions | ||
| modified_files = cache_manager.get_modified_files(jsonl_files) | ||
|
|
||
| return ( | ||
| cached_project_data is None | ||
| or from_date is not None | ||
| or to_date is not None | ||
| or bool(modified_files) | ||
| or (cached_project_data.total_message_count == 0 and bool(jsonl_files)) | ||
| ) | ||
|
|
||
|
|
||
| def ensure_fresh_cache( | ||
| project_dir: Path, | ||
| cache_manager: Optional[CacheManager], | ||
|
|
@@ -1165,7 +1269,6 @@ def ensure_fresh_cache( | |
| """Ensure cache is fresh and populated. Returns True if cache was updated. | ||
|
|
||
| This does the heavy lifting of loading and parsing files. | ||
| Call has_cache_changes() first for a fast check. | ||
| """ | ||
| if cache_manager is None: | ||
| return False | ||
|
|
@@ -1201,7 +1304,7 @@ def ensure_fresh_cache( | |
| # Load and process messages to populate cache | ||
| if not silent: | ||
| print(f"Updating cache for {project_dir.name}...") | ||
| messages = load_directory_transcripts( | ||
| messages, _tree = load_directory_transcripts( | ||
| project_dir, cache_manager, from_date, to_date, silent | ||
| ) | ||
|
|
||
|
|
@@ -1479,6 +1582,7 @@ def _generate_individual_session_files( | |
| cache_was_updated: bool = False, | ||
| image_export_mode: Optional[str] = None, | ||
| silent: bool = False, | ||
| session_tree: Optional[SessionTree] = None, | ||
| ) -> int: | ||
| """Generate individual files for each session in the specified format. | ||
|
|
||
|
|
@@ -1577,7 +1681,12 @@ def _generate_individual_session_files( | |
| if should_regenerate_session: | ||
| # Generate session content | ||
| session_content = renderer.generate_session( | ||
| messages, session_id, session_title, cache_manager, output_dir | ||
| messages, | ||
| session_id, | ||
| session_title, | ||
| cache_manager, | ||
| output_dir, | ||
| session_tree=session_tree, | ||
| ) | ||
| assert session_content is not None | ||
| # Write session file | ||
|
|
@@ -1897,7 +2006,7 @@ def process_projects_hierarchy( | |
| print( | ||
| f"Warning: No cached data available for {project_dir.name}, using fallback processing" | ||
| ) | ||
| messages = load_directory_transcripts( | ||
| messages, _tree = load_directory_transcripts( | ||
| project_dir, cache_manager, from_date, to_date, silent=silent | ||
| ) | ||
| # Ensure cache is populated with session data (including working directories) | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protect parent-chain repair from cyclic progress links.
If
progress_chaincontains a cycle, this loop never terminates and can hang conversion on malformed logs.Suggested fix
for msg in messages: parent = getattr(msg, "parentUuid", None) if parent and parent in progress_chain: current: Optional[str] = parent + seen: set[str] = set() while current is not None and current in progress_chain: + if current in seen: + # Corrupt/cyclic progress chain; stop following to avoid infinite loop + current = None + break + seen.add(current) current = progress_chain[current] msg.parentUuid = current # type: ignore[union-attr]🤖 Prompt for AI Agents