from __future__ import annotations import argparse from dataclasses import dataclass from datetime import datetime from pathlib import Path import cv2 import numpy as np import pyrealsense2 as rs SAVE_DIR = Path("d405_recordings") WINDOW_NAME = "RealSense D405 Video Recorder" PREFERRED_COLOR_FORMATS = (rs.format.bgr8, rs.format.rgb8) @dataclass(frozen=True) class ColorStreamConfig: width: int height: int fps: int stream_format: rs.format def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Record color video from an Intel RealSense D405 camera.") parser.add_argument("--width", type=int, default=1280, help="Color stream width. Default: 1280") parser.add_argument("--height", type=int, default=720, help="Color stream height. Default: 720") parser.add_argument("--fps", type=int, default=30, help="Color stream FPS. Default: 30") parser.add_argument( "--list-profiles", action="store_true", help="List supported color video profiles for the connected camera and exit.", ) return parser.parse_args() def get_color_stream_candidates() -> list[ColorStreamConfig]: context = rs.context() devices = context.query_devices() if len(devices) == 0: raise RuntimeError("No RealSense device detected.") device = devices[0] stream_candidates: list[ColorStreamConfig] = [] for sensor in device.sensors: for profile in sensor.get_stream_profiles(): if profile.stream_type() != rs.stream.color: continue if profile.format() not in PREFERRED_COLOR_FORMATS: continue try: video_profile = profile.as_video_stream_profile() except RuntimeError: continue stream_candidates.append( ColorStreamConfig( width=video_profile.width(), height=video_profile.height(), fps=profile.fps(), stream_format=profile.format(), ) ) if not stream_candidates: raise RuntimeError("No usable color stream profile found for the RealSense device.") return stream_candidates def list_profiles() -> None: try: stream_candidates = get_color_stream_candidates() except RuntimeError as exc: print(exc) return print("Supported color stream profiles:") for candidate in sorted( set(stream_candidates), key=lambda item: (item.width, item.height, item.fps, str(item.stream_format)), ): print(f" {candidate.width}x{candidate.height}@{candidate.fps} {candidate.stream_format}") def select_color_stream(width: int, height: int, fps: int) -> ColorStreamConfig: stream_candidates = get_color_stream_candidates() matching_candidates = [ candidate for candidate in stream_candidates if candidate.width == width and candidate.height == height and candidate.fps == fps ] if not matching_candidates: raise RuntimeError( f"No supported color stream profile matches {width}x{height}@{fps}. " "Run with --list-profiles to see available profiles." ) def sort_key(candidate: ColorStreamConfig) -> int: return 1 if candidate.stream_format == rs.format.bgr8 else 0 return max(matching_candidates, key=sort_key) def init_camera(stream_config: ColorStreamConfig) -> rs.pipeline: pipeline = rs.pipeline() config = rs.config() config.enable_stream( rs.stream.color, stream_config.width, stream_config.height, stream_config.stream_format, stream_config.fps, ) pipeline.start(config) return pipeline def build_video_writer(output_path: Path, stream_config: ColorStreamConfig) -> cv2.VideoWriter: fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter( str(output_path), fourcc, stream_config.fps, (stream_config.width, stream_config.height), ) if not writer.isOpened(): raise RuntimeError(f"Failed to create video writer: {output_path}") return writer def draw_status(frame: np.ndarray, is_recording: bool, output_path: Path | None) -> np.ndarray: preview = frame.copy() frame_height = preview.shape[0] if is_recording: cv2.circle(preview, (25, 30), 8, (0, 0, 255), -1) cv2.putText( preview, "REC", (40, 36), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, ) else: cv2.putText( preview, "Press 's' to start recording", (20, 36), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2, ) cv2.putText( preview, "Press 'q' to stop and quit", (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) if output_path is not None: cv2.putText( preview, f"Saving: {output_path.name}", (20, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, ) return preview def frame_to_bgr(color_frame: rs.video_frame, stream_config: ColorStreamConfig) -> np.ndarray: color_image = np.asanyarray(color_frame.get_data()) if stream_config.stream_format == rs.format.rgb8: return cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) return color_image def main() -> None: args = parse_args() if args.list_profiles: list_profiles() return SAVE_DIR.mkdir(parents=True, exist_ok=True) stream_config = select_color_stream(args.width, args.height, args.fps) pipeline = init_camera(stream_config) writer: cv2.VideoWriter | None = None output_path: Path | None = None is_recording = False print("-" * 40) print("RealSense D405 video recorder") print( "Selected color stream: " f"{stream_config.width}x{stream_config.height} @ {stream_config.fps} FPS " f"({stream_config.stream_format})" ) print("Press 's' to start recording") print("Press 'q' to stop recording and quit") print(f"Video files will be saved to: {SAVE_DIR.resolve()}") print("-" * 40) try: while True: frames = pipeline.wait_for_frames(timeout_ms=2000) color_frame = frames.get_color_frame() if not color_frame: continue color_image = frame_to_bgr(color_frame, stream_config) if is_recording and writer is not None: writer.write(color_image) preview = draw_status(color_image, is_recording, output_path) cv2.imshow(WINDOW_NAME, preview) key = cv2.waitKey(1) & 0xFF if key == ord("s") and not is_recording: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = SAVE_DIR / f"d405_record_{timestamp}.mp4" writer = build_video_writer(output_path, stream_config) is_recording = True print(f"Recording started: {output_path}") if key == ord("q"): if is_recording: print("Recording stopped.") else: print("Exit without recording.") break finally: if writer is not None: writer.release() pipeline.stop() cv2.destroyAllWindows() if output_path is not None and output_path.exists(): print(f"Saved video: {output_path.resolve()}") if __name__ == "__main__": main()