Skip to content

Handle large meeting notes files without exceeding token limits #61

@eshulman2

Description

@eshulman2

Description

Implement strategies to process large meeting notes and transcripts that exceed LLM token limits. Currently, very long meetings or detailed notes may fail to process or produce incomplete results.

Current Limitation

  • Maximum context window: ~8k-200k tokens (depending on model)
  • Long meetings (>2 hours) can exceed limits
  • Detailed notes with attachments/diagrams are problematic
  • Transcripts are typically much longer than notes

Example Issues:

  • 2-hour meeting transcript: ~15k-20k tokens
  • All-hands meeting notes: ~10k tokens
  • Multi-day conference notes: >50k tokens

🚨 Claude Proposed Solution (Read/Implement with caution) 🚨

1. Chunking Strategy

# src/core/workflows/sub_workflows/chunking_workflow.py

class MeetingNotesChunker:
    """Split large meeting notes into processable chunks."""

    def __init__(self, max_chunk_tokens: int = 6000):
        self.max_chunk_tokens = max_chunk_tokens
        self.tokenizer = tiktoken.get_encoding("cl100k_base")

    def chunk_by_sections(self, notes: str) -> List[MeetingChunk]:
        """Split notes by sections (headings, topics)."""

        # Parse markdown/structured notes
        sections = self._parse_sections(notes)

        chunks = []
        current_chunk = []
        current_tokens = 0

        for section in sections:
            section_tokens = len(self.tokenizer.encode(section.content))

            if current_tokens + section_tokens > self.max_chunk_tokens:
                # Save current chunk
                chunks.append(MeetingChunk(
                    content="\n\n".join(current_chunk),
                    sections=[s.title for s in current_chunk]
                ))
                current_chunk = []
                current_tokens = 0

            current_chunk.append(section)
            current_tokens += section_tokens

        return chunks

    def chunk_by_time(self, transcript: str) -> List[MeetingChunk]:
        """Split transcript by time segments."""

        # Parse timestamps from transcript
        segments = self._parse_timestamps(transcript)

        # Group into chunks of ~5-10 minutes
        chunks = []
        for i in range(0, len(segments), 10):
            chunk_segments = segments[i:i+10]
            chunks.append(MeetingChunk(
                content=self._combine_segments(chunk_segments),
                time_range=(chunk_segments[0].timestamp, chunk_segments[-1].timestamp)
            ))

        return chunks

2. Hierarchical Processing

class HierarchicalProcessor:
    """Process large notes in multiple passes."""

    async def process(self, notes: str) -> ActionItemsList:
        """
        Two-pass processing:
        1. Summarize each chunk
        2. Extract action items from summaries
        """

        # Pass 1: Chunk and summarize
        chunker = MeetingNotesChunker()
        chunks = chunker.chunk_by_sections(notes)

        summaries = []
        for chunk in chunks:
            summary = await self._summarize_chunk(chunk)
            summaries.append(summary)

        # Pass 2: Extract action items from summaries
        combined_summary = "\n\n".join([s.summary for s in summaries])
        action_items = await self._extract_action_items(combined_summary)

        # Pass 3: Enrich with details from original chunks
        enriched_items = await self._enrich_with_context(
            action_items,
            chunks
        )

        return enriched_items

    async def _summarize_chunk(self, chunk: MeetingChunk) -> ChunkSummary:
        """Summarize a single chunk focusing on action items."""

        prompt = f"""
        Summarize this meeting segment, focusing on:
        - Decisions made
        - Action items mentioned
        - Key discussion points
        - Commitments made by participants

        Meeting content:
        {chunk.content}

        Keep summary concise (<500 tokens).
        """

        summary = await self.llm.acomplete(prompt)

        return ChunkSummary(
            chunk_index=chunk.index,
            summary=summary.text,
            sections=chunk.sections
        )

3. Streaming Processing

class StreamingProcessor:
    """Process notes as stream, extracting action items incrementally."""

    async def process_stream(
        self,
        notes: str,
        callback: Callable[[ActionItem], None]
    ) -> ActionItemsList:
        """Process notes in stream, emitting action items as found."""

        chunker = MeetingNotesChunker()
        chunks = chunker.chunk_by_sections(notes)

        all_items = []

        for chunk in chunks:
            # Extract action items from this chunk
            items = await self._extract_from_chunk(chunk)

            # Emit items immediately
            for item in items:
                callback(item)
                all_items.append(item)

        # Deduplicate and merge similar items
        deduplicated = self._deduplicate_items(all_items)

        return ActionItemsList(action_items=deduplicated)

    def _deduplicate_items(
        self,
        items: List[ActionItem]
    ) -> List[ActionItem]:
        """Merge duplicate or very similar action items."""

        # Use semantic similarity to find duplicates
        # Merge items that are >80% similar
        pass

4. Selective Extraction

class SelectiveExtractor:
    """Extract only relevant sections for action item generation."""

    async def extract_relevant_sections(self, notes: str) -> str:
        """Filter notes to only action-item-relevant content."""

        prompt = f"""
        Extract ONLY the sections that contain:
        - Action items
        - Decisions
        - Commitments
        - Next steps
        - Assignments

        Ignore:
        - General discussion
        - Background information
        - Informational content

        Meeting notes:
        {notes[:2000]}... [truncated]

        Return the relevant excerpts.
        """

        relevant_content = await self.llm.acomplete(prompt)
        return relevant_content.text

5. Adaptive Strategy Selection

# src/core/workflows/sub_workflows/meeting_notes_workflow.py

class AdaptiveMeetingNotesProcessor:
    """Select processing strategy based on content size."""

    def __init__(self, llm: LLM):
        self.llm = llm
        self.tokenizer = tiktoken.get_encoding("cl100k_base")

    async def process(self, notes: str) -> ActionItemsList:
        """Adaptively select processing strategy."""

        token_count = len(self.tokenizer.encode(notes))

        # Strategy selection
        if token_count < 4000:
            # Direct processing
            return await self._process_direct(notes)

        elif token_count < 10000:
            # Selective extraction
            extractor = SelectiveExtractor(self.llm)
            relevant = await extractor.extract_relevant_sections(notes)
            return await self._process_direct(relevant)

        elif token_count < 30000:
            # Hierarchical processing
            processor = HierarchicalProcessor(self.llm)
            return await processor.process(notes)

        else:
            # Streaming processing for very large notes
            processor = StreamingProcessor(self.llm)
            items = []

            def collect_item(item: ActionItem):
                items.append(item)

            await processor.process_stream(notes, collect_item)
            return ActionItemsList(action_items=items)

6. Configuration

// config.json
{
  "meeting_notes_processing": {
    "max_tokens": 8000,
    "chunking": {
      "enabled": true,
      "strategy": "sections",  // sections, time, hybrid
      "chunk_size_tokens": 6000,
      "overlap_tokens": 200
    },
    "hierarchical": {
      "enabled": true,
      "max_summary_tokens": 500
    },
    "selective_extraction": {
      "enabled": true,
      "threshold_tokens": 4000
    }
  }
}

Performance Optimizations

1. Caching

# Cache chunk summaries
cache_key = f"summary:{hash(chunk.content)}"
cached_summary = cache.get(cache_key)

if not cached_summary:
    summary = await self._summarize_chunk(chunk)
    cache.set(cache_key, summary, ttl=3600)
else:
    summary = cached_summary

2. Parallel Processing

# Process chunks in parallel
summaries = await asyncio.gather(*[
    self._summarize_chunk(chunk)
    for chunk in chunks
])

Acceptance Criteria

  • Token counting implemented
  • Chunking strategies implemented (sections, time)
  • Chunk summary caching
  • Parallel chunk processing
  • Deduplication of action items
  • Handle edge cases (empty chunks, single giant chunk)
  • Tests with various note sizes
  • Performance benchmarks
  • Documentation with examples

Priority: 🟡 Medium-High
Effort: 10-12 hours
Difficulty: High
Dependencies: May benefit from Issue #1 (transcript support)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestscalabilityScalability improvements

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions