import json import math import time from hardware_control import hw_ctrl # 修理商所在位置(游戏坐标),按实际位置修改 VENDOR_POS = (30.08, 71.51) # 到达判定距离(与 coordinate_patrol 一致) VENDOR_ARRIVAL_THRESHOLD = 0.1 # 修理商所在位置配置文件 VENDOR_FILE = 'vendor.json' class LogisticsManager: def __init__(self, route_file=None): self.need_repair = False self.bag_full = False self.is_returning = False self.route_file = route_file or VENDOR_FILE self.bag_full_hearthstone = False # 包满时用炉石回城而非走路修理 self.hearthstone_key = "b" # 炉石按键 self.hearthstone_cast_sec = 10.0 # 炉石施法等待秒数 def check_logistics(self, state): """ state['free_slots']: 剩余空格数量 state['durability']: 0.0 ~ 1.0 的耐久度 """ # 触发阈值:空格少于 2 个,或耐久度低于 20% if state['free_slots'] < 2 or state['durability'] < 0.2: if not self.is_returning: print(f">>> [后勤警告] 背包/耐久不足!触发回城程序。") self.is_returning = True else: self.is_returning = False def use_hearthstone_and_stop(self, get_state=None): """按炉石按键并等待施法完成,带有坐标校验的重试机制。""" max_retries = 3 success = False for i in range(max_retries): start_pos = None if get_state: st = get_state() if st: start_pos = (st.get('x'), st.get('y')) print(f">>> [后勤] 第 {i+1} 次尝试使用炉石(按键: {self.hearthstone_key})...") # 先按一下 S 确保停止移动,防止移动中按炉石失败 hw_ctrl.press('s') time.sleep(0.5) hw_ctrl.press(self.hearthstone_key) # 等待施法过程 print(f">>> [后勤] 正在等待施法 {self.hearthstone_cast_sec}s...") time.sleep(self.hearthstone_cast_sec + 2.0) # 多等 2 秒保险 if get_state and start_pos and start_pos[0] is not None: st_now = get_state() if st_now: end_pos = (st_now.get('x'), st_now.get('y')) dist = math.dist(start_pos, end_pos) # 如果坐标发生了明显跳变(大于 2.0),证明回城成功 if dist > 2.0: print(f">>> [后勤] 炉石回城成功!位置跳变距离: {dist:.2f}") success = True break else: print(f">>> [后勤] 炉石似乎失败(位置未变化),准备重试...") else: # 获取不到状态可能已经卡死或窗口关闭,默认成功以退出循环 success = True break else: # 如果没法校验坐标,就只执行一次 success = True break if not success: print(">>> [后勤] 警告:多次尝试炉石回城均未检测到位置跳变!") self.is_returning = False return success def return_home(self): """执行回城动作""" # 1. 停止当前巡逻 # 2. 寻找安全点或直接使用炉石 print(">>> 正在释放炉石...") hw_ctrl.press('7') # 假设炉石在 7 号键 time.sleep(15) # 等待炉石施法 def handle_town_visit(self, state, patrol): """回城流程:用 state 取当前坐标与朝向,调用 patrol.navigate_to_point 前往修理商;到达后按 F3 交互""" is_arrived = patrol.navigate_to_point( state, VENDOR_POS, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD, ) if is_arrived: print(">>> 到达修理商,执行交互宏") self._do_vendor_interact() self.is_returning = False def _do_vendor_interact(self): """执行与修理商/背包的交互按键(8、4)。""" hw_ctrl.press("8") time.sleep(0.5) hw_ctrl.press("4") time.sleep(2) def run_route1_round(self, get_state, patrol, route_file=None): """ 读取 route1.json 路径,先正向走完,执行交互(8、4),再反向走完,然后结束。 get_state: 可调用对象,返回当前状态 dict(含 x, y, facing)。 patrol: CoordinatePatrol 实例,用于 navigate_path。 route_file: 路径 JSON 文件路径,默认使用 __init__ 中的 route_file。 """ route_file = route_file or self.route_file with open(route_file, "r", encoding="utf-8") as f: path = json.load(f) if not path: print(">>> [后勤] route1 为空,跳过") return print(">>> [后勤] 开始 route1 正向") ok = patrol.navigate_path(get_state, path, forward=True, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD) if not ok: print(">>> [后勤] 正向未完成,中止") return print(">>> [后勤] 正向到达,执行交互") self._do_vendor_interact() print(">>> [后勤] 开始 route1 反向") ok = patrol.navigate_path(get_state, path, forward=False, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD) if not ok: print(">>> [后勤] 反向未完成") return print(">>> [后勤] route1 往返结束")