import json import math import os import time from hardware_control import hw_ctrl # 修理商所在位置(游戏坐标),按实际位置修改 VENDOR_POS = (30.08, 71.51) # 到达判定距离(与 coordinate_patrol 一致) VENDOR_ARRIVAL_THRESHOLD = 0.1 # 修理商所在位置配置文件 VENDOR_FILE = 'vendor.json' class LogisticsManager: def __init__(self, route_file=None): self.need_repair = False self.bag_full = False self.is_returning = False self.route_file = route_file or VENDOR_FILE self.bag_full_hearthstone = False # 包满时用炉石回城而非走路修理 self.hearthstone_key = "b" # 炉石按键 self.hearthstone_cast_sec = 10.0 # 炉石施法等待秒数 self.enable_bag_full_mail = False self.mailbox_route_file = os.path.join("recorder", "mailbox.json") self.mailbox_interact_key = "8" self.mail_recipient_key = "" self.mail_send_key = "f8" self.mailbox_open_wait_sec = 2.0 self.mail_send_wait_sec = 60.0 def _resolve_path(self, path): if not path: return "" path = str(path) if os.path.isabs(path): return path cwd_path = os.path.abspath(path) if os.path.exists(cwd_path): return cwd_path return os.path.join(os.path.dirname(os.path.abspath(__file__)), path) def _sleep_with_stop(self, seconds, stop_check=None): end_at = time.time() + max(0.0, float(seconds)) while time.time() < end_at: if callable(stop_check) and stop_check(): return False time.sleep(min(0.1, end_at - time.time())) return True def check_logistics(self, state): """ state['free_slots']: 剩余空格数量 state['durability']: 0.0 ~ 1.0 的耐久度 """ # 触发阈值:空格少于 2 个,或耐久度低于 20% if state['free_slots'] < 2 or state['durability'] < 0.2: if not self.is_returning: print(f">>> [后勤警告] 背包/耐久不足!触发回城程序。") self.is_returning = True else: self.is_returning = False def use_hearthstone_and_stop(self, get_state=None): """按炉石按键并等待施法完成,带有坐标校验的重试机制。""" max_retries = 3 success = False for i in range(max_retries): start_pos = None if get_state: st = get_state() if st: start_pos = (st.get('x'), st.get('y')) print(f">>> [后勤] 第 {i+1} 次尝试使用炉石(按键: {self.hearthstone_key})...") # 先按一下 S 确保停止移动,防止移动中按炉石失败 hw_ctrl.press('s') time.sleep(0.5) hw_ctrl.press(self.hearthstone_key) # 等待施法过程 print(f">>> [后勤] 正在等待施法 {self.hearthstone_cast_sec}s...") time.sleep(self.hearthstone_cast_sec + 2.0) # 多等 2 秒保险 if get_state and start_pos and start_pos[0] is not None: st_now = get_state() if st_now: end_pos = (st_now.get('x'), st_now.get('y')) dist = math.dist(start_pos, end_pos) # 如果坐标发生了明显跳变(大于 2.0),证明回城成功 if dist > 2.0: print(f">>> [后勤] 炉石回城成功!位置跳变距离: {dist:.2f}") success = True break else: print(f">>> [后勤] 炉石似乎失败(位置未变化),准备重试...") else: # 获取不到状态可能已经卡死或窗口关闭,默认成功以退出循环 success = True break else: # 如果没法校验坐标,就只执行一次 success = True break if not success: print(">>> [后勤] 警告:多次尝试炉石回城均未检测到位置跳变!") self.is_returning = False return success def return_home(self): """执行回城动作""" # 1. 停止当前巡逻 # 2. 寻找安全点或直接使用炉石 print(">>> 正在释放炉石...") hw_ctrl.press('7') # 假设炉石在 7 号键 time.sleep(15) # 等待炉石施法 def handle_town_visit(self, state, patrol): """回城流程:用 state 取当前坐标与朝向,调用 patrol.navigate_to_point 前往修理商;到达后按 F3 交互""" is_arrived = patrol.navigate_to_point( state, VENDOR_POS, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD, ) if is_arrived: print(">>> 到达修理商,执行交互宏") self._do_vendor_interact() self.is_returning = False def _do_vendor_interact(self): """执行与修理商/背包的交互按键(8、4)。""" hw_ctrl.press("8") time.sleep(0.5) hw_ctrl.press("4") time.sleep(2) def run_bag_full_mail_flow(self, get_state, patrol, stop_check=None): """ 包满邮寄第一版流程: 炉石 -> 跑到邮箱路线终点 -> 交互打开邮箱 -> 按 MailboxCourier 宏键 -> 等待后停止。 Python 不判断邮件是否发完,发送细节交给游戏内插件。 """ route_file = self._resolve_path(self.mailbox_route_file) if not route_file or not os.path.exists(route_file): print(f">>> [后勤-邮箱] 邮箱路线不存在,已停止: {route_file}") self.is_returning = False return False if callable(stop_check) and stop_check(): self.is_returning = False return False ok = self.use_hearthstone_and_stop(get_state=get_state) if not ok: print(">>> [后勤-邮箱] 炉石失败,未继续跑邮箱。") self.is_returning = False return False try: with open(route_file, "r", encoding="utf-8") as f: path = json.load(f) except Exception as exc: print(f">>> [后勤-邮箱] 邮箱路线读取失败: {exc}") self.is_returning = False return False if not path: print(f">>> [后勤-邮箱] 邮箱路线为空,已停止: {route_file}") self.is_returning = False return False print(f">>> [后勤-邮箱] 开始跑邮箱路线: {route_file}") old_enable_mount = getattr(patrol, "enable_mount", None) if old_enable_mount is not None: patrol.enable_mount = False try: ok = patrol.navigate_path(get_state, path, forward=True, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD) finally: if old_enable_mount is not None: patrol.enable_mount = old_enable_mount if not ok: print(">>> [后勤-邮箱] 邮箱路线未完成,已停止。") self.is_returning = False return False if callable(stop_check) and stop_check(): self.is_returning = False return False patrol.stop_all() print(f">>> [后勤-邮箱] 到达邮箱附近,按交互键: {self.mailbox_interact_key}") hw_ctrl.press(self.mailbox_interact_key) if not self._sleep_with_stop(self.mailbox_open_wait_sec, stop_check=stop_check): self.is_returning = False return False if self.mail_recipient_key: print(f">>> [后勤-邮箱] 触发 MailboxCourier 收件人宏: {self.mail_recipient_key}") hw_ctrl.press(self.mail_recipient_key) if not self._sleep_with_stop(0.5, stop_check=stop_check): self.is_returning = False return False print(f">>> [后勤-邮箱] 触发 MailboxCourier 发送宏: {self.mail_send_key}") hw_ctrl.press(self.mail_send_key) if not self._sleep_with_stop(self.mail_send_wait_sec, stop_check=stop_check): self.is_returning = False return False print(">>> [后勤-邮箱] 邮寄宏已触发,流程结束。") self.is_returning = False return True def run_route1_round(self, get_state, patrol, route_file=None): """ 读取 route1.json 路径,先正向走完,执行交互(8、4),再反向走完,然后结束。 get_state: 可调用对象,返回当前状态 dict(含 x, y, facing)。 patrol: CoordinatePatrol 实例,用于 navigate_path。 route_file: 路径 JSON 文件路径,默认使用 __init__ 中的 route_file。 """ route_file = route_file or self.route_file with open(route_file, "r", encoding="utf-8") as f: path = json.load(f) if not path: print(">>> [后勤] route1 为空,跳过") return print(">>> [后勤] 开始 route1 正向") ok = patrol.navigate_path(get_state, path, forward=True, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD) if not ok: print(">>> [后勤] 正向未完成,中止") return print(">>> [后勤] 正向到达,执行交互") self._do_vendor_interact() print(">>> [后勤] 开始 route1 反向") ok = patrol.navigate_path(get_state, path, forward=False, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD) if not ok: print(">>> [后勤] 反向未完成") return print(">>> [后勤] route1 往返结束")