Files
acAubo/main.py
2026-05-06 10:47:14 +08:00

1673 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog
import tkinter.font as tkfont
import queue
import threading
import time
import os
import sys
import json
import io
import ctypes
from ctypes import wintypes
from pathlib import Path
from PIL import Image, ImageTk, ImageDraw
import cv2
import numpy as np
import signal
import contextlib
"""
图形界面入口文件。
本文件主要负责:
1. 构建参数配置界面。
2. 启动/停止后台采摘主程序。
3. 捕获控制台输出并实时展示到日志面板。
4. 接收 control.py 回传的相机画面并显示在界面上。
界面层本身不直接实现采摘算法,而是负责把用户输入转成运行参数,
并把后台线程的运行状态可视化出来。
"""
# Windows 控制台缓冲区结构体。
# 当程序在 Windows 上运行时,会用它读取后台线程输出的控制台内容,
# 再把这些内容转发到 Tkinter 的日志面板中。
class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
_fields_ = [
("dwSize", wintypes._COORD),
("dwCursorPosition", wintypes._COORD),
("wAttributes", wintypes.WORD),
("srWindow", wintypes.SMALL_RECT),
("dwMaximumWindowSize", wintypes._COORD)
]
STD_OUTPUT_HANDLE = -11
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
MAX_LOG_LINES = 1000
MAX_TERMINAL_LINES = 1500
class TomatoHarvestingUI:
"""番茄采摘系统的主界面类。"""
def __init__(self, root):
self.root = root
self.root.title("番茄采摘系统参数配置")
screen_w = self.root.winfo_screenwidth()
screen_h = self.root.winfo_screenheight()
window_w = int(screen_w * 0.8)
window_h = int(screen_h * 0.7)
pos_x = max(0, (screen_w - window_w) // 2)
pos_y = max(0, (screen_h - window_h) // 2)
self.root.geometry(f"{window_w}x{window_h}+{pos_x}+{pos_y}")
self.root.minsize(max(980, int(screen_w * 0.5)), max(640, int(screen_h * 0.55)))
self.base_window_width = window_w
self.base_window_height = window_h
self.base_dir = Path(__file__).resolve().parent
self.settings_file = self.base_dir / "ui_settings.json"
self._font_resize_job = None
self._is_applying_font_scale = False
self.colors = {
"bg": "#F3F7FB",
"panel": "#FFFFFF",
"panel_soft": "#F8FBFD",
"border": "#D8E3EC",
"text": "#16324A",
"muted": "#6B7C93",
"accent": "#0F766E",
"accent_soft": "#D7F3EE",
"danger": "#B45309",
"danger_soft": "#FDE7D7",
"info": "#0F4C81",
"info_soft": "#DCEBFA",
"terminal_bg": "#0F172A",
"terminal_fg": "#DCFCE7",
"terminal_muted": "#93C5FD",
"terminal_err": "#FCA5A5",
"metric_1": "#E6FFFB",
"metric_2": "#EEF2FF",
"metric_3": "#FFF7ED",
"metric_4": "#F0FDF4",
}
# 设置中文字体
self.init_fonts()
# 程序运行状态
self.running = False
self.thread = None
self.runtime_mode = None
self.vision_test_stop_event = threading.Event()
self.vision_test_pipeline = None
self.show_background_only = True
self.scissors_enabled = tk.BooleanVar(value=True)
# 日志队列
self.log_queue = queue.Queue(maxsize=500)
self.log_line_count = 0
self.terminal_queue = queue.Queue(maxsize=1000)
self.terminal_line_count = 0
self.log_records = []
self.terminal_message_count = 0
self.collapsible_sections = {}
# 相机画面变量
self.camera_frame = None
self.camera_image = None
self.camera_bg_image = None # 保存背景图引用(关键:防止回收)
self.raw_bg_image = None # 新增保存原始1.png用于后续尺寸调整
# 创建界面
self.create_widgets()
# 初始化参数
self.init_parameters()
# 加载相机背景图延迟100ms确保窗口渲染完成后再加载
self.root.after(100, self.load_camera_background)
# 启动日志处理线程
self.root.after(200, self.process_log_queue)
# 信号处理
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def create_widgets(self):
"""构建整个界面布局。"""
# 设置主窗口背景
self.set_background_image()
main_frame = ttk.Frame(self.root, padding=(16, 12, 16, 16), style="App.TFrame")
main_frame.pack(fill=tk.BOTH, expand=True)
header_card = tk.Frame(
main_frame,
bg=self.colors["panel"],
bd=0,
highlightthickness=1,
highlightbackground=self.colors["border"]
)
header_card.pack(fill=tk.X, pady=(0, 10))
header_top = tk.Frame(header_card, bg=self.colors["panel"])
header_top.pack(fill=tk.X, padx=18, pady=10)
self.status_badge = tk.Label(
header_top,
text="系统就绪",
font=self.fonts["button"],
bg=self.colors["accent_soft"],
fg=self.colors["accent"],
padx=14,
pady=6
)
self.status_badge.pack(side=tk.RIGHT)
title_wrap = tk.Frame(header_top, bg=self.colors["panel"])
title_wrap.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.title_label = tk.Label(
title_wrap,
text="番茄采摘系统",
font=self.fonts["title"],
bg=self.colors["panel"],
fg=self.colors["text"]
)
self.title_label.pack(side=tk.LEFT)
self.subtitle_label = tk.Label(
title_wrap,
text="参数配置 · 日志 · 相机 · 终端",
font=self.fonts["subtitle"],
bg=self.colors["panel"],
fg=self.colors["muted"]
)
self.subtitle_label.pack(side=tk.LEFT, padx=(14, 0), pady=(4, 0))
body_frame = ttk.Frame(main_frame, style="App.TFrame")
body_frame.pack(fill=tk.BOTH, expand=True)
# 三栏布局(侧边配置 + 中间工作区 + 终端)
paned_window = ttk.PanedWindow(body_frame, orient=tk.HORIZONTAL)
paned_window.pack(fill=tk.BOTH, expand=True)
self.paned_window = paned_window
# 左侧参数区
params_frame = ttk.Frame(paned_window, width=470, style="App.TFrame")
paned_window.add(params_frame, weight=2)
# 中间日志和相机区
center_frame = ttk.Frame(paned_window, style="App.TFrame")
paned_window.add(center_frame, weight=3)
# 最右侧终端区
terminal_frame = ttk.Frame(paned_window, width=280, style="App.TFrame")
paned_window.add(terminal_frame, weight=1)
# 参数区布局:使用可滚动容器,避免窗口高度不足时底部控件被裁切
params_canvas = tk.Canvas(
params_frame,
highlightthickness=0,
bd=0,
relief="flat",
bg=self.colors["bg"]
)
params_scrollbar = ttk.Scrollbar(params_frame, orient=tk.VERTICAL, command=params_canvas.yview)
params_canvas.configure(yscrollcommand=params_scrollbar.set)
params_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
params_canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.params_canvas = params_canvas
self.params_scrollbar = params_scrollbar
params_inner = ttk.Frame(params_canvas, style="App.TFrame")
self.params_canvas_window = params_canvas.create_window((0, 0), window=params_inner, anchor="nw")
params_inner.bind(
"<Configure>",
lambda event: params_canvas.configure(scrollregion=params_canvas.bbox("all"))
)
params_canvas.bind(
"<Configure>",
lambda event: params_canvas.itemconfigure(self.params_canvas_window, width=event.width)
)
param_card = ttk.Frame(params_inner, style="Transparent.TFrame")
param_card.pack(fill=tk.BOTH, expand=True, pady=0, padx=(4, 8))
param_grid = ttk.Frame(param_card, style="Transparent.TFrame")
param_grid.pack(fill=tk.BOTH, expand=True)
param_grid.columnconfigure(0, weight=1)
param_grid.columnconfigure(1, weight=1)
# 1. 机械臂配置
robot_group = ttk.LabelFrame(param_grid, text="机械臂配置", padding="6", style="SidebarCard.TLabelframe")
robot_group.grid(row=0, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
robot_group.columnconfigure(1, weight=1)
robot_group.columnconfigure(3, weight=1)
ttk.Label(robot_group, text="机械臂IP:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.robot_ip = ttk.Entry(robot_group, width=16)
self.robot_ip.grid(row=0, column=1, sticky=tk.EW, pady=2, padx=4)
ttk.Label(robot_group, text="端口:").grid(row=0, column=2, sticky=tk.W, pady=2)
self.robot_port = ttk.Entry(robot_group, width=8)
self.robot_port.grid(row=0, column=3, sticky=tk.EW, pady=2, padx=4)
# 2. AGV配置
agv_group = ttk.LabelFrame(param_grid, text="AGV配置", padding="6", style="SidebarCard.TLabelframe")
agv_group.grid(row=1, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
agv_group.columnconfigure(1, weight=1)
agv_group.columnconfigure(3, weight=1)
ttk.Label(agv_group, text="AGV IP:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.agv_ip = ttk.Entry(agv_group, width=16)
self.agv_ip.grid(row=0, column=1, sticky=tk.EW, pady=2, padx=4)
ttk.Label(agv_group, text="端口:").grid(row=0, column=2, sticky=tk.W, pady=2)
self.agv_port = ttk.Entry(agv_group, width=8)
self.agv_port.grid(row=0, column=3, sticky=tk.EW, pady=2, padx=4)
ttk.Label(agv_group, text="前进(m/s):").grid(row=1, column=0, sticky=tk.W, pady=2)
self.agv_speed_forward = ttk.Entry(agv_group, width=8)
self.agv_speed_forward.grid(row=1, column=1, sticky=tk.EW, pady=2, padx=4)
ttk.Label(agv_group, text="停止(m/s):").grid(row=1, column=2, sticky=tk.W, pady=2)
self.agv_speed_stop = ttk.Entry(agv_group, width=8)
self.agv_speed_stop.grid(row=1, column=3, sticky=tk.EW, pady=2, padx=4)
# 3. 运行参数
run_group = ttk.LabelFrame(param_grid, text="运行参数", padding="6", style="SidebarCard.TLabelframe")
run_group.grid(row=2, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
run_group.columnconfigure(1, weight=1)
run_group.columnconfigure(3, weight=1)
ttk.Label(run_group, text="总时长(s):").grid(row=0, column=0, sticky=tk.W, pady=2)
self.total_duration = ttk.Entry(run_group, width=8)
self.total_duration.grid(row=0, column=1, sticky=tk.EW, pady=2, padx=4)
ttk.Label(run_group, text="停止超时(s):").grid(row=0, column=2, sticky=tk.W, pady=2)
self.agv_stop_timeout = ttk.Entry(run_group, width=8)
self.agv_stop_timeout.grid(row=0, column=3, sticky=tk.EW, pady=2, padx=4)
# YOLO模型路径
model_frame = ttk.Frame(run_group)
model_frame.grid(row=1, column=0, columnspan=4, sticky=tk.W+tk.E, pady=2)
ttk.Label(model_frame, text="YOLO模型:").pack(side=tk.LEFT, pady=2)
self.yolo_model_path = ttk.Entry(model_frame)
self.yolo_model_path.pack(side=tk.LEFT, pady=2, padx=4, fill=tk.X, expand=True)
self.browse_btn = ttk.Button(model_frame, text="浏览", command=self.browse_model, width=5)
self.browse_btn.pack(side=tk.LEFT, padx=(3, 0), pady=2)
bg_frame = ttk.Frame(run_group)
bg_frame.grid(row=2, column=0, columnspan=4, sticky=tk.W+tk.E, pady=2)
ttk.Label(bg_frame, text="背景图:").pack(side=tk.LEFT, pady=2)
self.camera_bg_path = ttk.Entry(bg_frame)
self.camera_bg_path.pack(side=tk.LEFT, pady=2, padx=4, fill=tk.X, expand=True)
self.browse_bg_btn = ttk.Button(bg_frame, text="浏览", command=self.browse_camera_background, width=5)
self.browse_bg_btn.pack(side=tk.LEFT, padx=(3, 0), pady=2)
# 4. 放置位置
place_group = ttk.LabelFrame(param_grid, text="放置位置", padding="6", style="SidebarCard.TLabelframe")
place_group.grid(row=3, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
# 位置参数输入
place_params_frame = ttk.Frame(place_group)
place_params_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 2))
place_params_frame.columnconfigure(1, weight=1)
place_params_frame.columnconfigure(3, weight=1)
self.place_entries = []
left_labels = ["X(m):", "Y(m):", "Z(m):"]
right_labels = ["Roll:", "Pitch:", "Yaw:"]
for i, label in enumerate(left_labels):
ttk.Label(place_params_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=2, padx=(2, 4))
entry = ttk.Entry(place_params_frame, width=9)
entry.grid(row=i, column=1, sticky=tk.EW, pady=2, padx=2)
self.place_entries.append(entry)
for i, label in enumerate(right_labels):
ttk.Label(place_params_frame, text=label).grid(row=i, column=2, sticky=tk.W, pady=2, padx=(8, 4))
entry = ttk.Entry(place_params_frame, width=9)
entry.grid(row=i, column=3, sticky=tk.EW, pady=2, padx=2)
self.place_entries.append(entry)
self.save_place_btn = ttk.Button(place_group, text="保存位置", command=self.save_place_position)
self.save_place_btn.pack(pady=4)
# 5. 控制按钮
control_frame = ttk.LabelFrame(param_grid, text="系统控制", padding="6", style="SidebarCard.TLabelframe")
control_frame.grid(row=4, column=0, columnspan=2, sticky=tk.W+tk.E, pady=4, padx=4)
btn_frame = ttk.Frame(control_frame)
btn_frame.pack(fill=tk.X, pady=(3, 1))
btn_frame.columnconfigure(0, weight=1)
btn_frame.columnconfigure(1, weight=1)
btn_frame.columnconfigure(2, weight=1)
btn_frame.columnconfigure(3, weight=1)
self.start_btn = ttk.Button(btn_frame, text="启动程序", command=self.start_program, style="Primary.TButton")
self.start_btn.grid(row=0, column=0, padx=3, pady=3, sticky=tk.EW)
self.vision_test_btn = ttk.Button(btn_frame, text="视觉测试", command=self.start_vision_test, style="Secondary.TButton")
self.vision_test_btn.grid(row=0, column=1, padx=3, pady=3, sticky=tk.EW)
self.stop_btn = ttk.Button(btn_frame, text="停止程序", command=self.stop_program, state=tk.DISABLED, style="Danger.TButton")
self.stop_btn.grid(row=0, column=2, padx=3, pady=3, sticky=tk.EW)
self.save_btn = ttk.Button(btn_frame, text="保存参数", command=self.save_parameters, style="Secondary.TButton")
self.save_btn.grid(row=0, column=3, padx=3, pady=3, sticky=tk.EW)
self.scissors_enabled_check = ttk.Checkbutton(
control_frame,
text="启用末端剪刀",
variable=self.scissors_enabled,
)
self.scissors_enabled_check.pack(anchor=tk.W, padx=5, pady=(3, 1))
# 中间区域布局
# 日志区域(上方)
log_frame = ttk.LabelFrame(center_frame, text="运行日志", padding="8", style="Card.TLabelframe")
log_frame.pack(fill=tk.X, expand=False, pady=(4, 8), padx=5)
# 日志控制栏
log_control = ttk.Frame(log_frame)
log_control.pack(fill=tk.X, pady=(2, 4))
ttk.Label(log_control, text="日志显示:").pack(side=tk.LEFT, padx=5)
self.log_level = tk.StringVar(value="全部")
log_level_combo = ttk.Combobox(log_control, textvariable=self.log_level, width=10, state="readonly")
log_level_combo['values'] = ("全部", "信息", "警告", "错误")
log_level_combo.pack(side=tk.LEFT, padx=5)
log_level_combo.bind("<<ComboboxSelected>>", self.on_log_level_changed)
ttk.Button(log_control, text="清空日志", command=self.clear_log).pack(side=tk.RIGHT, padx=5)
# 日志文本框
self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=4, bd=0, relief="flat", highlightthickness=0)
self.log_text.pack(fill=tk.BOTH, expand=True, pady=(0, 2))
self.log_text.config(state=tk.DISABLED, bg="#F0F0F0", fg="#333333", insertbackground="black")
# 相机画面区域(下方)
camera_card = ttk.LabelFrame(center_frame, text="相机画面", padding="8", style="Card.TLabelframe")
camera_card.pack(fill=tk.BOTH, expand=True, pady=(0, 5), padx=5)
# 关键修改1相机显示框架取消默认背景色
self.camera_display_frame = ttk.Frame(camera_card)
self.camera_display_frame.pack(fill=tk.BOTH, expand=True)
# 绑定窗口大小变化事件(窗口拉伸时,图片自动适配)
self.camera_display_frame.bind("<Configure>", self.on_camera_frame_resize)
# 关键修改2相机标签不继承全局背景色且填充整个框架
self.camera_label = ttk.Label(self.camera_display_frame)
self.camera_label.pack(fill=tk.BOTH, expand=True)
# 终端输出区域(最右侧)
terminal_card = ttk.LabelFrame(terminal_frame, text="终端输出", padding="8", style="TerminalCard.TLabelframe")
terminal_card.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
terminal_control = ttk.Frame(terminal_card)
terminal_control.pack(fill=tk.X, pady=5)
ttk.Label(terminal_control, text="输出类型: 标准输出 / 标准错误").pack(side=tk.LEFT, padx=5)
ttk.Button(terminal_control, text="清空终端", command=self.clear_terminal).pack(side=tk.RIGHT, padx=5)
self.terminal_text = scrolledtext.ScrolledText(
terminal_card,
wrap=tk.WORD,
font=self.fonts["mono"],
bd=0,
relief="flat",
highlightthickness=0,
bg="#111111",
fg="#D8F3DC",
insertbackground="#D8F3DC"
)
self.terminal_text.pack(fill=tk.BOTH, expand=True, pady=5)
self.terminal_text.config(state=tk.DISABLED)
# 设置样式(核心修改:取消全局标签背景色)
self.setup_styles()
self.bind_mousewheel_to_params()
self.root.after(100, self.set_initial_layout)
self.root.after(120, self.apply_font_scale)
def init_fonts(self):
"""创建一组可随窗口缩放的字体对象。"""
self.font_specs = {
"default": {"family": "微软雅黑", "size": 9},
"title": {"family": "微软雅黑", "size": 18, "weight": "bold"},
"subtitle": {"family": "微软雅黑", "size": 10},
"section": {"family": "微软雅黑", "size": 9, "weight": "bold"},
"button": {"family": "微软雅黑", "size": 9, "weight": "bold"},
"button_normal": {"family": "微软雅黑", "size": 9},
"mono": {"family": "微软雅黑", "size": 9},
}
self.font_min_sizes = {
"default": 7,
"title": 14,
"subtitle": 8,
"section": 8,
"button": 8,
"button_normal": 8,
"mono": 8,
}
self.fonts = {}
for name, spec in self.font_specs.items():
self.fonts[name] = tkfont.Font(
self.root,
family=spec["family"],
size=spec["size"],
weight=spec.get("weight", "normal"),
)
self.root.option_add("*Font", self.fonts["default"])
self.root.bind("<Configure>", self.on_root_resize)
def setup_styles(self):
"""集中配置 ttk 控件样式,避免样式定义散落在各个方法中。"""
style = ttk.Style()
available_themes = style.theme_names()
if "clam" in available_themes:
style.theme_use("clam")
elif "vista" in available_themes:
style.theme_use("vista")
style.configure("TFrame", background=self.colors["panel"])
style.configure("App.TFrame", background=self.colors["bg"])
style.configure("Transparent.TFrame", background=self.colors["bg"])
style.configure("TPanedwindow", background=self.colors["bg"])
style.configure("TLabel", background=self.colors["panel"], foreground=self.colors["text"], font=self.fonts["default"])
style.configure(
"TEntry",
fieldbackground=self.colors["panel"],
background=self.colors["panel"],
foreground=self.colors["text"],
padding=5,
font=self.fonts["default"],
relief="flat",
borderwidth=1
)
style.configure(
"TCombobox",
fieldbackground=self.colors["panel"],
background=self.colors["panel"],
foreground=self.colors["text"],
arrowsize=14,
padding=3,
font=self.fonts["default"]
)
style.configure(
"TButton",
font=self.fonts["button_normal"],
padding=(10, 6),
relief="flat",
borderwidth=0
)
style.configure(
"SidebarShell.TLabelframe",
background=self.colors["panel_soft"],
borderwidth=1,
relief="solid",
padding=6
)
style.configure(
"SidebarShell.TLabelframe.Label",
background=self.colors["panel_soft"],
foreground=self.colors["text"],
font=self.fonts["section"]
)
style.configure(
"SidebarCard.TLabelframe",
background=self.colors["panel"],
borderwidth=1,
relief="solid",
padding=6
)
style.configure(
"SidebarCard.TLabelframe.Label",
background=self.colors["panel"],
foreground=self.colors["text"],
font=self.fonts["section"]
)
style.configure(
"Card.TLabelframe",
background=self.colors["panel"],
borderwidth=1,
relief="solid",
padding=6
)
style.configure(
"Card.TLabelframe.Label",
background=self.colors["panel"],
foreground=self.colors["text"],
font=self.fonts["section"]
)
style.configure(
"TerminalCard.TLabelframe",
background=self.colors["panel"],
borderwidth=1,
relief="solid",
padding=6
)
style.configure(
"TerminalCard.TLabelframe.Label",
background=self.colors["panel"],
foreground=self.colors["text"],
font=self.fonts["section"]
)
style.configure("Primary.TButton", background=self.colors["accent"], foreground="#FFFFFF", font=self.fonts["button"])
style.map("Primary.TButton", background=[("active", "#0B5E58"), ("disabled", "#A8D7D2")], foreground=[("disabled", "#F7FAFC")])
style.configure("Danger.TButton", background="#C2410C", foreground="#FFFFFF", font=self.fonts["button"])
style.map("Danger.TButton", background=[("active", "#9A3412"), ("disabled", "#F3B99A")], foreground=[("disabled", "#FEF2F2")])
style.configure("Secondary.TButton", background=self.colors["info_soft"], foreground=self.colors["info"], font=self.fonts["button_normal"])
style.map("Secondary.TButton", background=[("active", "#C8DFF7"), ("disabled", "#E7EEF5")])
def on_root_resize(self, event):
"""窗口尺寸变化时,延迟刷新字体缩放。"""
if event.widget is not self.root or self._is_applying_font_scale:
return
if self._font_resize_job is not None:
self.root.after_cancel(self._font_resize_job)
self._font_resize_job = self.root.after(80, self.apply_font_scale)
def apply_font_scale(self):
"""根据窗口当前尺寸动态缩放字体。"""
self._font_resize_job = None
current_w = max(self.root.winfo_width(), 1)
current_h = max(self.root.winfo_height(), 1)
scale = min(current_w / self.base_window_width, current_h / self.base_window_height)
scale = max(0.72, min(1.12, scale))
self._is_applying_font_scale = True
try:
for name, font in self.fonts.items():
base_size = self.font_specs[name]["size"]
min_size = self.font_min_sizes[name]
font.configure(size=max(min_size, int(round(base_size * scale))))
if hasattr(self, "log_text"):
self.log_text.configure(font=self.fonts["default"])
if hasattr(self, "terminal_text"):
self.terminal_text.configure(font=self.fonts["mono"])
finally:
self._is_applying_font_scale = False
def set_status_badge(self, text, tone="idle"):
"""更新顶部状态徽标。"""
tone_map = {
"idle": (self.colors["info_soft"], self.colors["info"]),
"running": (self.colors["accent_soft"], self.colors["accent"]),
"warning": (self.colors["danger_soft"], self.colors["danger"]),
"error": ("#FEE2E2", "#B91C1C"),
}
bg_color, fg_color = tone_map.get(tone, tone_map["idle"])
if hasattr(self, "status_badge"):
self.status_badge.configure(text=text, bg=bg_color, fg=fg_color)
def bind_mousewheel_to_params(self):
"""让左侧参数区支持鼠标滚轮滚动。"""
def _on_mousewheel(event):
if hasattr(self, "params_canvas"):
self.params_canvas.yview_scroll(int(-event.delta / 120), "units")
def _bind(_event):
self.params_canvas.bind_all("<MouseWheel>", _on_mousewheel)
def _unbind(_event):
self.params_canvas.unbind_all("<MouseWheel>")
self.params_canvas.bind("<Enter>", _bind)
self.params_canvas.bind("<Leave>", _unbind)
def set_initial_layout(self):
"""根据当前窗口尺寸设置更稳定的初始三栏宽度。"""
try:
total_width = self.paned_window.winfo_width()
if total_width > 600:
left_width = max(480, int(total_width * 0.38))
terminal_width = max(250, int(total_width * 0.20))
self.paned_window.sashpos(0, left_width)
self.paned_window.sashpos(1, total_width - terminal_width)
except Exception:
pass
def get_resource_candidates(self, filename):
"""返回项目内可能存放资源文件的候选路径。"""
return [
self.base_dir / filename,
self.base_dir / "tools" / filename,
Path.cwd() / filename,
Path.cwd() / "tools" / filename,
]
def resolve_path(self, path_value, fallback_name=None):
"""解析绝对路径或相对项目根目录的路径。"""
if path_value:
candidate = Path(path_value).expanduser()
if not candidate.is_absolute():
candidate = self.base_dir / candidate
candidate = candidate.resolve()
if candidate.exists():
return candidate
if fallback_name:
for candidate in self.get_resource_candidates(fallback_name):
if candidate.exists():
return candidate.resolve()
return None
def load_ui_settings(self):
"""读取本地保存的 UI 配置。"""
if not self.settings_file.exists():
return {}
try:
with self.settings_file.open("r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
self.log(f"读取 UI 配置失败:{str(e)}", level="警告")
return {}
def save_ui_settings(self):
"""保存 UI 配置,方便下次启动直接复用。"""
data = {
"ROBOT_IP": self.robot_ip.get(),
"ROBOT_PORT": self.robot_port.get(),
"AGV_IP": self.agv_ip.get(),
"AGV_PORT": self.agv_port.get(),
"AGV_SPEED_FORWARD": self.agv_speed_forward.get(),
"AGV_SPEED_STOP": self.agv_speed_stop.get(),
"TOTAL_DURATION": self.total_duration.get(),
"AGV_STOP_TIMEOUT": self.agv_stop_timeout.get(),
"YOLO_MODEL_PATH": self.yolo_model_path.get(),
"CAMERA_BG_PATH": self.camera_bg_path.get(),
"SCISSORS_ENABLED": bool(self.scissors_enabled.get()),
}
with self.settings_file.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_camera_background_path(self):
"""返回当前相机背景图的有效路径。"""
return self.resolve_path(self.camera_bg_path.get().strip(), fallback_name="1.png")
def load_camera_background(self):
"""加载1.png并确保全屏显示保存原始图片用于后续缩放"""
try:
bg_path = self.get_camera_background_path()
if bg_path is None:
raise FileNotFoundError("1.png")
# 这里只保存原始图片,不直接反复对已缩放图片继续缩放,
# 这样可以减少多次窗口拉伸后的画质损失。
self.raw_bg_image = Image.open(bg_path)
self.camera_bg_path.delete(0, tk.END)
self.camera_bg_path.insert(0, str(bg_path))
self.adjust_bg_image_size()
except FileNotFoundError:
searched_paths = "\n".join(str(path) for path in self.get_resource_candidates("1.png"))
error_msg = f"未找到背景图 1.png。\n可在界面中手动选择图片,或将文件放到以下位置之一:\n{searched_paths}"
self.log(error_msg, level="警告")
messagebox.showwarning("图片缺失", error_msg)
self.raw_bg_image = None
self.camera_bg_image = None
self.camera_label.config(background="#E8E8E8") # 仅异常时显示灰色
except Exception as e:
self.log(f"加载1.png失败{str(e)}", level="错误")
self.raw_bg_image = None
self.camera_bg_image = None
self.camera_label.config(background="#E8E8E8")
def adjust_bg_image_size(self):
"""根据相机区域尺寸调整1.png大小并显示"""
if self.raw_bg_image is None:
return
# 获取当前相机显示区域的实际尺寸(窗口渲染完成后的真实尺寸)
display_w = self.camera_display_frame.winfo_width()
display_h = self.camera_display_frame.winfo_height()
# 确保尺寸有效(避免窗口未渲染完成的情况)
if display_w <= 10 or display_h <= 10:
return
# 缩放图片:填充整个相机区域(保持图片比例可改用 Image.ANTIALIAS + 计算比例)
resized_bg = self.raw_bg_image.resize((display_w, display_h), Image.LANCZOS)
# 转换为Tkinter格式并更新标签
self.camera_bg_image = ImageTk.PhotoImage(resized_bg)
self.camera_label.config(image=self.camera_bg_image)
def restore_initial_camera_view(self):
"""将相机区域恢复为初始背景图,并忽略后续残留画面刷新。"""
self.show_background_only = True
if self.raw_bg_image is None:
self.load_camera_background()
return
self.adjust_bg_image_size()
self.camera_image = None
if self.camera_bg_image is not None:
self.camera_label.config(image=self.camera_bg_image)
def on_camera_frame_resize(self, event):
"""相机区域窗口拉伸时自动调整1.png尺寸"""
# 使用 after 做轻微延迟,避免用户拖拽窗口时触发过于频繁的 resize 计算。
self.root.after(50, self.adjust_bg_image_size)
def set_background_image(self):
"""设置主窗口背景色。"""
self.root.configure(bg=self.colors["bg"])
def init_parameters(self):
"""填充界面默认参数。
这些值会作为用户第一次打开界面时的初始配置,
同时也让界面与 control.py 的默认参数保持基本一致。
"""
self.robot_ip.insert(0, "192.168.192.100")
self.robot_port.insert(0, "30004")
self.agv_ip.insert(0, "192.168.192.100")
self.agv_port.insert(0, "30104")
self.agv_speed_forward.insert(0, "-0.2")
self.agv_speed_stop.insert(0, "0.0")
self.total_duration.insert(0, "300")
self.agv_stop_timeout.insert(0, "10")
self.yolo_model_path.insert(0, "best.pt")
default_bg_path = self.resolve_path("", fallback_name="1.png")
if default_bg_path:
self.camera_bg_path.insert(0, str(default_bg_path))
import control
self.scissors_enabled.set(self.parse_bool_setting(getattr(control, "SCISSORS_ENABLED", True), default=True))
self.place_position = self.normalize_place_position(control.place_positions)
settings = self.load_ui_settings()
if settings:
self.robot_ip.delete(0, tk.END)
self.robot_ip.insert(0, str(settings.get("ROBOT_IP", "192.168.192.100")))
self.robot_port.delete(0, tk.END)
self.robot_port.insert(0, str(settings.get("ROBOT_PORT", "30004")))
self.agv_ip.delete(0, tk.END)
self.agv_ip.insert(0, str(settings.get("AGV_IP", "192.168.192.100")))
self.agv_port.delete(0, tk.END)
self.agv_port.insert(0, str(settings.get("AGV_PORT", "30104")))
self.agv_speed_forward.delete(0, tk.END)
self.agv_speed_forward.insert(0, str(settings.get("AGV_SPEED_FORWARD", "-0.2")))
self.agv_speed_stop.delete(0, tk.END)
self.agv_speed_stop.insert(0, str(settings.get("AGV_SPEED_STOP", "0.0")))
self.total_duration.delete(0, tk.END)
self.total_duration.insert(0, str(settings.get("TOTAL_DURATION", "300")))
self.agv_stop_timeout.delete(0, tk.END)
self.agv_stop_timeout.insert(0, str(settings.get("AGV_STOP_TIMEOUT", "10")))
self.yolo_model_path.delete(0, tk.END)
self.yolo_model_path.insert(0, str(settings.get("YOLO_MODEL_PATH", "best.pt")))
self.camera_bg_path.delete(0, tk.END)
saved_bg_path = settings.get("CAMERA_BG_PATH", "")
resolved_bg_path = self.resolve_path(saved_bg_path, fallback_name="1.png")
if resolved_bg_path:
self.camera_bg_path.insert(0, str(resolved_bg_path))
elif saved_bg_path:
self.camera_bg_path.insert(0, str(saved_bg_path))
self.scissors_enabled.set(
self.parse_bool_setting(settings.get("SCISSORS_ENABLED", self.scissors_enabled.get()), default=True)
)
self.update_place_fields()
def parse_bool_setting(self, value, default=True):
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in ("1", "true", "yes", "on"):
return True
if normalized in ("0", "false", "no", "off"):
return False
return default
def normalize_place_position(self, position):
"""把外部读取到的放置位统一规整为 [x,y,z,roll,pitch,yaw]。"""
if isinstance(position, (list, tuple)):
if len(position) >= 6 and not isinstance(position[0], (list, tuple)):
try:
return [float(value) for value in position[:6]]
except (TypeError, ValueError):
pass
if position and isinstance(position[0], (list, tuple)):
return self.normalize_place_position(position[0])
return [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
def update_place_fields(self, event=None):
"""刷新单一放置位的 6 个位姿输入框。"""
def update():
for i, entry in enumerate(self.place_entries):
entry.delete(0, tk.END)
entry.insert(0, str(self.place_position[i]))
self.root.after(0, update)
def save_place_position(self):
"""把当前输入框中的单一放置位姿保存回内存。"""
try:
position = [float(entry.get()) for entry in self.place_entries]
if len(position) != 6:
messagebox.showerror("错误", "请填写所有位置参数")
return
self.place_position = position
self.save_ui_settings()
messagebox.showinfo("成功", "放置位已保存")
self.log("放置位已保存")
except ValueError:
messagebox.showerror("错误", "请输入有效的数值")
self.log("保存放置位失败:无效的数值", level="错误")
def browse_model(self):
"""弹出文件选择框,选择 YOLO 模型文件。"""
filename = filedialog.askopenfilename(
title="选择YOLO模型文件",
filetypes=[("PyTorch模型", "*.pt"), ("所有文件", "*.*")]
)
if filename:
self.yolo_model_path.delete(0, tk.END)
self.yolo_model_path.insert(0, filename)
def browse_camera_background(self):
"""弹出文件选择框,选择相机背景图。"""
filename = filedialog.askopenfilename(
title="选择相机背景图",
filetypes=[("图片文件", "*.png;*.jpg;*.jpeg;*.bmp"), ("所有文件", "*.*")]
)
if filename:
self.camera_bg_path.delete(0, tk.END)
self.camera_bg_path.insert(0, filename)
self.load_camera_background()
def save_parameters(self):
"""把界面中的参数写回 control 模块。
这里采用“直接修改模块级变量”的方式,而不是写配置文件。
好处是后台启动时可以立刻读取最新参数,代价是参数只在本次进程生命周期内生效。
"""
try:
import control
params = {
"ROBOT_IP": self.robot_ip.get(),
"ROBOT_PORT": int(self.robot_port.get()),
"AGV_IP": self.agv_ip.get(),
"AGV_PORT": int(self.agv_port.get()),
"AGV_SPEED_FORWARD": float(self.agv_speed_forward.get()),
"AGV_SPEED_STOP": float(self.agv_speed_stop.get()),
"TOTAL_DURATION": int(self.total_duration.get()),
"AGV_STOP_TIMEOUT": int(self.agv_stop_timeout.get()),
"YOLO_MODEL_PATH": self.yolo_model_path.get(),
"SCISSORS_ENABLED": bool(self.scissors_enabled.get()),
"place_positions": [self.place_position]
}
for param_name, value in params.items():
if value in (None, ""):
raise ValueError(f"参数 {param_name} 不能为空")
for param_name, value in params.items():
setattr(control, param_name, value)
self.save_ui_settings()
self.log("参数保存成功")
messagebox.showinfo("成功", "参数已保存")
return True
except Exception as e:
self.log(f"参数保存失败: {str(e)}", level="错误")
messagebox.showerror("错误", f"参数保存失败: {str(e)}")
return False
@contextlib.contextmanager
def capture_console_output(self):
"""捕获后台程序的标准输出/错误输出,并转发到日志队列。"""
def push_console_output(message, channel="stdout"):
if not message:
return
normalized = message.strip()
if not normalized:
return
level = "错误" if "error" in normalized.lower() or "失败" in normalized else \
"警告" if "warning" in normalized.lower() or "警告" in normalized else "信息"
self.push_terminal_message(normalized, channel)
if channel == "stderr" or level in ("警告", "错误"):
self.log_queue.put((normalized, "错误" if channel == "stderr" else level))
old_stdout = sys.stdout
old_stderr = sys.stderr
class TeeConsole(io.TextIOBase):
def __init__(self, original_stream, push_fn, channel):
self.original_stream = original_stream
self.push_fn = push_fn
self.channel = channel
self._buffer = ""
def write(self, message):
if not message:
return 0
if self.original_stream:
self.original_stream.write(message)
self.original_stream.flush()
self._buffer += message
while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1)
line = line.rstrip("\r")
if line.strip():
self.push_fn(line, self.channel)
return len(message)
def flush(self):
if self.original_stream:
self.original_stream.flush()
if self._buffer.strip():
self.push_fn(self._buffer.strip(), self.channel)
self._buffer = ""
sys.stdout = TeeConsole(old_stdout, push_console_output, "stdout")
sys.stderr = TeeConsole(old_stderr, push_console_output, "stderr")
try:
yield
finally:
try:
sys.stdout.flush()
sys.stderr.flush()
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
def start_program(self):
"""启动后台采摘程序线程。"""
try:
if self.running:
messagebox.showwarning("提示", "已有任务正在运行,请先停止当前任务")
return
if not self.save_parameters():
messagebox.showwarning("警告", "参数保存失败,无法启动程序")
return
self.running = True
self.runtime_mode = "full"
self.show_background_only = False
self.set_status_badge("运行中", "running")
self.start_btn.config(state=tk.DISABLED)
self.vision_test_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
self.push_terminal_message("终端监听已连接,等待后台标准输出...", "system")
def main_thread():
exited_with_error = False
try:
import control
# 将 UI 的画面更新函数注入到控制模块,
# 这样 control.py 中的视觉线程就能直接把最新画面推送回来。
control.set_ui_callback(self.update_camera_frame)
with self.capture_console_output():
control.main()
except Exception as e:
exited_with_error = True
self.log(f"后台线程异常退出: {str(e)}", level="错误")
self.push_terminal_message(f"后台线程异常退出: {str(e)}", "stderr")
self.root.after(0, lambda: self.set_status_badge("异常退出", "error"))
finally:
self.running = False
self.runtime_mode = None
self.root.after(0, lambda: self.stop_btn.config(state=tk.DISABLED))
self.root.after(0, lambda: self.start_btn.config(state=tk.NORMAL))
self.root.after(0, lambda: self.vision_test_btn.config(state=tk.NORMAL))
if not exited_with_error:
self.root.after(0, lambda: self.set_status_badge("已停止", "idle"))
self.thread = threading.Thread(target=main_thread)
self.thread.daemon = True
self.thread.start()
self.log("程序启动成功")
except Exception as e:
self.set_status_badge("启动失败", "error")
self.log(f"程序启动失败: {str(e)}", level="错误")
messagebox.showerror("错误", f"程序启动失败: {str(e)}")
self.start_btn.config(state=tk.NORMAL)
self.vision_test_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
def start_vision_test(self):
"""启动仅做相机采集和 YOLO 推理的视觉测试。"""
try:
if self.running:
messagebox.showwarning("提示", "已有任务正在运行,请先停止当前任务")
return
if not self.save_parameters():
messagebox.showwarning("警告", "参数保存失败,无法启动视觉测试")
return
self.running = True
self.runtime_mode = "vision_test"
self.vision_test_stop_event.clear()
self.show_background_only = False
self.set_status_badge("视觉测试中", "running")
self.start_btn.config(state=tk.DISABLED)
self.vision_test_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
self.push_terminal_message("视觉测试启动:仅加载相机和 YOLO不启动机械臂。", "system")
def vision_test_thread():
pipeline = None
exited_with_error = False
try:
import control_core
with self.capture_console_output():
pipeline, align, _ = control_core.init_camera()
self.vision_test_pipeline = pipeline
model = control_core.init_tomato_detector(self.yolo_model_path.get().strip())
self.log("视觉测试已启动,可直接查看相机检测结果")
while not self.vision_test_stop_event.is_set():
try:
frames = pipeline.wait_for_frames(timeout_ms=3000)
if not frames:
continue
aligned_frames = align.process(frames)
color_frame = aligned_frames.get_color_frame()
if not color_frame:
continue
color_image = np.asanyarray(color_frame.get_data())
results = model(color_image, classes=0, conf=control_core.YOLO_DETECT_CONF, verbose=False)
annotated_image = self.build_vision_test_frame(
color_image,
results[0],
control_core.PICK_CONFIDENCE_THRESHOLD,
)
self.update_camera_frame(annotated_image)
except Exception as frame_exc:
if self.vision_test_stop_event.is_set():
break
self.log(f"视觉测试帧处理失败: {frame_exc}", level="警告")
time.sleep(0.2)
self.push_terminal_message("视觉测试已停止。", "system")
except Exception as exc:
exited_with_error = True
self.log(f"视觉测试启动失败: {exc}", level="错误")
self.push_terminal_message(f"视觉测试异常退出: {exc}", "stderr")
self.root.after(0, lambda: self.set_status_badge("视觉测试失败", "error"))
finally:
if pipeline is not None:
try:
pipeline.stop()
except Exception:
pass
self.vision_test_pipeline = None
self.running = False
self.runtime_mode = None
self.vision_test_stop_event.set()
self.root.after(0, lambda: self.stop_btn.config(state=tk.DISABLED))
self.root.after(0, lambda: self.start_btn.config(state=tk.NORMAL))
self.root.after(0, lambda: self.vision_test_btn.config(state=tk.NORMAL))
if not exited_with_error:
self.root.after(0, lambda: self.set_status_badge("已停止", "idle"))
self.thread = threading.Thread(target=vision_test_thread, daemon=True)
self.thread.start()
except Exception as e:
self.running = False
self.runtime_mode = None
self.vision_test_stop_event.set()
self.set_status_badge("启动失败", "error")
self.log(f"视觉测试启动失败: {str(e)}", level="错误")
messagebox.showerror("错误", f"视觉测试启动失败: {str(e)}")
self.start_btn.config(state=tk.NORMAL)
self.vision_test_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
def stop_program(self):
"""停止后台程序并恢复按钮状态。"""
if not self.running and not (self.thread and self.thread.is_alive()):
self.restore_initial_camera_view()
self.start_btn.config(state=tk.NORMAL)
self.vision_test_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
self.runtime_mode = None
self.set_status_badge("已停止", "idle")
return
self.running = False
self.set_status_badge("停止中", "warning")
self.log("正在停止程序...")
self.restore_initial_camera_view()
self.start_btn.config(state=tk.NORMAL)
self.vision_test_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
thread_to_wait = self.thread
self.thread = None
try:
import control
control.set_ui_callback(lambda *_args, **_kwargs: None)
except Exception:
pass
try:
import control_core
control_core.set_ui_callback(lambda *_args, **_kwargs: None)
control_core.reset_runtime_state()
except Exception as e:
self.log(f"发送停止信号失败: {e}", level="警告")
self.vision_test_stop_event.set()
if self.vision_test_pipeline is not None:
try:
self.vision_test_pipeline.stop()
except Exception:
pass
finally:
self.vision_test_pipeline = None
self.runtime_mode = None
if thread_to_wait and thread_to_wait.is_alive():
def wait_for_shutdown():
thread_to_wait.join(timeout=3)
if thread_to_wait.is_alive():
self.root.after(
0,
lambda: self.log("后台线程仍未完全退出,请检查相机或控制器是否阻塞", level="警告"),
)
else:
self.root.after(0, lambda: self.log("程序已停止"))
self.root.after(0, lambda: self.set_status_badge("已停止", "idle"))
threading.Thread(target=wait_for_shutdown, daemon=True).start()
else:
self.set_status_badge("已停止", "idle")
self.log("程序已停止")
def push_terminal_message(self, message, kind="stdout"):
"""向终端输出区域追加一条消息。"""
self.terminal_queue.put((kind, message))
def log(self, message, level="信息", sync_terminal=False):
"""向日志队列追加一条消息,由 UI 主线程异步消费。"""
self.log_queue.put((message, level))
if sync_terminal:
self.push_terminal_message(f"[UI-{level}] {message}", "system")
def append_text_line(self, text_widget, text, line_count_attr, max_lines, tag=None):
"""向文本框追加一行并限制最大行数。"""
current_count = getattr(self, line_count_attr) + 1
setattr(self, line_count_attr, current_count)
if current_count > max_lines:
text_widget.config(state=tk.NORMAL)
text_widget.delete(1.0, 2.0)
text_widget.config(state=tk.DISABLED)
setattr(self, line_count_attr, current_count - 1)
text_widget.config(state=tk.NORMAL)
if tag:
text_widget.insert(tk.END, text, tag)
else:
text_widget.insert(tk.END, text)
text_widget.see(tk.END)
text_widget.config(state=tk.DISABLED)
def format_log_entry(self, timestamp, message, level):
"""格式化运行日志文本和标签。"""
tag = "info"
prefix = "信息"
if level == "警告":
tag = "warning"
prefix = "警告"
elif level == "错误":
tag = "error"
prefix = "错误"
return tag, f"[{timestamp}] {prefix}: {message}\n"
def should_display_log(self, level):
"""根据下拉框判断日志是否显示。"""
selected = self.log_level.get()
return selected == "全部" or selected == level
def on_log_level_changed(self, event=None):
"""切换日志级别时刷新日志视图。"""
self.refresh_log_view()
def refresh_log_view(self):
"""根据当前筛选条件重绘运行日志。"""
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.config(state=tk.DISABLED)
self.log_line_count = 0
self.log_text.tag_config("info", foreground=self.colors["text"])
self.log_text.tag_config("warning", foreground="#D97706")
self.log_text.tag_config("error", foreground="#DC2626")
for timestamp, message, level in self.log_records:
if self.should_display_log(level):
tag, display_text = self.format_log_entry(timestamp, message, level)
self.append_text_line(self.log_text, display_text, "log_line_count", MAX_LOG_LINES, tag=tag)
def process_log_queue(self):
"""定时消费日志队列和终端队列并刷新文本框。"""
while not self.log_queue.empty():
message, level = self.log_queue.get()
timestamp = time.strftime('%H:%M:%S')
self.log_records.append((timestamp, message, level))
if len(self.log_records) > MAX_LOG_LINES * 2:
self.log_records.pop(0)
self.log_text.tag_config("info", foreground=self.colors["text"])
self.log_text.tag_config("warning", foreground="#D97706")
self.log_text.tag_config("error", foreground="#DC2626")
if self.should_display_log(level):
tag, display_text = self.format_log_entry(timestamp, message, level)
self.append_text_line(self.log_text, display_text, "log_line_count", MAX_LOG_LINES, tag=tag)
self.log_queue.task_done()
while not self.terminal_queue.empty():
kind, message = self.terminal_queue.get()
prefix_map = {
"stdout": "STDOUT",
"stderr": "STDERR",
"system": "SYSTEM",
}
tag_map = {
"stdout": "terminal_stdout",
"stderr": "terminal_stderr",
"system": "terminal_system",
}
self.terminal_text.tag_config("terminal_stdout", foreground=self.colors["terminal_fg"])
self.terminal_text.tag_config("terminal_stderr", foreground=self.colors["terminal_err"])
self.terminal_text.tag_config("terminal_system", foreground=self.colors["terminal_muted"])
self.append_text_line(
self.terminal_text,
f"[{time.strftime('%H:%M:%S')}] {prefix_map.get(kind, 'OUT')}: {message}\n",
"terminal_line_count",
MAX_TERMINAL_LINES,
tag=tag_map.get(kind, "terminal_stdout")
)
self.terminal_queue.task_done()
self.root.after(200, self.process_log_queue)
def clear_log(self):
"""清空界面日志显示区域。"""
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.config(state=tk.DISABLED)
self.log_line_count = 0
self.log_records.clear()
self.log("日志已清空")
def clear_terminal(self):
"""清空实时终端输出区域。"""
self.terminal_text.config(state=tk.NORMAL)
self.terminal_text.delete(1.0, tk.END)
self.terminal_text.config(state=tk.DISABLED)
self.terminal_line_count = 0
def build_vision_test_frame(self, image, result, confidence_threshold):
"""生成视觉测试画面,仅显示检测框、置信度、中心点和 YOLO-pose 关键点。"""
annotated = image.copy()
frame_h, frame_w = annotated.shape[:2]
header_y = max(30, int(frame_h * 0.05))
cv2.putText(
annotated,
"Vision Test",
(12, header_y),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(38, 166, 154),
2,
)
boxes = getattr(result, "boxes", None)
keypoints = getattr(result, "keypoints", None)
keypoint_xy = getattr(keypoints, "xy", None)
keypoint_conf = getattr(keypoints, "conf", None)
tomato_index = 0
iterable_boxes = boxes if boxes is not None else []
for box_index, box in enumerate(iterable_boxes):
try:
confidence = float(box.conf[0]) if hasattr(box.conf, "__len__") else float(box.conf)
if int(box.cls) != 0 or confidence < confidence_threshold:
continue
tomato_index += 1
x1, y1, x2, y2 = [int(value) for value in box.xyxy[0].cpu().numpy()]
x1 = max(0, min(frame_w - 1, x1))
x2 = max(0, min(frame_w - 1, x2))
y1 = max(0, min(frame_h - 1, y1))
y2 = max(0, min(frame_h - 1, y2))
if x2 <= x1 or y2 <= y1:
continue
pixel_x = int((x1 + x2) / 2)
pixel_y = int((y1 + y2) / 2)
label = f"Tomato-{tomato_index} {confidence:.2f}"
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 165, 255), 2)
cv2.circle(annotated, (pixel_x, pixel_y), 5, (0, 0, 255), -1)
text_scale = 0.55
text_thickness = 2
(text_w, text_h), baseline = cv2.getTextSize(
label,
cv2.FONT_HERSHEY_SIMPLEX,
text_scale,
text_thickness,
)
text_x = max(6, min(x1, frame_w - text_w - 10))
text_y = y1 - 10
if text_y - text_h < header_y + 8:
text_y = min(frame_h - 8, y2 + text_h + 10)
box_top = max(0, text_y - text_h - baseline - 4)
box_bottom = min(frame_h, text_y + baseline + 2)
box_right = min(frame_w, text_x + text_w + 8)
cv2.rectangle(annotated, (text_x - 4, box_top), (box_right, box_bottom), (0, 165, 255), -1)
cv2.putText(
annotated,
label,
(text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX,
text_scale,
(255, 255, 255),
text_thickness,
)
if keypoint_xy is not None:
try:
current_points = keypoint_xy[box_index]
if hasattr(current_points, "cpu"):
current_points = current_points.cpu().numpy()
current_conf = None
if keypoint_conf is not None:
current_conf = keypoint_conf[box_index]
if hasattr(current_conf, "cpu"):
current_conf = current_conf.cpu().numpy()
for point_index, point in enumerate(current_points):
px, py = [int(v) for v in point[:2]]
if px <= 0 and py <= 0:
continue
if current_conf is not None and point_index < len(current_conf):
if float(current_conf[point_index]) < 0.2:
continue
px = max(0, min(frame_w - 1, px))
py = max(0, min(frame_h - 1, py))
cv2.circle(annotated, (px, py), 4, (0, 255, 255), -1)
cv2.circle(annotated, (px, py), 7, (0, 120, 255), 1)
except Exception as kp_exc:
self.log(f"视觉测试关键点绘制失败: {kp_exc}", level="警告")
except Exception as exc:
self.log(f"视觉测试标注失败: {exc}", level="警告")
return annotated
def serialize_detection_boxes_for_ui(self, boxes, image=None, fit_line_fn=None):
"""把检测框整理成 UI 叠加显示需要的结构。"""
detections = []
for index, box in enumerate(boxes, start=1):
try:
x1, y1, x2, y2 = [int(value) for value in box.xyxy[0].cpu().numpy()]
cutpoint = []
end_point = []
if image is not None and callable(fit_line_fn):
crop_image = image[y1:y2, x1:x2]
line_points = fit_line_fn(crop_image, x1, y1)
if line_points:
p1, p2 = [list(map(int, point)) for point in line_points]
cutpoint, end_point = sorted([p1, p2], key=lambda point: point[1])
detections.append(
{
"label": f"Tomato-{index}",
"confidence": float(box.conf),
"bbox": [x1, y1, x2, y2],
"cutpoint": cutpoint,
"end_point": end_point,
}
)
except Exception as exc:
self.log(f"检测框解析失败: {exc}", level="警告")
return detections
@staticmethod
def _overlay_text_rect(origin, text_size, baseline=0, padding=4):
text_x, text_y = origin
text_w, text_h = text_size
return (
text_x - padding,
text_y - text_h - baseline - padding,
text_x + text_w + padding,
text_y + baseline + padding,
)
@staticmethod
def _rects_overlap(rect_a, rect_b, padding=4):
ax1, ay1, ax2, ay2 = rect_a
bx1, by1, bx2, by2 = rect_b
return not (
ax2 + padding < bx1
or bx2 + padding < ax1
or ay2 + padding < by1
or by2 + padding < ay1
)
@staticmethod
def _clamp_text_origin(x, y, text_size, baseline, frame_w, frame_h, margin=6):
text_w, text_h = text_size
x = max(margin, min(int(x), frame_w - text_w - margin))
y = max(text_h + baseline + margin, min(int(y), frame_h - baseline - margin))
return x, y
def _place_overlay_text(self, text, anchors, occupied_rects, frame_shape, scale=0.48, thickness=1):
frame_h, frame_w = frame_shape[:2]
(text_w, text_h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness)
first_origin = None
first_rect = None
for anchor_x, anchor_y in anchors:
origin = self._clamp_text_origin(anchor_x, anchor_y, (text_w, text_h), baseline, frame_w, frame_h)
rect = self._overlay_text_rect(origin, (text_w, text_h), baseline)
if first_origin is None:
first_origin, first_rect = origin, rect
if not any(self._rects_overlap(rect, used_rect, padding=6) for used_rect in occupied_rects):
return origin, rect
return first_origin, first_rect
@staticmethod
def _draw_outlined_text(image, text, origin, scale, color, thickness=1):
cv2.putText(
image,
text,
origin,
cv2.FONT_HERSHEY_SIMPLEX,
scale,
(0, 0, 0),
thickness + 2,
cv2.LINE_AA,
)
cv2.putText(
image,
text,
origin,
cv2.FONT_HERSHEY_SIMPLEX,
scale,
color,
thickness,
cv2.LINE_AA,
)
def overlay_detection_metadata(self, image, payload):
"""在冻结画面上叠加番茄串框、置信度和关键点信息。"""
if not isinstance(payload, dict):
return image
detections = payload.get("detections") or []
if not detections:
return image
annotated = image.copy()
is_freeze_frame = bool(payload.get("freeze"))
stage = payload.get("stage", "")
header_text = "Pick Freeze" if is_freeze_frame else ("Vision Test" if stage == "vision_test" else "Detection")
header_y = max(30, int(annotated.shape[0] * 0.05))
occupied_rects = []
(header_w, header_h), header_baseline = cv2.getTextSize(
header_text,
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
2,
)
occupied_rects.append(
self._overlay_text_rect((12, header_y), (header_w, header_h), header_baseline, padding=6)
)
cv2.putText(
annotated,
header_text,
(12, header_y),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(38, 166, 154),
2,
)
for index, item in enumerate(detections, start=1):
bbox = item.get("bbox") or []
cutpoint = item.get("cutpoint") or []
end_point = item.get("end_point") or []
if len(bbox) != 4:
continue
x1, y1, x2, y2 = [int(v) for v in bbox]
confidence = float(item.get("confidence", 0.0))
label = item.get("label", f"Tomato-{index}")
box_color = (41, 121, 255) if is_freeze_frame else (64, 181, 246)
cv2.rectangle(annotated, (x1, y1), (x2, y2), box_color, 2)
text = f"{label} {confidence:.2f}"
(text_w, text_h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 2)
text_x = max(6, min(x1, annotated.shape[1] - text_w - 10))
text_y = y1 - 10
if text_y - text_h < header_y + 8:
text_y = min(annotated.shape[0] - 8, y2 + text_h + 10)
box_top = max(0, text_y - text_h - baseline - 4)
box_bottom = min(annotated.shape[0], text_y + baseline + 2)
box_right = min(annotated.shape[1], text_x + text_w + 8)
cv2.rectangle(annotated, (text_x - 4, box_top), (box_right, box_bottom), box_color, -1)
occupied_rects.append((text_x - 4, box_top, box_right, box_bottom))
cv2.putText(
annotated,
text,
(text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 255, 255),
2,
)
if len(cutpoint) == 2 and len(end_point) == 2:
p1 = tuple(int(v) for v in cutpoint)
p2 = tuple(int(v) for v in end_point)
cv2.circle(annotated, p1, 5, (0, 255, 255), -1)
cv2.circle(annotated, p2, 5, (255, 215, 0), -1)
cv2.line(annotated, p1, p2, (0, 220, 0), 2)
cv2.putText(
annotated,
"cutpoint",
(p1[0] + 8, max(18, p1[1] - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
(0, 255, 255),
1,
)
(cut_w, cut_h), cut_baseline = cv2.getTextSize(
"cutpoint",
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
1,
)
occupied_rects.append(
self._overlay_text_rect(
(p1[0] + 8, max(18, p1[1] - 8)),
(cut_w, cut_h),
cut_baseline,
)
)
cv2.putText(
annotated,
"end_point",
(p2[0] + 8, min(annotated.shape[0] - 10, p2[1] + 16)),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
(255, 215, 0),
1,
)
(end_w, end_h), end_baseline = cv2.getTextSize(
"end_point",
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
1,
)
occupied_rects.append(
self._overlay_text_rect(
(p2[0] + 8, min(annotated.shape[0] - 10, p2[1] + 16)),
(end_w, end_h),
end_baseline,
)
)
angle_deg = item.get("angle_deg")
if angle_deg is not None:
angle_text = f"Angle: {float(angle_deg):.2f}deg"
(angle_w, angle_h), angle_baseline = cv2.getTextSize(
angle_text,
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
2,
)
angle_x = max(6, min(p2[0] + 8, annotated.shape[1] - angle_w - 6))
angle_y = max(18, min(p2[1] + 18, annotated.shape[0] - angle_baseline - 6))
occupied_rects.append(
self._overlay_text_rect(
(angle_x, angle_y),
(angle_w, angle_h),
angle_baseline,
)
)
pick_xyz = item.get("pick_xyz") or []
if is_freeze_frame and len(pick_xyz) >= 3 and len(cutpoint) == 2:
try:
xyz = [float(value) for value in pick_xyz[:3]]
p1 = tuple(int(v) for v in cutpoint)
xyz_text = f"D405 XYZ: {xyz[0]:.3f}, {xyz[1]:.3f}, {xyz[2]:.3f} m"
text_scale = 0.48
text_thickness = 1
(xyz_w, xyz_h), _xyz_baseline = cv2.getTextSize(
xyz_text,
cv2.FONT_HERSHEY_SIMPLEX,
text_scale,
text_thickness,
)
anchors = [
(x2 + 12, y1 + xyz_h + 8),
(x1 - xyz_w - 12, y1 + xyz_h + 8),
(x2 + 12, y2 - 8),
(x1 - xyz_w - 12, y2 - 8),
(p1[0] + 14, p1[1] - 18),
(p1[0] + 14, p1[1] + xyz_h + 22),
(p1[0] - xyz_w - 14, p1[1] - 18),
(p1[0] - xyz_w - 14, p1[1] + xyz_h + 22),
]
origin, xyz_rect = self._place_overlay_text(
xyz_text,
anchors,
occupied_rects,
annotated.shape,
scale=text_scale,
thickness=text_thickness,
)
if origin and xyz_rect:
leader_end = (
max(xyz_rect[0], min(p1[0], xyz_rect[2])),
max(xyz_rect[1], min(p1[1], xyz_rect[3])),
)
cv2.line(annotated, p1, leader_end, (0, 255, 255), 1, cv2.LINE_AA)
self._draw_outlined_text(
annotated,
xyz_text,
origin,
text_scale,
(0, 255, 255),
text_thickness,
)
occupied_rects.append(xyz_rect)
except Exception as xyz_exc:
self.log(f"pick xyz overlay failed: {xyz_exc}", level="警告")
return annotated
def update_camera_frame(self, image):
"""接收后台线程传来的 OpenCV 图像,并显示到 Tkinter 标签上。"""
def update():
try:
if self.show_background_only:
return
payload = image if isinstance(image, dict) else None
frame = payload.get("frame") if payload else image
if frame is None:
return
if payload:
frame = self.overlay_detection_metadata(frame, payload)
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(image_rgb)
display_w = self.camera_display_frame.winfo_width()
display_h = self.camera_display_frame.winfo_height()
if display_w > 10 and display_h > 10:
# 这里与背景图一样,统一按当前显示区域缩放,
# 让视觉画面始终与右侧面板尺寸匹配。
pil_img = pil_img.resize((display_w, display_h), Image.LANCZOS)
self.camera_image = ImageTk.PhotoImage(image=pil_img)
self.camera_label.config(image=self.camera_image) # 覆盖1.png
except Exception as e:
self.log(f"相机画面更新失败: {str(e)}", level="错误")
self.root.after(0, update)
def signal_handler(self, sig, frame):
"""响应系统信号,优先走与手动停止一致的退出流程。"""
self.log(f"接收到信号 {sig},正在终止程序...", level="警告")
self.stop_program()
self.root.quit()
if __name__ == "__main__":
root = tk.Tk()
app = TomatoHarvestingUI(root)
root.mainloop()