forked from YikaiFu-cart/acAubo
1781 lines
64 KiB
Python
1781 lines
64 KiB
Python
from dataclasses import dataclass, field
|
||
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
|
||
|
||
import math
|
||
import os
|
||
import sys
|
||
import threading
|
||
import time
|
||
|
||
import cv2
|
||
import numpy as np
|
||
import pyrealsense2 as rs
|
||
import pyaubo_sdk
|
||
from pyaubo_agvc_sdk import RpcClient
|
||
from ultralytics import YOLO
|
||
|
||
"""
|
||
番茄采摘控制核心实现。
|
||
|
||
这个模块只负责内部逻辑拆分,`control.py` 作为兼容层保留给 UI 调用。
|
||
"""
|
||
|
||
|
||
M_PI = math.pi
|
||
|
||
# 可修改的设备、运行、位姿、夹爪参数统一放在 control.py 顶部。
|
||
# control_core.py 只保留核心内部常量,避免出现第二套可调默认值。
|
||
|
||
|
||
# -------------------- 外部配置缓存 --------------------
|
||
_runtime_config: Dict[str, Any] = {}
|
||
|
||
|
||
PICK_ZONE_LEFT_RATIO = 1.0 / 3.0
|
||
PICK_ZONE_RIGHT_RATIO = 7.0 / 10.0
|
||
PICK_KEYPOINT_CONF_THRESHOLD = 0.8 # 关键点置信度低于该值时不参与采摘。
|
||
PICK_CUTPOINT_KEYPOINT_INDEX = 0 # YOLO pose 第 0 个点为采摘点 cutpoint。
|
||
PICK_ENDPOINT_KEYPOINT_INDEX = 1 # YOLO pose 第 1 个点为方向端点 endpoint。
|
||
PICK_KEYPOINT_BOX_MARGIN_RATIO = 0.15 # 关键点允许略超出 bbox 的比例。
|
||
PICK_CUTPOINT_MAX_REL_Y = 0.70 # 采摘点在 bbox 内的最大相对高度,防止误取下方点。
|
||
PICK_KEYPOINT_MIN_DISTANCE_PX = 8.0 # cutpoint 与 endpoint 的最小像素距离。
|
||
PICK_ENDPOINT_Y_TOLERANCE_PX = 4 # endpoint 允许比 cutpoint 略高的像素容差。
|
||
PICK_DEPTH_WINDOW_RADIUS = 5 # 首次稳健取深度的窗口半径,5 表示 11x11。
|
||
PICK_DEPTH_FALLBACK_WINDOW_RADIUS = 10 # 首次深度不足时的扩大窗口半径,10 表示 21x21。
|
||
PICK_DEPTH_MIN_VALID_PIXELS = 5 # 深度窗口内至少需要的有效深度点数量。
|
||
PICK_DEPTH_MIN_M = 0.25 # 有效深度下限,单位 m。
|
||
PICK_DEPTH_MAX_M = 0.6 # 有效深度上限,单位 m。
|
||
PICK_DEPTH_MEDIAN_TOLERANCE_M = 0.035 # 深度中位数滤波容差,单位 m。
|
||
|
||
# R_tc = np.array(
|
||
# [
|
||
# [-0.9996, -0.0270, -0.0103],
|
||
# [0.0267, -0.7993, 0.0245],
|
||
# [-0.0109, 0.0242, 0.7996],
|
||
# ],
|
||
# dtype=float,
|
||
# )
|
||
# T_tc = np.array([0.0, 0.0, 0.0], dtype=float)
|
||
|
||
R_tc = np.array(
|
||
[
|
||
[-0.9998, -0.0104, 0.0152],
|
||
[0.0099, -0.9995, -0.0316],
|
||
[0.0155, -0.0315, 0.9994],
|
||
],
|
||
dtype=float,
|
||
)
|
||
|
||
T_tc = np.array([-0.0026, 0.0388, -0.0632], dtype=float)
|
||
|
||
POSITION_TOLERANCE = 0.01
|
||
ORIENTATION_TOLERANCE = 0.05
|
||
JOINT_TOLERANCE = 0.02
|
||
|
||
|
||
# -------------------- 运行时常量 --------------------
|
||
FRAME_SHAPE = (480, 640, 3)
|
||
AGV_SUCCESS_CODE = 10100000
|
||
EXEC_ID_MAX_RETRIES = 20
|
||
MOTION_COMMAND_RETRIES = 2
|
||
MOTION_START_DELAY = 0.2
|
||
HOME_RETRY_COUNT = 1
|
||
HOME_RETRY_DELAY = 1.0
|
||
STARTUP_HOME_DELAY = 2.0
|
||
DEFAULT_PLACE_POSITION = [0.1, 0.7, 0.5, 1.5708, 1.5708, 0]
|
||
|
||
|
||
# -------------------- 全局运行状态 --------------------
|
||
# `running`: 总运行开关,两个工作线程都以它作为退出条件。
|
||
# `has_tomato`: 视觉线程在 AGV 行进时发现候选成熟番茄后置位,通知 AGV 先停下。
|
||
# `picking_done`: 一次抓取放置完成后置位,视觉线程会在下一轮把本轮状态清掉,让 AGV 继续走。
|
||
running = threading.Event()
|
||
has_tomato = threading.Event()
|
||
picking_done = threading.Event()
|
||
|
||
# 这三个事件变量共同组成跨线程状态机:
|
||
# running 控制系统总开关;
|
||
# has_tomato 表示视觉线程发现了候选番茄,需要 AGV 暂停;
|
||
# picking_done 表示本轮抓取放置完成,系统可以回到继续巡检。
|
||
|
||
start_time = 0.0
|
||
agv_stop_time = 0.0
|
||
|
||
ui_callback: Callable[[np.ndarray], None] = lambda frame: None
|
||
robot_rpc_client = pyaubo_sdk.RpcClient()
|
||
|
||
|
||
def configure(config: Dict[str, Any]) -> None:
|
||
"""接收 control.py 同步过来的可调参数。"""
|
||
_runtime_config.update(config)
|
||
|
||
|
||
def _cfg(name: str) -> Any:
|
||
control_module = sys.modules.get("control")
|
||
if control_module is not None and hasattr(control_module, name):
|
||
value = getattr(control_module, name)
|
||
elif name in _runtime_config:
|
||
value = _runtime_config[name]
|
||
else:
|
||
raise RuntimeError(f"缺少配置参数 {name},请先通过 control.py 启动或同步配置")
|
||
if isinstance(value, list):
|
||
return [list(item) if isinstance(item, list) else item for item in value]
|
||
return value
|
||
|
||
|
||
def __getattr__(name: str) -> Any:
|
||
control_module = sys.modules.get("control")
|
||
if name in _runtime_config or (control_module is not None and hasattr(control_module, name)):
|
||
return _cfg(name)
|
||
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|
||
|
||
|
||
@dataclass
|
||
class PlacementManager:
|
||
"""按顺序循环取放置位,始终读取 control.py 中的 `place_positions`。"""
|
||
|
||
lock: threading.Lock = field(default_factory=threading.Lock)
|
||
next_index: int = 0
|
||
|
||
def get_next_position(self) -> Tuple[List[float], int]:
|
||
with self.lock:
|
||
positions = _cfg("place_positions") or [DEFAULT_PLACE_POSITION]
|
||
used_index = self.next_index % len(positions)
|
||
position = list(positions[used_index])
|
||
self.next_index = (used_index + 1) % len(positions)
|
||
return position, used_index
|
||
|
||
def get_current_index(self) -> int:
|
||
with self.lock:
|
||
return self.next_index
|
||
|
||
def reset(self) -> None:
|
||
with self.lock:
|
||
self.next_index = 0
|
||
|
||
|
||
@dataclass
|
||
class DetectedTomato:
|
||
x1: int
|
||
y1: int
|
||
x2: int
|
||
y2: int
|
||
pixel_x: int
|
||
pixel_y: int
|
||
depth: float
|
||
point_cam: List[float]
|
||
point_base: List[float]
|
||
robot_pose: List[float]
|
||
angle_rad: float
|
||
cutpoint: List[int] = field(default_factory=list)
|
||
end_point: List[int] = field(default_factory=list)
|
||
|
||
|
||
@dataclass
|
||
class TomatoCandidate:
|
||
box: Any
|
||
box_index: int
|
||
cutpoint: Optional[Tuple[int, int]]
|
||
end_point: Optional[Tuple[int, int]]
|
||
|
||
|
||
placement_manager = PlacementManager()
|
||
_current_arm_controller: Optional["RobotArmController"] = None
|
||
_current_agv_controller: Optional["AgvController"] = None
|
||
|
||
|
||
def blank_frame() -> np.ndarray:
|
||
return np.zeros(FRAME_SHAPE, dtype=np.uint8)
|
||
|
||
|
||
def set_ui_callback(callback: Callable[[np.ndarray], None]) -> None:
|
||
global ui_callback
|
||
if callable(callback):
|
||
ui_callback = callback
|
||
print("UI 回调函数已设置")
|
||
else:
|
||
print("警告:传入的 UI 回调不可调用,继续使用默认空回调")
|
||
|
||
|
||
def safe_ui_update(frame: Any, overlay: Optional[Dict[str, Any]] = None) -> None:
|
||
try:
|
||
payload = frame
|
||
if overlay is not None:
|
||
payload = {"frame": frame, **overlay}
|
||
ui_callback(payload)
|
||
except Exception as exc:
|
||
print(f"UI 回调失败: {exc}")
|
||
|
||
|
||
def reset_runtime_state() -> None:
|
||
global start_time, agv_stop_time
|
||
running.clear()
|
||
has_tomato.clear()
|
||
picking_done.clear()
|
||
start_time = 0.0
|
||
agv_stop_time = 0.0
|
||
|
||
|
||
def start_runtime_state() -> None:
|
||
global start_time, agv_stop_time
|
||
has_tomato.clear()
|
||
picking_done.clear()
|
||
agv_stop_time = 0.0
|
||
start_time = time.time()
|
||
running.set()
|
||
|
||
|
||
def runtime_expired() -> bool:
|
||
if start_time <= 0:
|
||
return False
|
||
return (time.time() - start_time) > _cfg("TOTAL_DURATION")
|
||
|
||
|
||
def remaining_seconds() -> int:
|
||
total_duration = _cfg("TOTAL_DURATION")
|
||
if start_time <= 0:
|
||
return total_duration
|
||
return max(0, int(total_duration - (time.time() - start_time)))
|
||
|
||
|
||
def start_agv_stop_window() -> bool:
|
||
global agv_stop_time
|
||
if agv_stop_time == 0:
|
||
agv_stop_time = time.time()
|
||
return True
|
||
return False
|
||
|
||
|
||
def clear_agv_stop_window() -> None:
|
||
global agv_stop_time
|
||
agv_stop_time = 0.0
|
||
|
||
|
||
def agv_stop_elapsed() -> float:
|
||
if agv_stop_time == 0:
|
||
return 0.0
|
||
return time.time() - agv_stop_time
|
||
|
||
|
||
def clear_pick_cycle() -> None:
|
||
# 清空这一轮“发现目标 -> 停车 -> 抓取”的中间状态。
|
||
has_tomato.clear()
|
||
picking_done.clear()
|
||
clear_agv_stop_window()
|
||
|
||
|
||
def mark_pick_complete() -> None:
|
||
# 抓取完成后先打完成标记,再由搜索阶段统一恢复为继续前进。
|
||
picking_done.set()
|
||
has_tomato.clear()
|
||
clear_agv_stop_window()
|
||
|
||
|
||
def get_next_placement() -> Tuple[List[float], int]:
|
||
return placement_manager.get_next_position()
|
||
|
||
|
||
def get_placement_index() -> int:
|
||
return placement_manager.get_current_index()
|
||
|
||
|
||
def create_placement_manager():
|
||
"""兼容旧接口,返回当前放置位访问器。"""
|
||
return get_next_placement, get_placement_index
|
||
|
||
|
||
def exampleState(robot_name: str) -> Optional[List[float]]:
|
||
"""读取并打印当前机械臂关节角。"""
|
||
try:
|
||
joints = robot_rpc_client.getRobotInterface(robot_name).getRobotState().getJointPositions()
|
||
joints = list(joints)
|
||
print("机械臂关节角:", joints)
|
||
return joints
|
||
except Exception as exc:
|
||
print(f"读取关节角失败: {exc}")
|
||
return None
|
||
|
||
|
||
def get_robot_end_effector_pose(robot_name: str) -> Optional[List[float]]:
|
||
"""读取并打印当前 TCP 位姿。"""
|
||
try:
|
||
pose = robot_rpc_client.getRobotInterface(robot_name).getRobotState().getTcpPose()
|
||
pose = list(pose)
|
||
print("机械臂末端位姿:", pose)
|
||
return pose
|
||
except Exception as exc:
|
||
print(f"读取末端位姿失败: {exc}")
|
||
return None
|
||
|
||
|
||
def exampleInverseK(
|
||
robot_name: str, pose: Sequence[float], reference_q: Sequence[float]
|
||
) -> Tuple[Optional[List[float]], Optional[int]]:
|
||
"""执行逆解计算。"""
|
||
if not isinstance(pose, (list, tuple)) or len(pose) != 6:
|
||
print(f"逆解目标位姿格式错误,需要 6 维列表: {pose}")
|
||
return None, None
|
||
|
||
try:
|
||
result_joints, status = robot_rpc_client.getRobotInterface(robot_name).getRobotAlgorithm().inverseKinematics(
|
||
list(reference_q), list(pose)
|
||
)
|
||
result_joints = list(result_joints)
|
||
print("逆解输入位姿:", list(pose))
|
||
print("逆解输出关节角:", result_joints)
|
||
if status != 0:
|
||
print(f"逆解失败,错误码: {status}")
|
||
return None, status
|
||
return result_joints, status
|
||
except Exception as exc:
|
||
print(f"逆解计算失败: {exc}")
|
||
return None, None
|
||
|
||
|
||
def exampleStartup() -> Optional[str]:
|
||
"""机械臂上电、启动,并等待进入 Running。"""
|
||
try:
|
||
robot_names = robot_rpc_client.getRobotNames()
|
||
if not robot_names:
|
||
print("机械臂启动失败:未找到机器人名称")
|
||
return None
|
||
|
||
robot_name = robot_names[0]
|
||
robot_manage = robot_rpc_client.getRobotInterface(robot_name).getRobotManage()
|
||
if robot_manage.poweron() != 0:
|
||
print("机械臂上电失败")
|
||
return None
|
||
print("机械臂上电请求已发送")
|
||
|
||
if robot_manage.startup() != 0:
|
||
print("机械臂启动失败")
|
||
return None
|
||
print("机械臂启动请求已发送")
|
||
|
||
while True:
|
||
robot_mode = robot_rpc_client.getRobotInterface(robot_name).getRobotState().getRobotModeType()
|
||
print(f"机械臂当前模式: {robot_mode.name}")
|
||
if robot_mode == pyaubo_sdk.RobotModeType.Running:
|
||
return robot_name
|
||
time.sleep(2)
|
||
except Exception as exc:
|
||
print(f"机械臂启动异常: {exc}")
|
||
return None
|
||
|
||
|
||
def quaternion_to_rotation_matrix(q: Sequence[float]) -> np.ndarray:
|
||
"""四元数转旋转矩阵。"""
|
||
w, x, y, z = q
|
||
return np.array(
|
||
[
|
||
[1 - 2 * y**2 - 2 * z**2, 2 * x * y - 2 * w * z, 2 * x * z + 2 * w * y],
|
||
[2 * x * y + 2 * w * z, 1 - 2 * x**2 - 2 * z**2, 2 * y * z - 2 * w * x],
|
||
[2 * x * z - 2 * w * y, 2 * y * z + 2 * w * x, 1 - 2 * x**2 - 2 * y**2],
|
||
],
|
||
dtype=float,
|
||
)
|
||
|
||
|
||
def euler_to_rotation_matrix(roll: float, pitch: float, yaw: float) -> np.ndarray:
|
||
"""欧拉角转旋转矩阵,旋转顺序为 ZYX。"""
|
||
r_x = np.array(
|
||
[
|
||
[1, 0, 0],
|
||
[0, math.cos(roll), -math.sin(roll)],
|
||
[0, math.sin(roll), math.cos(roll)],
|
||
],
|
||
dtype=float,
|
||
)
|
||
r_y = np.array(
|
||
[
|
||
[math.cos(pitch), 0, math.sin(pitch)],
|
||
[0, 1, 0],
|
||
[-math.sin(pitch), 0, math.cos(pitch)],
|
||
],
|
||
dtype=float,
|
||
)
|
||
r_z = np.array(
|
||
[
|
||
[math.cos(yaw), -math.sin(yaw), 0],
|
||
[math.sin(yaw), math.cos(yaw), 0],
|
||
[0, 0, 1],
|
||
],
|
||
dtype=float,
|
||
)
|
||
return r_z @ r_y @ r_x
|
||
|
||
|
||
def get_robot_end_matrix(robot_name: str) -> np.ndarray:
|
||
"""获取末端相对基座的 4x4 齐次变换矩阵。"""
|
||
pose = get_robot_end_effector_pose(robot_name)
|
||
if pose is None:
|
||
return np.eye(4, dtype=float)
|
||
|
||
try:
|
||
position = pose[:3]
|
||
if len(pose) >= 7:
|
||
rotation = quaternion_to_rotation_matrix(pose[3:7])
|
||
elif len(pose) == 6:
|
||
rotation = euler_to_rotation_matrix(*pose[3:])
|
||
else:
|
||
print(f"末端位姿格式错误: {pose}")
|
||
return np.eye(4, dtype=float)
|
||
|
||
matrix = np.eye(4, dtype=float)
|
||
matrix[:3, :3] = rotation
|
||
matrix[:3, 3] = position
|
||
return matrix
|
||
except Exception as exc:
|
||
print(f"计算末端矩阵失败: {exc}")
|
||
return np.eye(4, dtype=float)
|
||
|
||
|
||
def camera_to_base(point_cam: Sequence[float], robot_name: str) -> Optional[np.ndarray]:
|
||
"""将相机坐标系中的三维点换算到机械臂基坐标系。"""
|
||
try:
|
||
base_t_tool = get_robot_end_matrix(robot_name)
|
||
tool_t_camera = np.eye(4, dtype=float)
|
||
tool_t_camera[:3, :3] = R_tc
|
||
tool_t_camera[:3, 3] = T_tc
|
||
point_h = np.append(np.array(point_cam, dtype=float), 1.0)
|
||
point_base = base_t_tool @ tool_t_camera @ point_h
|
||
return point_base[:3]
|
||
except Exception as exc:
|
||
print(f"坐标转换失败: {exc}")
|
||
return None
|
||
|
||
|
||
def check_joint_position(robot_name: str, target_joints: Sequence[float]) -> bool:
|
||
"""校验当前关节位置是否到达目标位置。"""
|
||
current_joints = exampleState(robot_name)
|
||
if current_joints is None or len(current_joints) != len(target_joints):
|
||
print("无法读取关节位置,或关节数量不匹配")
|
||
return False
|
||
|
||
errors = [abs(current - target) for current, target in zip(current_joints, target_joints)]
|
||
max_error = max(errors) if errors else float("inf")
|
||
print(f"关节最大误差: {max_error:.4f} rad,阈值: {JOINT_TOLERANCE:.4f} rad")
|
||
return max_error < JOINT_TOLERANCE
|
||
|
||
|
||
def check_tcp_pose(robot_name: str, target_pose: Sequence[float]) -> bool:
|
||
"""校验当前 TCP 位姿是否到达目标位姿。"""
|
||
current_pose = get_robot_end_effector_pose(robot_name)
|
||
if current_pose is None or len(current_pose) < 6 or len(target_pose) < 6:
|
||
print("无法读取 TCP 位姿,或目标位姿格式错误")
|
||
return False
|
||
|
||
pos_error = np.linalg.norm(np.array(current_pose[:3], dtype=float) - np.array(target_pose[:3], dtype=float))
|
||
ori_error = np.linalg.norm(np.array(current_pose[3:6], dtype=float) - np.array(target_pose[3:6], dtype=float))
|
||
print(
|
||
f"TCP 位姿误差: 位置={pos_error:.4f} m,姿态={ori_error:.4f} rad | "
|
||
f"阈值: 位置={POSITION_TOLERANCE:.4f} m,姿态={ORIENTATION_TOLERANCE:.4f} rad"
|
||
)
|
||
return pos_error < POSITION_TOLERANCE and ori_error < ORIENTATION_TOLERANCE
|
||
|
||
|
||
def waitArrival(impl: Any, target_joints: Optional[Sequence[float]] = None, target_pose: Optional[Sequence[float]] = None) -> int:
|
||
"""
|
||
等待运动完成并在结束后进行位置校验。
|
||
|
||
返回:
|
||
0 = 成功
|
||
-1 = 获取执行 ID 失败
|
||
-2 = 运动等待超时
|
||
"""
|
||
retry_count = 0
|
||
motion_control = impl.getMotionControl()
|
||
while motion_control.getExecId() == -1:
|
||
retry_count += 1
|
||
if retry_count > EXEC_ID_MAX_RETRIES:
|
||
robot_names = robot_rpc_client.getRobotNames()
|
||
robot_name = robot_names[0] if robot_names else None
|
||
if robot_name:
|
||
joints_ok = target_joints is not None and check_joint_position(robot_name, target_joints)
|
||
pose_ok = target_pose is not None and check_tcp_pose(robot_name, target_pose)
|
||
if joints_ok or pose_ok:
|
||
print("未获取到执行 ID,但检测到机械臂已到达目标位,按成功处理")
|
||
return 0
|
||
print("运动失败:获取执行 ID 超时")
|
||
return -1
|
||
time.sleep(0.5)
|
||
|
||
exec_id = motion_control.getExecId()
|
||
timeout = 0.0
|
||
max_timeout = 30.0
|
||
|
||
while True:
|
||
current_exec_id = motion_control.getExecId()
|
||
if exec_id != current_exec_id:
|
||
break
|
||
|
||
timeout += 0.5
|
||
if timeout > max_timeout:
|
||
print(f"运动等待超时:{max_timeout} 秒")
|
||
return -2
|
||
time.sleep(0.5)
|
||
|
||
robot_names = robot_rpc_client.getRobotNames()
|
||
robot_name = robot_names[0] if robot_names else None
|
||
if robot_name:
|
||
if target_joints is not None and not check_joint_position(robot_name, target_joints):
|
||
print("警告:关节位置未达到目标值")
|
||
if target_pose is not None and not check_tcp_pose(robot_name, target_pose):
|
||
print("警告:TCP 位姿未达到目标值")
|
||
|
||
print("机械臂运动完成")
|
||
return 0
|
||
|
||
|
||
def control_tool_io(robot_name: str, io_index: int, state: bool) -> None:
|
||
"""设置工具端数字 IO。"""
|
||
io_control = robot_rpc_client.getRobotInterface(robot_name).getIoControl()
|
||
io_control.setToolDigitalOutput(io_index, state)
|
||
output = io_control.getToolDigitalOutput(io_index)
|
||
print(f"TOOL_IO[{io_index}] -> {'ON' if output else 'OFF'}")
|
||
|
||
|
||
@dataclass
|
||
class RobotArmController:
|
||
"""机械臂子系统。"""
|
||
|
||
robot_name: str
|
||
rpc_client: Any
|
||
placement_manager: PlacementManager
|
||
|
||
@property
|
||
def interface(self) -> Any:
|
||
return self.rpc_client.getRobotInterface(self.robot_name)
|
||
|
||
@property
|
||
def runtime_machine(self) -> Any:
|
||
return self.rpc_client.getRuntimeMachine()
|
||
|
||
def get_joint_positions(self) -> Optional[List[float]]:
|
||
return exampleState(self.robot_name)
|
||
|
||
def get_tcp_pose(self) -> Optional[List[float]]:
|
||
return get_robot_end_effector_pose(self.robot_name)
|
||
|
||
def is_at_home(self, joints: Optional[Sequence[float]] = None) -> bool:
|
||
home_joints = _cfg("HOME_JOINTS")
|
||
current_joints = list(joints) if joints is not None else self.get_joint_positions()
|
||
if current_joints is None or len(current_joints) != len(home_joints):
|
||
return False
|
||
max_error = max(abs(current - target) for current, target in zip(current_joints, home_joints))
|
||
return max_error < JOINT_TOLERANCE
|
||
|
||
def keep_scissors_open(self) -> None:
|
||
try:
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False)
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), True)
|
||
print("Scissors disabled: keeping end effector open")
|
||
except Exception as exc:
|
||
print(f"Failed to keep scissors open: {exc}")
|
||
|
||
def reset_scissors_outputs(self) -> None:
|
||
try:
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False)
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), False)
|
||
print("Scissors enabled: tool IO reset to neutral")
|
||
except Exception as exc:
|
||
print(f"Failed to reset scissors IO: {exc}")
|
||
|
||
def close_scissors_for_pick(self) -> None:
|
||
if not _cfg("SCISSORS_ENABLED"):
|
||
self.keep_scissors_open()
|
||
return
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), False)
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), True)
|
||
time.sleep(_cfg("GRIPPER_ACTION_DELAY"))
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False)
|
||
|
||
def open_scissors_after_place(self) -> None:
|
||
if not _cfg("SCISSORS_ENABLED"):
|
||
self.keep_scissors_open()
|
||
return
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_CLOSE_IO"), False)
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), True)
|
||
time.sleep(_cfg("GRIPPER_ACTION_DELAY"))
|
||
control_tool_io(self.robot_name, _cfg("GRIPPER_OPEN_IO"), False)
|
||
|
||
def _restart_runtime_machine(self) -> None:
|
||
"""重置运行态,尽量把控制器拉回可接收运动指令的状态。"""
|
||
try:
|
||
self.runtime_machine.stop()
|
||
except Exception:
|
||
pass
|
||
time.sleep(MOTION_START_DELAY)
|
||
self.runtime_machine.start()
|
||
time.sleep(MOTION_START_DELAY)
|
||
|
||
def move_joint(
|
||
self,
|
||
target_joints: Sequence[float],
|
||
*,
|
||
target_pose: Optional[Sequence[float]] = None,
|
||
description: str = "机械臂运动",
|
||
) -> bool:
|
||
"""按关节目标执行运动。"""
|
||
joints = list(target_joints)
|
||
last_error: Optional[Exception] = None
|
||
|
||
for attempt in range(1, MOTION_COMMAND_RETRIES + 1):
|
||
try:
|
||
if attempt > 1:
|
||
print(f"{description}:重新下发运动指令(第 {attempt}/{MOTION_COMMAND_RETRIES} 次)")
|
||
|
||
self._restart_runtime_machine()
|
||
self.interface.getMotionControl().moveJoint(joints, _cfg("ARM_SPEED"), _cfg("ARM_ACCEL"), 0, 0)
|
||
result = waitArrival(self.interface, target_joints=joints, target_pose=target_pose)
|
||
if result == 0:
|
||
print(f"{description}完成")
|
||
return True
|
||
|
||
print(f"{description}失败,错误码: {result}")
|
||
except Exception as exc:
|
||
last_error = exc
|
||
print(f"{description}失败: {exc}")
|
||
finally:
|
||
try:
|
||
self.runtime_machine.stop()
|
||
except Exception:
|
||
pass
|
||
|
||
time.sleep(MOTION_START_DELAY)
|
||
|
||
if last_error is not None:
|
||
print(f"{description}最终失败: {last_error}")
|
||
return False
|
||
|
||
def move_pose(
|
||
self,
|
||
target_pose: Sequence[float],
|
||
reference_joints: Sequence[float],
|
||
*,
|
||
description: str,
|
||
) -> Optional[List[float]]:
|
||
"""按位姿目标执行逆解和关节运动。"""
|
||
target_joints, _ = exampleInverseK(self.robot_name, target_pose, reference_joints)
|
||
if target_joints is None:
|
||
print(f"{description}失败:逆解无结果")
|
||
self.return_home(retries=1)
|
||
return None
|
||
|
||
if not self.move_joint(target_joints, target_pose=target_pose, description=description):
|
||
return None
|
||
return target_joints
|
||
|
||
def return_home(self, retries: int = HOME_RETRY_COUNT) -> bool:
|
||
"""返回 Home 关节位。"""
|
||
current_joints = self.get_joint_positions()
|
||
if self.is_at_home(current_joints):
|
||
print("Robot arm is already at Home, skipping Home motion")
|
||
return True
|
||
|
||
attempts = max(1, retries)
|
||
for attempt in range(1, attempts + 1):
|
||
if attempts == 1:
|
||
print("机械臂:返回 Home")
|
||
else:
|
||
print(f"机械臂:返回 Home(第 {attempt}/{attempts} 次)")
|
||
if self.move_joint(_cfg("HOME_JOINTS"), description="返回 Home"):
|
||
return True
|
||
if attempt < attempts:
|
||
time.sleep(HOME_RETRY_DELAY)
|
||
print("机械臂:多次尝试后仍未能返回 Home")
|
||
return False
|
||
|
||
def execute_pick_and_place(self, pose: Sequence[float], angle_rad: float) -> bool:
|
||
"""
|
||
执行抓取和放置动作。
|
||
|
||
保持原有外部接口和基本动作顺序:
|
||
1. 去抓取预备点
|
||
2. 去抓取点
|
||
3. 夹爪动作
|
||
4. 回抓取预备点
|
||
5. 回 Home
|
||
6. 去放置点
|
||
7. 释放
|
||
8. 再回 Home
|
||
"""
|
||
current_joints = self.get_joint_positions()
|
||
if current_joints is None:
|
||
print("获取关节位置失败,终止抓取并尝试返回 Home")
|
||
self.return_home()
|
||
return False
|
||
|
||
target_pose = list(pose)
|
||
print(f"机械臂:开始处理目标位姿 {target_pose}")
|
||
print(f"机械臂:关键点连线角度 {math.degrees(angle_rad):.2f}°")
|
||
|
||
approach_pose = target_pose.copy()
|
||
# 先走到抓取预备点,避免直接扎向果实。
|
||
approach_pose[1] += _cfg("APPROACH_Y_OFFSET")
|
||
|
||
pickup_pose = target_pose.copy()
|
||
# 再把末端抬到可抓取高度,和原代码保持一致使用 Z 偏移。
|
||
pickup_pose[0] += _cfg("PICKUP_X_OFFSET")
|
||
pickup_pose[1] += _cfg("PICKUP_Y_OFFSET")
|
||
pickup_pose[2] += _cfg("LIFT_Z_OFFSET")
|
||
|
||
angle_index = int(_cfg("PICK_ANGLE_ORIENTATION_INDEX"))
|
||
if 3 <= angle_index < len(pickup_pose):
|
||
signed_angle = float(_cfg("PICK_ANGLE_SIGN")) * angle_rad
|
||
pickup_pose[angle_index] += signed_angle
|
||
print(
|
||
"机械臂:末端姿态旋转 "
|
||
f"{math.degrees(signed_angle):.2f}° | 姿态索引: {angle_index}"
|
||
)
|
||
else:
|
||
print(f"警告:PICK_ANGLE_ORIENTATION_INDEX={_cfg('PICK_ANGLE_ORIENTATION_INDEX')} 无效,跳过姿态旋转")
|
||
|
||
approach_joints = self.move_pose(approach_pose, current_joints, description="移动到抓取预备点")
|
||
if approach_joints is None:
|
||
self.return_home()
|
||
return False
|
||
|
||
pickup_joints = self.move_pose(pickup_pose, approach_joints, description="旋转末端并移动到抓取点")
|
||
if pickup_joints is None:
|
||
self.return_home()
|
||
return False
|
||
|
||
pick_point_delay = float(_cfg("PICK_POINT_ARRIVAL_DELAY"))
|
||
if pick_point_delay > 0:
|
||
print(f"机械臂:已到达抓取点,等待 {pick_point_delay:.1f} 秒后执行夹爪动作")
|
||
time.sleep(pick_point_delay)
|
||
|
||
self.close_scissors_for_pick()
|
||
|
||
retreat_joints = self.move_pose(approach_pose, pickup_joints, description="采摘后返回预抓取点")
|
||
if retreat_joints is None:
|
||
self.return_home()
|
||
return False
|
||
|
||
if not self.return_home():
|
||
return False
|
||
|
||
place_pose, place_index = self.placement_manager.get_next_position()
|
||
print(f"机械臂:移动到放置位 {place_index} -> {place_pose}")
|
||
|
||
reference_joints = self.get_joint_positions() or _cfg("HOME_JOINTS")
|
||
place_joints = self.move_pose(place_pose, reference_joints, description=f"移动到放置位 {place_index}")
|
||
if place_joints is None:
|
||
self.return_home()
|
||
return False
|
||
|
||
self.open_scissors_after_place()
|
||
|
||
if not self.return_home():
|
||
return False
|
||
|
||
print("机械臂:本次抓取放置流程完成")
|
||
return True
|
||
|
||
|
||
def init_camera() -> Tuple[Any, Any, Any]:
|
||
"""初始化 RealSense 相机。"""
|
||
try:
|
||
pipeline = rs.pipeline()
|
||
config = rs.config()
|
||
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
|
||
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
|
||
profile = pipeline.start(config)
|
||
|
||
depth_sensor = profile.get_device().first_depth_sensor()
|
||
depth_scale = depth_sensor.get_depth_scale()
|
||
depth_profile = rs.video_stream_profile(profile.get_stream(rs.stream.depth))
|
||
depth_intrinsics = depth_profile.get_intrinsics()
|
||
align = rs.align(rs.stream.color)
|
||
|
||
print(f"相机初始化完成 | 深度比例尺: {depth_scale}")
|
||
return pipeline, align, depth_intrinsics
|
||
except Exception as exc:
|
||
print(f"相机初始化失败: {exc}")
|
||
raise
|
||
|
||
|
||
def init_tomato_detector(model_path: Optional[str] = None) -> Any:
|
||
"""初始化 YOLO 模型。"""
|
||
use_path = model_path or _cfg("YOLO_MODEL_PATH") or "best.pt"
|
||
print(f"加载 YOLO 模型 | 路径: {use_path}")
|
||
|
||
if not os.path.exists(use_path):
|
||
raise FileNotFoundError(f"YOLO 模型文件不存在: {use_path}")
|
||
if not use_path.endswith(".pt"):
|
||
raise TypeError(f"模型文件格式错误: {use_path},仅支持 .pt")
|
||
|
||
try:
|
||
model = YOLO(use_path)
|
||
print("YOLO 模型加载成功")
|
||
return model
|
||
except Exception as exc:
|
||
print(f"YOLO 模型加载失败: {exc}")
|
||
raise
|
||
|
||
|
||
def filter_ripe_tomatoes(results: Any, confidence_threshold: Optional[float] = None) -> List[Any]:
|
||
"""筛选成熟番茄目标。"""
|
||
threshold = _cfg("PICK_CONFIDENCE_THRESHOLD") if confidence_threshold is None else confidence_threshold
|
||
ripe_tomatoes: List[Any] = []
|
||
for box in results.boxes:
|
||
confidence = float(box.conf)
|
||
if int(box.cls) == 0 and confidence >= threshold:
|
||
ripe_tomatoes.append(box)
|
||
return ripe_tomatoes
|
||
|
||
|
||
def draw_tomato_tilt_line(color_image: np.ndarray, x1: int, y1: int, x2: int, y2: int, pixel_x: int) -> float:
|
||
"""在目标区域内估计番茄串倾角。"""
|
||
_ = pixel_x
|
||
if x2 <= x1 or y2 <= y1:
|
||
return 0.0
|
||
|
||
roi = color_image[y1:y2, x1:x2]
|
||
if roi.size == 0:
|
||
return 0.0
|
||
|
||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||
edges = cv2.Canny(gray, 50, 150)
|
||
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=50, minLineLength=50, maxLineGap=10)
|
||
|
||
angle = 0.0
|
||
if lines is None:
|
||
return angle
|
||
|
||
longest_line = max(
|
||
lines,
|
||
key=lambda line: np.linalg.norm(np.array(line[0][:2], dtype=float) - np.array(line[0][2:], dtype=float)),
|
||
)
|
||
x1_line, y1_line, x2_line, y2_line = [int(v) for v in longest_line[0]]
|
||
x1_line += x1
|
||
x2_line += x1
|
||
y1_line += y1
|
||
y2_line += y1
|
||
|
||
vector_tilt = np.array([x2_line - x1_line, y2_line - y1_line], dtype=float)
|
||
vector_vertical = np.array([0.0, y2 - y1], dtype=float)
|
||
norm_tilt = np.linalg.norm(vector_tilt)
|
||
norm_vertical = np.linalg.norm(vector_vertical)
|
||
|
||
if norm_tilt > 0 and norm_vertical > 0:
|
||
cos_angle = np.dot(vector_tilt, vector_vertical) / (norm_tilt * norm_vertical)
|
||
cos_angle = max(-1.0, min(1.0, float(cos_angle)))
|
||
angle = math.degrees(math.acos(cos_angle))
|
||
angle = angle if x2_line > x1_line else -angle
|
||
angle = max(-30.0, min(30.0, angle))
|
||
|
||
cv2.putText(
|
||
color_image,
|
||
f"Angle: {angle:.2f}deg",
|
||
(x1, y2 + 20),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.6,
|
||
(0, 255, 0),
|
||
2,
|
||
)
|
||
print(f"检测到番茄串倾角: {angle:.2f}°")
|
||
return angle
|
||
|
||
|
||
def fit_red_line(crop_image: np.ndarray, x1: int, y1: int) -> Optional[Tuple[Tuple[int, int], Tuple[int, int]]]:
|
||
"""在目标区域中拟合红色主轴。"""
|
||
if crop_image.size == 0:
|
||
return None
|
||
|
||
lower_red1 = np.array([0, 120, 70])
|
||
upper_red1 = np.array([10, 255, 255])
|
||
lower_red2 = np.array([160, 120, 70])
|
||
upper_red2 = np.array([180, 255, 255])
|
||
|
||
hsv = cv2.cvtColor(crop_image, cv2.COLOR_BGR2HSV)
|
||
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
|
||
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
|
||
mask = cv2.bitwise_or(mask1, mask2)
|
||
|
||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
if not contours:
|
||
return None
|
||
|
||
all_points = np.vstack(contours).squeeze()
|
||
if len(np.atleast_1d(all_points)) < 2:
|
||
return None
|
||
|
||
vx, vy, x0, y0 = [float(value) for value in cv2.fitLine(all_points, cv2.DIST_L2, 0, 0.01, 0.01)]
|
||
if abs(vx) < 1e-6:
|
||
p1 = (x1 + int(x0), y1)
|
||
p2 = (x1 + int(x0), y1 + crop_image.shape[0])
|
||
return p1, p2
|
||
|
||
left_y = int((-x0 * vy / vx) + y0)
|
||
right_y = int((((crop_image.shape[1] - x0) * vy) / vx) + y0)
|
||
p1 = (x1, y1 + left_y)
|
||
p2 = (x1 + crop_image.shape[1], y1 + right_y)
|
||
return p1, p2
|
||
|
||
|
||
def calculate_keypoint_line_angle_rad(
|
||
cutpoint: Optional[Tuple[int, int]],
|
||
end_point: Optional[Tuple[int, int]],
|
||
) -> float:
|
||
"""计算上/下关键点连线相对图像竖直方向的角度。"""
|
||
if cutpoint is None or end_point is None:
|
||
return 0.0
|
||
|
||
dx = float(end_point[0] - cutpoint[0])
|
||
dy = float(end_point[1] - cutpoint[1])
|
||
if abs(dx) < 1e-6 and abs(dy) < 1e-6:
|
||
return 0.0
|
||
|
||
angle_rad = math.atan2(dx, dy)
|
||
max_angle_rad = math.radians(float(_cfg("MAX_PICK_ANGLE_DEG")))
|
||
return max(-max_angle_rad, min(max_angle_rad, angle_rad))
|
||
|
||
|
||
@dataclass
|
||
class VisionController:
|
||
"""视觉子系统。"""
|
||
|
||
pipeline: Any
|
||
align: Any
|
||
depth_intrinsics: Any
|
||
model: Any
|
||
robot_name: str
|
||
arm_controller: RobotArmController
|
||
|
||
def capture_frame(self) -> Tuple[Optional[np.ndarray], Optional[Any]]:
|
||
"""采集并对齐相机画面。"""
|
||
try:
|
||
frames = self.pipeline.wait_for_frames(timeout_ms=5000)
|
||
if not frames:
|
||
print("未获取到相机帧,重试...")
|
||
return None, None
|
||
|
||
aligned_frames = self.align.process(frames)
|
||
depth_frame = aligned_frames.get_depth_frame()
|
||
color_frame = aligned_frames.get_color_frame()
|
||
if not depth_frame or not color_frame:
|
||
print("深度帧或彩色帧为空,重试...")
|
||
return None, None
|
||
|
||
color_image = np.asanyarray(color_frame.get_data())
|
||
return color_image, depth_frame
|
||
except Exception as exc:
|
||
print(f"获取相机帧失败: {exc}")
|
||
return None, None
|
||
|
||
@staticmethod
|
||
def get_pick_zone_bounds(frame_width: int) -> Tuple[int, int]:
|
||
left = int(frame_width * PICK_ZONE_LEFT_RATIO)
|
||
right = int(frame_width * PICK_ZONE_RIGHT_RATIO)
|
||
return max(0, left), max(left + 1, min(frame_width - 1, right))
|
||
|
||
def annotate_pick_zone(self, frame: np.ndarray) -> None:
|
||
frame_h, frame_w = frame.shape[:2]
|
||
left, right = self.get_pick_zone_bounds(frame_w)
|
||
overlay = frame.copy()
|
||
cv2.rectangle(overlay, (left, 0), (right, frame_h - 1), (0, 180, 0), -1)
|
||
cv2.addWeighted(overlay, 0.12, frame, 0.88, 0, frame)
|
||
cv2.rectangle(frame, (left, 0), (right, frame_h - 1), (0, 220, 0), 2)
|
||
cv2.putText(
|
||
frame,
|
||
"Pick Zone",
|
||
(left + 8, max(24, int(frame_h * 0.08))),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.65,
|
||
(0, 220, 0),
|
||
2,
|
||
)
|
||
|
||
@staticmethod
|
||
def box_xyxy(box: Any) -> Tuple[int, int, int, int]:
|
||
return tuple(int(value) for value in box.xyxy[0].cpu().numpy())
|
||
|
||
@staticmethod
|
||
def box_confidence(box: Any) -> float:
|
||
return float(box.conf[0]) if hasattr(box.conf, "__len__") else float(box.conf)
|
||
|
||
def box_center_in_pick_zone(self, box: Any, frame_width: int) -> bool:
|
||
x1, _y1, x2, _y2 = self.box_xyxy(box)
|
||
center_x = int((x1 + x2) / 2)
|
||
left, right = self.get_pick_zone_bounds(frame_width)
|
||
return left <= center_x <= right
|
||
|
||
@staticmethod
|
||
def as_numpy_array(value: Any) -> Optional[np.ndarray]:
|
||
if value is None:
|
||
return None
|
||
if hasattr(value, "cpu"):
|
||
value = value.cpu()
|
||
if hasattr(value, "numpy"):
|
||
value = value.numpy()
|
||
return np.asarray(value)
|
||
|
||
@staticmethod
|
||
def clamp_pixel(px: float, py: float, frame_shape: Tuple[int, int, int]) -> Tuple[int, int]:
|
||
frame_h, frame_w = frame_shape[:2]
|
||
pixel_x = max(0, min(frame_w - 1, int(round(px))))
|
||
pixel_y = max(0, min(frame_h - 1, int(round(py))))
|
||
return pixel_x, pixel_y
|
||
|
||
def extract_keypoint_by_index(
|
||
self,
|
||
current_points: np.ndarray,
|
||
current_conf: Optional[np.ndarray],
|
||
point_index: int,
|
||
frame_shape: Tuple[int, int, int],
|
||
) -> Optional[Tuple[int, int]]:
|
||
if point_index >= len(current_points):
|
||
return None
|
||
|
||
point = current_points[point_index]
|
||
px, py = float(point[0]), float(point[1])
|
||
if px <= 0 and py <= 0:
|
||
return None
|
||
if current_conf is not None and point_index < len(current_conf):
|
||
if float(current_conf[point_index]) < PICK_KEYPOINT_CONF_THRESHOLD:
|
||
return None
|
||
return self.clamp_pixel(px, py, frame_shape)
|
||
|
||
def validate_candidate_keypoints(
|
||
self,
|
||
box: Any,
|
||
cutpoint: Tuple[int, int],
|
||
end_point: Optional[Tuple[int, int]],
|
||
) -> bool:
|
||
x1, y1, x2, y2 = self.box_xyxy(box)
|
||
box_w = max(1, x2 - x1)
|
||
box_h = max(1, y2 - y1)
|
||
margin = int(max(box_w, box_h) * PICK_KEYPOINT_BOX_MARGIN_RATIO)
|
||
|
||
if not (x1 - margin <= cutpoint[0] <= x2 + margin and y1 - margin <= cutpoint[1] <= y2 + margin):
|
||
print(f"Pick candidate skipped: semantic cutpoint outside bbox @ {cutpoint}")
|
||
return False
|
||
|
||
cut_rel_y = (cutpoint[1] - y1) / float(box_h)
|
||
if cut_rel_y > PICK_CUTPOINT_MAX_REL_Y:
|
||
print(
|
||
"Pick candidate skipped: semantic cutpoint too low "
|
||
f"(rel_y={cut_rel_y:.2f}) @ {cutpoint}"
|
||
)
|
||
return False
|
||
|
||
if end_point is None:
|
||
return True
|
||
|
||
if not (x1 - margin <= end_point[0] <= x2 + margin and y1 - margin <= end_point[1] <= y2 + margin):
|
||
print(f"Pick candidate skipped: endpoint outside bbox @ {end_point}")
|
||
return False
|
||
|
||
distance = float(np.linalg.norm(np.array(end_point, dtype=float) - np.array(cutpoint, dtype=float)))
|
||
if distance < PICK_KEYPOINT_MIN_DISTANCE_PX:
|
||
print(f"Pick candidate skipped: keypoints too close ({distance:.1f}px)")
|
||
return False
|
||
|
||
if end_point[1] + PICK_ENDPOINT_Y_TOLERANCE_PX < cutpoint[1]:
|
||
print(f"Pick candidate skipped: endpoint appears above cutpoint @ cut={cutpoint}, end={end_point}")
|
||
return False
|
||
|
||
return True
|
||
|
||
def extract_candidate_keypoints(
|
||
self,
|
||
result: Any,
|
||
box_index: int,
|
||
frame_shape: Tuple[int, int, int],
|
||
) -> Tuple[Optional[Tuple[int, int]], Optional[Tuple[int, int]]]:
|
||
keypoints = getattr(result, "keypoints", None)
|
||
keypoint_xy = self.as_numpy_array(getattr(keypoints, "xy", None))
|
||
if keypoint_xy is None or box_index >= len(keypoint_xy):
|
||
return None, None
|
||
|
||
current_points = keypoint_xy[box_index]
|
||
keypoint_conf = self.as_numpy_array(getattr(keypoints, "conf", None))
|
||
current_conf = keypoint_conf[box_index] if keypoint_conf is not None and box_index < len(keypoint_conf) else None
|
||
|
||
cutpoint = self.extract_keypoint_by_index(
|
||
current_points,
|
||
current_conf,
|
||
PICK_CUTPOINT_KEYPOINT_INDEX,
|
||
frame_shape,
|
||
)
|
||
end_point = self.extract_keypoint_by_index(
|
||
current_points,
|
||
current_conf,
|
||
PICK_ENDPOINT_KEYPOINT_INDEX,
|
||
frame_shape,
|
||
)
|
||
return cutpoint, end_point
|
||
|
||
def build_pick_candidates(self, result: Any, frame_shape: Tuple[int, int, int]) -> List[TomatoCandidate]:
|
||
frame_w = frame_shape[1]
|
||
candidates: List[TomatoCandidate] = []
|
||
boxes = getattr(result, "boxes", None)
|
||
if boxes is None:
|
||
return candidates
|
||
|
||
for box_index, box in enumerate(boxes):
|
||
confidence = self.box_confidence(box)
|
||
if int(box.cls) != 0 or confidence < _cfg("PICK_CONFIDENCE_THRESHOLD"):
|
||
continue
|
||
if not self.box_center_in_pick_zone(box, frame_w):
|
||
continue
|
||
|
||
cutpoint, end_point = self.extract_candidate_keypoints(result, box_index, frame_shape)
|
||
if cutpoint is None:
|
||
print("Pick candidate skipped: missing semantic cutpoint keypoint")
|
||
continue
|
||
if not self.validate_candidate_keypoints(box, cutpoint, end_point):
|
||
continue
|
||
candidates.append(TomatoCandidate(box=box, box_index=box_index, cutpoint=cutpoint, end_point=end_point))
|
||
|
||
return candidates
|
||
|
||
def run_detection(self, image: np.ndarray) -> Tuple[np.ndarray, List[TomatoCandidate]]:
|
||
"""执行 YOLO 检测并返回带标注画面。"""
|
||
results = self.model(image, classes=0, conf=_cfg("YOLO_DETECT_CONF"), verbose=False)
|
||
result = results[0]
|
||
annotated_image = result.plot()
|
||
self.annotate_pick_zone(annotated_image)
|
||
ripe_tomatoes = self.build_pick_candidates(result, image.shape)
|
||
return annotated_image, ripe_tomatoes
|
||
|
||
def serialize_detection_boxes(self, boxes: Sequence[Any]) -> List[Dict[str, Any]]:
|
||
"""把 YOLO 检测框整理成可供 UI 叠加显示的结构化信息。"""
|
||
serialized: List[Dict[str, Any]] = []
|
||
for index, item in enumerate(boxes, start=1):
|
||
try:
|
||
box = item.box if isinstance(item, TomatoCandidate) else item
|
||
x1, y1, x2, y2 = self.box_xyxy(box)
|
||
center_x = int((x1 + x2) / 2)
|
||
center_y = int((y1 + y2) / 2)
|
||
cutpoint = list(item.cutpoint) if isinstance(item, TomatoCandidate) and item.cutpoint else []
|
||
end_point = list(item.end_point) if isinstance(item, TomatoCandidate) and item.end_point else []
|
||
serialized.append(
|
||
{
|
||
"label": f"Tomato-{index}",
|
||
"confidence": self.box_confidence(box),
|
||
"bbox": [x1, y1, x2, y2],
|
||
"center": [center_x, center_y],
|
||
"cutpoint": cutpoint,
|
||
"end_point": end_point,
|
||
}
|
||
)
|
||
except Exception as exc:
|
||
print(f"检测框序列化失败: {exc}")
|
||
return serialized
|
||
|
||
def annotate_target(self, frame: np.ndarray, target: DetectedTomato) -> None:
|
||
"""在画面上绘制目标核心信息。"""
|
||
cutpoint = (target.pixel_x, target.pixel_y)
|
||
cv2.drawMarker(frame, cutpoint, (0, 255, 255), cv2.MARKER_CROSS, 18, 2)
|
||
cv2.circle(frame, cutpoint, 6, (0, 255, 255), -1)
|
||
cv2.circle(frame, cutpoint, 11, (0, 80, 255), 2)
|
||
if len(target.end_point) == 2:
|
||
end_point = tuple(int(v) for v in target.end_point)
|
||
cv2.circle(frame, end_point, 5, (255, 215, 0), -1)
|
||
cv2.line(frame, cutpoint, end_point, (255, 255, 0), 2)
|
||
angle_text = f"Angle: {math.degrees(target.angle_rad):.2f}deg"
|
||
(angle_w, _angle_h), angle_baseline = cv2.getTextSize(
|
||
angle_text,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.5,
|
||
2,
|
||
)
|
||
angle_x = max(6, min(end_point[0] + 8, frame.shape[1] - angle_w - 6))
|
||
angle_y = max(18, min(end_point[1] + 18, frame.shape[0] - angle_baseline - 6))
|
||
cv2.putText(
|
||
frame,
|
||
angle_text,
|
||
(angle_x, angle_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.5,
|
||
(0, 255, 0),
|
||
2,
|
||
)
|
||
|
||
@staticmethod
|
||
def collect_depth_samples(
|
||
depth_frame: Any,
|
||
pixel_x: int,
|
||
pixel_y: int,
|
||
frame_shape: Tuple[int, int, int],
|
||
radius: int,
|
||
) -> List[float]:
|
||
frame_h, frame_w = frame_shape[:2]
|
||
x_start = max(0, pixel_x - radius)
|
||
x_end = min(frame_w - 1, pixel_x + radius)
|
||
y_start = max(0, pixel_y - radius)
|
||
y_end = min(frame_h - 1, pixel_y + radius)
|
||
|
||
samples: List[float] = []
|
||
for sample_y in range(y_start, y_end + 1):
|
||
for sample_x in range(x_start, x_end + 1):
|
||
depth = float(depth_frame.get_distance(sample_x, sample_y))
|
||
if PICK_DEPTH_MIN_M <= depth <= PICK_DEPTH_MAX_M:
|
||
samples.append(depth)
|
||
return samples
|
||
|
||
def get_robust_depth(
|
||
self,
|
||
depth_frame: Any,
|
||
pixel_x: int,
|
||
pixel_y: int,
|
||
frame_shape: Tuple[int, int, int],
|
||
) -> Optional[float]:
|
||
center_depth = float(depth_frame.get_distance(pixel_x, pixel_y))
|
||
for radius in (PICK_DEPTH_WINDOW_RADIUS, PICK_DEPTH_FALLBACK_WINDOW_RADIUS):
|
||
samples = self.collect_depth_samples(depth_frame, pixel_x, pixel_y, frame_shape, radius)
|
||
if len(samples) < PICK_DEPTH_MIN_VALID_PIXELS:
|
||
continue
|
||
|
||
sample_array = np.array(samples, dtype=float)
|
||
median_depth = float(np.median(sample_array))
|
||
stable_samples = sample_array[np.abs(sample_array - median_depth) <= PICK_DEPTH_MEDIAN_TOLERANCE_M]
|
||
if len(stable_samples) >= PICK_DEPTH_MIN_VALID_PIXELS:
|
||
depth = float(np.median(stable_samples))
|
||
else:
|
||
depth = median_depth
|
||
|
||
if not (PICK_DEPTH_MIN_M <= center_depth <= PICK_DEPTH_MAX_M):
|
||
print(
|
||
"采摘点中心深度无效,使用邻域稳健深度 "
|
||
f"{depth:.3f}m @ ({pixel_x}, {pixel_y}), radius={radius}"
|
||
)
|
||
elif abs(center_depth - depth) > PICK_DEPTH_MEDIAN_TOLERANCE_M:
|
||
print(
|
||
"采摘点中心深度跳变,使用邻域稳健深度 "
|
||
f"{depth:.3f}m 替代 {center_depth:.3f}m @ ({pixel_x}, {pixel_y})"
|
||
)
|
||
return depth
|
||
|
||
print(
|
||
"采摘点邻域深度不足 "
|
||
f"center={center_depth:.3f}m @ ({pixel_x}, {pixel_y})"
|
||
)
|
||
return None
|
||
|
||
def build_detected_tomato(
|
||
self,
|
||
candidate: TomatoCandidate,
|
||
depth_frame: Any,
|
||
frame: np.ndarray,
|
||
tcp_pose: Sequence[float],
|
||
) -> Optional[DetectedTomato]:
|
||
"""把检测框转换成可供机械臂执行的抓取目标。"""
|
||
box = candidate.box
|
||
x1, y1, x2, y2 = self.box_xyxy(box)
|
||
if candidate.cutpoint is None:
|
||
print("Pick target skipped: missing upper YOLO keypoint")
|
||
return None
|
||
pixel_x, pixel_y = candidate.cutpoint
|
||
|
||
depth = self.get_robust_depth(depth_frame, pixel_x, pixel_y, frame.shape)
|
||
if depth is None:
|
||
print(f"无效深度值 @ ({pixel_x}, {pixel_y})")
|
||
return None
|
||
|
||
point_cam = rs.rs2_deproject_pixel_to_point(self.depth_intrinsics, [pixel_x, pixel_y], depth)
|
||
point_base = camera_to_base(point_cam, self.robot_name)
|
||
if point_base is None:
|
||
print(f"坐标转换失败 @ ({pixel_x}, {pixel_y}),本次目标跳过")
|
||
self.arm_controller.return_home()
|
||
return None
|
||
|
||
angle_rad = calculate_keypoint_line_angle_rad(candidate.cutpoint, candidate.end_point)
|
||
print(f"关键点连线角度: {math.degrees(angle_rad):.2f}°")
|
||
robot_pose = list(point_base) + list(tcp_pose[3:6])
|
||
|
||
target = DetectedTomato(
|
||
x1=x1,
|
||
y1=y1,
|
||
x2=x2,
|
||
y2=y2,
|
||
pixel_x=pixel_x,
|
||
pixel_y=pixel_y,
|
||
depth=depth,
|
||
point_cam=list(point_cam),
|
||
point_base=list(point_base),
|
||
robot_pose=robot_pose,
|
||
angle_rad=angle_rad,
|
||
cutpoint=[int(pixel_x), int(pixel_y)],
|
||
end_point=list(candidate.end_point) if candidate.end_point else [],
|
||
)
|
||
self.annotate_target(frame, target)
|
||
return target
|
||
|
||
def log_target(self, index: int, target: DetectedTomato) -> None:
|
||
"""打印目标信息。"""
|
||
print(f"检测到目标 #{index}:")
|
||
print(f" 像素坐标: ({target.pixel_x}, {target.pixel_y})")
|
||
print(
|
||
" 相机坐标: "
|
||
f"({target.point_cam[0]:.3f}, {target.point_cam[1]:.3f}, {target.point_cam[2]:.3f}) m"
|
||
)
|
||
print(
|
||
" 机械臂基坐标: "
|
||
f"({target.point_base[0]:.3f}, {target.point_base[1]:.3f}, {target.point_base[2]:.3f}) m"
|
||
)
|
||
|
||
def handle_search_phase(self, raw_image: np.ndarray) -> np.ndarray:
|
||
"""AGV 行进时的快速检测阶段。"""
|
||
annotated_image, ripe_tomatoes = self.run_detection(raw_image)
|
||
|
||
if ripe_tomatoes and not picking_done.is_set():
|
||
# 这里只负责“发现目标并请求停车”,真正抓取发生在 AGV 停稳后的下一阶段。
|
||
has_tomato.set()
|
||
print(f"视觉检测:识别到 {len(ripe_tomatoes)} 个成熟番茄 | 触发 AGV 暂停")
|
||
elif picking_done.is_set():
|
||
# 一次采摘结束后,把状态机恢复到“边走边看”的初始态。
|
||
clear_pick_cycle()
|
||
print("视觉检测:采摘完成 | 允许 AGV 继续前进")
|
||
|
||
return annotated_image
|
||
|
||
def handle_pick_phase(self, raw_image: np.ndarray, depth_frame: Any) -> np.ndarray:
|
||
"""AGV 停止后的精确检测与抓取阶段。"""
|
||
if start_agv_stop_window():
|
||
# AGV 刚停下时画面可能还在抖,这里故意留一个稳定窗口再做精检和抓取。
|
||
pick_settle_delay = _cfg("AGV_PICK_SETTLE_DELAY")
|
||
print(f"AGV 已停止,等待 {pick_settle_delay:.1f} 秒后开始精确检测...")
|
||
frozen_image, frozen_targets = self.run_detection(raw_image)
|
||
safe_ui_update(
|
||
frozen_image,
|
||
{
|
||
"freeze": True,
|
||
"stage": "pick_wait",
|
||
"detections": self.serialize_detection_boxes(frozen_targets),
|
||
},
|
||
)
|
||
time.sleep(pick_settle_delay)
|
||
return frozen_image
|
||
|
||
agv_stop_timeout = _cfg("AGV_STOP_TIMEOUT")
|
||
if agv_stop_elapsed() > agv_stop_timeout:
|
||
print(f"AGV 停止超时 {agv_stop_timeout} 秒,允许继续前进")
|
||
clear_pick_cycle()
|
||
timeout_image = raw_image.copy()
|
||
self.annotate_pick_zone(timeout_image)
|
||
return timeout_image
|
||
|
||
annotated_image, ripe_tomatoes = self.run_detection(raw_image)
|
||
if not ripe_tomatoes:
|
||
print("视觉检测:停止窗口内未找到成熟番茄,允许 AGV 继续前进")
|
||
clear_pick_cycle()
|
||
return annotated_image
|
||
|
||
tcp_pose = self.arm_controller.get_tcp_pose()
|
||
if tcp_pose is None:
|
||
print("获取末端位姿失败,跳过本次抓取并返回 Home")
|
||
self.arm_controller.return_home()
|
||
clear_pick_cycle()
|
||
return annotated_image
|
||
|
||
for index, candidate in enumerate(ripe_tomatoes, start=1):
|
||
try:
|
||
# 逐个候选目标尝试构造成机械臂可执行的抓取目标,成功一个就结束本轮。
|
||
target = self.build_detected_tomato(candidate, depth_frame, annotated_image, tcp_pose)
|
||
if target is None:
|
||
continue
|
||
|
||
detections = self.serialize_detection_boxes([candidate])
|
||
if detections:
|
||
detections[0]["pick_xyz"] = [float(value) for value in target.point_cam[:3]]
|
||
detections[0]["pick_xyz_frame"] = "d405_camera"
|
||
detections[0]["angle_deg"] = float(math.degrees(target.angle_rad))
|
||
|
||
safe_ui_update(
|
||
annotated_image,
|
||
{
|
||
"freeze": True,
|
||
"stage": "pick_target",
|
||
"detections": detections,
|
||
},
|
||
)
|
||
self.log_target(index, target)
|
||
if self.arm_controller.execute_pick_and_place(target.robot_pose, target.angle_rad):
|
||
mark_pick_complete()
|
||
else:
|
||
clear_pick_cycle()
|
||
break
|
||
except Exception as exc:
|
||
print(f"处理目标时出错: {exc}")
|
||
self.arm_controller.return_home()
|
||
clear_pick_cycle()
|
||
continue
|
||
else:
|
||
print("视觉检测:候选目标均不可用,允许 AGV 继续前进")
|
||
clear_pick_cycle()
|
||
|
||
return annotated_image
|
||
|
||
def annotate_runtime_status(self, frame: np.ndarray) -> None:
|
||
"""绘制运行时状态文本。"""
|
||
cv2.putText(
|
||
frame,
|
||
f"Time Left: {remaining_seconds()}s",
|
||
(10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
(0, 0, 255),
|
||
2,
|
||
)
|
||
cv2.putText(
|
||
frame,
|
||
f"Tomato: {'Yes' if has_tomato.is_set() else 'No'}",
|
||
(10, 70),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
(0, 255, 0),
|
||
2,
|
||
)
|
||
cv2.putText(
|
||
frame,
|
||
f"Picking: {'Yes' if picking_done.is_set() else 'No'}",
|
||
(10, 110),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
(255, 0, 0),
|
||
2,
|
||
)
|
||
|
||
def shutdown(self) -> None:
|
||
"""释放视觉资源。"""
|
||
try:
|
||
self.pipeline.stop()
|
||
except Exception as exc:
|
||
print(f"释放相机失败: {exc}")
|
||
safe_ui_update(blank_frame())
|
||
print("视觉线程停止 | 相机已释放")
|
||
|
||
def run(self) -> None:
|
||
"""视觉主线程。"""
|
||
print("视觉线程启动 | 开始检测番茄...")
|
||
while running.is_set():
|
||
if runtime_expired():
|
||
print("视觉检测:总时长已到,准备停止...")
|
||
running.clear()
|
||
break
|
||
|
||
color_image, depth_frame = self.capture_frame()
|
||
if color_image is None or depth_frame is None:
|
||
safe_ui_update(blank_frame())
|
||
time.sleep(0.1)
|
||
continue
|
||
|
||
try:
|
||
# 视觉线程本质上是个两阶段状态机:
|
||
# 1. 搜索态:AGV 行进,找候选番茄;
|
||
# 2. 抓取态:AGV 停住,在停止窗口内做精检并触发抓取。
|
||
if has_tomato.is_set() and not picking_done.is_set():
|
||
annotated_image = self.handle_pick_phase(color_image, depth_frame)
|
||
else:
|
||
annotated_image = self.handle_search_phase(color_image)
|
||
except Exception as exc:
|
||
print(f"视觉检测逻辑出错: {exc}")
|
||
self.arm_controller.return_home()
|
||
annotated_image = color_image.copy()
|
||
|
||
self.annotate_runtime_status(annotated_image)
|
||
safe_ui_update(annotated_image)
|
||
time.sleep(0.05)
|
||
|
||
self.shutdown()
|
||
|
||
|
||
@dataclass
|
||
class AgvController:
|
||
"""AGV 子系统。"""
|
||
|
||
agv_client: Any
|
||
|
||
def reacquire_priority(self) -> None:
|
||
"""重新获取 AGV 控制权。"""
|
||
self.agv_client.release_priority()
|
||
time.sleep(0.5)
|
||
self.agv_client.set_priority("admin", _cfg("AGV_IP"))
|
||
|
||
def send_speed(self, linear_speed: float, description: str) -> None:
|
||
"""发送 AGV 速度指令。"""
|
||
ret = self.agv_client.set_control_speed(linear_speed, _cfg("AGV_SPEED_STOP"))
|
||
if ret != AGV_SUCCESS_CODE:
|
||
print(f"AGV 控制:{description}指令失败(码 {ret}),重新获取控制权...")
|
||
self.reacquire_priority()
|
||
|
||
def stop_motion(self) -> None:
|
||
"""停止 AGV。"""
|
||
self.agv_client.set_control_speed(_cfg("AGV_SPEED_STOP"), _cfg("AGV_SPEED_STOP"))
|
||
|
||
def shutdown(self) -> None:
|
||
"""线程结束时确保 AGV 停止。"""
|
||
try:
|
||
self.stop_motion()
|
||
except Exception as exc:
|
||
print(f"AGV 停止失败: {exc}")
|
||
print("AGV 控制线程停止 | AGV 已停止")
|
||
|
||
def run(self) -> None:
|
||
"""AGV 控制主线程。"""
|
||
print("AGV 控制线程启动 | 等待视觉检测信号...")
|
||
while running.is_set():
|
||
try:
|
||
if runtime_expired():
|
||
print("AGV 控制:总时长已到,停止 AGV...")
|
||
self.stop_motion()
|
||
running.clear()
|
||
break
|
||
|
||
if has_tomato.is_set() and not picking_done.is_set():
|
||
# AGV 线程本身不做视觉/抓取,只消费状态位决定“停”还是“走”。
|
||
self.send_speed(_cfg("AGV_SPEED_STOP"), "停止")
|
||
else:
|
||
self.send_speed(_cfg("AGV_SPEED_FORWARD"), "前进")
|
||
except Exception as exc:
|
||
print(f"AGV 控制线程出错: {exc}")
|
||
time.sleep(0.5)
|
||
continue
|
||
|
||
time.sleep(0.1)
|
||
|
||
self.shutdown()
|
||
|
||
|
||
def get_arm_controller(robot_name: str) -> RobotArmController:
|
||
global _current_arm_controller
|
||
if _current_arm_controller is None or _current_arm_controller.robot_name != robot_name:
|
||
_current_arm_controller = RobotArmController(robot_name, robot_rpc_client, placement_manager)
|
||
return _current_arm_controller
|
||
|
||
|
||
def return_to_home(robot_name: str) -> None:
|
||
"""兼容旧接口:返回 Home。"""
|
||
controller = get_arm_controller(robot_name)
|
||
controller.return_home()
|
||
|
||
|
||
def control_robot(robot_name: str, pose: Sequence[float], angle_rad: float) -> bool:
|
||
"""兼容旧接口:执行抓取与放置。"""
|
||
controller = get_arm_controller(robot_name)
|
||
return controller.execute_pick_and_place(pose, angle_rad)
|
||
|
||
|
||
def vision_detection_thread(pipeline: Any, align: Any, depth_intrinsics: Any, model: Any, robot_name: str) -> None:
|
||
"""兼容旧接口:视觉线程入口。"""
|
||
controller = VisionController(
|
||
pipeline=pipeline,
|
||
align=align,
|
||
depth_intrinsics=depth_intrinsics,
|
||
model=model,
|
||
robot_name=robot_name,
|
||
arm_controller=get_arm_controller(robot_name),
|
||
)
|
||
controller.run()
|
||
|
||
|
||
def agv_control_thread(agv_client: Any) -> None:
|
||
"""兼容旧接口:AGV 线程入口。"""
|
||
global _current_agv_controller
|
||
_current_agv_controller = AgvController(agv_client)
|
||
_current_agv_controller.run()
|
||
|
||
|
||
def connect_robot_arm() -> RobotArmController:
|
||
"""连接并启动机械臂。"""
|
||
robot_rpc_client.connect(_cfg("ROBOT_IP"), _cfg("ROBOT_PORT"))
|
||
robot_rpc_client.setRequestTimeout(1000)
|
||
if not robot_rpc_client.hasConnected():
|
||
raise RuntimeError("机械臂连接失败")
|
||
print("机械臂连接成功")
|
||
|
||
robot_rpc_client.login("rob1", "123456")
|
||
if not robot_rpc_client.hasLogined():
|
||
raise RuntimeError("机械臂登录失败")
|
||
print("机械臂登录成功")
|
||
|
||
robot_name = exampleStartup()
|
||
if not robot_name:
|
||
raise RuntimeError("机械臂启动失败")
|
||
print(f"机械臂 {robot_name} 启动成功")
|
||
|
||
return get_arm_controller(robot_name)
|
||
|
||
|
||
def connect_agv() -> Any:
|
||
"""连接 AGV 并获取控制权。"""
|
||
agv_client = RpcClient()
|
||
agv_client.setRequestTimeout(1000)
|
||
agv_ip = _cfg("AGV_IP")
|
||
agv_port = _cfg("AGV_PORT")
|
||
print(f"正在连接 AGV | IP: {agv_ip}:{agv_port}")
|
||
|
||
connect_ret = agv_client.connect(agv_ip, agv_port)
|
||
if connect_ret != 0:
|
||
raise RuntimeError(f"AGV 连接失败(返回码 {connect_ret})")
|
||
|
||
login_ret = agv_client.login("admin", "admin")
|
||
if login_ret != 0:
|
||
raise RuntimeError(f"AGV 登录失败(返回码 {login_ret})")
|
||
print("AGV 连接与登录成功")
|
||
|
||
agv_client.release_priority()
|
||
time.sleep(0.5)
|
||
control_ret = agv_client.set_priority("admin", agv_ip)
|
||
if control_ret != AGV_SUCCESS_CODE:
|
||
raise RuntimeError(f"AGV 获取控制权失败(返回码 {control_ret})")
|
||
print("AGV 控制权获取成功")
|
||
|
||
return agv_client
|
||
|
||
|
||
def release_agv(agv_client: Optional[Any]) -> None:
|
||
"""释放 AGV 资源。"""
|
||
if agv_client is None:
|
||
return
|
||
try:
|
||
agv_client.release_priority()
|
||
except Exception as exc:
|
||
print(f"释放 AGV 控制权失败: {exc}")
|
||
try:
|
||
agv_client.disconnect()
|
||
except Exception as exc:
|
||
print(f"断开 AGV 失败: {exc}")
|
||
|
||
|
||
def release_robot_arm() -> None:
|
||
"""释放机械臂 RPC 连接。"""
|
||
try:
|
||
if robot_rpc_client.hasConnected():
|
||
robot_rpc_client.disconnect()
|
||
except Exception as exc:
|
||
print(f"断开机械臂连接失败: {exc}")
|
||
|
||
|
||
def main() -> None:
|
||
"""主程序入口。"""
|
||
global _current_arm_controller, _current_agv_controller
|
||
|
||
# 每次主流程启动前都把跨线程状态和放置位游标清零,避免继承上一次运行残留。
|
||
reset_runtime_state()
|
||
placement_manager.reset()
|
||
|
||
print("=" * 50)
|
||
print("开始初始化番茄采摘系统")
|
||
print("当前参数:")
|
||
print(f" 机械臂 IP: {_cfg('ROBOT_IP')}:{_cfg('ROBOT_PORT')}")
|
||
print(f" AGV IP: {_cfg('AGV_IP')}:{_cfg('AGV_PORT')}")
|
||
print(f" YOLO 模型: {_cfg('YOLO_MODEL_PATH')}")
|
||
print(f" 总时长: {_cfg('TOTAL_DURATION')}s")
|
||
print("=" * 50)
|
||
|
||
arm_controller: Optional[RobotArmController] = None
|
||
agv_client: Optional[Any] = None
|
||
pipeline: Optional[Any] = None
|
||
vision_thread: Optional[threading.Thread] = None
|
||
agv_thread: Optional[threading.Thread] = None
|
||
|
||
try:
|
||
print("\n1. 初始化机械臂...")
|
||
arm_controller = connect_robot_arm()
|
||
if not _cfg("SCISSORS_ENABLED"):
|
||
arm_controller.keep_scissors_open()
|
||
else:
|
||
arm_controller.reset_scissors_outputs()
|
||
|
||
print("\n1.1 机械臂回到初始位置...")
|
||
# 机械臂刚进入 Running 后,控制器有时还没完全准备好接收第一条运动指令。
|
||
# 这里把“回 Home”作为尽力而为的预处理,失败时只告警,不阻塞整机启动。
|
||
if arm_controller.is_at_home():
|
||
print("Robot arm already at Home during startup, continuing initialization")
|
||
else:
|
||
time.sleep(STARTUP_HOME_DELAY)
|
||
if not arm_controller.return_home():
|
||
print("警告:机械臂启动后返回 Home 失败,继续启动系统,后续流程中再尝试回 Home")
|
||
|
||
print("\n2. 初始化 AGV...")
|
||
agv_client = connect_agv()
|
||
|
||
print("\n3. 初始化视觉...")
|
||
pipeline, align, depth_intrinsics = init_camera()
|
||
model = init_tomato_detector()
|
||
|
||
print("\n4. 启动核心线程...")
|
||
start_runtime_state()
|
||
_current_arm_controller = arm_controller
|
||
_current_agv_controller = AgvController(agv_client)
|
||
|
||
# 主线程只负责保活和兜底清理;
|
||
# 真正并发执行的是视觉线程和 AGV 控制线程。
|
||
vision_thread = threading.Thread(
|
||
target=vision_detection_thread,
|
||
args=(pipeline, align, depth_intrinsics, model, arm_controller.robot_name),
|
||
daemon=True,
|
||
name="VisionThread",
|
||
)
|
||
agv_thread = threading.Thread(
|
||
target=agv_control_thread,
|
||
args=(agv_client,),
|
||
daemon=True,
|
||
name="AGVThread",
|
||
)
|
||
|
||
vision_thread.start()
|
||
agv_thread.start()
|
||
print(f"所有线程启动完成 | 总运行时长:{_cfg('TOTAL_DURATION')} 秒")
|
||
|
||
# 主线程不参与业务决策,只等待 `running` 被任一线程或异常路径清掉。
|
||
while running.is_set():
|
||
time.sleep(1)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n用户手动中断程序...")
|
||
running.clear()
|
||
if arm_controller is not None:
|
||
arm_controller.return_home()
|
||
except Exception as exc:
|
||
print(f"系统初始化或运行失败: {exc}")
|
||
running.clear()
|
||
if arm_controller is not None:
|
||
arm_controller.return_home()
|
||
finally:
|
||
print("\n释放系统资源...")
|
||
running.clear()
|
||
|
||
if vision_thread is not None and vision_thread.is_alive():
|
||
vision_thread.join(timeout=2.0)
|
||
elif pipeline is not None:
|
||
try:
|
||
pipeline.stop()
|
||
except Exception:
|
||
pass
|
||
|
||
if agv_thread is not None and agv_thread.is_alive():
|
||
agv_thread.join(timeout=2.0)
|
||
|
||
release_agv(agv_client)
|
||
release_robot_arm()
|
||
reset_runtime_state()
|
||
_current_arm_controller = None
|
||
_current_agv_controller = None
|
||
print("程序结束 | 所有设备已断开连接")
|
||
|
||
|
||
def signal_handler(sig: int, frame: Any) -> None:
|
||
"""信号处理:停止线程并尽量回 Home。"""
|
||
_ = frame
|
||
print(f"\n接收到信号 {sig},正在终止程序并返回 Home...")
|
||
running.clear()
|
||
|
||
try:
|
||
if _current_arm_controller is not None:
|
||
_current_arm_controller.return_home()
|
||
else:
|
||
robot_names = robot_rpc_client.getRobotNames()
|
||
if robot_names:
|
||
RobotArmController(robot_names[0], robot_rpc_client, placement_manager).return_home()
|
||
except Exception:
|
||
pass
|
||
|
||
time.sleep(1)
|
||
sys.exit(0)
|