This commit is contained in:
2026-05-06 10:47:14 +08:00
commit e3be8861b8
9 changed files with 4380 additions and 0 deletions

View File

@ -0,0 +1,258 @@
from __future__ import annotations
import argparse
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
import pyrealsense2 as rs
SAVE_DIR = Path("d405_recordings")
WINDOW_NAME = "RealSense D405 Video Recorder"
PREFERRED_COLOR_FORMATS = (rs.format.bgr8, rs.format.rgb8)
@dataclass(frozen=True)
class ColorStreamConfig:
width: int
height: int
fps: int
stream_format: rs.format
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Record color video from an Intel RealSense D405 camera.")
parser.add_argument("--width", type=int, default=1280, help="Color stream width. Default: 1280")
parser.add_argument("--height", type=int, default=720, help="Color stream height. Default: 720")
parser.add_argument("--fps", type=int, default=30, help="Color stream FPS. Default: 30")
parser.add_argument(
"--list-profiles",
action="store_true",
help="List supported color video profiles for the connected camera and exit.",
)
return parser.parse_args()
def get_color_stream_candidates() -> list[ColorStreamConfig]:
context = rs.context()
devices = context.query_devices()
if len(devices) == 0:
raise RuntimeError("No RealSense device detected.")
device = devices[0]
stream_candidates: list[ColorStreamConfig] = []
for sensor in device.sensors:
for profile in sensor.get_stream_profiles():
if profile.stream_type() != rs.stream.color:
continue
if profile.format() not in PREFERRED_COLOR_FORMATS:
continue
try:
video_profile = profile.as_video_stream_profile()
except RuntimeError:
continue
stream_candidates.append(
ColorStreamConfig(
width=video_profile.width(),
height=video_profile.height(),
fps=profile.fps(),
stream_format=profile.format(),
)
)
if not stream_candidates:
raise RuntimeError("No usable color stream profile found for the RealSense device.")
return stream_candidates
def list_profiles() -> None:
try:
stream_candidates = get_color_stream_candidates()
except RuntimeError as exc:
print(exc)
return
print("Supported color stream profiles:")
for candidate in sorted(
set(stream_candidates),
key=lambda item: (item.width, item.height, item.fps, str(item.stream_format)),
):
print(f" {candidate.width}x{candidate.height}@{candidate.fps} {candidate.stream_format}")
def select_color_stream(width: int, height: int, fps: int) -> ColorStreamConfig:
stream_candidates = get_color_stream_candidates()
matching_candidates = [
candidate
for candidate in stream_candidates
if candidate.width == width and candidate.height == height and candidate.fps == fps
]
if not matching_candidates:
raise RuntimeError(
f"No supported color stream profile matches {width}x{height}@{fps}. "
"Run with --list-profiles to see available profiles."
)
def sort_key(candidate: ColorStreamConfig) -> int:
return 1 if candidate.stream_format == rs.format.bgr8 else 0
return max(matching_candidates, key=sort_key)
def init_camera(stream_config: ColorStreamConfig) -> rs.pipeline:
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(
rs.stream.color,
stream_config.width,
stream_config.height,
stream_config.stream_format,
stream_config.fps,
)
pipeline.start(config)
return pipeline
def build_video_writer(output_path: Path, stream_config: ColorStreamConfig) -> cv2.VideoWriter:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(
str(output_path),
fourcc,
stream_config.fps,
(stream_config.width, stream_config.height),
)
if not writer.isOpened():
raise RuntimeError(f"Failed to create video writer: {output_path}")
return writer
def draw_status(frame: np.ndarray, is_recording: bool, output_path: Path | None) -> np.ndarray:
preview = frame.copy()
frame_height = preview.shape[0]
if is_recording:
cv2.circle(preview, (25, 30), 8, (0, 0, 255), -1)
cv2.putText(
preview,
"REC",
(40, 36),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(0, 0, 255),
2,
)
else:
cv2.putText(
preview,
"Press 's' to start recording",
(20, 36),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 255),
2,
)
cv2.putText(
preview,
"Press 'q' to stop and quit",
(20, 70),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
if output_path is not None:
cv2.putText(
preview,
f"Saving: {output_path.name}",
(20, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
)
return preview
def frame_to_bgr(color_frame: rs.video_frame, stream_config: ColorStreamConfig) -> np.ndarray:
color_image = np.asanyarray(color_frame.get_data())
if stream_config.stream_format == rs.format.rgb8:
return cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR)
return color_image
def main() -> None:
args = parse_args()
if args.list_profiles:
list_profiles()
return
SAVE_DIR.mkdir(parents=True, exist_ok=True)
stream_config = select_color_stream(args.width, args.height, args.fps)
pipeline = init_camera(stream_config)
writer: cv2.VideoWriter | None = None
output_path: Path | None = None
is_recording = False
print("-" * 40)
print("RealSense D405 video recorder")
print(
"Selected color stream: "
f"{stream_config.width}x{stream_config.height} @ {stream_config.fps} FPS "
f"({stream_config.stream_format})"
)
print("Press 's' to start recording")
print("Press 'q' to stop recording and quit")
print(f"Video files will be saved to: {SAVE_DIR.resolve()}")
print("-" * 40)
try:
while True:
frames = pipeline.wait_for_frames(timeout_ms=2000)
color_frame = frames.get_color_frame()
if not color_frame:
continue
color_image = frame_to_bgr(color_frame, stream_config)
if is_recording and writer is not None:
writer.write(color_image)
preview = draw_status(color_image, is_recording, output_path)
cv2.imshow(WINDOW_NAME, preview)
key = cv2.waitKey(1) & 0xFF
if key == ord("s") and not is_recording:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = SAVE_DIR / f"d405_record_{timestamp}.mp4"
writer = build_video_writer(output_path, stream_config)
is_recording = True
print(f"Recording started: {output_path}")
if key == ord("q"):
if is_recording:
print("Recording stopped.")
else:
print("Exit without recording.")
break
finally:
if writer is not None:
writer.release()
pipeline.stop()
cv2.destroyAllWindows()
if output_path is not None and output_path.exists():
print(f"Saved video: {output_path.resolve()}")
if __name__ == "__main__":
main()