from pyaubo_sdk import RpcClient, RtdeClient import time import threading ROBOT_IP = "192.168.192.100" RPC_PORT = 30004 RTDE_PORT = 30010 USERNAME = "aubo" PASSWORD = "123456" PRINT_INTERVAL = 2.0 latest_joint_positions = None latest_tcp_pose = None lock = threading.Lock() def main(): global latest_joint_positions, latest_tcp_pose rpc = RpcClient() rtde = RtdeClient() topic_id = None try: # 1. RPC 连接 rpc.connect(ROBOT_IP, RPC_PORT) rpc.login(USERNAME, PASSWORD) robot_names = rpc.getRobotNames() if not robot_names: print("未找到机器人") return print(f"已连接机器人: {robot_names[0]}") # 2. RTDE 连接 + 登录 rtde.connect(ROBOT_IP, RTDE_PORT) rtde.login(USERNAME, PASSWORD) # 3. 设置 RTDE 订阅项 names = ["R1_actual_q", "R1_actual_TCP_pose"] topic_id = rtde.setTopic(False, names, 50, 0) # 4. 回调函数 def callback(parser): global latest_joint_positions, latest_tcp_pose joint_positions = parser.popVectorDouble() tcp_pose = parser.popVectorDouble() with lock: latest_joint_positions = joint_positions latest_tcp_pose = tcp_pose # 5. 订阅 rtde.subscribe(topic_id, callback) print(f"开始监听(每 {PRINT_INTERVAL:.0f}s 输出一次)... Ctrl+C 退出") while True: time.sleep(PRINT_INTERVAL) with lock: joints = latest_joint_positions[:] if latest_joint_positions is not None else None tcp = latest_tcp_pose[:] if latest_tcp_pose is not None else None if joints is not None and tcp is not None: joints_fmt = [round(j, 4) for j in joints] tcp_fmt = [round(t, 4) for t in tcp] print("\n" + "=" * 60) print(f"关节角 (rad): {joints_fmt}") print(f"末端位姿 : {tcp_fmt}") print("=" * 60) else: print("尚未收到完整数据...") except KeyboardInterrupt: print("\n程序退出") except Exception as e: print(f"\n运行出错: {e}") finally: try: if topic_id is not None: rtde.removeTopic(False, topic_id) except Exception: pass try: rtde.disconnect() except Exception: pass try: rpc.logout() except Exception: pass try: rpc.disconnect() except Exception: pass if __name__ == "__main__": main()