from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple import math import os import sys import threading import time import cv2 import numpy as np import pyrealsense2 as rs import pyaubo_sdk from pyaubo_agvc_sdk import RpcClient from ultralytics import YOLO """ 番茄采摘控制核心实现。 这个模块只负责内部逻辑拆分,`control.py` 作为兼容层保留给 UI 调用。 """ M_PI = math.pi # 可修改的设备、运行、位姿、夹爪参数统一放在 control.py 顶部。 # control_core.py 只保留核心内部常量,避免出现第二套可调默认值。 # -------------------- 外部配置缓存 -------------------- _runtime_config: Dict[str, Any] = {} PICK_ZONE_LEFT_RATIO = 1.0 / 3.0 PICK_ZONE_RIGHT_RATIO = 2.0 / 3.0 PICK_KEYPOINT_CONF_THRESHOLD = 0.8 # 关键点置信度低于该值时不参与采摘。 PICK_CUTPOINT_KEYPOINT_INDEX = 0 # YOLO pose 第 0 个点为采摘点 cutpoint。 PICK_ENDPOINT_KEYPOINT_INDEX = 1 # YOLO pose 第 1 个点为方向端点 endpoint。 PICK_KEYPOINT_BOX_MARGIN_RATIO = 0.15 # 关键点允许略超出 bbox 的比例。 PICK_CUTPOINT_MAX_REL_Y = 0.70 # 采摘点在 bbox 内的最大相对高度,防止误取下方点。 PICK_KEYPOINT_MIN_DISTANCE_PX = 8.0 # cutpoint 与 endpoint 的最小像素距离。 PICK_ENDPOINT_Y_TOLERANCE_PX = 4 # endpoint 允许比 cutpoint 略高的像素容差。 PICK_DEPTH_WINDOW_RADIUS = 5 # 首次稳健取深度的窗口半径,5 表示 11x11。 PICK_DEPTH_FALLBACK_WINDOW_RADIUS = 10 # 首次深度不足时的扩大窗口半径,10 表示 21x21。 PICK_DEPTH_MIN_VALID_PIXELS = 5 # 深度窗口内至少需要的有效深度点数量。 PICK_DEPTH_MIN_M = 0.25 # 有效深度下限,单位 m。 PICK_DEPTH_MAX_M = 0.6 # 有效深度上限,单位 m。 PICK_DEPTH_MEDIAN_TOLERANCE_M = 0.035 # 深度中位数滤波容差,单位 m。 # R_tc = np.array( # [ # [-0.9996, -0.0270, -0.0103], # [0.0267, -0.7993, 0.0245], # [-0.0109, 0.0242, 0.7996], # ], # dtype=float, # ) # T_tc = np.array([0.0, 0.0, 0.0], dtype=float) R_tc = np.array( [ [-0.9998, -0.0104, 0.0152], [0.0099, -0.9995, -0.0316], [0.0155, -0.0315, 0.9994], ], dtype=float, ) T_tc = np.array([-0.0026, 0.0388, -0.0632], dtype=float) POSITION_TOLERANCE = 0.01 ORIENTATION_TOLERANCE = 0.05 JOINT_TOLERANCE = 0.02 # -------------------- 运行时常量 -------------------- FRAME_SHAPE = (480, 640, 3) AGV_SUCCESS_CODE = 10100000 EXEC_ID_MAX_RETRIES = 20 MOTION_COMMAND_RETRIES = 2 MOTION_START_DELAY = 0.2 HOME_RETRY_COUNT = 1 HOME_RETRY_DELAY = 1.0 STARTUP_HOME_DELAY = 2.0 DEFAULT_PLACE_POSITION = [0.1, 0.7, 0.5, 1.5708, 1.5708, 0] # -------------------- 全局运行状态 -------------------- # `running`: 总运行开关,两个工作线程都以它作为退出条件。 # `has_tomato`: 视觉线程在 AGV 行进时发现候选成熟番茄后置位,通知 AGV 先停下。 # `picking_done`: 一次抓取放置完成后置位,视觉线程会在下一轮把本轮状态清掉,让 AGV 继续走。 running = threading.Event() has_tomato = threading.Event() picking_done = threading.Event() # 这三个事件变量共同组成跨线程状态机: # running 控制系统总开关; # has_tomato 表示视觉线程发现了候选番茄,需要 AGV 暂停; # picking_done 表示本轮抓取放置完成,系统可以回到继续巡检。 start_time = 0.0 agv_stop_time = 0.0 ui_callback: Callable[[np.ndarray], None] = lambda frame: None robot_rpc_client = pyaubo_sdk.RpcClient() def configure(config: Dict[str, Any]) -> None: """接收 control.py 同步过来的可调参数。""" _runtime_config.update(config) def _cfg(name: str) -> Any: control_module = sys.modules.get("control") if control_module is not None and hasattr(control_module, name): value = getattr(control_module, name) elif name in _runtime_config: value = _runtime_config[name] else: raise RuntimeError(f"缺少配置参数 {name},请先通过 control.py 启动或同步配置") if isinstance(value, list): return [list(item) if isinstance(item, list) else item for item in value] return value def __getattr__(name: str) -> Any: control_module = sys.modules.get("control") if name in _runtime_config or (control_module is not None and hasattr(control_module, name)): return _cfg(name) raise AttributeError(f"module {__name__!r} has no attribute {name!r}") @dataclass class PlacementManager: """按顺序循环取放置位,始终读取 control.py 中的 `place_positions`。""" lock: threading.Lock = field(default_factory=threading.Lock) next_index: int = 0 def get_next_position(self) -> Tuple[List[float], int]: with self.lock: positions = _cfg("place_positions") or [DEFAULT_PLACE_POSITION] used_index = self.next_index % len(positions) position = list(positions[used_index]) self.next_index = (used_index + 1) % len(positions) return position, used_index def get_current_index(self) -> int: with self.lock: return self.next_index def reset(self) -> None: with self.lock: self.next_index = 0 @dataclass class DetectedTomato: x1: int y1: int x2: int y2: int pixel_x: int pixel_y: int depth: float point_cam: List[float] point_base: List[float] robot_pose: List[float] angle_rad: float cutpoint: List[int] = field(default_factory=list) end_point: List[int] = field(default_factory=list) @dataclass class TomatoCandidate: box: Any box_index: int cutpoint: Optional[Tuple[int, int]] end_point: Optional[Tuple[int, int]] placement_manager = PlacementManager() _current_arm_controller: Optional["RobotArmController"] = None _current_agv_controller: Optional["AgvController"] = None def blank_frame() -> np.ndarray: return np.zeros(FRAME_SHAPE, dtype=np.uint8) def set_ui_callback(callback: Callable[[np.ndarray], None]) -> None: global ui_callback if callable(callback): ui_callback = callback print("UI 回调函数已设置") else: print("警告:传入的 UI 回调不可调用,继续使用默认空回调") def safe_ui_update(frame: Any, overlay: Optional[Dict[str, Any]] = None) -> None: try: payload = frame if overlay is not None: payload = {"frame": frame, **overlay} ui_callback(payload) except Exception as exc: print(f"UI 回调失败: {exc}") def reset_runtime_state() -> None: global start_time, agv_stop_time running.clear() has_tomato.clear() picking_done.clear() start_time = 0.0 agv_stop_time = 0.0 def start_runtime_state() -> None: global start_time, agv_stop_time has_tomato.clear() picking_done.clear() agv_stop_time = 0.0 start_time = time.time() running.set() def runtime_expired() -> bool: if start_time <= 0: return False return (time.time() - start_time) > _cfg("TOTAL_DURATION") def remaining_seconds() -> int: total_duration = _cfg("TOTAL_DURATION") if start_time <= 0: return total_duration return max(0, int(total_duration - (time.time() - start_time))) def start_agv_stop_window() -> bool: global agv_stop_time if agv_stop_time == 0: agv_stop_time = time.time() return True return False def clear_agv_stop_window() -> None: global agv_stop_time agv_stop_time = 0.0 def agv_stop_elapsed() -> float: if agv_stop_time == 0: return 0.0 return time.time() - agv_stop_time def clear_pick_cycle() -> None: # 清空这一轮“发现目标 -> 停车 -> 抓取”的中间状态。 has_tomato.clear() picking_done.clear() clear_agv_stop_window() def mark_pick_complete() -> None: # 抓取完成后先打完成标记,再由搜索阶段统一恢复为继续前进。 picking_done.set() has_tomato.clear() clear_agv_stop_window() def get_next_placement() -> Tuple[List[float], int]: return placement_manager.get_next_position() def get_placement_index() -> int: return placement_manager.get_current_index() def create_placement_manager(): """兼容旧接口,返回当前放置位访问器。""" return get_next_placement, get_placement_index def exampleState(robot_name: str) -> Optional[List[float]]: """读取并打印当前机械臂关节角。""" try: joints = robot_rpc_client.getRobotInterface(robot_name).getRobotState().getJointPositions() joints = list(joints) print("机械臂关节角:", joints) return joints except Exception as exc: print(f"读取关节角失败: {exc}") return None def get_robot_end_effector_pose(robot_name: str) -> Optional[List[float]]: """读取并打印当前 TCP 位姿。""" try: pose = robot_rpc_client.getRobotInterface(robot_name).getRobotState().getTcpPose() pose = list(pose) print("机械臂末端位姿:", pose) return pose except Exception as exc: print(f"读取末端位姿失败: {exc}") return None def exampleInverseK( robot_name: str, pose: Sequence[float], reference_q: Sequence[float] ) -> Tuple[Optional[List[float]], Optional[int]]: """执行逆解计算。""" if not isinstance(pose, (list, tuple)) or len(pose) != 6: print(f"逆解目标位姿格式错误,需要 6 维列表: {pose}") return None, None try: result_joints, status = robot_rpc_client.getRobotInterface(robot_name).getRobotAlgorithm().inverseKinematics( list(reference_q), list(pose) ) result_joints = list(result_joints) print("逆解输入位姿:", list(pose)) print("逆解输出关节角:", result_joints) if status != 0: print(f"逆解失败,错误码: {status}") return None, status return result_joints, status except Exception as exc: print(f"逆解计算失败: {exc}") return None, None def exampleStartup() -> Optional[str]: """机械臂上电、启动,并等待进入 Running。""" try: robot_names = robot_rpc_client.getRobotNames() if not robot_names: print("机械臂启动失败:未找到机器人名称") return None robot_name = robot_names[0] robot_manage = robot_rpc_client.getRobotInterface(robot_name).getRobotManage() if robot_manage.poweron() != 0: print("机械臂上电失败") return None print("机械臂上电请求已发送") if robot_manage.startup() != 0: print("机械臂启动失败") return None print("机械臂启动请求已发送") while True: robot_mode = robot_rpc_client.getRobotInterface(robot_name).getRobotState().getRobotModeType() print(f"机械臂当前模式: {robot_mode.name}") if robot_mode == pyaubo_sdk.RobotModeType.Running: return robot_name time.sleep(2) except Exception as exc: print(f"机械臂启动异常: {exc}") return None def quaternion_to_rotation_matrix(q: Sequence[float]) -> np.ndarray: """四元数转旋转矩阵。""" w, x, y, z = q return np.array( [ [1 - 2 * y**2 - 2 * z**2, 2 * x * y - 2 * w * z, 2 * x * z + 2 * w * y], [2 * x * y + 2 * w * z, 1 - 2 * x**2 - 2 * z**2, 2 * y * z - 2 * w * x], [2 * x * z - 2 * w * y, 2 * y * z + 2 * w * x, 1 - 2 * x**2 - 2 * y**2], ], dtype=float, ) def euler_to_rotation_matrix(roll: float, pitch: float, yaw: float) -> np.ndarray: """欧拉角转旋转矩阵,旋转顺序为 ZYX。""" r_x = np.array( [ [1, 0, 0], [0, math.cos(roll), -math.sin(roll)], [0, math.sin(roll), math.cos(roll)], ], dtype=float, ) r_y = np.array( [ [math.cos(pitch), 0, math.sin(pitch)], [0, 1, 0], [-math.sin(pitch), 0, math.cos(pitch)], ], dtype=float, ) r_z = np.array( [ [math.cos(yaw), -math.sin(yaw), 0], [math.sin(yaw), math.cos(yaw), 0], [0, 0, 1], ], dtype=float, ) return r_z @ r_y @ r_x def get_robot_end_matrix(robot_name: str) -> np.ndarray: """获取末端相对基座的 4x4 齐次变换矩阵。""" pose = get_robot_end_effector_pose(robot_name) if pose is None: return np.eye(4, dtype=float) try: position = pose[:3] if len(pose) >= 7: rotation = quaternion_to_rotation_matrix(pose[3:7]) elif len(pose) == 6: rotation = euler_to_rotation_matrix(*pose[3:]) else: print(f"末端位姿格式错误: {pose}") return np.eye(4, dtype=float) matrix = np.eye(4, dtype=float) matrix[:3, :3] = rotation matrix[:3, 3] = position return matrix except Exception as exc: print(f"计算末端矩阵失败: {exc}") return np.eye(4, dtype=float) def camera_to_base(point_cam: Sequence[float], robot_name: str) -> Optional[np.ndarray]: """将相机坐标系中的三维点换算到机械臂基坐标系。""" try: base_t_tool = get_robot_end_matrix(robot_name) tool_t_camera = np.eye(4, dtype=float) tool_t_camera[:3, :3] = R_tc tool_t_camera[:3, 3] = T_tc point_h = np.append(np.array(point_cam, dtype=float), 1.0) point_base = base_t_tool @ tool_t_camera @ point_h return point_base[:3] except Exception as exc: print(f"坐标转换失败: {exc}") return None def check_joint_position(robot_name: str, target_joints: Sequence[float]) -> bool: """校验当前关节位置是否到达目标位置。""" current_joints = exampleState(robot_name) if current_joints is None or len(current_joints) != len(target_joints): print("无法读取关节位置,或关节数量不匹配") return False errors = [abs(current - target) for current, target in zip(current_joints, target_joints)] max_error = max(errors) if errors else float("inf") print(f"关节最大误差: {max_error:.4f} rad,阈值: {JOINT_TOLERANCE:.4f} rad") return max_error < JOINT_TOLERANCE def check_tcp_pose(robot_name: str, target_pose: Sequence[float]) -> bool: """校验当前 TCP 位姿是否到达目标位姿。""" current_pose = get_robot_end_effector_pose(robot_name) if current_pose is None or len(current_pose) < 6 or len(target_pose) < 6: print("无法读取 TCP 位姿,或目标位姿格式错误") return False pos_error = np.linalg.norm(np.array(current_pose[:3], dtype=float) - np.array(target_pose[:3], dtype=float)) ori_error = np.linalg.norm(np.array(current_pose[3:6], dtype=float) - np.array(target_pose[3:6], dtype=float)) print( f"TCP 位姿误差: 位置={pos_error:.4f} m,姿态={ori_error:.4f} rad | " f"阈值: 位置={POSITION_TOLERANCE:.4f} m,姿态={ORIENTATION_TOLERANCE:.4f} rad" ) return pos_error < POSITION_TOLERANCE and ori_error < ORIENTATION_TOLERANCE def waitArrival(impl: Any, target_joints: Optional[Sequence[float]] = None, target_pose: Optional[Sequence[float]] = None) -> int: """ 等待运动完成并在结束后进行位置校验。 返回: 0 = 成功 -1 = 获取执行 ID 失败 -2 = 运动等待超时 """ retry_count = 0 motion_control = impl.getMotionControl() while motion_control.getExecId() == -1: retry_count += 1 if retry_count > EXEC_ID_MAX_RETRIES: robot_names = robot_rpc_client.getRobotNames() robot_name = robot_names[0] if robot_names else None if robot_name: joints_ok = target_joints is not None and check_joint_position(robot_name, target_joints) pose_ok = target_pose is not None and check_tcp_pose(robot_name, target_pose) if joints_ok or pose_ok: print("未获取到执行 ID,但检测到机械臂已到达目标位,按成功处理") return 0 print("运动失败:获取执行 ID 超时") return -1 time.sleep(0.5) exec_id = motion_control.getExecId() timeout = 0.0 max_timeout = 30.0 while True: current_exec_id = motion_control.getExecId() if exec_id != current_exec_id: break timeout += 0.5 if timeout > max_timeout: print(f"运动等待超时:{max_timeout} 秒") return -2 time.sleep(0.5) robot_names = robot_rpc_client.getRobotNames() robot_name = robot_names[0] if robot_names else None if robot_name: if target_joints is not None and not check_joint_position(robot_name, target_joints): print("警告:关节位置未达到目标值") if target_pose is not None and not check_tcp_pose(robot_name, target_pose): print("警告:TCP 位姿未达到目标值") print("机械臂运动完成") return 0 def control_tool_io(robot_name: str, io_index: int, state: bool) -> None: """设置工具端数字 IO。""" io_control = robot_rpc_client.getRobotInterface(robot_name).getIoControl() io_control.setToolDigitalOutput(io_index, state) output = io_control.getToolDigitalOutput(io_index) print(f"TOOL_IO[{io_index}] -> {'ON' if output else 'OFF'}") @dataclass class RobotArmController: """机械臂子系统。""" robot_name: str rpc_client: Any placement_manager: PlacementManager @property def interface(self) -> Any: return self.rpc_client.getRobotInterface(self.robot_name) @property def runtime_machine(self) -> Any: return self.rpc_client.getRuntimeMachine() def get_joint_positions(self) -> Optional[List[float]]: return exampleState(self.robot_name) def get_tcp_pose(self) -> Optional[List[float]]: return get_robot_end_effector_pose(self.robot_name) def is_at_home(self, joints: Optional[Sequence[float]] = None) -> bool: home_joints = _cfg("HOME_JOINTS") current_joints = list(joints) if joints is not None else self.get_joint_positions() if current_joints is None or len(current_joints) != len(home_joints): return False max_error = max(abs(current - target) for current, target in zip(current_joints, home_joints)) return max_error < JOINT_TOLERANCE def keep_scissors_open(self) -> None: try: control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False) control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), True) print("Scissors disabled: keeping end effector open") except Exception as exc: print(f"Failed to keep scissors open: {exc}") def reset_scissors_outputs(self) -> None: try: control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False) control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), False) print("Scissors enabled: tool IO reset to neutral") except Exception as exc: print(f"Failed to reset scissors IO: {exc}") def close_scissors_for_pick(self) -> None: if not _cfg("SCISSORS_ENABLED"): self.keep_scissors_open() return control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), False) control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), True) time.sleep(_cfg("GRIPPER_ACTION_DELAY")) control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False) def open_scissors_after_place(self) -> None: if not _cfg("SCISSORS_ENABLED"): self.keep_scissors_open() return control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False) control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), True) time.sleep(_cfg("GRIPPER_ACTION_DELAY")) control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), False) def _restart_runtime_machine(self) -> None: """重置运行态,尽量把控制器拉回可接收运动指令的状态。""" try: self.runtime_machine.stop() except Exception: pass time.sleep(MOTION_START_DELAY) self.runtime_machine.start() time.sleep(MOTION_START_DELAY) def move_joint( self, target_joints: Sequence[float], *, target_pose: Optional[Sequence[float]] = None, description: str = "机械臂运动", ) -> bool: """按关节目标执行运动。""" joints = list(target_joints) last_error: Optional[Exception] = None for attempt in range(1, MOTION_COMMAND_RETRIES + 1): try: if attempt > 1: print(f"{description}:重新下发运动指令(第 {attempt}/{MOTION_COMMAND_RETRIES} 次)") self._restart_runtime_machine() self.interface.getMotionControl().moveJoint(joints, _cfg("ARM_SPEED"), _cfg("ARM_ACCEL"), 0, 0) result = waitArrival(self.interface, target_joints=joints, target_pose=target_pose) if result == 0: print(f"{description}完成") return True print(f"{description}失败,错误码: {result}") except Exception as exc: last_error = exc print(f"{description}失败: {exc}") finally: try: self.runtime_machine.stop() except Exception: pass time.sleep(MOTION_START_DELAY) if last_error is not None: print(f"{description}最终失败: {last_error}") return False def move_pose( self, target_pose: Sequence[float], reference_joints: Sequence[float], *, description: str, ) -> Optional[List[float]]: """按位姿目标执行逆解和关节运动。""" target_joints, _ = exampleInverseK(self.robot_name, target_pose, reference_joints) if target_joints is None: print(f"{description}失败:逆解无结果") self.return_home(retries=1) return None if not self.move_joint(target_joints, target_pose=target_pose, description=description): return None return target_joints def return_home(self, retries: int = HOME_RETRY_COUNT) -> bool: """返回 Home 关节位。""" current_joints = self.get_joint_positions() if self.is_at_home(current_joints): print("Robot arm is already at Home, skipping Home motion") return True attempts = max(1, retries) for attempt in range(1, attempts + 1): if attempts == 1: print("机械臂:返回 Home") else: print(f"机械臂:返回 Home(第 {attempt}/{attempts} 次)") if self.move_joint(_cfg("HOME_JOINTS"), description="返回 Home"): return True if attempt < attempts: time.sleep(HOME_RETRY_DELAY) print("机械臂:多次尝试后仍未能返回 Home") return False def execute_pick_and_place(self, pose: Sequence[float], angle_rad: float) -> bool: """ 执行抓取和放置动作。 保持原有外部接口和基本动作顺序: 1. 去抓取预备点 2. 去抓取点 3. 夹爪动作 4. 回抓取预备点 5. 回 Home 6. 去放置点 7. 释放 8. 再回 Home """ current_joints = self.get_joint_positions() if current_joints is None: print("获取关节位置失败,终止抓取并尝试返回 Home") self.return_home() return False target_pose = list(pose) print(f"机械臂:开始处理目标位姿 {target_pose}") print(f"机械臂:关键点连线角度 {math.degrees(angle_rad):.2f}°") approach_pose = target_pose.copy() # 先走到抓取预备点,避免直接扎向果实。 approach_pose[1] += _cfg("APPROACH_Y_OFFSET") pickup_pose = target_pose.copy() # 再把末端抬到可抓取高度,和原代码保持一致使用 Z 偏移。 pickup_pose[0] += _cfg("PICKUP_X_OFFSET") pickup_pose[1] += _cfg("PICKUP_Y_OFFSET") pickup_pose[2] += _cfg("LIFT_Z_OFFSET") angle_index = int(_cfg("PICK_ANGLE_ORIENTATION_INDEX")) if 3 <= angle_index < len(pickup_pose): signed_angle = float(_cfg("PICK_ANGLE_SIGN")) * angle_rad pickup_pose[angle_index] += signed_angle print( "机械臂:末端姿态旋转 " f"{math.degrees(signed_angle):.2f}° | 姿态索引: {angle_index}" ) else: print(f"警告:PICK_ANGLE_ORIENTATION_INDEX={_cfg('PICK_ANGLE_ORIENTATION_INDEX')} 无效,跳过姿态旋转") approach_joints = self.move_pose(approach_pose, current_joints, description="移动到抓取预备点") if approach_joints is None: self.return_home() return False pickup_joints = self.move_pose(pickup_pose, approach_joints, description="旋转末端并移动到抓取点") if pickup_joints is None: self.return_home() return False self.close_scissors_for_pick() retreat_joints = self.move_pose(approach_pose, pickup_joints, description="采摘后返回预抓取点") if retreat_joints is None: self.return_home() return False if not self.return_home(): return False place_pose, place_index = self.placement_manager.get_next_position() print(f"机械臂:移动到放置位 {place_index} -> {place_pose}") reference_joints = self.get_joint_positions() or _cfg("HOME_JOINTS") place_joints = self.move_pose(place_pose, reference_joints, description=f"移动到放置位 {place_index}") if place_joints is None: self.return_home() return False self.open_scissors_after_place() if not self.return_home(): return False print("机械臂:本次抓取放置流程完成") return True def init_camera() -> Tuple[Any, Any, Any]: """初始化 RealSense 相机。""" try: pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) profile = pipeline.start(config) depth_sensor = profile.get_device().first_depth_sensor() depth_scale = depth_sensor.get_depth_scale() depth_profile = rs.video_stream_profile(profile.get_stream(rs.stream.depth)) depth_intrinsics = depth_profile.get_intrinsics() align = rs.align(rs.stream.color) print(f"相机初始化完成 | 深度比例尺: {depth_scale}") return pipeline, align, depth_intrinsics except Exception as exc: print(f"相机初始化失败: {exc}") raise def init_tomato_detector(model_path: Optional[str] = None) -> Any: """初始化 YOLO 模型。""" use_path = model_path or _cfg("YOLO_MODEL_PATH") or "best.pt" print(f"加载 YOLO 模型 | 路径: {use_path}") if not os.path.exists(use_path): raise FileNotFoundError(f"YOLO 模型文件不存在: {use_path}") if not use_path.endswith(".pt"): raise TypeError(f"模型文件格式错误: {use_path},仅支持 .pt") try: model = YOLO(use_path) print("YOLO 模型加载成功") return model except Exception as exc: print(f"YOLO 模型加载失败: {exc}") raise def filter_ripe_tomatoes(results: Any, confidence_threshold: Optional[float] = None) -> List[Any]: """筛选成熟番茄目标。""" threshold = _cfg("PICK_CONFIDENCE_THRESHOLD") if confidence_threshold is None else confidence_threshold ripe_tomatoes: List[Any] = [] for box in results.boxes: confidence = float(box.conf) if int(box.cls) == 0 and confidence >= threshold: ripe_tomatoes.append(box) return ripe_tomatoes def draw_tomato_tilt_line(color_image: np.ndarray, x1: int, y1: int, x2: int, y2: int, pixel_x: int) -> float: """在目标区域内估计番茄串倾角。""" _ = pixel_x if x2 <= x1 or y2 <= y1: return 0.0 roi = color_image[y1:y2, x1:x2] if roi.size == 0: return 0.0 gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=50, minLineLength=50, maxLineGap=10) angle = 0.0 if lines is None: return angle longest_line = max( lines, key=lambda line: np.linalg.norm(np.array(line[0][:2], dtype=float) - np.array(line[0][2:], dtype=float)), ) x1_line, y1_line, x2_line, y2_line = [int(v) for v in longest_line[0]] x1_line += x1 x2_line += x1 y1_line += y1 y2_line += y1 vector_tilt = np.array([x2_line - x1_line, y2_line - y1_line], dtype=float) vector_vertical = np.array([0.0, y2 - y1], dtype=float) norm_tilt = np.linalg.norm(vector_tilt) norm_vertical = np.linalg.norm(vector_vertical) if norm_tilt > 0 and norm_vertical > 0: cos_angle = np.dot(vector_tilt, vector_vertical) / (norm_tilt * norm_vertical) cos_angle = max(-1.0, min(1.0, float(cos_angle))) angle = math.degrees(math.acos(cos_angle)) angle = angle if x2_line > x1_line else -angle angle = max(-30.0, min(30.0, angle)) cv2.putText( color_image, f"Angle: {angle:.2f}deg", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, ) print(f"检测到番茄串倾角: {angle:.2f}°") return angle def fit_red_line(crop_image: np.ndarray, x1: int, y1: int) -> Optional[Tuple[Tuple[int, int], Tuple[int, int]]]: """在目标区域中拟合红色主轴。""" if crop_image.size == 0: return None lower_red1 = np.array([0, 120, 70]) upper_red1 = np.array([10, 255, 255]) lower_red2 = np.array([160, 120, 70]) upper_red2 = np.array([180, 255, 255]) hsv = cv2.cvtColor(crop_image, cv2.COLOR_BGR2HSV) mask1 = cv2.inRange(hsv, lower_red1, upper_red1) mask2 = cv2.inRange(hsv, lower_red2, upper_red2) mask = cv2.bitwise_or(mask1, mask2) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None all_points = np.vstack(contours).squeeze() if len(np.atleast_1d(all_points)) < 2: return None vx, vy, x0, y0 = [float(value) for value in cv2.fitLine(all_points, cv2.DIST_L2, 0, 0.01, 0.01)] if abs(vx) < 1e-6: p1 = (x1 + int(x0), y1) p2 = (x1 + int(x0), y1 + crop_image.shape[0]) return p1, p2 left_y = int((-x0 * vy / vx) + y0) right_y = int((((crop_image.shape[1] - x0) * vy) / vx) + y0) p1 = (x1, y1 + left_y) p2 = (x1 + crop_image.shape[1], y1 + right_y) return p1, p2 def calculate_keypoint_line_angle_rad( cutpoint: Optional[Tuple[int, int]], end_point: Optional[Tuple[int, int]], ) -> float: """计算上/下关键点连线相对图像竖直方向的角度。""" if cutpoint is None or end_point is None: return 0.0 dx = float(end_point[0] - cutpoint[0]) dy = float(end_point[1] - cutpoint[1]) if abs(dx) < 1e-6 and abs(dy) < 1e-6: return 0.0 angle_rad = math.atan2(dx, dy) max_angle_rad = math.radians(float(_cfg("MAX_PICK_ANGLE_DEG"))) return max(-max_angle_rad, min(max_angle_rad, angle_rad)) @dataclass class VisionController: """视觉子系统。""" pipeline: Any align: Any depth_intrinsics: Any model: Any robot_name: str arm_controller: RobotArmController def capture_frame(self) -> Tuple[Optional[np.ndarray], Optional[Any]]: """采集并对齐相机画面。""" try: frames = self.pipeline.wait_for_frames(timeout_ms=5000) if not frames: print("未获取到相机帧,重试...") return None, None aligned_frames = self.align.process(frames) depth_frame = aligned_frames.get_depth_frame() color_frame = aligned_frames.get_color_frame() if not depth_frame or not color_frame: print("深度帧或彩色帧为空,重试...") return None, None color_image = np.asanyarray(color_frame.get_data()) return color_image, depth_frame except Exception as exc: print(f"获取相机帧失败: {exc}") return None, None @staticmethod def get_pick_zone_bounds(frame_width: int) -> Tuple[int, int]: left = int(frame_width * PICK_ZONE_LEFT_RATIO) right = int(frame_width * PICK_ZONE_RIGHT_RATIO) return max(0, left), max(left + 1, min(frame_width - 1, right)) def annotate_pick_zone(self, frame: np.ndarray) -> None: frame_h, frame_w = frame.shape[:2] left, right = self.get_pick_zone_bounds(frame_w) overlay = frame.copy() cv2.rectangle(overlay, (left, 0), (right, frame_h - 1), (0, 180, 0), -1) cv2.addWeighted(overlay, 0.12, frame, 0.88, 0, frame) cv2.rectangle(frame, (left, 0), (right, frame_h - 1), (0, 220, 0), 2) cv2.putText( frame, "Pick Zone", (left + 8, max(24, int(frame_h * 0.08))), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (0, 220, 0), 2, ) @staticmethod def box_xyxy(box: Any) -> Tuple[int, int, int, int]: return tuple(int(value) for value in box.xyxy[0].cpu().numpy()) @staticmethod def box_confidence(box: Any) -> float: return float(box.conf[0]) if hasattr(box.conf, "__len__") else float(box.conf) def box_center_in_pick_zone(self, box: Any, frame_width: int) -> bool: x1, _y1, x2, _y2 = self.box_xyxy(box) center_x = int((x1 + x2) / 2) left, right = self.get_pick_zone_bounds(frame_width) return left <= center_x <= right @staticmethod def as_numpy_array(value: Any) -> Optional[np.ndarray]: if value is None: return None if hasattr(value, "cpu"): value = value.cpu() if hasattr(value, "numpy"): value = value.numpy() return np.asarray(value) @staticmethod def clamp_pixel(px: float, py: float, frame_shape: Tuple[int, int, int]) -> Tuple[int, int]: frame_h, frame_w = frame_shape[:2] pixel_x = max(0, min(frame_w - 1, int(round(px)))) pixel_y = max(0, min(frame_h - 1, int(round(py)))) return pixel_x, pixel_y def extract_keypoint_by_index( self, current_points: np.ndarray, current_conf: Optional[np.ndarray], point_index: int, frame_shape: Tuple[int, int, int], ) -> Optional[Tuple[int, int]]: if point_index >= len(current_points): return None point = current_points[point_index] px, py = float(point[0]), float(point[1]) if px <= 0 and py <= 0: return None if current_conf is not None and point_index < len(current_conf): if float(current_conf[point_index]) < PICK_KEYPOINT_CONF_THRESHOLD: return None return self.clamp_pixel(px, py, frame_shape) def validate_candidate_keypoints( self, box: Any, cutpoint: Tuple[int, int], end_point: Optional[Tuple[int, int]], ) -> bool: x1, y1, x2, y2 = self.box_xyxy(box) box_w = max(1, x2 - x1) box_h = max(1, y2 - y1) margin = int(max(box_w, box_h) * PICK_KEYPOINT_BOX_MARGIN_RATIO) if not (x1 - margin <= cutpoint[0] <= x2 + margin and y1 - margin <= cutpoint[1] <= y2 + margin): print(f"Pick candidate skipped: semantic cutpoint outside bbox @ {cutpoint}") return False cut_rel_y = (cutpoint[1] - y1) / float(box_h) if cut_rel_y > PICK_CUTPOINT_MAX_REL_Y: print( "Pick candidate skipped: semantic cutpoint too low " f"(rel_y={cut_rel_y:.2f}) @ {cutpoint}" ) return False if end_point is None: return True if not (x1 - margin <= end_point[0] <= x2 + margin and y1 - margin <= end_point[1] <= y2 + margin): print(f"Pick candidate skipped: endpoint outside bbox @ {end_point}") return False distance = float(np.linalg.norm(np.array(end_point, dtype=float) - np.array(cutpoint, dtype=float))) if distance < PICK_KEYPOINT_MIN_DISTANCE_PX: print(f"Pick candidate skipped: keypoints too close ({distance:.1f}px)") return False if end_point[1] + PICK_ENDPOINT_Y_TOLERANCE_PX < cutpoint[1]: print(f"Pick candidate skipped: endpoint appears above cutpoint @ cut={cutpoint}, end={end_point}") return False return True def extract_candidate_keypoints( self, result: Any, box_index: int, frame_shape: Tuple[int, int, int], ) -> Tuple[Optional[Tuple[int, int]], Optional[Tuple[int, int]]]: keypoints = getattr(result, "keypoints", None) keypoint_xy = self.as_numpy_array(getattr(keypoints, "xy", None)) if keypoint_xy is None or box_index >= len(keypoint_xy): return None, None current_points = keypoint_xy[box_index] keypoint_conf = self.as_numpy_array(getattr(keypoints, "conf", None)) current_conf = keypoint_conf[box_index] if keypoint_conf is not None and box_index < len(keypoint_conf) else None cutpoint = self.extract_keypoint_by_index( current_points, current_conf, PICK_CUTPOINT_KEYPOINT_INDEX, frame_shape, ) end_point = self.extract_keypoint_by_index( current_points, current_conf, PICK_ENDPOINT_KEYPOINT_INDEX, frame_shape, ) return cutpoint, end_point def build_pick_candidates(self, result: Any, frame_shape: Tuple[int, int, int]) -> List[TomatoCandidate]: frame_w = frame_shape[1] candidates: List[TomatoCandidate] = [] boxes = getattr(result, "boxes", None) if boxes is None: return candidates for box_index, box in enumerate(boxes): confidence = self.box_confidence(box) if int(box.cls) != 0 or confidence < _cfg("PICK_CONFIDENCE_THRESHOLD"): continue if not self.box_center_in_pick_zone(box, frame_w): continue cutpoint, end_point = self.extract_candidate_keypoints(result, box_index, frame_shape) if cutpoint is None: print("Pick candidate skipped: missing semantic cutpoint keypoint") continue if not self.validate_candidate_keypoints(box, cutpoint, end_point): continue candidates.append(TomatoCandidate(box=box, box_index=box_index, cutpoint=cutpoint, end_point=end_point)) return candidates def run_detection(self, image: np.ndarray) -> Tuple[np.ndarray, List[TomatoCandidate]]: """执行 YOLO 检测并返回带标注画面。""" results = self.model(image, classes=0, conf=_cfg("YOLO_DETECT_CONF"), verbose=False) result = results[0] annotated_image = result.plot() self.annotate_pick_zone(annotated_image) ripe_tomatoes = self.build_pick_candidates(result, image.shape) return annotated_image, ripe_tomatoes def serialize_detection_boxes(self, boxes: Sequence[Any]) -> List[Dict[str, Any]]: """把 YOLO 检测框整理成可供 UI 叠加显示的结构化信息。""" serialized: List[Dict[str, Any]] = [] for index, item in enumerate(boxes, start=1): try: box = item.box if isinstance(item, TomatoCandidate) else item x1, y1, x2, y2 = self.box_xyxy(box) center_x = int((x1 + x2) / 2) center_y = int((y1 + y2) / 2) cutpoint = list(item.cutpoint) if isinstance(item, TomatoCandidate) and item.cutpoint else [] end_point = list(item.end_point) if isinstance(item, TomatoCandidate) and item.end_point else [] serialized.append( { "label": f"Tomato-{index}", "confidence": self.box_confidence(box), "bbox": [x1, y1, x2, y2], "center": [center_x, center_y], "cutpoint": cutpoint, "end_point": end_point, } ) except Exception as exc: print(f"检测框序列化失败: {exc}") return serialized def annotate_target(self, frame: np.ndarray, target: DetectedTomato) -> None: """在画面上绘制目标核心信息。""" cutpoint = (target.pixel_x, target.pixel_y) cv2.drawMarker(frame, cutpoint, (0, 255, 255), cv2.MARKER_CROSS, 18, 2) cv2.circle(frame, cutpoint, 6, (0, 255, 255), -1) cv2.circle(frame, cutpoint, 11, (0, 80, 255), 2) if len(target.end_point) == 2: end_point = tuple(int(v) for v in target.end_point) cv2.circle(frame, end_point, 5, (255, 215, 0), -1) cv2.line(frame, cutpoint, end_point, (255, 255, 0), 2) angle_text = f"Angle: {math.degrees(target.angle_rad):.2f}deg" (angle_w, _angle_h), angle_baseline = cv2.getTextSize( angle_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2, ) angle_x = max(6, min(end_point[0] + 8, frame.shape[1] - angle_w - 6)) angle_y = max(18, min(end_point[1] + 18, frame.shape[0] - angle_baseline - 6)) cv2.putText( frame, angle_text, (angle_x, angle_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2, ) @staticmethod def collect_depth_samples( depth_frame: Any, pixel_x: int, pixel_y: int, frame_shape: Tuple[int, int, int], radius: int, ) -> List[float]: frame_h, frame_w = frame_shape[:2] x_start = max(0, pixel_x - radius) x_end = min(frame_w - 1, pixel_x + radius) y_start = max(0, pixel_y - radius) y_end = min(frame_h - 1, pixel_y + radius) samples: List[float] = [] for sample_y in range(y_start, y_end + 1): for sample_x in range(x_start, x_end + 1): depth = float(depth_frame.get_distance(sample_x, sample_y)) if PICK_DEPTH_MIN_M <= depth <= PICK_DEPTH_MAX_M: samples.append(depth) return samples def get_robust_depth( self, depth_frame: Any, pixel_x: int, pixel_y: int, frame_shape: Tuple[int, int, int], ) -> Optional[float]: center_depth = float(depth_frame.get_distance(pixel_x, pixel_y)) for radius in (PICK_DEPTH_WINDOW_RADIUS, PICK_DEPTH_FALLBACK_WINDOW_RADIUS): samples = self.collect_depth_samples(depth_frame, pixel_x, pixel_y, frame_shape, radius) if len(samples) < PICK_DEPTH_MIN_VALID_PIXELS: continue sample_array = np.array(samples, dtype=float) median_depth = float(np.median(sample_array)) stable_samples = sample_array[np.abs(sample_array - median_depth) <= PICK_DEPTH_MEDIAN_TOLERANCE_M] if len(stable_samples) >= PICK_DEPTH_MIN_VALID_PIXELS: depth = float(np.median(stable_samples)) else: depth = median_depth if not (PICK_DEPTH_MIN_M <= center_depth <= PICK_DEPTH_MAX_M): print( "采摘点中心深度无效,使用邻域稳健深度 " f"{depth:.3f}m @ ({pixel_x}, {pixel_y}), radius={radius}" ) elif abs(center_depth - depth) > PICK_DEPTH_MEDIAN_TOLERANCE_M: print( "采摘点中心深度跳变,使用邻域稳健深度 " f"{depth:.3f}m 替代 {center_depth:.3f}m @ ({pixel_x}, {pixel_y})" ) return depth print( "采摘点邻域深度不足 " f"center={center_depth:.3f}m @ ({pixel_x}, {pixel_y})" ) return None def build_detected_tomato( self, candidate: TomatoCandidate, depth_frame: Any, frame: np.ndarray, tcp_pose: Sequence[float], ) -> Optional[DetectedTomato]: """把检测框转换成可供机械臂执行的抓取目标。""" box = candidate.box x1, y1, x2, y2 = self.box_xyxy(box) if candidate.cutpoint is None: print("Pick target skipped: missing upper YOLO keypoint") return None pixel_x, pixel_y = candidate.cutpoint depth = self.get_robust_depth(depth_frame, pixel_x, pixel_y, frame.shape) if depth is None: print(f"无效深度值 @ ({pixel_x}, {pixel_y})") return None point_cam = rs.rs2_deproject_pixel_to_point(self.depth_intrinsics, [pixel_x, pixel_y], depth) point_base = camera_to_base(point_cam, self.robot_name) if point_base is None: print(f"坐标转换失败 @ ({pixel_x}, {pixel_y}),本次目标跳过") self.arm_controller.return_home() return None angle_rad = calculate_keypoint_line_angle_rad(candidate.cutpoint, candidate.end_point) print(f"关键点连线角度: {math.degrees(angle_rad):.2f}°") robot_pose = list(point_base) + list(tcp_pose[3:6]) target = DetectedTomato( x1=x1, y1=y1, x2=x2, y2=y2, pixel_x=pixel_x, pixel_y=pixel_y, depth=depth, point_cam=list(point_cam), point_base=list(point_base), robot_pose=robot_pose, angle_rad=angle_rad, cutpoint=[int(pixel_x), int(pixel_y)], end_point=list(candidate.end_point) if candidate.end_point else [], ) self.annotate_target(frame, target) return target def log_target(self, index: int, target: DetectedTomato) -> None: """打印目标信息。""" print(f"检测到目标 #{index}:") print(f" 像素坐标: ({target.pixel_x}, {target.pixel_y})") print( " 相机坐标: " f"({target.point_cam[0]:.3f}, {target.point_cam[1]:.3f}, {target.point_cam[2]:.3f}) m" ) print( " 机械臂基坐标: " f"({target.point_base[0]:.3f}, {target.point_base[1]:.3f}, {target.point_base[2]:.3f}) m" ) def handle_search_phase(self, raw_image: np.ndarray) -> np.ndarray: """AGV 行进时的快速检测阶段。""" annotated_image, ripe_tomatoes = self.run_detection(raw_image) if ripe_tomatoes and not picking_done.is_set(): # 这里只负责“发现目标并请求停车”,真正抓取发生在 AGV 停稳后的下一阶段。 has_tomato.set() print(f"视觉检测:识别到 {len(ripe_tomatoes)} 个成熟番茄 | 触发 AGV 暂停") elif picking_done.is_set(): # 一次采摘结束后,把状态机恢复到“边走边看”的初始态。 clear_pick_cycle() print("视觉检测:采摘完成 | 允许 AGV 继续前进") return annotated_image def handle_pick_phase(self, raw_image: np.ndarray, depth_frame: Any) -> np.ndarray: """AGV 停止后的精确检测与抓取阶段。""" if start_agv_stop_window(): # AGV 刚停下时画面可能还在抖,这里故意留一个稳定窗口再做精检和抓取。 pick_settle_delay = _cfg("AGV_PICK_SETTLE_DELAY") print(f"AGV 已停止,等待 {pick_settle_delay:.1f} 秒后开始精确检测...") frozen_image, frozen_targets = self.run_detection(raw_image) safe_ui_update( frozen_image, { "freeze": True, "stage": "pick_wait", "detections": self.serialize_detection_boxes(frozen_targets), }, ) time.sleep(pick_settle_delay) return frozen_image agv_stop_timeout = _cfg("AGV_STOP_TIMEOUT") if agv_stop_elapsed() > agv_stop_timeout: print(f"AGV 停止超时 {agv_stop_timeout} 秒,允许继续前进") clear_pick_cycle() timeout_image = raw_image.copy() self.annotate_pick_zone(timeout_image) return timeout_image annotated_image, ripe_tomatoes = self.run_detection(raw_image) if not ripe_tomatoes: print("视觉检测:停止窗口内未找到成熟番茄,允许 AGV 继续前进") clear_pick_cycle() return annotated_image tcp_pose = self.arm_controller.get_tcp_pose() if tcp_pose is None: print("获取末端位姿失败,跳过本次抓取并返回 Home") self.arm_controller.return_home() clear_pick_cycle() return annotated_image for index, candidate in enumerate(ripe_tomatoes, start=1): try: # 逐个候选目标尝试构造成机械臂可执行的抓取目标,成功一个就结束本轮。 target = self.build_detected_tomato(candidate, depth_frame, annotated_image, tcp_pose) if target is None: continue detections = self.serialize_detection_boxes([candidate]) if detections: detections[0]["pick_xyz"] = [float(value) for value in target.point_cam[:3]] detections[0]["pick_xyz_frame"] = "d405_camera" detections[0]["angle_deg"] = float(math.degrees(target.angle_rad)) safe_ui_update( annotated_image, { "freeze": True, "stage": "pick_target", "detections": detections, }, ) self.log_target(index, target) if self.arm_controller.execute_pick_and_place(target.robot_pose, target.angle_rad): mark_pick_complete() else: clear_pick_cycle() break except Exception as exc: print(f"处理目标时出错: {exc}") self.arm_controller.return_home() clear_pick_cycle() continue else: print("视觉检测:候选目标均不可用,允许 AGV 继续前进") clear_pick_cycle() return annotated_image def annotate_runtime_status(self, frame: np.ndarray) -> None: """绘制运行时状态文本。""" cv2.putText( frame, f"Time Left: {remaining_seconds()}s", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, ) cv2.putText( frame, f"Tomato: {'Yes' if has_tomato.is_set() else 'No'}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, ) cv2.putText( frame, f"Picking: {'Yes' if picking_done.is_set() else 'No'}", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2, ) def shutdown(self) -> None: """释放视觉资源。""" try: self.pipeline.stop() except Exception as exc: print(f"释放相机失败: {exc}") safe_ui_update(blank_frame()) print("视觉线程停止 | 相机已释放") def run(self) -> None: """视觉主线程。""" print("视觉线程启动 | 开始检测番茄...") while running.is_set(): if runtime_expired(): print("视觉检测:总时长已到,准备停止...") running.clear() break color_image, depth_frame = self.capture_frame() if color_image is None or depth_frame is None: safe_ui_update(blank_frame()) time.sleep(0.1) continue try: # 视觉线程本质上是个两阶段状态机: # 1. 搜索态:AGV 行进,找候选番茄; # 2. 抓取态:AGV 停住,在停止窗口内做精检并触发抓取。 if has_tomato.is_set() and not picking_done.is_set(): annotated_image = self.handle_pick_phase(color_image, depth_frame) else: annotated_image = self.handle_search_phase(color_image) except Exception as exc: print(f"视觉检测逻辑出错: {exc}") self.arm_controller.return_home() annotated_image = color_image.copy() self.annotate_runtime_status(annotated_image) safe_ui_update(annotated_image) time.sleep(0.05) self.shutdown() @dataclass class AgvController: """AGV 子系统。""" agv_client: Any def reacquire_priority(self) -> None: """重新获取 AGV 控制权。""" self.agv_client.release_priority() time.sleep(0.5) self.agv_client.set_priority("admin", _cfg("AGV_IP")) def send_speed(self, linear_speed: float, description: str) -> None: """发送 AGV 速度指令。""" ret = self.agv_client.set_control_speed(linear_speed, _cfg("AGV_SPEED_STOP")) if ret != AGV_SUCCESS_CODE: print(f"AGV 控制:{description}指令失败(码 {ret}),重新获取控制权...") self.reacquire_priority() def stop_motion(self) -> None: """停止 AGV。""" self.agv_client.set_control_speed(_cfg("AGV_SPEED_STOP"), _cfg("AGV_SPEED_STOP")) def shutdown(self) -> None: """线程结束时确保 AGV 停止。""" try: self.stop_motion() except Exception as exc: print(f"AGV 停止失败: {exc}") print("AGV 控制线程停止 | AGV 已停止") def run(self) -> None: """AGV 控制主线程。""" print("AGV 控制线程启动 | 等待视觉检测信号...") while running.is_set(): try: if runtime_expired(): print("AGV 控制:总时长已到,停止 AGV...") self.stop_motion() running.clear() break if has_tomato.is_set() and not picking_done.is_set(): # AGV 线程本身不做视觉/抓取,只消费状态位决定“停”还是“走”。 self.send_speed(_cfg("AGV_SPEED_STOP"), "停止") else: self.send_speed(_cfg("AGV_SPEED_FORWARD"), "前进") except Exception as exc: print(f"AGV 控制线程出错: {exc}") time.sleep(0.5) continue time.sleep(0.1) self.shutdown() def get_arm_controller(robot_name: str) -> RobotArmController: global _current_arm_controller if _current_arm_controller is None or _current_arm_controller.robot_name != robot_name: _current_arm_controller = RobotArmController(robot_name, robot_rpc_client, placement_manager) return _current_arm_controller def return_to_home(robot_name: str) -> None: """兼容旧接口:返回 Home。""" controller = get_arm_controller(robot_name) controller.return_home() def control_robot(robot_name: str, pose: Sequence[float], angle_rad: float) -> bool: """兼容旧接口:执行抓取与放置。""" controller = get_arm_controller(robot_name) return controller.execute_pick_and_place(pose, angle_rad) def vision_detection_thread(pipeline: Any, align: Any, depth_intrinsics: Any, model: Any, robot_name: str) -> None: """兼容旧接口:视觉线程入口。""" controller = VisionController( pipeline=pipeline, align=align, depth_intrinsics=depth_intrinsics, model=model, robot_name=robot_name, arm_controller=get_arm_controller(robot_name), ) controller.run() def agv_control_thread(agv_client: Any) -> None: """兼容旧接口:AGV 线程入口。""" global _current_agv_controller _current_agv_controller = AgvController(agv_client) _current_agv_controller.run() def connect_robot_arm() -> RobotArmController: """连接并启动机械臂。""" robot_rpc_client.connect(_cfg("ROBOT_IP"), _cfg("ROBOT_PORT")) robot_rpc_client.setRequestTimeout(1000) if not robot_rpc_client.hasConnected(): raise RuntimeError("机械臂连接失败") print("机械臂连接成功") robot_rpc_client.login("rob1", "123456") if not robot_rpc_client.hasLogined(): raise RuntimeError("机械臂登录失败") print("机械臂登录成功") robot_name = exampleStartup() if not robot_name: raise RuntimeError("机械臂启动失败") print(f"机械臂 {robot_name} 启动成功") return get_arm_controller(robot_name) def connect_agv() -> Any: """连接 AGV 并获取控制权。""" agv_client = RpcClient() agv_client.setRequestTimeout(1000) agv_ip = _cfg("AGV_IP") agv_port = _cfg("AGV_PORT") print(f"正在连接 AGV | IP: {agv_ip}:{agv_port}") connect_ret = agv_client.connect(agv_ip, agv_port) if connect_ret != 0: raise RuntimeError(f"AGV 连接失败(返回码 {connect_ret})") login_ret = agv_client.login("admin", "admin") if login_ret != 0: raise RuntimeError(f"AGV 登录失败(返回码 {login_ret})") print("AGV 连接与登录成功") agv_client.release_priority() time.sleep(0.5) control_ret = agv_client.set_priority("admin", agv_ip) if control_ret != AGV_SUCCESS_CODE: raise RuntimeError(f"AGV 获取控制权失败(返回码 {control_ret})") print("AGV 控制权获取成功") return agv_client def release_agv(agv_client: Optional[Any]) -> None: """释放 AGV 资源。""" if agv_client is None: return try: agv_client.release_priority() except Exception as exc: print(f"释放 AGV 控制权失败: {exc}") try: agv_client.disconnect() except Exception as exc: print(f"断开 AGV 失败: {exc}") def release_robot_arm() -> None: """释放机械臂 RPC 连接。""" try: if robot_rpc_client.hasConnected(): robot_rpc_client.disconnect() except Exception as exc: print(f"断开机械臂连接失败: {exc}") def main() -> None: """主程序入口。""" global _current_arm_controller, _current_agv_controller # 每次主流程启动前都把跨线程状态和放置位游标清零,避免继承上一次运行残留。 reset_runtime_state() placement_manager.reset() print("=" * 50) print("开始初始化番茄采摘系统") print("当前参数:") print(f" 机械臂 IP: {_cfg('ROBOT_IP')}:{_cfg('ROBOT_PORT')}") print(f" AGV IP: {_cfg('AGV_IP')}:{_cfg('AGV_PORT')}") print(f" YOLO 模型: {_cfg('YOLO_MODEL_PATH')}") print(f" 总时长: {_cfg('TOTAL_DURATION')}s") print("=" * 50) arm_controller: Optional[RobotArmController] = None agv_client: Optional[Any] = None pipeline: Optional[Any] = None vision_thread: Optional[threading.Thread] = None agv_thread: Optional[threading.Thread] = None try: print("\n1. 初始化机械臂...") arm_controller = connect_robot_arm() if not _cfg("SCISSORS_ENABLED"): arm_controller.keep_scissors_open() else: arm_controller.reset_scissors_outputs() print("\n1.1 机械臂回到初始位置...") # 机械臂刚进入 Running 后,控制器有时还没完全准备好接收第一条运动指令。 # 这里把“回 Home”作为尽力而为的预处理,失败时只告警,不阻塞整机启动。 if arm_controller.is_at_home(): print("Robot arm already at Home during startup, continuing initialization") else: time.sleep(STARTUP_HOME_DELAY) if not arm_controller.return_home(): print("警告:机械臂启动后返回 Home 失败,继续启动系统,后续流程中再尝试回 Home") print("\n2. 初始化 AGV...") agv_client = connect_agv() print("\n3. 初始化视觉...") pipeline, align, depth_intrinsics = init_camera() model = init_tomato_detector() print("\n4. 启动核心线程...") start_runtime_state() _current_arm_controller = arm_controller _current_agv_controller = AgvController(agv_client) # 主线程只负责保活和兜底清理; # 真正并发执行的是视觉线程和 AGV 控制线程。 vision_thread = threading.Thread( target=vision_detection_thread, args=(pipeline, align, depth_intrinsics, model, arm_controller.robot_name), daemon=True, name="VisionThread", ) agv_thread = threading.Thread( target=agv_control_thread, args=(agv_client,), daemon=True, name="AGVThread", ) vision_thread.start() agv_thread.start() print(f"所有线程启动完成 | 总运行时长:{_cfg('TOTAL_DURATION')} 秒") # 主线程不参与业务决策,只等待 `running` 被任一线程或异常路径清掉。 while running.is_set(): time.sleep(1) except KeyboardInterrupt: print("\n用户手动中断程序...") running.clear() if arm_controller is not None: arm_controller.return_home() except Exception as exc: print(f"系统初始化或运行失败: {exc}") running.clear() if arm_controller is not None: arm_controller.return_home() finally: print("\n释放系统资源...") running.clear() if vision_thread is not None and vision_thread.is_alive(): vision_thread.join(timeout=2.0) elif pipeline is not None: try: pipeline.stop() except Exception: pass if agv_thread is not None and agv_thread.is_alive(): agv_thread.join(timeout=2.0) release_agv(agv_client) release_robot_arm() reset_runtime_state() _current_arm_controller = None _current_agv_controller = None print("程序结束 | 所有设备已断开连接") def signal_handler(sig: int, frame: Any) -> None: """信号处理:停止线程并尽量回 Home。""" _ = frame print(f"\n接收到信号 {sig},正在终止程序并返回 Home...") running.clear() try: if _current_arm_controller is not None: _current_arm_controller.return_home() else: robot_names = robot_rpc_client.getRobotNames() if robot_names: RobotArmController(robot_names[0], robot_rpc_client, placement_manager).return_home() except Exception: pass time.sleep(1) sys.exit(0)