forked from YikaiFu-cart/acAubo
1673 lines
74 KiB
Python
1673 lines
74 KiB
Python
import tkinter as tk
|
||
from tkinter import ttk, messagebox, scrolledtext, filedialog
|
||
import tkinter.font as tkfont
|
||
import queue
|
||
import threading
|
||
import time
|
||
import os
|
||
import sys
|
||
import json
|
||
import io
|
||
import ctypes
|
||
from ctypes import wintypes
|
||
from pathlib import Path
|
||
from PIL import Image, ImageTk, ImageDraw
|
||
import cv2
|
||
import numpy as np
|
||
import signal
|
||
import contextlib
|
||
|
||
"""
|
||
图形界面入口文件。
|
||
|
||
本文件主要负责:
|
||
1. 构建参数配置界面。
|
||
2. 启动/停止后台采摘主程序。
|
||
3. 捕获控制台输出并实时展示到日志面板。
|
||
4. 接收 control.py 回传的相机画面并显示在界面上。
|
||
|
||
界面层本身不直接实现采摘算法,而是负责把用户输入转成运行参数,
|
||
并把后台线程的运行状态可视化出来。
|
||
"""
|
||
|
||
# Windows 控制台缓冲区结构体。
|
||
# 当程序在 Windows 上运行时,会用它读取后台线程输出的控制台内容,
|
||
# 再把这些内容转发到 Tkinter 的日志面板中。
|
||
class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
|
||
_fields_ = [
|
||
("dwSize", wintypes._COORD),
|
||
("dwCursorPosition", wintypes._COORD),
|
||
("wAttributes", wintypes.WORD),
|
||
("srWindow", wintypes.SMALL_RECT),
|
||
("dwMaximumWindowSize", wintypes._COORD)
|
||
]
|
||
|
||
STD_OUTPUT_HANDLE = -11
|
||
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
|
||
MAX_LOG_LINES = 1000
|
||
MAX_TERMINAL_LINES = 1500
|
||
|
||
class TomatoHarvestingUI:
|
||
"""番茄采摘系统的主界面类。"""
|
||
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title("番茄采摘系统参数配置")
|
||
screen_w = self.root.winfo_screenwidth()
|
||
screen_h = self.root.winfo_screenheight()
|
||
window_w = int(screen_w * 0.8)
|
||
window_h = int(screen_h * 0.7)
|
||
pos_x = max(0, (screen_w - window_w) // 2)
|
||
pos_y = max(0, (screen_h - window_h) // 2)
|
||
self.root.geometry(f"{window_w}x{window_h}+{pos_x}+{pos_y}")
|
||
self.root.minsize(max(980, int(screen_w * 0.5)), max(640, int(screen_h * 0.55)))
|
||
self.base_window_width = window_w
|
||
self.base_window_height = window_h
|
||
self.base_dir = Path(__file__).resolve().parent
|
||
self.settings_file = self.base_dir / "ui_settings.json"
|
||
self._font_resize_job = None
|
||
self._is_applying_font_scale = False
|
||
self.colors = {
|
||
"bg": "#F3F7FB",
|
||
"panel": "#FFFFFF",
|
||
"panel_soft": "#F8FBFD",
|
||
"border": "#D8E3EC",
|
||
"text": "#16324A",
|
||
"muted": "#6B7C93",
|
||
"accent": "#0F766E",
|
||
"accent_soft": "#D7F3EE",
|
||
"danger": "#B45309",
|
||
"danger_soft": "#FDE7D7",
|
||
"info": "#0F4C81",
|
||
"info_soft": "#DCEBFA",
|
||
"terminal_bg": "#0F172A",
|
||
"terminal_fg": "#DCFCE7",
|
||
"terminal_muted": "#93C5FD",
|
||
"terminal_err": "#FCA5A5",
|
||
"metric_1": "#E6FFFB",
|
||
"metric_2": "#EEF2FF",
|
||
"metric_3": "#FFF7ED",
|
||
"metric_4": "#F0FDF4",
|
||
}
|
||
|
||
# 设置中文字体
|
||
self.init_fonts()
|
||
|
||
# 程序运行状态
|
||
self.running = False
|
||
self.thread = None
|
||
self.runtime_mode = None
|
||
self.vision_test_stop_event = threading.Event()
|
||
self.vision_test_pipeline = None
|
||
self.show_background_only = True
|
||
self.scissors_enabled = tk.BooleanVar(value=True)
|
||
|
||
# 日志队列
|
||
self.log_queue = queue.Queue(maxsize=500)
|
||
self.log_line_count = 0
|
||
self.terminal_queue = queue.Queue(maxsize=1000)
|
||
self.terminal_line_count = 0
|
||
self.log_records = []
|
||
self.terminal_message_count = 0
|
||
self.collapsible_sections = {}
|
||
|
||
# 相机画面变量
|
||
self.camera_frame = None
|
||
self.camera_image = None
|
||
self.camera_bg_image = None # 保存背景图引用(关键:防止回收)
|
||
self.raw_bg_image = None # 新增:保存原始1.png,用于后续尺寸调整
|
||
|
||
# 创建界面
|
||
self.create_widgets()
|
||
|
||
# 初始化参数
|
||
self.init_parameters()
|
||
|
||
# 加载相机背景图(延迟100ms,确保窗口渲染完成后再加载)
|
||
self.root.after(100, self.load_camera_background)
|
||
|
||
# 启动日志处理线程
|
||
self.root.after(200, self.process_log_queue)
|
||
|
||
# 信号处理
|
||
signal.signal(signal.SIGINT, self.signal_handler)
|
||
signal.signal(signal.SIGTERM, self.signal_handler)
|
||
|
||
def create_widgets(self):
|
||
"""构建整个界面布局。"""
|
||
# 设置主窗口背景
|
||
self.set_background_image()
|
||
|
||
main_frame = ttk.Frame(self.root, padding=(16, 12, 16, 16), style="App.TFrame")
|
||
main_frame.pack(fill=tk.BOTH, expand=True)
|
||
|
||
header_card = tk.Frame(
|
||
main_frame,
|
||
bg=self.colors["panel"],
|
||
bd=0,
|
||
highlightthickness=1,
|
||
highlightbackground=self.colors["border"]
|
||
)
|
||
header_card.pack(fill=tk.X, pady=(0, 10))
|
||
header_top = tk.Frame(header_card, bg=self.colors["panel"])
|
||
header_top.pack(fill=tk.X, padx=18, pady=10)
|
||
self.status_badge = tk.Label(
|
||
header_top,
|
||
text="系统就绪",
|
||
font=self.fonts["button"],
|
||
bg=self.colors["accent_soft"],
|
||
fg=self.colors["accent"],
|
||
padx=14,
|
||
pady=6
|
||
)
|
||
self.status_badge.pack(side=tk.RIGHT)
|
||
title_wrap = tk.Frame(header_top, bg=self.colors["panel"])
|
||
title_wrap.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||
self.title_label = tk.Label(
|
||
title_wrap,
|
||
text="番茄采摘系统",
|
||
font=self.fonts["title"],
|
||
bg=self.colors["panel"],
|
||
fg=self.colors["text"]
|
||
)
|
||
self.title_label.pack(side=tk.LEFT)
|
||
self.subtitle_label = tk.Label(
|
||
title_wrap,
|
||
text="参数配置 · 日志 · 相机 · 终端",
|
||
font=self.fonts["subtitle"],
|
||
bg=self.colors["panel"],
|
||
fg=self.colors["muted"]
|
||
)
|
||
self.subtitle_label.pack(side=tk.LEFT, padx=(14, 0), pady=(4, 0))
|
||
|
||
body_frame = ttk.Frame(main_frame, style="App.TFrame")
|
||
body_frame.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# 三栏布局(侧边配置 + 中间工作区 + 终端)
|
||
paned_window = ttk.PanedWindow(body_frame, orient=tk.HORIZONTAL)
|
||
paned_window.pack(fill=tk.BOTH, expand=True)
|
||
self.paned_window = paned_window
|
||
|
||
# 左侧参数区
|
||
params_frame = ttk.Frame(paned_window, width=470, style="App.TFrame")
|
||
paned_window.add(params_frame, weight=2)
|
||
|
||
# 中间日志和相机区
|
||
center_frame = ttk.Frame(paned_window, style="App.TFrame")
|
||
paned_window.add(center_frame, weight=3)
|
||
|
||
# 最右侧终端区
|
||
terminal_frame = ttk.Frame(paned_window, width=280, style="App.TFrame")
|
||
paned_window.add(terminal_frame, weight=1)
|
||
|
||
# 参数区布局:使用可滚动容器,避免窗口高度不足时底部控件被裁切
|
||
params_canvas = tk.Canvas(
|
||
params_frame,
|
||
highlightthickness=0,
|
||
bd=0,
|
||
relief="flat",
|
||
bg=self.colors["bg"]
|
||
)
|
||
params_scrollbar = ttk.Scrollbar(params_frame, orient=tk.VERTICAL, command=params_canvas.yview)
|
||
params_canvas.configure(yscrollcommand=params_scrollbar.set)
|
||
params_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
params_canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
self.params_canvas = params_canvas
|
||
self.params_scrollbar = params_scrollbar
|
||
|
||
params_inner = ttk.Frame(params_canvas, style="App.TFrame")
|
||
self.params_canvas_window = params_canvas.create_window((0, 0), window=params_inner, anchor="nw")
|
||
params_inner.bind(
|
||
"<Configure>",
|
||
lambda event: params_canvas.configure(scrollregion=params_canvas.bbox("all"))
|
||
)
|
||
params_canvas.bind(
|
||
"<Configure>",
|
||
lambda event: params_canvas.itemconfigure(self.params_canvas_window, width=event.width)
|
||
)
|
||
|
||
param_card = ttk.Frame(params_inner, style="Transparent.TFrame")
|
||
param_card.pack(fill=tk.BOTH, expand=True, pady=0, padx=(4, 8))
|
||
param_grid = ttk.Frame(param_card, style="Transparent.TFrame")
|
||
param_grid.pack(fill=tk.BOTH, expand=True)
|
||
param_grid.columnconfigure(0, weight=1)
|
||
param_grid.columnconfigure(1, weight=1)
|
||
|
||
# 1. 机械臂配置
|
||
robot_group = ttk.LabelFrame(param_grid, text="机械臂配置", padding="6", style="SidebarCard.TLabelframe")
|
||
robot_group.grid(row=0, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
|
||
robot_group.columnconfigure(1, weight=1)
|
||
robot_group.columnconfigure(3, weight=1)
|
||
ttk.Label(robot_group, text="机械臂IP:").grid(row=0, column=0, sticky=tk.W, pady=2)
|
||
self.robot_ip = ttk.Entry(robot_group, width=16)
|
||
self.robot_ip.grid(row=0, column=1, sticky=tk.EW, pady=2, padx=4)
|
||
ttk.Label(robot_group, text="端口:").grid(row=0, column=2, sticky=tk.W, pady=2)
|
||
self.robot_port = ttk.Entry(robot_group, width=8)
|
||
self.robot_port.grid(row=0, column=3, sticky=tk.EW, pady=2, padx=4)
|
||
|
||
# 2. AGV配置
|
||
agv_group = ttk.LabelFrame(param_grid, text="AGV配置", padding="6", style="SidebarCard.TLabelframe")
|
||
agv_group.grid(row=1, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
|
||
agv_group.columnconfigure(1, weight=1)
|
||
agv_group.columnconfigure(3, weight=1)
|
||
ttk.Label(agv_group, text="AGV IP:").grid(row=0, column=0, sticky=tk.W, pady=2)
|
||
self.agv_ip = ttk.Entry(agv_group, width=16)
|
||
self.agv_ip.grid(row=0, column=1, sticky=tk.EW, pady=2, padx=4)
|
||
ttk.Label(agv_group, text="端口:").grid(row=0, column=2, sticky=tk.W, pady=2)
|
||
self.agv_port = ttk.Entry(agv_group, width=8)
|
||
self.agv_port.grid(row=0, column=3, sticky=tk.EW, pady=2, padx=4)
|
||
ttk.Label(agv_group, text="前进(m/s):").grid(row=1, column=0, sticky=tk.W, pady=2)
|
||
self.agv_speed_forward = ttk.Entry(agv_group, width=8)
|
||
self.agv_speed_forward.grid(row=1, column=1, sticky=tk.EW, pady=2, padx=4)
|
||
ttk.Label(agv_group, text="停止(m/s):").grid(row=1, column=2, sticky=tk.W, pady=2)
|
||
self.agv_speed_stop = ttk.Entry(agv_group, width=8)
|
||
self.agv_speed_stop.grid(row=1, column=3, sticky=tk.EW, pady=2, padx=4)
|
||
|
||
# 3. 运行参数
|
||
run_group = ttk.LabelFrame(param_grid, text="运行参数", padding="6", style="SidebarCard.TLabelframe")
|
||
run_group.grid(row=2, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
|
||
run_group.columnconfigure(1, weight=1)
|
||
run_group.columnconfigure(3, weight=1)
|
||
ttk.Label(run_group, text="总时长(s):").grid(row=0, column=0, sticky=tk.W, pady=2)
|
||
self.total_duration = ttk.Entry(run_group, width=8)
|
||
self.total_duration.grid(row=0, column=1, sticky=tk.EW, pady=2, padx=4)
|
||
ttk.Label(run_group, text="停止超时(s):").grid(row=0, column=2, sticky=tk.W, pady=2)
|
||
self.agv_stop_timeout = ttk.Entry(run_group, width=8)
|
||
self.agv_stop_timeout.grid(row=0, column=3, sticky=tk.EW, pady=2, padx=4)
|
||
# YOLO模型路径
|
||
model_frame = ttk.Frame(run_group)
|
||
model_frame.grid(row=1, column=0, columnspan=4, sticky=tk.W+tk.E, pady=2)
|
||
ttk.Label(model_frame, text="YOLO模型:").pack(side=tk.LEFT, pady=2)
|
||
self.yolo_model_path = ttk.Entry(model_frame)
|
||
self.yolo_model_path.pack(side=tk.LEFT, pady=2, padx=4, fill=tk.X, expand=True)
|
||
self.browse_btn = ttk.Button(model_frame, text="浏览", command=self.browse_model, width=5)
|
||
self.browse_btn.pack(side=tk.LEFT, padx=(3, 0), pady=2)
|
||
bg_frame = ttk.Frame(run_group)
|
||
bg_frame.grid(row=2, column=0, columnspan=4, sticky=tk.W+tk.E, pady=2)
|
||
ttk.Label(bg_frame, text="背景图:").pack(side=tk.LEFT, pady=2)
|
||
self.camera_bg_path = ttk.Entry(bg_frame)
|
||
self.camera_bg_path.pack(side=tk.LEFT, pady=2, padx=4, fill=tk.X, expand=True)
|
||
self.browse_bg_btn = ttk.Button(bg_frame, text="浏览", command=self.browse_camera_background, width=5)
|
||
self.browse_bg_btn.pack(side=tk.LEFT, padx=(3, 0), pady=2)
|
||
|
||
# 4. 放置位置
|
||
place_group = ttk.LabelFrame(param_grid, text="放置位置", padding="6", style="SidebarCard.TLabelframe")
|
||
place_group.grid(row=3, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
|
||
# 位置参数输入
|
||
place_params_frame = ttk.Frame(place_group)
|
||
place_params_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 2))
|
||
place_params_frame.columnconfigure(1, weight=1)
|
||
place_params_frame.columnconfigure(3, weight=1)
|
||
self.place_entries = []
|
||
left_labels = ["X(m):", "Y(m):", "Z(m):"]
|
||
right_labels = ["Roll:", "Pitch:", "Yaw:"]
|
||
for i, label in enumerate(left_labels):
|
||
ttk.Label(place_params_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=2, padx=(2, 4))
|
||
entry = ttk.Entry(place_params_frame, width=9)
|
||
entry.grid(row=i, column=1, sticky=tk.EW, pady=2, padx=2)
|
||
self.place_entries.append(entry)
|
||
for i, label in enumerate(right_labels):
|
||
ttk.Label(place_params_frame, text=label).grid(row=i, column=2, sticky=tk.W, pady=2, padx=(8, 4))
|
||
entry = ttk.Entry(place_params_frame, width=9)
|
||
entry.grid(row=i, column=3, sticky=tk.EW, pady=2, padx=2)
|
||
self.place_entries.append(entry)
|
||
self.save_place_btn = ttk.Button(place_group, text="保存位置", command=self.save_place_position)
|
||
self.save_place_btn.pack(pady=4)
|
||
|
||
# 5. 控制按钮
|
||
control_frame = ttk.LabelFrame(param_grid, text="系统控制", padding="6", style="SidebarCard.TLabelframe")
|
||
control_frame.grid(row=4, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
|
||
btn_frame = ttk.Frame(control_frame)
|
||
btn_frame.pack(fill=tk.X, pady=(3, 1))
|
||
btn_frame.columnconfigure(0, weight=1)
|
||
btn_frame.columnconfigure(1, weight=1)
|
||
btn_frame.columnconfigure(2, weight=1)
|
||
btn_frame.columnconfigure(3, weight=1)
|
||
self.start_btn = ttk.Button(btn_frame, text="启动程序", command=self.start_program, style="Primary.TButton")
|
||
self.start_btn.grid(row=0, column=0, padx=3, pady=3, sticky=tk.EW)
|
||
self.vision_test_btn = ttk.Button(btn_frame, text="视觉测试", command=self.start_vision_test, style="Secondary.TButton")
|
||
self.vision_test_btn.grid(row=0, column=1, padx=3, pady=3, sticky=tk.EW)
|
||
self.stop_btn = ttk.Button(btn_frame, text="停止程序", command=self.stop_program, state=tk.DISABLED, style="Danger.TButton")
|
||
self.stop_btn.grid(row=0, column=2, padx=3, pady=3, sticky=tk.EW)
|
||
self.save_btn = ttk.Button(btn_frame, text="保存参数", command=self.save_parameters, style="Secondary.TButton")
|
||
self.save_btn.grid(row=0, column=3, padx=3, pady=3, sticky=tk.EW)
|
||
self.scissors_enabled_check = ttk.Checkbutton(
|
||
control_frame,
|
||
text="启用末端剪刀",
|
||
variable=self.scissors_enabled,
|
||
)
|
||
self.scissors_enabled_check.pack(anchor=tk.W, padx=5, pady=(3, 1))
|
||
|
||
# 中间区域布局
|
||
# 日志区域(上方)
|
||
log_frame = ttk.LabelFrame(center_frame, text="运行日志", padding="8", style="Card.TLabelframe")
|
||
log_frame.pack(fill=tk.X, expand=False, pady=(4, 8), padx=5)
|
||
# 日志控制栏
|
||
log_control = ttk.Frame(log_frame)
|
||
log_control.pack(fill=tk.X, pady=(2, 4))
|
||
ttk.Label(log_control, text="日志显示:").pack(side=tk.LEFT, padx=5)
|
||
self.log_level = tk.StringVar(value="全部")
|
||
log_level_combo = ttk.Combobox(log_control, textvariable=self.log_level, width=10, state="readonly")
|
||
log_level_combo['values'] = ("全部", "信息", "警告", "错误")
|
||
log_level_combo.pack(side=tk.LEFT, padx=5)
|
||
log_level_combo.bind("<<ComboboxSelected>>", self.on_log_level_changed)
|
||
ttk.Button(log_control, text="清空日志", command=self.clear_log).pack(side=tk.RIGHT, padx=5)
|
||
# 日志文本框
|
||
self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=4, bd=0, relief="flat", highlightthickness=0)
|
||
self.log_text.pack(fill=tk.BOTH, expand=True, pady=(0, 2))
|
||
self.log_text.config(state=tk.DISABLED, bg="#F0F0F0", fg="#333333", insertbackground="black")
|
||
|
||
# 相机画面区域(下方)
|
||
camera_card = ttk.LabelFrame(center_frame, text="相机画面", padding="8", style="Card.TLabelframe")
|
||
camera_card.pack(fill=tk.BOTH, expand=True, pady=(0, 5), padx=5)
|
||
# 关键修改1:相机显示框架取消默认背景色
|
||
self.camera_display_frame = ttk.Frame(camera_card)
|
||
self.camera_display_frame.pack(fill=tk.BOTH, expand=True)
|
||
# 绑定窗口大小变化事件(窗口拉伸时,图片自动适配)
|
||
self.camera_display_frame.bind("<Configure>", self.on_camera_frame_resize)
|
||
|
||
# 关键修改2:相机标签不继承全局背景色,且填充整个框架
|
||
self.camera_label = ttk.Label(self.camera_display_frame)
|
||
self.camera_label.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# 终端输出区域(最右侧)
|
||
terminal_card = ttk.LabelFrame(terminal_frame, text="终端输出", padding="8", style="TerminalCard.TLabelframe")
|
||
terminal_card.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
|
||
terminal_control = ttk.Frame(terminal_card)
|
||
terminal_control.pack(fill=tk.X, pady=5)
|
||
ttk.Label(terminal_control, text="输出类型: 标准输出 / 标准错误").pack(side=tk.LEFT, padx=5)
|
||
ttk.Button(terminal_control, text="清空终端", command=self.clear_terminal).pack(side=tk.RIGHT, padx=5)
|
||
self.terminal_text = scrolledtext.ScrolledText(
|
||
terminal_card,
|
||
wrap=tk.WORD,
|
||
font=self.fonts["mono"],
|
||
bd=0,
|
||
relief="flat",
|
||
highlightthickness=0,
|
||
bg="#111111",
|
||
fg="#D8F3DC",
|
||
insertbackground="#D8F3DC"
|
||
)
|
||
self.terminal_text.pack(fill=tk.BOTH, expand=True, pady=5)
|
||
self.terminal_text.config(state=tk.DISABLED)
|
||
|
||
# 设置样式(核心修改:取消全局标签背景色)
|
||
self.setup_styles()
|
||
self.bind_mousewheel_to_params()
|
||
self.root.after(100, self.set_initial_layout)
|
||
self.root.after(120, self.apply_font_scale)
|
||
|
||
|
||
def init_fonts(self):
|
||
"""创建一组可随窗口缩放的字体对象。"""
|
||
self.font_specs = {
|
||
"default": {"family": "微软雅黑", "size": 9},
|
||
"title": {"family": "微软雅黑", "size": 18, "weight": "bold"},
|
||
"subtitle": {"family": "微软雅黑", "size": 10},
|
||
"section": {"family": "微软雅黑", "size": 9, "weight": "bold"},
|
||
"button": {"family": "微软雅黑", "size": 9, "weight": "bold"},
|
||
"button_normal": {"family": "微软雅黑", "size": 9},
|
||
"mono": {"family": "微软雅黑", "size": 9},
|
||
}
|
||
self.font_min_sizes = {
|
||
"default": 7,
|
||
"title": 14,
|
||
"subtitle": 8,
|
||
"section": 8,
|
||
"button": 8,
|
||
"button_normal": 8,
|
||
"mono": 8,
|
||
}
|
||
self.fonts = {}
|
||
for name, spec in self.font_specs.items():
|
||
self.fonts[name] = tkfont.Font(
|
||
self.root,
|
||
family=spec["family"],
|
||
size=spec["size"],
|
||
weight=spec.get("weight", "normal"),
|
||
)
|
||
self.root.option_add("*Font", self.fonts["default"])
|
||
self.root.bind("<Configure>", self.on_root_resize)
|
||
|
||
def setup_styles(self):
|
||
"""集中配置 ttk 控件样式,避免样式定义散落在各个方法中。"""
|
||
style = ttk.Style()
|
||
available_themes = style.theme_names()
|
||
if "clam" in available_themes:
|
||
style.theme_use("clam")
|
||
elif "vista" in available_themes:
|
||
style.theme_use("vista")
|
||
|
||
style.configure("TFrame", background=self.colors["panel"])
|
||
style.configure("App.TFrame", background=self.colors["bg"])
|
||
style.configure("Transparent.TFrame", background=self.colors["bg"])
|
||
style.configure("TPanedwindow", background=self.colors["bg"])
|
||
style.configure("TLabel", background=self.colors["panel"], foreground=self.colors["text"], font=self.fonts["default"])
|
||
style.configure(
|
||
"TEntry",
|
||
fieldbackground=self.colors["panel"],
|
||
background=self.colors["panel"],
|
||
foreground=self.colors["text"],
|
||
padding=5,
|
||
font=self.fonts["default"],
|
||
relief="flat",
|
||
borderwidth=1
|
||
)
|
||
style.configure(
|
||
"TCombobox",
|
||
fieldbackground=self.colors["panel"],
|
||
background=self.colors["panel"],
|
||
foreground=self.colors["text"],
|
||
arrowsize=14,
|
||
padding=3,
|
||
font=self.fonts["default"]
|
||
)
|
||
style.configure(
|
||
"TButton",
|
||
font=self.fonts["button_normal"],
|
||
padding=(10, 6),
|
||
relief="flat",
|
||
borderwidth=0
|
||
)
|
||
style.configure(
|
||
"SidebarShell.TLabelframe",
|
||
background=self.colors["panel_soft"],
|
||
borderwidth=1,
|
||
relief="solid",
|
||
padding=6
|
||
)
|
||
style.configure(
|
||
"SidebarShell.TLabelframe.Label",
|
||
background=self.colors["panel_soft"],
|
||
foreground=self.colors["text"],
|
||
font=self.fonts["section"]
|
||
)
|
||
style.configure(
|
||
"SidebarCard.TLabelframe",
|
||
background=self.colors["panel"],
|
||
borderwidth=1,
|
||
relief="solid",
|
||
padding=6
|
||
)
|
||
style.configure(
|
||
"SidebarCard.TLabelframe.Label",
|
||
background=self.colors["panel"],
|
||
foreground=self.colors["text"],
|
||
font=self.fonts["section"]
|
||
)
|
||
style.configure(
|
||
"Card.TLabelframe",
|
||
background=self.colors["panel"],
|
||
borderwidth=1,
|
||
relief="solid",
|
||
padding=6
|
||
)
|
||
style.configure(
|
||
"Card.TLabelframe.Label",
|
||
background=self.colors["panel"],
|
||
foreground=self.colors["text"],
|
||
font=self.fonts["section"]
|
||
)
|
||
style.configure(
|
||
"TerminalCard.TLabelframe",
|
||
background=self.colors["panel"],
|
||
borderwidth=1,
|
||
relief="solid",
|
||
padding=6
|
||
)
|
||
style.configure(
|
||
"TerminalCard.TLabelframe.Label",
|
||
background=self.colors["panel"],
|
||
foreground=self.colors["text"],
|
||
font=self.fonts["section"]
|
||
)
|
||
style.configure("Primary.TButton", background=self.colors["accent"], foreground="#FFFFFF", font=self.fonts["button"])
|
||
style.map("Primary.TButton", background=[("active", "#0B5E58"), ("disabled", "#A8D7D2")], foreground=[("disabled", "#F7FAFC")])
|
||
style.configure("Danger.TButton", background="#C2410C", foreground="#FFFFFF", font=self.fonts["button"])
|
||
style.map("Danger.TButton", background=[("active", "#9A3412"), ("disabled", "#F3B99A")], foreground=[("disabled", "#FEF2F2")])
|
||
style.configure("Secondary.TButton", background=self.colors["info_soft"], foreground=self.colors["info"], font=self.fonts["button_normal"])
|
||
style.map("Secondary.TButton", background=[("active", "#C8DFF7"), ("disabled", "#E7EEF5")])
|
||
|
||
def on_root_resize(self, event):
|
||
"""窗口尺寸变化时,延迟刷新字体缩放。"""
|
||
if event.widget is not self.root or self._is_applying_font_scale:
|
||
return
|
||
if self._font_resize_job is not None:
|
||
self.root.after_cancel(self._font_resize_job)
|
||
self._font_resize_job = self.root.after(80, self.apply_font_scale)
|
||
|
||
def apply_font_scale(self):
|
||
"""根据窗口当前尺寸动态缩放字体。"""
|
||
self._font_resize_job = None
|
||
current_w = max(self.root.winfo_width(), 1)
|
||
current_h = max(self.root.winfo_height(), 1)
|
||
scale = min(current_w / self.base_window_width, current_h / self.base_window_height)
|
||
scale = max(0.72, min(1.12, scale))
|
||
self._is_applying_font_scale = True
|
||
try:
|
||
for name, font in self.fonts.items():
|
||
base_size = self.font_specs[name]["size"]
|
||
min_size = self.font_min_sizes[name]
|
||
font.configure(size=max(min_size, int(round(base_size * scale))))
|
||
if hasattr(self, "log_text"):
|
||
self.log_text.configure(font=self.fonts["default"])
|
||
if hasattr(self, "terminal_text"):
|
||
self.terminal_text.configure(font=self.fonts["mono"])
|
||
finally:
|
||
self._is_applying_font_scale = False
|
||
|
||
def set_status_badge(self, text, tone="idle"):
|
||
"""更新顶部状态徽标。"""
|
||
tone_map = {
|
||
"idle": (self.colors["info_soft"], self.colors["info"]),
|
||
"running": (self.colors["accent_soft"], self.colors["accent"]),
|
||
"warning": (self.colors["danger_soft"], self.colors["danger"]),
|
||
"error": ("#FEE2E2", "#B91C1C"),
|
||
}
|
||
bg_color, fg_color = tone_map.get(tone, tone_map["idle"])
|
||
if hasattr(self, "status_badge"):
|
||
self.status_badge.configure(text=text, bg=bg_color, fg=fg_color)
|
||
|
||
def bind_mousewheel_to_params(self):
|
||
"""让左侧参数区支持鼠标滚轮滚动。"""
|
||
def _on_mousewheel(event):
|
||
if hasattr(self, "params_canvas"):
|
||
self.params_canvas.yview_scroll(int(-event.delta / 120), "units")
|
||
|
||
def _bind(_event):
|
||
self.params_canvas.bind_all("<MouseWheel>", _on_mousewheel)
|
||
|
||
def _unbind(_event):
|
||
self.params_canvas.unbind_all("<MouseWheel>")
|
||
|
||
self.params_canvas.bind("<Enter>", _bind)
|
||
self.params_canvas.bind("<Leave>", _unbind)
|
||
|
||
def set_initial_layout(self):
|
||
"""根据当前窗口尺寸设置更稳定的初始三栏宽度。"""
|
||
try:
|
||
total_width = self.paned_window.winfo_width()
|
||
if total_width > 600:
|
||
left_width = max(480, int(total_width * 0.38))
|
||
terminal_width = max(250, int(total_width * 0.20))
|
||
self.paned_window.sashpos(0, left_width)
|
||
self.paned_window.sashpos(1, total_width - terminal_width)
|
||
except Exception:
|
||
pass
|
||
|
||
def get_resource_candidates(self, filename):
|
||
"""返回项目内可能存放资源文件的候选路径。"""
|
||
return [
|
||
self.base_dir / filename,
|
||
self.base_dir / "tools" / filename,
|
||
Path.cwd() / filename,
|
||
Path.cwd() / "tools" / filename,
|
||
]
|
||
|
||
def resolve_path(self, path_value, fallback_name=None):
|
||
"""解析绝对路径或相对项目根目录的路径。"""
|
||
if path_value:
|
||
candidate = Path(path_value).expanduser()
|
||
if not candidate.is_absolute():
|
||
candidate = self.base_dir / candidate
|
||
candidate = candidate.resolve()
|
||
if candidate.exists():
|
||
return candidate
|
||
if fallback_name:
|
||
for candidate in self.get_resource_candidates(fallback_name):
|
||
if candidate.exists():
|
||
return candidate.resolve()
|
||
return None
|
||
|
||
def load_ui_settings(self):
|
||
"""读取本地保存的 UI 配置。"""
|
||
if not self.settings_file.exists():
|
||
return {}
|
||
try:
|
||
with self.settings_file.open("r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
return data if isinstance(data, dict) else {}
|
||
except Exception as e:
|
||
self.log(f"读取 UI 配置失败:{str(e)}", level="警告")
|
||
return {}
|
||
|
||
def save_ui_settings(self):
|
||
"""保存 UI 配置,方便下次启动直接复用。"""
|
||
data = {
|
||
"ROBOT_IP": self.robot_ip.get(),
|
||
"ROBOT_PORT": self.robot_port.get(),
|
||
"AGV_IP": self.agv_ip.get(),
|
||
"AGV_PORT": self.agv_port.get(),
|
||
"AGV_SPEED_FORWARD": self.agv_speed_forward.get(),
|
||
"AGV_SPEED_STOP": self.agv_speed_stop.get(),
|
||
"TOTAL_DURATION": self.total_duration.get(),
|
||
"AGV_STOP_TIMEOUT": self.agv_stop_timeout.get(),
|
||
"YOLO_MODEL_PATH": self.yolo_model_path.get(),
|
||
"CAMERA_BG_PATH": self.camera_bg_path.get(),
|
||
"SCISSORS_ENABLED": bool(self.scissors_enabled.get()),
|
||
}
|
||
with self.settings_file.open("w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
def get_camera_background_path(self):
|
||
"""返回当前相机背景图的有效路径。"""
|
||
return self.resolve_path(self.camera_bg_path.get().strip(), fallback_name="1.png")
|
||
|
||
|
||
def load_camera_background(self):
|
||
"""加载1.png并确保全屏显示,保存原始图片用于后续缩放"""
|
||
try:
|
||
bg_path = self.get_camera_background_path()
|
||
if bg_path is None:
|
||
raise FileNotFoundError("1.png")
|
||
# 这里只保存原始图片,不直接反复对已缩放图片继续缩放,
|
||
# 这样可以减少多次窗口拉伸后的画质损失。
|
||
self.raw_bg_image = Image.open(bg_path)
|
||
self.camera_bg_path.delete(0, tk.END)
|
||
self.camera_bg_path.insert(0, str(bg_path))
|
||
self.adjust_bg_image_size()
|
||
except FileNotFoundError:
|
||
searched_paths = "\n".join(str(path) for path in self.get_resource_candidates("1.png"))
|
||
error_msg = f"未找到背景图 1.png。\n可在界面中手动选择图片,或将文件放到以下位置之一:\n{searched_paths}"
|
||
self.log(error_msg, level="警告")
|
||
messagebox.showwarning("图片缺失", error_msg)
|
||
self.raw_bg_image = None
|
||
self.camera_bg_image = None
|
||
self.camera_label.config(background="#E8E8E8") # 仅异常时显示灰色
|
||
except Exception as e:
|
||
self.log(f"加载1.png失败:{str(e)}", level="错误")
|
||
self.raw_bg_image = None
|
||
self.camera_bg_image = None
|
||
self.camera_label.config(background="#E8E8E8")
|
||
|
||
def adjust_bg_image_size(self):
|
||
"""根据相机区域尺寸,调整1.png大小并显示"""
|
||
if self.raw_bg_image is None:
|
||
return
|
||
# 获取当前相机显示区域的实际尺寸(窗口渲染完成后的真实尺寸)
|
||
display_w = self.camera_display_frame.winfo_width()
|
||
display_h = self.camera_display_frame.winfo_height()
|
||
# 确保尺寸有效(避免窗口未渲染完成的情况)
|
||
if display_w <= 10 or display_h <= 10:
|
||
return
|
||
# 缩放图片:填充整个相机区域(保持图片比例可改用 Image.ANTIALIAS + 计算比例)
|
||
resized_bg = self.raw_bg_image.resize((display_w, display_h), Image.LANCZOS)
|
||
# 转换为Tkinter格式并更新标签
|
||
self.camera_bg_image = ImageTk.PhotoImage(resized_bg)
|
||
self.camera_label.config(image=self.camera_bg_image)
|
||
|
||
def restore_initial_camera_view(self):
|
||
"""将相机区域恢复为初始背景图,并忽略后续残留画面刷新。"""
|
||
self.show_background_only = True
|
||
if self.raw_bg_image is None:
|
||
self.load_camera_background()
|
||
return
|
||
self.adjust_bg_image_size()
|
||
self.camera_image = None
|
||
if self.camera_bg_image is not None:
|
||
self.camera_label.config(image=self.camera_bg_image)
|
||
|
||
def on_camera_frame_resize(self, event):
|
||
"""相机区域窗口拉伸时,自动调整1.png尺寸"""
|
||
# 使用 after 做轻微延迟,避免用户拖拽窗口时触发过于频繁的 resize 计算。
|
||
self.root.after(50, self.adjust_bg_image_size)
|
||
|
||
def set_background_image(self):
|
||
"""设置主窗口背景色。"""
|
||
self.root.configure(bg=self.colors["bg"])
|
||
|
||
def init_parameters(self):
|
||
"""填充界面默认参数。
|
||
|
||
这些值会作为用户第一次打开界面时的初始配置,
|
||
同时也让界面与 control.py 的默认参数保持基本一致。
|
||
"""
|
||
self.robot_ip.insert(0, "192.168.192.100")
|
||
self.robot_port.insert(0, "30004")
|
||
self.agv_ip.insert(0, "192.168.192.100")
|
||
self.agv_port.insert(0, "30104")
|
||
self.agv_speed_forward.insert(0, "-0.2")
|
||
self.agv_speed_stop.insert(0, "0.0")
|
||
self.total_duration.insert(0, "300")
|
||
self.agv_stop_timeout.insert(0, "10")
|
||
self.yolo_model_path.insert(0, "best.pt")
|
||
default_bg_path = self.resolve_path("", fallback_name="1.png")
|
||
if default_bg_path:
|
||
self.camera_bg_path.insert(0, str(default_bg_path))
|
||
import control
|
||
|
||
self.scissors_enabled.set(self.parse_bool_setting(getattr(control, "SCISSORS_ENABLED", True), default=True))
|
||
self.place_position = self.normalize_place_position(control.place_positions)
|
||
settings = self.load_ui_settings()
|
||
if settings:
|
||
self.robot_ip.delete(0, tk.END)
|
||
self.robot_ip.insert(0, str(settings.get("ROBOT_IP", "192.168.192.100")))
|
||
self.robot_port.delete(0, tk.END)
|
||
self.robot_port.insert(0, str(settings.get("ROBOT_PORT", "30004")))
|
||
self.agv_ip.delete(0, tk.END)
|
||
self.agv_ip.insert(0, str(settings.get("AGV_IP", "192.168.192.100")))
|
||
self.agv_port.delete(0, tk.END)
|
||
self.agv_port.insert(0, str(settings.get("AGV_PORT", "30104")))
|
||
self.agv_speed_forward.delete(0, tk.END)
|
||
self.agv_speed_forward.insert(0, str(settings.get("AGV_SPEED_FORWARD", "-0.2")))
|
||
self.agv_speed_stop.delete(0, tk.END)
|
||
self.agv_speed_stop.insert(0, str(settings.get("AGV_SPEED_STOP", "0.0")))
|
||
self.total_duration.delete(0, tk.END)
|
||
self.total_duration.insert(0, str(settings.get("TOTAL_DURATION", "300")))
|
||
self.agv_stop_timeout.delete(0, tk.END)
|
||
self.agv_stop_timeout.insert(0, str(settings.get("AGV_STOP_TIMEOUT", "10")))
|
||
self.yolo_model_path.delete(0, tk.END)
|
||
self.yolo_model_path.insert(0, str(settings.get("YOLO_MODEL_PATH", "best.pt")))
|
||
self.camera_bg_path.delete(0, tk.END)
|
||
saved_bg_path = settings.get("CAMERA_BG_PATH", "")
|
||
resolved_bg_path = self.resolve_path(saved_bg_path, fallback_name="1.png")
|
||
if resolved_bg_path:
|
||
self.camera_bg_path.insert(0, str(resolved_bg_path))
|
||
elif saved_bg_path:
|
||
self.camera_bg_path.insert(0, str(saved_bg_path))
|
||
self.scissors_enabled.set(
|
||
self.parse_bool_setting(settings.get("SCISSORS_ENABLED", self.scissors_enabled.get()), default=True)
|
||
)
|
||
self.update_place_fields()
|
||
|
||
def parse_bool_setting(self, value, default=True):
|
||
if isinstance(value, bool):
|
||
return value
|
||
if isinstance(value, str):
|
||
normalized = value.strip().lower()
|
||
if normalized in ("1", "true", "yes", "on"):
|
||
return True
|
||
if normalized in ("0", "false", "no", "off"):
|
||
return False
|
||
return default
|
||
|
||
def normalize_place_position(self, position):
|
||
"""把外部读取到的放置位统一规整为 [x,y,z,roll,pitch,yaw]。"""
|
||
if isinstance(position, (list, tuple)):
|
||
if len(position) >= 6 and not isinstance(position[0], (list, tuple)):
|
||
try:
|
||
return [float(value) for value in position[:6]]
|
||
except (TypeError, ValueError):
|
||
pass
|
||
if position and isinstance(position[0], (list, tuple)):
|
||
return self.normalize_place_position(position[0])
|
||
return [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||
|
||
def update_place_fields(self, event=None):
|
||
"""刷新单一放置位的 6 个位姿输入框。"""
|
||
def update():
|
||
for i, entry in enumerate(self.place_entries):
|
||
entry.delete(0, tk.END)
|
||
entry.insert(0, str(self.place_position[i]))
|
||
self.root.after(0, update)
|
||
|
||
def save_place_position(self):
|
||
"""把当前输入框中的单一放置位姿保存回内存。"""
|
||
try:
|
||
position = [float(entry.get()) for entry in self.place_entries]
|
||
if len(position) != 6:
|
||
messagebox.showerror("错误", "请填写所有位置参数")
|
||
return
|
||
self.place_position = position
|
||
self.save_ui_settings()
|
||
messagebox.showinfo("成功", "放置位已保存")
|
||
self.log("放置位已保存")
|
||
except ValueError:
|
||
messagebox.showerror("错误", "请输入有效的数值")
|
||
self.log("保存放置位失败:无效的数值", level="错误")
|
||
|
||
def browse_model(self):
|
||
"""弹出文件选择框,选择 YOLO 模型文件。"""
|
||
filename = filedialog.askopenfilename(
|
||
title="选择YOLO模型文件",
|
||
filetypes=[("PyTorch模型", "*.pt"), ("所有文件", "*.*")]
|
||
)
|
||
if filename:
|
||
self.yolo_model_path.delete(0, tk.END)
|
||
self.yolo_model_path.insert(0, filename)
|
||
|
||
def browse_camera_background(self):
|
||
"""弹出文件选择框,选择相机背景图。"""
|
||
filename = filedialog.askopenfilename(
|
||
title="选择相机背景图",
|
||
filetypes=[("图片文件", "*.png;*.jpg;*.jpeg;*.bmp"), ("所有文件", "*.*")]
|
||
)
|
||
if filename:
|
||
self.camera_bg_path.delete(0, tk.END)
|
||
self.camera_bg_path.insert(0, filename)
|
||
self.load_camera_background()
|
||
|
||
def save_parameters(self):
|
||
"""把界面中的参数写回 control 模块。
|
||
|
||
这里采用“直接修改模块级变量”的方式,而不是写配置文件。
|
||
好处是后台启动时可以立刻读取最新参数,代价是参数只在本次进程生命周期内生效。
|
||
"""
|
||
try:
|
||
import control
|
||
params = {
|
||
"ROBOT_IP": self.robot_ip.get(),
|
||
"ROBOT_PORT": int(self.robot_port.get()),
|
||
"AGV_IP": self.agv_ip.get(),
|
||
"AGV_PORT": int(self.agv_port.get()),
|
||
"AGV_SPEED_FORWARD": float(self.agv_speed_forward.get()),
|
||
"AGV_SPEED_STOP": float(self.agv_speed_stop.get()),
|
||
"TOTAL_DURATION": int(self.total_duration.get()),
|
||
"AGV_STOP_TIMEOUT": int(self.agv_stop_timeout.get()),
|
||
"YOLO_MODEL_PATH": self.yolo_model_path.get(),
|
||
"SCISSORS_ENABLED": bool(self.scissors_enabled.get()),
|
||
"place_positions": [self.place_position]
|
||
}
|
||
for param_name, value in params.items():
|
||
if value in (None, ""):
|
||
raise ValueError(f"参数 {param_name} 不能为空")
|
||
for param_name, value in params.items():
|
||
setattr(control, param_name, value)
|
||
self.save_ui_settings()
|
||
self.log("参数保存成功")
|
||
messagebox.showinfo("成功", "参数已保存")
|
||
return True
|
||
except Exception as e:
|
||
self.log(f"参数保存失败: {str(e)}", level="错误")
|
||
messagebox.showerror("错误", f"参数保存失败: {str(e)}")
|
||
return False
|
||
|
||
@contextlib.contextmanager
|
||
def capture_console_output(self):
|
||
"""捕获后台程序的标准输出/错误输出,并转发到日志队列。"""
|
||
def push_console_output(message, channel="stdout"):
|
||
if not message:
|
||
return
|
||
normalized = message.strip()
|
||
if not normalized:
|
||
return
|
||
level = "错误" if "error" in normalized.lower() or "失败" in normalized else \
|
||
"警告" if "warning" in normalized.lower() or "警告" in normalized else "信息"
|
||
self.push_terminal_message(normalized, channel)
|
||
if channel == "stderr" or level in ("警告", "错误"):
|
||
self.log_queue.put((normalized, "错误" if channel == "stderr" else level))
|
||
|
||
old_stdout = sys.stdout
|
||
old_stderr = sys.stderr
|
||
|
||
class TeeConsole(io.TextIOBase):
|
||
def __init__(self, original_stream, push_fn, channel):
|
||
self.original_stream = original_stream
|
||
self.push_fn = push_fn
|
||
self.channel = channel
|
||
self._buffer = ""
|
||
|
||
def write(self, message):
|
||
if not message:
|
||
return 0
|
||
if self.original_stream:
|
||
self.original_stream.write(message)
|
||
self.original_stream.flush()
|
||
self._buffer += message
|
||
while "\n" in self._buffer:
|
||
line, self._buffer = self._buffer.split("\n", 1)
|
||
line = line.rstrip("\r")
|
||
if line.strip():
|
||
self.push_fn(line, self.channel)
|
||
return len(message)
|
||
|
||
def flush(self):
|
||
if self.original_stream:
|
||
self.original_stream.flush()
|
||
if self._buffer.strip():
|
||
self.push_fn(self._buffer.strip(), self.channel)
|
||
self._buffer = ""
|
||
|
||
sys.stdout = TeeConsole(old_stdout, push_console_output, "stdout")
|
||
sys.stderr = TeeConsole(old_stderr, push_console_output, "stderr")
|
||
try:
|
||
yield
|
||
finally:
|
||
try:
|
||
sys.stdout.flush()
|
||
sys.stderr.flush()
|
||
finally:
|
||
sys.stdout = old_stdout
|
||
sys.stderr = old_stderr
|
||
|
||
def start_program(self):
|
||
"""启动后台采摘程序线程。"""
|
||
try:
|
||
if self.running:
|
||
messagebox.showwarning("提示", "已有任务正在运行,请先停止当前任务")
|
||
return
|
||
if not self.save_parameters():
|
||
messagebox.showwarning("警告", "参数保存失败,无法启动程序")
|
||
return
|
||
self.running = True
|
||
self.runtime_mode = "full"
|
||
self.show_background_only = False
|
||
self.set_status_badge("运行中", "running")
|
||
self.start_btn.config(state=tk.DISABLED)
|
||
self.vision_test_btn.config(state=tk.DISABLED)
|
||
self.stop_btn.config(state=tk.NORMAL)
|
||
self.push_terminal_message("终端监听已连接,等待后台标准输出...", "system")
|
||
def main_thread():
|
||
exited_with_error = False
|
||
try:
|
||
import control
|
||
# 将 UI 的画面更新函数注入到控制模块,
|
||
# 这样 control.py 中的视觉线程就能直接把最新画面推送回来。
|
||
control.set_ui_callback(self.update_camera_frame)
|
||
with self.capture_console_output():
|
||
control.main()
|
||
except Exception as e:
|
||
exited_with_error = True
|
||
self.log(f"后台线程异常退出: {str(e)}", level="错误")
|
||
self.push_terminal_message(f"后台线程异常退出: {str(e)}", "stderr")
|
||
self.root.after(0, lambda: self.set_status_badge("异常退出", "error"))
|
||
finally:
|
||
self.running = False
|
||
self.runtime_mode = None
|
||
self.root.after(0, lambda: self.stop_btn.config(state=tk.DISABLED))
|
||
self.root.after(0, lambda: self.start_btn.config(state=tk.NORMAL))
|
||
self.root.after(0, lambda: self.vision_test_btn.config(state=tk.NORMAL))
|
||
if not exited_with_error:
|
||
self.root.after(0, lambda: self.set_status_badge("已停止", "idle"))
|
||
self.thread = threading.Thread(target=main_thread)
|
||
self.thread.daemon = True
|
||
self.thread.start()
|
||
self.log("程序启动成功")
|
||
except Exception as e:
|
||
self.set_status_badge("启动失败", "error")
|
||
self.log(f"程序启动失败: {str(e)}", level="错误")
|
||
messagebox.showerror("错误", f"程序启动失败: {str(e)}")
|
||
self.start_btn.config(state=tk.NORMAL)
|
||
self.vision_test_btn.config(state=tk.NORMAL)
|
||
self.stop_btn.config(state=tk.DISABLED)
|
||
|
||
def start_vision_test(self):
|
||
"""启动仅做相机采集和 YOLO 推理的视觉测试。"""
|
||
try:
|
||
if self.running:
|
||
messagebox.showwarning("提示", "已有任务正在运行,请先停止当前任务")
|
||
return
|
||
if not self.save_parameters():
|
||
messagebox.showwarning("警告", "参数保存失败,无法启动视觉测试")
|
||
return
|
||
|
||
self.running = True
|
||
self.runtime_mode = "vision_test"
|
||
self.vision_test_stop_event.clear()
|
||
self.show_background_only = False
|
||
self.set_status_badge("视觉测试中", "running")
|
||
self.start_btn.config(state=tk.DISABLED)
|
||
self.vision_test_btn.config(state=tk.DISABLED)
|
||
self.stop_btn.config(state=tk.NORMAL)
|
||
self.push_terminal_message("视觉测试启动:仅加载相机和 YOLO,不启动机械臂。", "system")
|
||
|
||
def vision_test_thread():
|
||
pipeline = None
|
||
exited_with_error = False
|
||
try:
|
||
import control_core
|
||
with self.capture_console_output():
|
||
pipeline, align, _ = control_core.init_camera()
|
||
self.vision_test_pipeline = pipeline
|
||
model = control_core.init_tomato_detector(self.yolo_model_path.get().strip())
|
||
self.log("视觉测试已启动,可直接查看相机检测结果")
|
||
|
||
while not self.vision_test_stop_event.is_set():
|
||
try:
|
||
frames = pipeline.wait_for_frames(timeout_ms=3000)
|
||
if not frames:
|
||
continue
|
||
aligned_frames = align.process(frames)
|
||
color_frame = aligned_frames.get_color_frame()
|
||
if not color_frame:
|
||
continue
|
||
|
||
color_image = np.asanyarray(color_frame.get_data())
|
||
results = model(color_image, classes=0, conf=control_core.YOLO_DETECT_CONF, verbose=False)
|
||
annotated_image = self.build_vision_test_frame(
|
||
color_image,
|
||
results[0],
|
||
control_core.PICK_CONFIDENCE_THRESHOLD,
|
||
)
|
||
self.update_camera_frame(annotated_image)
|
||
except Exception as frame_exc:
|
||
if self.vision_test_stop_event.is_set():
|
||
break
|
||
self.log(f"视觉测试帧处理失败: {frame_exc}", level="警告")
|
||
time.sleep(0.2)
|
||
self.push_terminal_message("视觉测试已停止。", "system")
|
||
except Exception as exc:
|
||
exited_with_error = True
|
||
self.log(f"视觉测试启动失败: {exc}", level="错误")
|
||
self.push_terminal_message(f"视觉测试异常退出: {exc}", "stderr")
|
||
self.root.after(0, lambda: self.set_status_badge("视觉测试失败", "error"))
|
||
finally:
|
||
if pipeline is not None:
|
||
try:
|
||
pipeline.stop()
|
||
except Exception:
|
||
pass
|
||
self.vision_test_pipeline = None
|
||
self.running = False
|
||
self.runtime_mode = None
|
||
self.vision_test_stop_event.set()
|
||
self.root.after(0, lambda: self.stop_btn.config(state=tk.DISABLED))
|
||
self.root.after(0, lambda: self.start_btn.config(state=tk.NORMAL))
|
||
self.root.after(0, lambda: self.vision_test_btn.config(state=tk.NORMAL))
|
||
if not exited_with_error:
|
||
self.root.after(0, lambda: self.set_status_badge("已停止", "idle"))
|
||
|
||
self.thread = threading.Thread(target=vision_test_thread, daemon=True)
|
||
self.thread.start()
|
||
except Exception as e:
|
||
self.running = False
|
||
self.runtime_mode = None
|
||
self.vision_test_stop_event.set()
|
||
self.set_status_badge("启动失败", "error")
|
||
self.log(f"视觉测试启动失败: {str(e)}", level="错误")
|
||
messagebox.showerror("错误", f"视觉测试启动失败: {str(e)}")
|
||
self.start_btn.config(state=tk.NORMAL)
|
||
self.vision_test_btn.config(state=tk.NORMAL)
|
||
self.stop_btn.config(state=tk.DISABLED)
|
||
|
||
def stop_program(self):
|
||
"""停止后台程序并恢复按钮状态。"""
|
||
if not self.running and not (self.thread and self.thread.is_alive()):
|
||
self.restore_initial_camera_view()
|
||
self.start_btn.config(state=tk.NORMAL)
|
||
self.vision_test_btn.config(state=tk.NORMAL)
|
||
self.stop_btn.config(state=tk.DISABLED)
|
||
self.runtime_mode = None
|
||
self.set_status_badge("已停止", "idle")
|
||
return
|
||
|
||
self.running = False
|
||
self.set_status_badge("停止中", "warning")
|
||
self.log("正在停止程序...")
|
||
self.restore_initial_camera_view()
|
||
self.start_btn.config(state=tk.NORMAL)
|
||
self.vision_test_btn.config(state=tk.NORMAL)
|
||
self.stop_btn.config(state=tk.DISABLED)
|
||
|
||
thread_to_wait = self.thread
|
||
self.thread = None
|
||
|
||
try:
|
||
import control
|
||
control.set_ui_callback(lambda *_args, **_kwargs: None)
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
import control_core
|
||
control_core.set_ui_callback(lambda *_args, **_kwargs: None)
|
||
control_core.reset_runtime_state()
|
||
except Exception as e:
|
||
self.log(f"发送停止信号失败: {e}", level="警告")
|
||
|
||
self.vision_test_stop_event.set()
|
||
if self.vision_test_pipeline is not None:
|
||
try:
|
||
self.vision_test_pipeline.stop()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self.vision_test_pipeline = None
|
||
|
||
self.runtime_mode = None
|
||
|
||
if thread_to_wait and thread_to_wait.is_alive():
|
||
def wait_for_shutdown():
|
||
thread_to_wait.join(timeout=3)
|
||
if thread_to_wait.is_alive():
|
||
self.root.after(
|
||
0,
|
||
lambda: self.log("后台线程仍未完全退出,请检查相机或控制器是否阻塞", level="警告"),
|
||
)
|
||
else:
|
||
self.root.after(0, lambda: self.log("程序已停止"))
|
||
self.root.after(0, lambda: self.set_status_badge("已停止", "idle"))
|
||
|
||
threading.Thread(target=wait_for_shutdown, daemon=True).start()
|
||
else:
|
||
self.set_status_badge("已停止", "idle")
|
||
self.log("程序已停止")
|
||
|
||
def push_terminal_message(self, message, kind="stdout"):
|
||
"""向终端输出区域追加一条消息。"""
|
||
self.terminal_queue.put((kind, message))
|
||
|
||
def log(self, message, level="信息", sync_terminal=False):
|
||
"""向日志队列追加一条消息,由 UI 主线程异步消费。"""
|
||
self.log_queue.put((message, level))
|
||
if sync_terminal:
|
||
self.push_terminal_message(f"[UI-{level}] {message}", "system")
|
||
|
||
def append_text_line(self, text_widget, text, line_count_attr, max_lines, tag=None):
|
||
"""向文本框追加一行并限制最大行数。"""
|
||
current_count = getattr(self, line_count_attr) + 1
|
||
setattr(self, line_count_attr, current_count)
|
||
if current_count > max_lines:
|
||
text_widget.config(state=tk.NORMAL)
|
||
text_widget.delete(1.0, 2.0)
|
||
text_widget.config(state=tk.DISABLED)
|
||
setattr(self, line_count_attr, current_count - 1)
|
||
text_widget.config(state=tk.NORMAL)
|
||
if tag:
|
||
text_widget.insert(tk.END, text, tag)
|
||
else:
|
||
text_widget.insert(tk.END, text)
|
||
text_widget.see(tk.END)
|
||
text_widget.config(state=tk.DISABLED)
|
||
|
||
def format_log_entry(self, timestamp, message, level):
|
||
"""格式化运行日志文本和标签。"""
|
||
tag = "info"
|
||
prefix = "信息"
|
||
if level == "警告":
|
||
tag = "warning"
|
||
prefix = "警告"
|
||
elif level == "错误":
|
||
tag = "error"
|
||
prefix = "错误"
|
||
return tag, f"[{timestamp}] {prefix}: {message}\n"
|
||
|
||
def should_display_log(self, level):
|
||
"""根据下拉框判断日志是否显示。"""
|
||
selected = self.log_level.get()
|
||
return selected == "全部" or selected == level
|
||
|
||
def on_log_level_changed(self, event=None):
|
||
"""切换日志级别时刷新日志视图。"""
|
||
self.refresh_log_view()
|
||
|
||
def refresh_log_view(self):
|
||
"""根据当前筛选条件重绘运行日志。"""
|
||
self.log_text.config(state=tk.NORMAL)
|
||
self.log_text.delete(1.0, tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
self.log_line_count = 0
|
||
self.log_text.tag_config("info", foreground=self.colors["text"])
|
||
self.log_text.tag_config("warning", foreground="#D97706")
|
||
self.log_text.tag_config("error", foreground="#DC2626")
|
||
for timestamp, message, level in self.log_records:
|
||
if self.should_display_log(level):
|
||
tag, display_text = self.format_log_entry(timestamp, message, level)
|
||
self.append_text_line(self.log_text, display_text, "log_line_count", MAX_LOG_LINES, tag=tag)
|
||
|
||
def process_log_queue(self):
|
||
"""定时消费日志队列和终端队列并刷新文本框。"""
|
||
while not self.log_queue.empty():
|
||
message, level = self.log_queue.get()
|
||
timestamp = time.strftime('%H:%M:%S')
|
||
self.log_records.append((timestamp, message, level))
|
||
if len(self.log_records) > MAX_LOG_LINES * 2:
|
||
self.log_records.pop(0)
|
||
self.log_text.tag_config("info", foreground=self.colors["text"])
|
||
self.log_text.tag_config("warning", foreground="#D97706")
|
||
self.log_text.tag_config("error", foreground="#DC2626")
|
||
if self.should_display_log(level):
|
||
tag, display_text = self.format_log_entry(timestamp, message, level)
|
||
self.append_text_line(self.log_text, display_text, "log_line_count", MAX_LOG_LINES, tag=tag)
|
||
self.log_queue.task_done()
|
||
while not self.terminal_queue.empty():
|
||
kind, message = self.terminal_queue.get()
|
||
prefix_map = {
|
||
"stdout": "STDOUT",
|
||
"stderr": "STDERR",
|
||
"system": "SYSTEM",
|
||
}
|
||
tag_map = {
|
||
"stdout": "terminal_stdout",
|
||
"stderr": "terminal_stderr",
|
||
"system": "terminal_system",
|
||
}
|
||
self.terminal_text.tag_config("terminal_stdout", foreground=self.colors["terminal_fg"])
|
||
self.terminal_text.tag_config("terminal_stderr", foreground=self.colors["terminal_err"])
|
||
self.terminal_text.tag_config("terminal_system", foreground=self.colors["terminal_muted"])
|
||
self.append_text_line(
|
||
self.terminal_text,
|
||
f"[{time.strftime('%H:%M:%S')}] {prefix_map.get(kind, 'OUT')}: {message}\n",
|
||
"terminal_line_count",
|
||
MAX_TERMINAL_LINES,
|
||
tag=tag_map.get(kind, "terminal_stdout")
|
||
)
|
||
self.terminal_queue.task_done()
|
||
self.root.after(200, self.process_log_queue)
|
||
|
||
def clear_log(self):
|
||
"""清空界面日志显示区域。"""
|
||
self.log_text.config(state=tk.NORMAL)
|
||
self.log_text.delete(1.0, tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
self.log_line_count = 0
|
||
self.log_records.clear()
|
||
self.log("日志已清空")
|
||
|
||
def clear_terminal(self):
|
||
"""清空实时终端输出区域。"""
|
||
self.terminal_text.config(state=tk.NORMAL)
|
||
self.terminal_text.delete(1.0, tk.END)
|
||
self.terminal_text.config(state=tk.DISABLED)
|
||
self.terminal_line_count = 0
|
||
|
||
def build_vision_test_frame(self, image, result, confidence_threshold):
|
||
"""生成视觉测试画面,仅显示检测框、置信度、中心点和 YOLO-pose 关键点。"""
|
||
annotated = image.copy()
|
||
frame_h, frame_w = annotated.shape[:2]
|
||
header_y = max(30, int(frame_h * 0.05))
|
||
cv2.putText(
|
||
annotated,
|
||
"Vision Test",
|
||
(12, header_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
(38, 166, 154),
|
||
2,
|
||
)
|
||
|
||
boxes = getattr(result, "boxes", None)
|
||
keypoints = getattr(result, "keypoints", None)
|
||
keypoint_xy = getattr(keypoints, "xy", None)
|
||
keypoint_conf = getattr(keypoints, "conf", None)
|
||
tomato_index = 0
|
||
iterable_boxes = boxes if boxes is not None else []
|
||
|
||
for box_index, box in enumerate(iterable_boxes):
|
||
try:
|
||
confidence = float(box.conf[0]) if hasattr(box.conf, "__len__") else float(box.conf)
|
||
if int(box.cls) != 0 or confidence < confidence_threshold:
|
||
continue
|
||
|
||
tomato_index += 1
|
||
x1, y1, x2, y2 = [int(value) for value in box.xyxy[0].cpu().numpy()]
|
||
x1 = max(0, min(frame_w - 1, x1))
|
||
x2 = max(0, min(frame_w - 1, x2))
|
||
y1 = max(0, min(frame_h - 1, y1))
|
||
y2 = max(0, min(frame_h - 1, y2))
|
||
if x2 <= x1 or y2 <= y1:
|
||
continue
|
||
|
||
pixel_x = int((x1 + x2) / 2)
|
||
pixel_y = int((y1 + y2) / 2)
|
||
label = f"Tomato-{tomato_index} {confidence:.2f}"
|
||
|
||
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 165, 255), 2)
|
||
cv2.circle(annotated, (pixel_x, pixel_y), 5, (0, 0, 255), -1)
|
||
|
||
text_scale = 0.55
|
||
text_thickness = 2
|
||
(text_w, text_h), baseline = cv2.getTextSize(
|
||
label,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
text_scale,
|
||
text_thickness,
|
||
)
|
||
text_x = max(6, min(x1, frame_w - text_w - 10))
|
||
text_y = y1 - 10
|
||
if text_y - text_h < header_y + 8:
|
||
text_y = min(frame_h - 8, y2 + text_h + 10)
|
||
box_top = max(0, text_y - text_h - baseline - 4)
|
||
box_bottom = min(frame_h, text_y + baseline + 2)
|
||
box_right = min(frame_w, text_x + text_w + 8)
|
||
cv2.rectangle(annotated, (text_x - 4, box_top), (box_right, box_bottom), (0, 165, 255), -1)
|
||
cv2.putText(
|
||
annotated,
|
||
label,
|
||
(text_x, text_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
text_scale,
|
||
(255, 255, 255),
|
||
text_thickness,
|
||
)
|
||
|
||
if keypoint_xy is not None:
|
||
try:
|
||
current_points = keypoint_xy[box_index]
|
||
if hasattr(current_points, "cpu"):
|
||
current_points = current_points.cpu().numpy()
|
||
current_conf = None
|
||
if keypoint_conf is not None:
|
||
current_conf = keypoint_conf[box_index]
|
||
if hasattr(current_conf, "cpu"):
|
||
current_conf = current_conf.cpu().numpy()
|
||
|
||
for point_index, point in enumerate(current_points):
|
||
px, py = [int(v) for v in point[:2]]
|
||
if px <= 0 and py <= 0:
|
||
continue
|
||
if current_conf is not None and point_index < len(current_conf):
|
||
if float(current_conf[point_index]) < 0.2:
|
||
continue
|
||
px = max(0, min(frame_w - 1, px))
|
||
py = max(0, min(frame_h - 1, py))
|
||
cv2.circle(annotated, (px, py), 4, (0, 255, 255), -1)
|
||
cv2.circle(annotated, (px, py), 7, (0, 120, 255), 1)
|
||
except Exception as kp_exc:
|
||
self.log(f"视觉测试关键点绘制失败: {kp_exc}", level="警告")
|
||
except Exception as exc:
|
||
self.log(f"视觉测试标注失败: {exc}", level="警告")
|
||
|
||
return annotated
|
||
|
||
def serialize_detection_boxes_for_ui(self, boxes, image=None, fit_line_fn=None):
|
||
"""把检测框整理成 UI 叠加显示需要的结构。"""
|
||
detections = []
|
||
for index, box in enumerate(boxes, start=1):
|
||
try:
|
||
x1, y1, x2, y2 = [int(value) for value in box.xyxy[0].cpu().numpy()]
|
||
cutpoint = []
|
||
end_point = []
|
||
if image is not None and callable(fit_line_fn):
|
||
crop_image = image[y1:y2, x1:x2]
|
||
line_points = fit_line_fn(crop_image, x1, y1)
|
||
if line_points:
|
||
p1, p2 = [list(map(int, point)) for point in line_points]
|
||
cutpoint, end_point = sorted([p1, p2], key=lambda point: point[1])
|
||
detections.append(
|
||
{
|
||
"label": f"Tomato-{index}",
|
||
"confidence": float(box.conf),
|
||
"bbox": [x1, y1, x2, y2],
|
||
"cutpoint": cutpoint,
|
||
"end_point": end_point,
|
||
}
|
||
)
|
||
except Exception as exc:
|
||
self.log(f"检测框解析失败: {exc}", level="警告")
|
||
return detections
|
||
|
||
@staticmethod
|
||
def _overlay_text_rect(origin, text_size, baseline=0, padding=4):
|
||
text_x, text_y = origin
|
||
text_w, text_h = text_size
|
||
return (
|
||
text_x - padding,
|
||
text_y - text_h - baseline - padding,
|
||
text_x + text_w + padding,
|
||
text_y + baseline + padding,
|
||
)
|
||
|
||
@staticmethod
|
||
def _rects_overlap(rect_a, rect_b, padding=4):
|
||
ax1, ay1, ax2, ay2 = rect_a
|
||
bx1, by1, bx2, by2 = rect_b
|
||
return not (
|
||
ax2 + padding < bx1
|
||
or bx2 + padding < ax1
|
||
or ay2 + padding < by1
|
||
or by2 + padding < ay1
|
||
)
|
||
|
||
@staticmethod
|
||
def _clamp_text_origin(x, y, text_size, baseline, frame_w, frame_h, margin=6):
|
||
text_w, text_h = text_size
|
||
x = max(margin, min(int(x), frame_w - text_w - margin))
|
||
y = max(text_h + baseline + margin, min(int(y), frame_h - baseline - margin))
|
||
return x, y
|
||
|
||
def _place_overlay_text(self, text, anchors, occupied_rects, frame_shape, scale=0.48, thickness=1):
|
||
frame_h, frame_w = frame_shape[:2]
|
||
(text_w, text_h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness)
|
||
first_origin = None
|
||
first_rect = None
|
||
for anchor_x, anchor_y in anchors:
|
||
origin = self._clamp_text_origin(anchor_x, anchor_y, (text_w, text_h), baseline, frame_w, frame_h)
|
||
rect = self._overlay_text_rect(origin, (text_w, text_h), baseline)
|
||
if first_origin is None:
|
||
first_origin, first_rect = origin, rect
|
||
if not any(self._rects_overlap(rect, used_rect, padding=6) for used_rect in occupied_rects):
|
||
return origin, rect
|
||
return first_origin, first_rect
|
||
|
||
@staticmethod
|
||
def _draw_outlined_text(image, text, origin, scale, color, thickness=1):
|
||
cv2.putText(
|
||
image,
|
||
text,
|
||
origin,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
scale,
|
||
(0, 0, 0),
|
||
thickness + 2,
|
||
cv2.LINE_AA,
|
||
)
|
||
cv2.putText(
|
||
image,
|
||
text,
|
||
origin,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
scale,
|
||
color,
|
||
thickness,
|
||
cv2.LINE_AA,
|
||
)
|
||
|
||
def overlay_detection_metadata(self, image, payload):
|
||
"""在冻结画面上叠加番茄串框、置信度和关键点信息。"""
|
||
if not isinstance(payload, dict):
|
||
return image
|
||
|
||
detections = payload.get("detections") or []
|
||
if not detections:
|
||
return image
|
||
|
||
annotated = image.copy()
|
||
is_freeze_frame = bool(payload.get("freeze"))
|
||
stage = payload.get("stage", "")
|
||
header_text = "Pick Freeze" if is_freeze_frame else ("Vision Test" if stage == "vision_test" else "Detection")
|
||
header_y = max(30, int(annotated.shape[0] * 0.05))
|
||
occupied_rects = []
|
||
(header_w, header_h), header_baseline = cv2.getTextSize(
|
||
header_text,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
2,
|
||
)
|
||
occupied_rects.append(
|
||
self._overlay_text_rect((12, header_y), (header_w, header_h), header_baseline, padding=6)
|
||
)
|
||
cv2.putText(
|
||
annotated,
|
||
header_text,
|
||
(12, header_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
(38, 166, 154),
|
||
2,
|
||
)
|
||
|
||
for index, item in enumerate(detections, start=1):
|
||
bbox = item.get("bbox") or []
|
||
cutpoint = item.get("cutpoint") or []
|
||
end_point = item.get("end_point") or []
|
||
if len(bbox) != 4:
|
||
continue
|
||
|
||
x1, y1, x2, y2 = [int(v) for v in bbox]
|
||
confidence = float(item.get("confidence", 0.0))
|
||
label = item.get("label", f"Tomato-{index}")
|
||
box_color = (41, 121, 255) if is_freeze_frame else (64, 181, 246)
|
||
|
||
cv2.rectangle(annotated, (x1, y1), (x2, y2), box_color, 2)
|
||
|
||
text = f"{label} {confidence:.2f}"
|
||
(text_w, text_h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 2)
|
||
text_x = max(6, min(x1, annotated.shape[1] - text_w - 10))
|
||
text_y = y1 - 10
|
||
if text_y - text_h < header_y + 8:
|
||
text_y = min(annotated.shape[0] - 8, y2 + text_h + 10)
|
||
box_top = max(0, text_y - text_h - baseline - 4)
|
||
box_bottom = min(annotated.shape[0], text_y + baseline + 2)
|
||
box_right = min(annotated.shape[1], text_x + text_w + 8)
|
||
cv2.rectangle(annotated, (text_x - 4, box_top), (box_right, box_bottom), box_color, -1)
|
||
occupied_rects.append((text_x - 4, box_top, box_right, box_bottom))
|
||
cv2.putText(
|
||
annotated,
|
||
text,
|
||
(text_x, text_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.55,
|
||
(255, 255, 255),
|
||
2,
|
||
)
|
||
|
||
if len(cutpoint) == 2 and len(end_point) == 2:
|
||
p1 = tuple(int(v) for v in cutpoint)
|
||
p2 = tuple(int(v) for v in end_point)
|
||
cv2.circle(annotated, p1, 5, (0, 255, 255), -1)
|
||
cv2.circle(annotated, p2, 5, (255, 215, 0), -1)
|
||
cv2.line(annotated, p1, p2, (0, 220, 0), 2)
|
||
cv2.putText(
|
||
annotated,
|
||
"cutpoint",
|
||
(p1[0] + 8, max(18, p1[1] - 8)),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.45,
|
||
(0, 255, 255),
|
||
1,
|
||
)
|
||
(cut_w, cut_h), cut_baseline = cv2.getTextSize(
|
||
"cutpoint",
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.45,
|
||
1,
|
||
)
|
||
occupied_rects.append(
|
||
self._overlay_text_rect(
|
||
(p1[0] + 8, max(18, p1[1] - 8)),
|
||
(cut_w, cut_h),
|
||
cut_baseline,
|
||
)
|
||
)
|
||
cv2.putText(
|
||
annotated,
|
||
"end_point",
|
||
(p2[0] + 8, min(annotated.shape[0] - 10, p2[1] + 16)),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.45,
|
||
(255, 215, 0),
|
||
1,
|
||
)
|
||
(end_w, end_h), end_baseline = cv2.getTextSize(
|
||
"end_point",
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.45,
|
||
1,
|
||
)
|
||
occupied_rects.append(
|
||
self._overlay_text_rect(
|
||
(p2[0] + 8, min(annotated.shape[0] - 10, p2[1] + 16)),
|
||
(end_w, end_h),
|
||
end_baseline,
|
||
)
|
||
)
|
||
angle_deg = item.get("angle_deg")
|
||
if angle_deg is not None:
|
||
angle_text = f"Angle: {float(angle_deg):.2f}deg"
|
||
(angle_w, angle_h), angle_baseline = cv2.getTextSize(
|
||
angle_text,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.5,
|
||
2,
|
||
)
|
||
angle_x = max(6, min(p2[0] + 8, annotated.shape[1] - angle_w - 6))
|
||
angle_y = max(18, min(p2[1] + 18, annotated.shape[0] - angle_baseline - 6))
|
||
occupied_rects.append(
|
||
self._overlay_text_rect(
|
||
(angle_x, angle_y),
|
||
(angle_w, angle_h),
|
||
angle_baseline,
|
||
)
|
||
)
|
||
|
||
pick_xyz = item.get("pick_xyz") or []
|
||
if is_freeze_frame and len(pick_xyz) >= 3 and len(cutpoint) == 2:
|
||
try:
|
||
xyz = [float(value) for value in pick_xyz[:3]]
|
||
p1 = tuple(int(v) for v in cutpoint)
|
||
xyz_text = f"D405 XYZ: {xyz[0]:.3f}, {xyz[1]:.3f}, {xyz[2]:.3f} m"
|
||
text_scale = 0.48
|
||
text_thickness = 1
|
||
|
||
(xyz_w, xyz_h), _xyz_baseline = cv2.getTextSize(
|
||
xyz_text,
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
text_scale,
|
||
text_thickness,
|
||
)
|
||
anchors = [
|
||
(x2 + 12, y1 + xyz_h + 8),
|
||
(x1 - xyz_w - 12, y1 + xyz_h + 8),
|
||
(x2 + 12, y2 - 8),
|
||
(x1 - xyz_w - 12, y2 - 8),
|
||
(p1[0] + 14, p1[1] - 18),
|
||
(p1[0] + 14, p1[1] + xyz_h + 22),
|
||
(p1[0] - xyz_w - 14, p1[1] - 18),
|
||
(p1[0] - xyz_w - 14, p1[1] + xyz_h + 22),
|
||
]
|
||
origin, xyz_rect = self._place_overlay_text(
|
||
xyz_text,
|
||
anchors,
|
||
occupied_rects,
|
||
annotated.shape,
|
||
scale=text_scale,
|
||
thickness=text_thickness,
|
||
)
|
||
if origin and xyz_rect:
|
||
leader_end = (
|
||
max(xyz_rect[0], min(p1[0], xyz_rect[2])),
|
||
max(xyz_rect[1], min(p1[1], xyz_rect[3])),
|
||
)
|
||
cv2.line(annotated, p1, leader_end, (0, 255, 255), 1, cv2.LINE_AA)
|
||
self._draw_outlined_text(
|
||
annotated,
|
||
xyz_text,
|
||
origin,
|
||
text_scale,
|
||
(0, 255, 255),
|
||
text_thickness,
|
||
)
|
||
occupied_rects.append(xyz_rect)
|
||
except Exception as xyz_exc:
|
||
self.log(f"pick xyz overlay failed: {xyz_exc}", level="警告")
|
||
|
||
return annotated
|
||
|
||
def update_camera_frame(self, image):
|
||
"""接收后台线程传来的 OpenCV 图像,并显示到 Tkinter 标签上。"""
|
||
def update():
|
||
try:
|
||
if self.show_background_only:
|
||
return
|
||
payload = image if isinstance(image, dict) else None
|
||
frame = payload.get("frame") if payload else image
|
||
if frame is None:
|
||
return
|
||
if payload:
|
||
frame = self.overlay_detection_metadata(frame, payload)
|
||
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
pil_img = Image.fromarray(image_rgb)
|
||
display_w = self.camera_display_frame.winfo_width()
|
||
display_h = self.camera_display_frame.winfo_height()
|
||
if display_w > 10 and display_h > 10:
|
||
# 这里与背景图一样,统一按当前显示区域缩放,
|
||
# 让视觉画面始终与右侧面板尺寸匹配。
|
||
pil_img = pil_img.resize((display_w, display_h), Image.LANCZOS)
|
||
self.camera_image = ImageTk.PhotoImage(image=pil_img)
|
||
self.camera_label.config(image=self.camera_image) # 覆盖1.png
|
||
except Exception as e:
|
||
self.log(f"相机画面更新失败: {str(e)}", level="错误")
|
||
self.root.after(0, update)
|
||
|
||
def signal_handler(self, sig, frame):
|
||
"""响应系统信号,优先走与手动停止一致的退出流程。"""
|
||
self.log(f"接收到信号 {sig},正在终止程序...", level="警告")
|
||
self.stop_program()
|
||
self.root.quit()
|
||
|
||
if __name__ == "__main__":
|
||
root = tk.Tk()
|
||
app = TomatoHarvestingUI(root)
|
||
root.mainloop()
|