十阶段干预完整流程 从 SpaceMouse 硬件读取到 Policy 接管全链路如下SpaceMouse 硬件 → 干预检测 → 动作替换 → 机器人执行 → transition 构建 → 双 buffer 插入 → ZMQ 传输 → Learner 50/50 采样 → 模型更新 → ZMQ 广播参数 → Actor 参数替换 → Policy 接管阶段 1SpaceMouse 硬件读取后台进程持续运行步骤类.方法输入→输出1.1SpaceMouseExpert.__init__()调用pyspacemouse.open()打开设备启动_read_spacemouse守护进程1.2_read_spacemouse()循环调用read_all()→ 写入共享字典latest_data[action](6,)和latest_data[buttons]1.3get_action()从共享内存读取 → 返回(expert_a(6,), buttons[2])阶段 2干预检测与动作替换步骤类.方法输入→输出2.1SpacemouseIntervention.__init__()创建 SpaceMouseExpert初始化按钮状态2.2SpacemouseIntervention.step(action)入口调用self.action(action)→ 调用self.env.step(new_action)如果被替换则info[intervene_action] new_action2.3SpacemouseIntervention.action(action)干预判定核心: ① 调用expert.get_action()②norm(expert_a) 0.001→ 位移/旋转干预 ③ 左键 → gripper_actionU(-1,-0.9) 关夹爪 ④ 右键 → gripper_actionU(0.9,1) 开夹爪 ⑤ 拼接为 (7,) ⑥ 返回(expert_a, True)或(action, False)阶段 3动作发送到机器人步骤类.方法输入→输出3.1FrankaEnv.step(action)输入已是干预替换后的 (7,) 动作 → clip → 解析 xyz/rotation/gripper3.2_send_gripper_command(pos)二值模式:pos -0.5→ POST/close_gripper,pos ≥ 0.5→ POST/open_gripper带冷却时间3.3_send_pos_command(pos)POST/pose发送目标位姿含安全盒裁剪 clip_safety_box3.4_update_currpos()POST/getstate获取最新状态3.5_get_obs()获取图像 状态观测 → 返回(ob, reward, done, False, {succeed: reward})阶段 4Actor 循环——干预标识与 transition 构建步骤函数/方法关键逻辑4.1策略采样agent.sample_actions(obs, seedkey, argmaxFalse)→actions(7,)4.2环境交互env.step(actions)→ 经过阶段 2–3 → 返回(next_obs, reward, done, truncated, info)4.3提取干预动作if intervene_action in info: actions info.pop(intervene_action)4.4干预统计intervention_steps 1若非连续干预则intervention_count 14.5构建 transitiontransition {observations, actions, next_observations, rewards, masks, dones} 可选grasp_penalty4.6双 buffer 插入data_store.insert(transition)所有 intvn_data_store.insert(transition)仅干预阶段 5Actor 端数据暂存步骤类.方法关键逻辑5.1QueuedDataStore.__init__(capacity50000)Actor 端创建两个实例data_store和intvn_data_store5.2insert(data)线程安全Lockseq_id追加到_data_queue5.3get_latest_data(from_id)返回from_id之后的所有增量数据List[dict]阶段 6网络传输Actor → Learner步骤类.方法关键逻辑6.1TrainerClient.__init__注册{actor_env: data_store, actor_env_intvn: intvn_data_store}6.2TrainerClient.update()Episode 结束触发遍历每个 store6.3get_server_last_update_id(name)ZMQ REQ-REP 获取 Learner 端最后收到的 ID6.4update_datastore(name, from_id)get_latest_data(from_id)获取增量 → 调用_update_ds()6.5—构造 ZMQ 消息{type: datastore, store_name, payload: {data, last_id}}→ REQ-REP 发送阶段 7Learner 端接收与 Buffer 写入步骤类.方法关键逻辑7.1TrainerServer 收到消息解析store_nameactor_env 或 actor_env_intvn和batch_data7.2–7.5batch_insert(batch_data)加锁 → 遍历 →MemoryEfficientReplayBuffer.insert(data_dict)→ 处理帧堆叠分离 pixel keys/非 pixel keys首帧复制 num_stack 次 → 递归写入预分配 numpy 数组Learner 端两个 Bufferreplay_buffer容量 200K对应 actor_env存所有在线经验和demo_buffer容量 200K对应 actor_env_intvn存仅干预经验。阶段 8Learner 采样与模型更新步骤函数/方法关键逻辑8.1–8.4创建采样迭代器replay_iterator(batch_size//2)demo_iterator(batch_size//2)→concat_batches(batch, demo_batch, axis0)8.5CTA 比率训练cta_ratio - 1次仅 critic 更新 1 次全网络更新8.6–8.9agent.update(batch, networks_to_update)loss_fns()→apply_loss_fns()→ 梯度更新 →target_update()actions[..., :-1]标准 SAC critic loss夹爪动作映射{0,1,2}grasp_rewards rewards grasp_penaltyDQN 式目标阶段 9参数发布与 Actor 更新步骤类.方法关键逻辑9.1–9.2发布触发server.publish_network(agent.state.params)每steps_per_update步 → BroadcastServer.broadcast(payload) → ZMQ 广播9.3–9.4TrainerClient.recv_network_callback(update_params)BroadcastClient 监听 →update_params(params)→agent.replace(stateagent.state.replace(paramsparams))阶段 10Policy 接管步骤类.方法关键逻辑10.1sample_actions(obs, seed, argmax)forward_policy(obs)→ 6D 连续动作 forward_grasp_critic(obs)→ 3 Q 值 → argmax → 映射 →concat([ee_actions, grasp_action])→ (7,)10.2—下一步循环使用更新后的参数采样若人类不干预则 Policy 完全接管