from __future__ import annotations import argparse from pathlib import Path import cv2 DEFAULT_VIDEO_DIR = Path("d405_recordings") DEFAULT_OUTPUT_DIR = Path("d405_rgb_frames") DEFAULT_TARGET_FPS = 8.0 def find_latest_video(video_dir: Path) -> Path: candidates = sorted(video_dir.glob("*.mp4"), key=lambda path: path.stat().st_mtime, reverse=True) if not candidates: raise FileNotFoundError(f"No .mp4 videos found in {video_dir.resolve()}") return candidates[0] def extract_frames(video_path: Path, output_dir: Path, target_fps: float) -> None: cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {video_path}") source_fps = cap.get(cv2.CAP_PROP_FPS) if source_fps <= 0: source_fps = 30.0 frame_interval = max(source_fps / target_fps, 1.0) next_frame_to_save = 0.0 frame_index = 0 saved_count = 0 output_dir.mkdir(parents=True, exist_ok=True) print("-" * 40) print(f"Input video : {video_path.resolve()}") print(f"Output dir : {output_dir.resolve()}") print(f"Source FPS : {source_fps:.2f}") print(f"Target FPS : {target_fps:.2f}") print("-" * 40) try: while True: success, frame = cap.read() if not success: break if frame_index + 1e-6 >= next_frame_to_save: saved_count += 1 image_path = output_dir / f"{saved_count:06d}.jpg" cv2.imwrite(str(image_path), frame) next_frame_to_save += frame_interval frame_index += 1 finally: cap.release() print(f"Saved {saved_count} RGB frames.") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Extract RGB frames from a recorded D405 video at 8 FPS.", ) parser.add_argument( "--video", type=Path, default=None, help="Path to the input video. If omitted, the latest video in d405_recordings is used.", ) parser.add_argument( "--output-dir", type=Path, default=None, help="Directory to save extracted RGB images.", ) parser.add_argument( "--target-fps", type=float, default=DEFAULT_TARGET_FPS, help="Frame extraction rate. Default is 8 FPS.", ) return parser.parse_args() def main() -> None: args = parse_args() video_path = args.video if args.video is not None else find_latest_video(DEFAULT_VIDEO_DIR) if not video_path.exists(): raise FileNotFoundError(f"Video file does not exist: {video_path.resolve()}") output_dir = args.output_dir if output_dir is None: output_dir = DEFAULT_OUTPUT_DIR / video_path.stem if args.target_fps <= 0: raise ValueError("--target-fps must be greater than 0.") extract_frames(video_path, output_dir, args.target_fps) if __name__ == "__main__": main()