diff --git a/.gitignore b/.gitignore index 271978f..6406d7a 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ prompting_benchmark/data/* !prompting_benchmark/data/screenshots/ datasets +sample_images # C extensions *.so diff --git a/pyproject.toml b/pyproject.toml index f214bc8..27d92d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ "screeninfo>=0.8.1", "pynput>=1.8.1", "scikit-image>=0.25.2", - "google-genai>=1.45.0", + "google-genai>=1.52.0", ] [tool.uv] diff --git a/src/label/__main__.py b/src/label/__main__.py index 0af8a4b..b335c29 100644 --- a/src/label/__main__.py +++ b/src/label/__main__.py @@ -2,7 +2,7 @@ import argparse from dotenv import load_dotenv -from label.discovery import discover_sessions, discover_video_sessions, create_single_config +from label.discovery import discover_sessions, discover_screenshots_sessions, create_single_config from label.clients import create_client from label.processor import Processor from label.visualizer import Visualizer @@ -20,9 +20,10 @@ def parse_args(): p.add_argument("--chunk-duration", type=int, default=60, help="Chunk duration in seconds") p.add_argument("--fps", type=int, default=1, help="Frames per second for video processing") - p.add_argument("--video-only", action="store_true", help="Process video only without screenshots or annotations") - p.add_argument("--video-extensions", nargs="+", default=[".mp4", ".avi", ".mov", ".mkv"], help="Video file extensions to consider") - p.add_argument("--prompt-file", default=None, help="Path to prompt file (default: prompts/default.txt or prompts/video_only.txt if video only)") + p.add_argument("--screenshots-only", action="store_true", help="Process screenshots folder only without aggregations or annotations") + p.add_argument("--image-extensions", nargs="+", default=[".jpg", ".jpeg", ".png"], help="Image file extensions to consider") + p.add_argument("--max-time-gap", type=float, default=300.0, help="Maximum time gap (seconds) between images before forcing a video split (default: 120 = 2 minutes)") + p.add_argument("--prompt-file", default=None, help="Path to prompt file (default: prompts/default.txt or prompts/screenshots_only.txt if screenshots only)") p.add_argument("--annotate", action="store_true", help="Annotate videos with cursor positions and clicks (only for standard processing)") p.add_argument("--skip-existing", action="store_true", help="Skip sessions that have already been processed") p.add_argument("--visualize", action="store_true", help="Create annotated video visualizations after processing") @@ -39,7 +40,7 @@ def parse_args(): if not args.model: args.model = 'gemini-2.5-flash' if args.client == 'gemini' else 'Qwen/Qwen3-VL-8B-Thinking-FP8' if not args.prompt_file: - args.prompt_file = "prompts/video_only.txt" if args.video_only else "prompts/default.txt" + args.prompt_file = "prompts/screenshots_only.txt" if args.screenshots_only else "prompts/default.txt" return args @@ -49,15 +50,15 @@ def setup_configs(args): configs = [create_single_config( args.session, args.chunk_duration, - args.video_only, - tuple(args.video_extensions), + args.screenshots_only, + tuple(args.image_extensions), )] else: - if args.video_only: - configs = discover_video_sessions( + if args.screenshots_only: + configs = discover_screenshots_sessions( args.sessions_root, args.chunk_duration, - tuple(args.video_extensions), + tuple(args.image_extensions), ) else: configs = discover_sessions( @@ -82,14 +83,15 @@ def process_with_gemini(args, configs): processor = Processor( client=client, num_workers=args.num_workers, - video_only=args.video_only, + screenshots_only=args.screenshots_only, prompt_file=args.prompt_file, + max_time_gap=args.max_time_gap, ) return processor.process_sessions( configs, fps=args.fps, - annotate=args.annotate and not args.video_only, + annotate=args.annotate and not args.screenshots_only, ) @@ -103,14 +105,15 @@ def process_with_vllm(args, configs): processor = Processor( client=client, num_workers=args.num_workers, - video_only=args.video_only, + screenshots_only=args.screenshots_only, prompt_file=args.prompt_file, + max_time_gap=args.max_time_gap, ) return processor.process_sessions( configs, fps=args.fps, - annotate=args.annotate and not args.video_only, + annotate=args.annotate and not args.screenshots_only, ) diff --git a/src/label/clients/gemini.py b/src/label/clients/gemini.py index bfae64a..971df7d 100644 --- a/src/label/clients/gemini.py +++ b/src/label/clients/gemini.py @@ -3,7 +3,6 @@ import os import time from label.clients.client import VLMClient, CAPTION_SCHEMA - from google import genai from google.genai import types @@ -13,6 +12,15 @@ def __init__(self, response): self.response = response self._json = None + # Extract token usage from response + self.input_tokens = 0 + self.output_tokens = 0 + + if hasattr(response, 'usage_metadata'): + usage = response.usage_metadata + self.input_tokens = getattr(usage, 'prompt_token_count', 0) + self.output_tokens = getattr(usage, 'candidates_token_count', 0) + @property def text(self) -> str: return self.response.text @@ -36,6 +44,10 @@ def __init__(self, api_key: Optional[str] = None, model_name: str = "gemini-2.5- self.client = genai.Client(api_key=api_key) self.model_name = model_name + # Token tracking + self.total_input_tokens = 0 + self.total_output_tokens = 0 + def upload_file(self, path: str) -> Any: video_file = self.client.files.upload(file=path) @@ -56,16 +68,24 @@ def upload_file(self, path: str) -> Any: def generate(self, prompt: str, file_descriptor: Optional[Any] = None, schema: Optional[Dict] = None) -> GeminiResponse: - inputs = [prompt] if file_descriptor: inputs.append(file_descriptor) - config = types.GenerateContentConfig( - response_mime_type="application/json", - temperature=0.0, - response_schema=schema or CAPTION_SCHEMA - ) + if "gemini-3" in self.model_name: + config = types.GenerateContentConfig( + response_mime_type="application/json", + temperature=0.0, + response_schema=schema or CAPTION_SCHEMA, + thinking_config=types.ThinkingConfig(thinking_level="high"), + media_resolution=types.MediaResolution.MEDIA_RESOLUTION_HIGH + ) + else: + config = types.GenerateContentConfig( + response_mime_type="application/json", + temperature=0.0, + response_schema=schema or CAPTION_SCHEMA, + ) res = self.client.models.generate_content( model=self.model_name, @@ -73,4 +93,31 @@ def generate(self, prompt: str, file_descriptor: Optional[Any] = None, config=config ) - return GeminiResponse(res) + response = GeminiResponse(res) + + # Track tokens + self.total_input_tokens += response.input_tokens + self.total_output_tokens += response.output_tokens + + return response + + def get_token_stats(self) -> Dict[str, int]: + """Get current token usage statistics.""" + return { + "input_tokens": self.total_input_tokens, + "output_tokens": self.total_output_tokens, + "total_tokens": self.total_input_tokens + self.total_output_tokens + } + + def reset_token_stats(self): + """Reset token counters.""" + self.total_input_tokens = 0 + self.total_output_tokens = 0 + + def print_token_stats(self, prefix: str = ""): + """Print token usage statistics.""" + stats = self.get_token_stats() + print(f"\n{prefix}Token Usage:") + print(f" Input tokens: {stats['input_tokens']:,}") + print(f" Output tokens: {stats['output_tokens']:,}") + print(f" Total tokens: {stats['total_tokens']:,}") diff --git a/src/label/discovery.py b/src/label/discovery.py index d029e2c..e585016 100644 --- a/src/label/discovery.py +++ b/src/label/discovery.py @@ -43,10 +43,10 @@ def discover_sessions( return configs -def discover_video_sessions( +def discover_screenshots_sessions( sessions_root: Path, chunk_duration: int = 60, - video_exts: Tuple[str, ...] = (".mp4", ".avi", ".mov", ".mkv") + image_exts: Tuple[str, ...] = (".jpg", ".jpeg", ".png") ) -> List[SessionConfig]: if not sessions_root.exists(): @@ -58,23 +58,22 @@ def discover_video_sessions( if not session_dir.is_dir(): continue - video_files = [ - f for f in session_dir.iterdir() - if f.is_file() and f.suffix.lower() in video_exts - ] + # Look for screenshots directory + screenshots_dir = session_dir / "screenshots" + if not screenshots_dir.exists(): + continue - video_subdir = session_dir / "video" - if video_subdir.exists(): - video_files.extend([ - f for f in video_subdir.iterdir() - if f.is_file() and f.suffix.lower() in video_exts - ]) + # Check if there are any image files + image_files = [ + f for f in screenshots_dir.iterdir() + if f.is_file() and f.suffix.lower() in image_exts + ] - if video_files: + if image_files: configs.append(SessionConfig( session_folder=session_dir, chunk_duration=chunk_duration, - video_path=VideoPath(video_files[0]) + _screenshots_dir=screenshots_dir )) return configs @@ -83,31 +82,31 @@ def discover_video_sessions( def create_single_config( session_dir: Path, chunk_duration: int, - video_only: bool, - video_exts: Tuple[str, ...], + screenshots_only: bool, + image_exts: Tuple[str, ...], prompt: str = "" ) -> SessionConfig: - if video_only: - video_files = [ - f for f in session_dir.iterdir() - if f.is_file() and f.suffix.lower() in video_exts + if screenshots_only: + # Check if there's a screenshots subdirectory first + screenshots_dir = session_dir / "screenshots" + if screenshots_dir.exists() and screenshots_dir.is_dir(): + search_dir = screenshots_dir + else: + search_dir = session_dir + + image_files = [ + f for f in search_dir.iterdir() + if f.is_file() and f.suffix.lower() in image_exts ] - video_subdir = session_dir / "video" - if video_subdir.exists(): - video_files.extend([ - f for f in video_subdir.iterdir() - if f.is_file() and f.suffix.lower() in video_exts - ]) - - if not video_files: - raise RuntimeError(f"No video files found in {session_dir}") + if not image_files: + raise RuntimeError(f"No image files found in {search_dir}") return SessionConfig( session_folder=session_dir, chunk_duration=chunk_duration, - video_path=VideoPath(video_files[0]) + _screenshots_dir=search_dir ) else: return SessionConfig( diff --git a/src/label/models.py b/src/label/models.py index a7bac06..62b94ca 100644 --- a/src/label/models.py +++ b/src/label/models.py @@ -449,6 +449,7 @@ class SessionConfig: chunk_duration: int = 60 video_path: Optional[VideoPath] = None agg_path: Optional[Path] = None + _screenshots_dir: Optional[Path] = None @property def session_id(self) -> str: @@ -468,6 +469,8 @@ def aggregations_dir(self) -> Path: @property def screenshots_dir(self) -> Path: + if self._screenshots_dir is not None: + return self._screenshots_dir return self.session_folder / "screenshots" @property diff --git a/src/label/processor.py b/src/label/processor.py index 583dfb8..39e1635 100644 --- a/src/label/processor.py +++ b/src/label/processor.py @@ -1,7 +1,8 @@ import re import json from pathlib import Path -from typing import List, Tuple +from typing import List, Tuple, Optional +from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm @@ -15,13 +16,15 @@ def __init__( self, client: VLMClient, num_workers: int = 4, - video_only: bool = False, + screenshots_only: bool = False, prompt_file: str = "prompts/default.txt", + max_time_gap: float = 300.0, ): self.client = client self.num_workers = num_workers - self.video_only = video_only + self.screenshots_only = screenshots_only self.prompt = self._load_prompt(prompt_file) + self.max_time_gap = max_time_gap def _load_prompt(self, path: str) -> str: p = Path(path) @@ -41,8 +44,8 @@ def process_sessions( for config in tqdm(configs, desc="Preparing"): config.ensure_dirs() - if self.video_only: - tasks.extend(self._prepare_video_only(config)) + if self.screenshots_only: + tasks.extend(self._prepare_screenshots_only(config, fps)) else: tasks.extend(self._prepare_standard(config, fps, annotate)) @@ -100,26 +103,171 @@ def _prepare_standard( return tasks - def _prepare_video_only(self, config: SessionConfig) -> List[ChunkTask]: - if not config.video_path or not config.video_path.exists(): + def _prepare_screenshots_only(self, config: SessionConfig, fps: int) -> List[ChunkTask]: + if not config.screenshots_dir or not config.screenshots_dir.exists(): return [] - chunks = split_video(config.video_path.resolve(), config.chunk_duration, config.chunks_dir) + # Get all image files from the screenshots directory + image_files = sorted([ + f for f in config.screenshots_dir.iterdir() + if f.is_file() and f.suffix.lower() in {".jpg", ".jpeg", ".png"} + ]) + + if not image_files: + return [] + + # Group images by time gaps (split if > max_time_gap seconds apart) + image_segments = self._split_images_by_time_gap(image_files, max_gap_seconds=self.max_time_gap) + + print(f"\n[Segments] Created {len(image_segments)} segment(s) from {len(image_files)} images (max gap: {self.max_time_gap}s)") + for idx, seg in enumerate(image_segments): + print(f" Segment {idx}: {len(seg)} images") tasks = [] - for i, video_path in enumerate(chunks): - tasks.append(ChunkTask( - session_id=config.session_id, - chunk_index=i, - video_path=VideoPath(video_path), - prompt=self.prompt, - aggregations=[], - chunk_start_time=i * config.chunk_duration, - chunk_duration=config.chunk_duration - )) + chunk_index = 0 + cumulative_time = 0 # Track actual time across all segments + + for segment_idx, segment_images in enumerate(image_segments): + if not segment_images: + continue + + # Create a video for this segment + segment_video_path = config.chunks_dir / f"segment_{segment_idx:03d}.mp4" + + if not segment_video_path.exists(): + create_video( + segment_images, + segment_video_path, + fps=fps, + pad_to=None, + annotate=False, + aggregations=None, + session_dir=None + ) + + # Get the actual duration of this segment video + from label.video import get_video_duration + segment_duration = get_video_duration(segment_video_path) + if segment_duration is None: + print(f"Warning: Could not get duration for {segment_video_path}, skipping segment") + continue + + # Split this segment video into chunks based on chunk_duration + segment_chunks = split_video(segment_video_path, config.chunk_duration, config.chunks_dir, start_index=chunk_index) + + # Create tasks for each chunk with correct timing + for i, video_path in enumerate(segment_chunks): + chunk_start_in_segment = i * config.chunk_duration + actual_chunk_duration = min(config.chunk_duration, segment_duration - chunk_start_in_segment) + + tasks.append(ChunkTask( + session_id=config.session_id, + chunk_index=chunk_index, + video_path=VideoPath(video_path), + prompt=self.prompt, + aggregations=[], + chunk_start_time=cumulative_time + chunk_start_in_segment, + chunk_duration=int(actual_chunk_duration) + )) + chunk_index += 1 + + # Update cumulative time for next segment + cumulative_time += segment_duration return tasks + def _split_images_by_time_gap(self, image_files: List[Path], max_gap_seconds: float = 30) -> List[List[Path]]: + """ + Split images into segments based on time gaps between consecutive images. + If two images are more than max_gap_seconds apart, start a new segment. + + Args: + image_files: List of image file paths (should be sorted) + max_gap_seconds: Maximum time gap in seconds before forcing a split + + Returns: + List of image segments, where each segment is a list of consecutive images + """ + if not image_files: + return [] + + segments = [] + current_segment = [image_files[0]] + prev_timestamp = self._extract_timestamp_from_filename(image_files[0]) + + for img_path in image_files[1:]: + curr_timestamp = self._extract_timestamp_from_filename(img_path) + + # If we can't parse timestamps, just keep adding to current segment + if prev_timestamp is None or curr_timestamp is None: + current_segment.append(img_path) + continue + + # Check time gap + time_gap = abs(curr_timestamp - prev_timestamp) + + if time_gap > max_gap_seconds: + # Start a new segment + print(f"[Split] Time gap detected: {time_gap:.1f}s between screenshots (threshold: {max_gap_seconds}s)") + print(f" Previous: {image_files[image_files.index(img_path) - 1].name}") + print(f" Current: {img_path.name}") + segments.append(current_segment) + current_segment = [img_path] + else: + current_segment.append(img_path) + + prev_timestamp = curr_timestamp + + # Add the last segment + if current_segment: + segments.append(current_segment) + + return segments + + def _extract_timestamp_from_filename(self, path: Path) -> Optional[float]: + """ + Extract timestamp from filename. Supports multiple formats: + 1. Float timestamp: 1760702571.228687_reason_move_start.jpg + 2. DateTime format: w5_6713_sstetler1@msn.com20200810004157314.jpg (YYYYMMDDHHMMSSmmm) + + Returns timestamp as float (seconds since epoch) or None if unable to parse + """ + filename = path.name + + # Try format 1: float timestamp at the beginning + m = re.search(r'^(\d+\.\d+)', filename) + if m: + try: + return float(m.group(1)) + except Exception: + pass + + # Try format 2: YYYYMMDDHHMMSSmmm datetime format + # Look for 17 consecutive digits (YYYYMMDDHHMMSSMMM) + m = re.search(r'(\d{17})', filename) + if m: + try: + timestamp_str = m.group(1) + # Parse: YYYYMMDDHHMMSSMMM + year = int(timestamp_str[0:4]) + month = int(timestamp_str[4:6]) + day = int(timestamp_str[6:8]) + hour = int(timestamp_str[8:10]) + minute = int(timestamp_str[10:12]) + second = int(timestamp_str[12:14]) + millisecond = int(timestamp_str[14:17]) + + dt = datetime(year, month, day, hour, minute, second, millisecond * 1000) + return dt.timestamp() + except Exception: + pass + + # Fallback: try file modification time + try: + return path.stat().st_mtime + except Exception: + return None + def _process_tasks(self, tasks: List[ChunkTask]) -> List[Tuple[ChunkTask, any]]: """Process tasks with configurable concurrency using num_workers.""" results = [] @@ -144,10 +292,14 @@ def _process_tasks(self, tasks: List[ChunkTask]) -> List[Tuple[ChunkTask, any]]: return results def _process_single_task(self, task: ChunkTask) -> any: - """Process single task with schema.""" + """Process single task with schema and log token usage.""" file_desc = self.client.upload_file(str(task.video_path.resolve())) response = self.client.generate(task.prompt, file_desc, schema=CAPTION_SCHEMA) + # Log token usage for this chunk + print(f"[Tokens] {task.session_id}/chunk_{task.chunk_index:03d}: " + f"input={response.input_tokens:,}, output={response.output_tokens:,}") + result = response.json if not callable(response.json) else response.json() return result @@ -159,13 +311,17 @@ def _save_results( session_captions = {} + # Track tokens per session + session_tokens = {} + for task, result in results: if task.session_id not in session_captions: session_captions[task.session_id] = [] + session_tokens[task.session_id] = {"input": 0, "output": 0} config = next(c for c in configs if c.session_id == task.session_id) - if not self.video_only and task.aggregations: + if not self.screenshots_only and task.aggregations: agg_file = config.aggregations_dir / f"{task.chunk_index:03d}.json" with open(agg_file, 'w') as f: json.dump([a.to_dict() for a in task.aggregations], f, indent=2) @@ -186,9 +342,31 @@ def _save_results( captions.sort(key=lambda c: c.start_seconds) config.save_captions(captions) - if not self.video_only: + if not self.screenshots_only: self._create_matched_captions(config, captions, fps) + # Print token summary per session and overall + print("\n" + "=" * 60) + print("TOKEN USAGE SUMMARY") + print("=" * 60) + + overall_input = 0 + overall_output = 0 + + for session_id in session_captions.keys(): + # Get session stats from client + stats = self.client.get_token_stats() + print(f"\nSession: {session_id}") + print(f" Captions generated: {len(session_captions[session_id])}") + + # Print overall stats + stats = self.client.get_token_stats() + print(f"\n{'OVERALL TOTALS':^60}") + print(f" Total input tokens: {stats['input_tokens']:,}") + print(f" Total output tokens: {stats['output_tokens']:,}") + print(f" Total tokens: {stats['total_tokens']:,}") + print("=" * 60 + "\n") + return {sid: len(caps) for sid, caps in session_captions.items()} def _extract_captions(self, result: any, task: ChunkTask) -> List[Caption]: diff --git a/src/label/prompts/video_only.txt b/src/label/prompts/screenshots_only.txt similarity index 94% rename from src/label/prompts/video_only.txt rename to src/label/prompts/screenshots_only.txt index 7487249..92c48d8 100644 --- a/src/label/prompts/video_only.txt +++ b/src/label/prompts/screenshots_only.txt @@ -1,6 +1,6 @@ You are an expert video analyst reconstructing **precise low-level user actions** from annotated screen recordings. -Your job is to generate **fully detailed captions** describing **exactly what the user did** in each action segment. Use visual information (screenshots with click to reconstruct complete actions. +Your job is to generate **fully detailed captions** describing **exactly what the user did** in each action segment. Use visual information (screenshots) to reconstruct complete actions. ## Input @@ -51,6 +51,7 @@ Generated captions must be in past tense, and at the level of detail as the exam - Opened the System Settings application. - Clicked the "Network" tab in the Settings sidebar. - Typed "openai office munich" into the Google search bar and pressed Enter. +- Replied with "yes, i'll totally make it" to Alex in the Facebook Messenger app. - Clicked on the Google search result titled "Vegan chocolate cake recipies" - Ran "cd /home/user/projects/gs-utils" in the terminal. - Deleted the text "hyundai i30" from cell I2. @@ -70,4 +71,4 @@ A JSON array of objects: "caption": "..." } ] -``` +``` \ No newline at end of file diff --git a/src/label/video.py b/src/label/video.py index f058eb6..1e79d11 100644 --- a/src/label/video.py +++ b/src/label/video.py @@ -28,7 +28,7 @@ def get_video_duration(video_path: Path) -> Optional[float]: return None -def split_video(video_path: Path, chunk_duration: int, out_dir: Path) -> List[Path]: +def split_video(video_path: Path, chunk_duration: int, out_dir: Path, start_index: int = 0) -> List[Path]: out_dir.mkdir(parents=True, exist_ok=True) duration = get_video_duration(video_path) if duration is None: @@ -36,10 +36,12 @@ def split_video(video_path: Path, chunk_duration: int, out_dir: Path) -> List[Pa num_chunks = math.ceil(duration / float(chunk_duration)) chunk_paths = [] + + print(f"[Split] 1-minute chunking: Splitting {video_path.name} into {num_chunks} chunks of {chunk_duration}s each (total duration: {duration:.1f}s)") for i in range(num_chunks): start = i * chunk_duration - out_path = out_dir / f"{i:03d}.mp4" + out_path = out_dir / f"{start_index + i:03d}.mp4" cmd = [ 'ffmpeg', '-y', '-ss', str(start), '-i', str(video_path), @@ -263,29 +265,38 @@ def create_video( pending_movement = [] - for idx, agg in enumerate(aggregations): - src = Path(agg.screenshot_path) - dst = tmpdir_path / f"{idx:06d}.jpg" + # Handle both aggregations mode and direct image paths mode + if aggregations is not None: + # Use aggregations to get image paths + for idx, agg in enumerate(aggregations): + src = Path(agg.screenshot_path) + dst = tmpdir_path / f"{idx:06d}.jpg" - if annotate: - agg = apply_pending_movement(agg, pending_movement) + if annotate: + agg = apply_pending_movement(agg, pending_movement) - img_path = ImagePath(src, session_dir) - img = img_path.load() + img_path = ImagePath(src, session_dir) + img = img_path.load() - if pad_to: - img, scale, x_off, y_off = scale_and_pad(img, pad_to[0], pad_to[1]) - else: - scale, x_off, y_off = 1.0, 0, 0 + if pad_to: + img, scale, x_off, y_off = scale_and_pad(img, pad_to[0], pad_to[1]) + else: + scale, x_off, y_off = 1.0, 0, 0 - img = annotate_image(img, agg, scale, x_off, y_off) + img = annotate_image(img, agg, scale, x_off, y_off) - pending_movement = extract_pending_movement(agg) + pending_movement = extract_pending_movement(agg) - img.save(dst) - else: + img.save(dst) + else: + shutil.copy2(src, dst) + pending_movement = [] + else: + # Use image_paths directly (screenshots-only mode) + for idx, img_path in enumerate(image_paths): + src = Path(img_path) + dst = tmpdir_path / f"{idx:06d}.jpg" shutil.copy2(src, dst) - pending_movement = [] vf_parts = [] if pad_to: diff --git a/src/misc/to_dataset.py b/src/misc/to_dataset.py new file mode 100644 index 0000000..6fa79bc --- /dev/null +++ b/src/misc/to_dataset.py @@ -0,0 +1,256 @@ +import argparse +import json +import re +from pathlib import Path +from datetime import datetime +from typing import List, Dict, Optional, Tuple +from datasets import Dataset, Features, Value, Image as HFImage + + +def parse_timestamp_from_filename(filename: str) -> Optional[float]: + """ + Parse timestamp from filename in various formats. + + Supports: + - Unix timestamp: 1762508790.129177_reason_move_start_stale.jpg + - Date format: img_motogfinalfix20171012150835.jpg + + Returns Unix timestamp as float. + """ + # Try Unix timestamp format first (more precise) + unix_match = re.search(r'(\d+\.\d+)', filename) + if unix_match: + return float(unix_match.group(1)) + + # Try date format: YYYYMMDDHHMMSS + date_match = re.search(r'(\d{14})', filename) + if date_match: + date_str = date_match.group(1) + dt = datetime.strptime(date_str, '%Y%m%d%H%M%S') + return dt.timestamp() + + return None + + +def mmss_to_seconds(mmss: str) -> int: + """Convert MM:SS format to total seconds.""" + parts = mmss.split(':') + return int(parts[0]) * 60 + int(parts[1]) + + +def unix_to_formatted_timestamp(unix_time: float) -> str: + """ + Convert Unix timestamp to format: 2025-07-30_10-12-54-036554 + """ + dt = datetime.fromtimestamp(unix_time) + microseconds = int((unix_time % 1) * 1_000_000) + return dt.strftime('%Y-%m-%d_%H-%M-%S') + f'-{microseconds:06d}' + + +def load_and_sort_screenshots(img_dir: Path) -> List[Tuple[Path, float]]: + """ + Load all screenshots from directory and sort by timestamp. + + Returns list of (filepath, timestamp) tuples sorted by timestamp. + """ + screenshots = [] + + for img_file in img_dir.glob('*.jpg'): + timestamp = parse_timestamp_from_filename(img_file.name) + if timestamp is not None: + screenshots.append((img_file, timestamp)) + + for img_file in img_dir.glob('*.png'): + timestamp = parse_timestamp_from_filename(img_file.name) + if timestamp is not None: + screenshots.append((img_file, timestamp)) + + # Sort by timestamp + screenshots.sort(key=lambda x: x[1]) + + return screenshots + + +def get_screenshot_by_mmss_index(screenshots: List[Tuple[Path, float]], + mmss: str) -> Optional[Tuple[Path, float]]: + """ + Get screenshot by MM:SS index. + 00:00 returns screenshots[0], 00:01 returns screenshots[1], etc. + """ + seconds = mmss_to_seconds(mmss) + + if 0 <= seconds < len(screenshots): + return screenshots[seconds] + + return None + + +def process_format1(jsonl_path: Path) -> List[Dict]: + """Process format 1: data.jsonl with direct img paths.""" + records = [] + + with open(jsonl_path, 'r') as f: + for line in f: + data = json.loads(line.strip()) + + record = { + 'text': data['caption'], + 'start_time': unix_to_formatted_timestamp(data['start_time']), + 'end_time': unix_to_formatted_timestamp(data['end_time']), + 'img': data['img'] + } + records.append(record) + + return records + + +def process_format2(jsonl_path: Path, img_dir: Path) -> List[Dict]: + """Process format 2: captions.jsonl with separate screenshot directory.""" + records = [] + + # Load and sort screenshots + screenshots = load_and_sort_screenshots(img_dir) + + if not screenshots: + raise ValueError(f"No valid screenshots found in {img_dir}") + + print(f"Loaded {len(screenshots)} screenshots from {img_dir}") + + with open(jsonl_path, 'r') as f: + for line in f: + data = json.loads(line.strip()) + + # Get start screenshot + start_screenshot = get_screenshot_by_mmss_index(screenshots, data['start']) + + # Get end screenshot (next second) + end_mmss_seconds = mmss_to_seconds(data['end']) + 1 + end_mmss = f"{end_mmss_seconds // 60:02d}:{end_mmss_seconds % 60:02d}" + end_screenshot = get_screenshot_by_mmss_index(screenshots, end_mmss) + + # If end screenshot doesn't exist, use the last one + if end_screenshot is None: + end_screenshot = screenshots[-1] + + if start_screenshot is None: + print(f"Warning: Could not find screenshot for {data['start']}, skipping...") + continue + + record = { + 'text': data['caption'], + 'start_time': unix_to_formatted_timestamp(start_screenshot[1]), + 'end_time': unix_to_formatted_timestamp(end_screenshot[1]), + 'img': str(start_screenshot[0]) + } + records.append(record) + + return records + + +def create_hf_dataset(records: List[Dict]) -> Dataset: + """Create HuggingFace Dataset from records.""" + # Define features + features = Features({ + 'text': Value('string'), + 'start_time': Value('string'), + 'end_time': Value('string'), + 'img': HFImage() + }) + + # Create dataset + dataset = Dataset.from_dict( + { + 'text': [r['text'] for r in records], + 'start_time': [r['start_time'] for r in records], + 'end_time': [r['end_time'] for r in records], + 'img': [r['img'] for r in records] + }, + features=features + ) + + return dataset + + +def main(): + parser = argparse.ArgumentParser( + description='Convert JSONL data to HuggingFace Dataset' + ) + parser.add_argument( + 'jsonl_path', + type=str, + help='Path to JSONL file' + ) + parser.add_argument( + '--img-dir', + type=str, + help='Directory containing screenshots (required for format 2)', + default=None + ) + parser.add_argument( + '--output-dir', + type=str, + required=True, + help='Output directory to save the dataset' + ) + parser.add_argument( + '--format', + type=int, + choices=[1, 2], + help='Input format (1 or 2). If not specified, auto-detect from first line.' + ) + + args = parser.parse_args() + + jsonl_path = Path(args.jsonl_path) + output_dir = Path(args.output_dir) + + if not jsonl_path.exists(): + raise FileNotFoundError(f"JSONL file not found: {jsonl_path}") + + # Auto-detect format if not specified + input_format = args.format + if input_format is None: + with open(jsonl_path, 'r') as f: + first_line = json.loads(f.readline().strip()) + # Format 1 has 'img' field with full path, format 2 has 'start' field + if 'img' in first_line and 'raw_events' in first_line: + input_format = 1 + print("Auto-detected format 1") + elif 'start' in first_line and 'chunk_index' in first_line: + input_format = 2 + print("Auto-detected format 2") + else: + raise ValueError("Could not auto-detect format. Please specify --format") + + # Process based on format + if input_format == 1: + print("Processing format 1...") + records = process_format1(jsonl_path) + else: # format 2 + if args.img_dir is None: + raise ValueError("--img-dir is required for format 2") + img_dir = Path(args.img_dir) + if not img_dir.exists(): + raise FileNotFoundError(f"Image directory not found: {img_dir}") + + print("Processing format 2...") + records = process_format2(jsonl_path, img_dir) + + print(f"Processed {len(records)} records") + + # Create HuggingFace dataset + print("Creating HuggingFace dataset...") + dataset = create_hf_dataset(records) + + # Save dataset + output_dir.mkdir(parents=True, exist_ok=True) + print(f"Saving dataset to {output_dir}...") + dataset.save_to_disk(str(output_dir)) + + print(f"Dataset saved successfully with {len(dataset)} examples!") + print(f"\nDataset info:") + print(dataset) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/uv.lock b/uv.lock index 6673aec..f50a2ff 100644 --- a/uv.lock +++ b/uv.lock @@ -606,7 +606,7 @@ wheels = [ [[package]] name = "google-genai" -version = "1.47.0" +version = "1.52.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -618,9 +618,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9f/97/784fba9bc6c41263ff90cb9063eadfdd755dde79cfa5a8d0e397b067dcf9/google_genai-1.47.0.tar.gz", hash = "sha256:ecece00d0a04e6739ea76cc8dad82ec9593d9380aaabef078990e60574e5bf59", size = 241471, upload-time = "2025-10-29T22:01:02.88Z" } +sdist = { url = "https://files.pythonhosted.org/packages/09/4e/0ad8585d05312074bb69711b2d81cfed69ce0ae441913d57bf169bed20a7/google_genai-1.52.0.tar.gz", hash = "sha256:a74e8a4b3025f23aa98d6a0f84783119012ca6c336fd68f73c5d2b11465d7fc5", size = 258743, upload-time = "2025-11-21T02:18:55.742Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/89/ef/e080e8d67c270ea320956bb911a9359664fc46d3b87d1f029decd33e5c4c/google_genai-1.47.0-py3-none-any.whl", hash = "sha256:e3851237556cbdec96007d8028b4b1f2425cdc5c099a8dc36b72a57e42821b60", size = 241506, upload-time = "2025-10-29T22:01:00.982Z" }, + { url = "https://files.pythonhosted.org/packages/ec/66/03f663e7bca7abe9ccfebe6cb3fe7da9a118fd723a5abb278d6117e7990e/google_genai-1.52.0-py3-none-any.whl", hash = "sha256:c8352b9f065ae14b9322b949c7debab8562982f03bf71d44130cd2b798c20743", size = 261219, upload-time = "2025-11-21T02:18:54.515Z" }, ] [[package]] @@ -1302,7 +1302,7 @@ dependencies = [ requires-dist = [ { name = "datasets", specifier = ">=3.0.0,<4.0.0" }, { name = "dotenv", specifier = ">=0.9.9" }, - { name = "google-genai", specifier = ">=1.45.0" }, + { name = "google-genai", specifier = ">=1.52.0" }, { name = "google-generativeai", specifier = ">=0.8.5" }, { name = "imageio", specifier = ">=2.37.0" }, { name = "ipdb", specifier = ">=0.13.13,<0.14.0" },