""" 坐标巡逻模块:按航点列表循环巡逻,朝向计算与 player_movement 约定一致(atan2(dx,-dy));内含卡死检测与脱困。 """ import math import random import time import logging import pyautogui from stuck_handler import StuckHandler # 转向与死区常量(度) ANGLE_THRESHOLD_DEG = 15.0 # 朝向容差,超过此值才按 A/D 转向 ANGLE_DEADZONE_DEG = 5.0 # 死区,此范围内不按 A/D、只按 W,减少左右微调 # 随机行为:跳跃概率(每帧触发概率,0.005 ≈ 平均 200 帧一次) RANDOM_JUMP_PROB = 0.05 class CoordinatePatrol: """按航点坐标巡逻,用 pyautogui 按键转向与前进。""" def __init__( self, waypoints, arrival_threshold=0.5, angle_threshold_deg=ANGLE_THRESHOLD_DEG, angle_deadzone_deg=ANGLE_DEADZONE_DEG, random_jump_prob=RANDOM_JUMP_PROB, mount_key="x", mount_hold_sec=1.6, mount_retry_after_sec=2.0, ): """ Args: waypoints: 航点列表,游戏坐标格式 [(x1, y1), (x2, y2), ...],如 [(26.0, 78.0), (30.0, 82.0)] arrival_threshold: 到达判定距离(游戏坐标单位),默认 0.5 angle_threshold_deg: 朝向容差(度),超过此值才按 A/D 转向,默认 ANGLE_THRESHOLD_DEG angle_deadzone_deg: 转向死区(度),此范围内不按 A/D、只按 W,默认 ANGLE_DEADZONE_DEG mount_key / mount_hold_sec / mount_retry_after_sec: 未上马时先上马(与 game_state_config / GUI 一致) """ self.waypoints = waypoints self.current_index = 0 self.arrival_threshold = arrival_threshold self.angle_threshold_deg = angle_threshold_deg self.angle_deadzone_deg = angle_deadzone_deg self.random_jump_prob = float(random_jump_prob) self.logger = logging.getLogger(__name__) self.last_turn_end_time = 0 # 最近一次结束 A/D 转向的时间,供卡死检测排除原地转向 self.stuck_handler = StuckHandler() self._next_mount_allowed = 0.0 self.mount_key = str(mount_key).strip() or "x" self.mount_hold_sec = float(mount_hold_sec) self.mount_retry_after_sec = float(mount_retry_after_sec) def _ensure_mounted(self, state): """ 已上马返回 True。 未上马则松开移动键、按住上马键 MOUNT_HOLD_SEC,本帧不走路;返回 False。 state 无 mounted 字段时视为无法判断,不拦巡逻(兼容未开 LogicBeacon)。 """ if "mounted" not in state: return True if state.get("mounted"): return True self.stop_all() now = time.time() if now < self._next_mount_allowed: return False self.logger.info( f">>> 未上马,先按 {self.mount_key} {self.mount_hold_sec:.1f}s 上马" ) pyautogui.keyDown(self.mount_key) time.sleep(self.mount_hold_sec) pyautogui.keyUp(self.mount_key) self._next_mount_allowed = time.time() + self.mount_retry_after_sec return False @staticmethod def get_distance(p1, p2): return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) def snap_to_forward_waypoint(self, current_pos, max_ahead=10, max_dist=10.0, skip_current=True): """ 根据当前位置,将巡逻索引 current_index 智能“接回”前方最近的航点。 典型场景:战斗结束后,角色可能偏离路径,希望从前进方向上的最近航点继续巡逻, 而不是跑回战斗前的旧航点。 Args: current_pos: 当前坐标 (x, y),游戏坐标。 max_ahead: 向前最多查看多少个航点(以 current_index 为起点),默认 10。 max_dist: 认为“可以接回”的最大距离阈值,超过则不调整索引,单位同坐标,默认 10.0。 skip_current: 是否跳过 current_index 本身,直接从下一个点开始找,避免立刻又去原来的点。 Returns: bool: True 表示已调整 current_index;False 表示未调整(距离太远或数据不完整)。 """ if current_pos is None: return False if not self.waypoints: return False try: cx, cy = float(current_pos[0]), float(current_pos[1]) except Exception: return False n = len(self.waypoints) if n == 0: return False start_idx = (self.current_index + (1 if skip_current else 0)) % n best_idx = None best_dist = None # 在 current_index 之后的若干个点里寻找最近的一个 for offset in range(max_ahead + 1): idx = (start_idx + offset) % n wp = self.waypoints[idx] d = self.get_distance((cx, cy), (float(wp[0]), float(wp[1]))) if (best_dist is None) or (d < best_dist): best_dist = d best_idx = idx if best_idx is None or best_dist is None: return False if best_dist > float(max_dist): # 离前方所有点都太远,则不强行跳索引,维持原行为 return False # 将当前巡逻索引接到前方最近的点或其后一个点 self.current_index = best_idx self.reset_stuck() self.logger.info( f">>> 战斗结束智能接回巡逻:current_pos={current_pos}, " f"接入点索引={best_idx}, 距离={best_dist:.2f}" ) return True def get_target_heading_deg(self, current_pos, target_pos): """计算朝向目标所需的游戏朝向(度)。dx_actual ∝ sin(heading),dy_actual ∝ -cos(heading) → 目标朝向 = atan2(dx, -dy)。""" dx = target_pos[0] - current_pos[0] dy = target_pos[1] - current_pos[1] if dx == 0 and dy == 0: return 0.0 return math.degrees(math.atan2(dx, -dy)) % 360 def _turn_duration_from_angle(self, angle_diff_deg): """ 专门为“边走边转”设计的非线性脉冲函数。 逻辑: - 大于 45°:给予一个较长脉冲,快速切入方向。 - 15° 到 45°:线性修正。 - 小于 15°:极小脉冲,防止在直线上摇摆。 """ abs_angle = abs(angle_diff_deg) # 基础转向系数 (针对边走边转优化,值越小越不容易画龙) # 推荐范围 0.002 - 0.0035 scale_factor = 0.0025 if abs_angle > 45: # 大角度:给予一个明显的转向力,但不超过 0.2s 避免丢失 W 的推力感 base_duration = min(abs_angle * scale_factor * 1.2, 0.20) elif abs_angle > 15: # 中角度:标准线性修正 base_duration = abs_angle * scale_factor else: # 小角度微调:使用极短脉冲 (物理按键触发的底线) base_duration = 0.04 # 随机化:模拟人工轻点按键的不规则性 # 这里的随机范围要小,否则会导致修正量不稳定 jitter = random.uniform(0.005, 0.015) final_duration = base_duration + jitter # 最终安全检查:严禁超过 0.25s,否则会造成明显的顿挫 return min(final_duration, 0.25) def reset_stuck(self): """重置卡死检测状态(切航点、停止巡逻、死亡/后勤/战斗时由外部调用)。""" self.stuck_handler.reset() def navigate(self, state): """ 优化版导航:支持边走边转,非阻塞平滑移动。 """ if state is None: self.stop_all() return # 1. 基础状态解析与校验 x, y = state.get("x"), state.get("y") heading = state.get("facing") if x is None or y is None or heading is None: self.logger.warning("state 数据不完整,跳过导航帧") self.stop_all() return if not self._ensure_mounted(state): return # 2. 卡死检测:位移检测逻辑保持在巡逻中 if self.stuck_handler.check_stuck(state, self.last_turn_end_time): self.logger.error("!!! 检测到卡死,启动脱困程序 !!!") self.stop_all() self.stuck_handler.resolve_stuck() return curr_pos = (float(x), float(y)) heading = float(heading) target_pos = self.waypoints[self.current_index] dist = self.get_distance(curr_pos, target_pos) # 3. 到达判定(到点后平滑切换下一个航点,不再完全刹车) if dist < self.arrival_threshold: self.logger.info(f">>> 到达航点 {self.current_index},切换至下一目标(平滑过渡)") self.current_index = (self.current_index + 1) % len(self.waypoints) # 重置卡死检测,避免在航点附近因为轻微抖动被误判卡死 self.reset_stuck() # 不再调用 stop_all(),让 W 保持按下,下一帧直接朝下一个航点继续前进 return # 4. 计算航向逻辑 target_heading = self.get_target_heading_deg(curr_pos, target_pos) angle_diff = (target_heading - heading + 180) % 360 - 180 abs_diff = abs(angle_diff) # --- [平滑移动核心:边走边转控制层] --- # A. 始终保持前进动力 pyautogui.keyDown("w") # B. 转向决策逻辑 if abs_diff <= self.angle_deadzone_deg: if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次 self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困") pyautogui.press("space") # 在死区内:绝对直线行驶,松开所有转向键 pyautogui.keyUp("a") pyautogui.keyUp("d") elif abs_diff > self.angle_threshold_deg: # 超出阈值:执行平滑脉冲转向 turn_key = "d" if angle_diff > 0 else "a" other_key = "a" if angle_diff > 0 else "d" # 确保不会同时按下左右键 pyautogui.keyUp(other_key) # 获取针对平滑移动优化的短脉冲时长 duration = self._turn_duration_from_angle(abs_diff) # 关键:执行短促转向脉冲 pyautogui.keyDown(turn_key) # 这里的 sleep 极短 (建议 < 0.1s),不会造成移动卡顿 time.sleep(duration) pyautogui.keyUp(turn_key) self.last_turn_end_time = time.time() # self.logger.debug(f"修正航向: {turn_key} | 角度差: {angle_diff:.1f}°") else: # 在死区与阈值之间:为了平滑,不进行任何按键动作,仅靠 W 前进 pyautogui.keyUp("a") pyautogui.keyUp("d") return def navigate_to_point(self, state, target_pos, arrival_threshold=None): """ 优化版:平滑导航,支持边走边转。 """ if state is None: self.stop_all() return False current_pos = (state.get('x'), state.get('y')) current_facing = state.get('facing') if None in current_pos or current_facing is None: self.stop_all() return False if not self._ensure_mounted(state): return False # 1. 卡死检测 if self.stuck_handler.check_stuck(state, self.last_turn_end_time): self.stop_all() self.stuck_handler.resolve_stuck() return False current_pos = (float(current_pos[0]), float(current_pos[1])) current_facing = float(current_facing) threshold = arrival_threshold if arrival_threshold is not None else self.arrival_threshold # 2. 距离判断 dist = self.get_distance(current_pos, target_pos) if dist < threshold: self.stop_all() return True # 3. 计算角度差 target_heading = self.get_target_heading_deg(current_pos, target_pos) angle_diff = (target_heading - current_facing + 180) % 360 - 180 abs_diff = abs(angle_diff) # --- 平滑移动核心逻辑 --- # 只要没到终点,始终保持前进 W 键按下 pyautogui.keyDown("w") # 4. 根据偏角决定转向动作 if abs_diff <= self.angle_deadzone_deg: if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次 self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困") pyautogui.press("space") # 在死区内,确保转向键松开,直线前进 pyautogui.keyUp("a") pyautogui.keyUp("d") else: # 在死区外,需要修正方向 turn_key = "d" if angle_diff > 0 else "a" other_key = "a" if angle_diff > 0 else "d" # 松开反方向键 pyautogui.keyUp(other_key) # 计算本次修正的时长 # 边走边转时,duration 应比原地转向更短,建议使用 0.05s - 0.1s 的短脉冲 duration = self._turn_duration_from_angle(abs_diff) # 限制单次修正时间,防止转过头导致“画龙” safe_duration = min(duration, 0.15) pyautogui.keyDown(turn_key) time.sleep(safe_duration) # 短暂物理延迟确保按键生效 pyautogui.keyUp(turn_key) self.last_turn_end_time = time.time() return False def simple_flight_navigate( self, state, target_x, target_y, *, takeoff_key="space", takeoff_hold_sec=2.0, land_key="p", land_hold_sec=4.0, dismount_key=None, arrival_threshold=0.003, turn_deadzone_deg=5.0, mount_wait_sec=2.5, ): """ 极简飞行导航:只做 1.上马 2.起飞 3.飞向坐标 4.降落。 说明: - 当前项目的飞行状态使用 `state['flight']`(0/0.5/1),并换算到 0..255 再按阈值判断阶段。 - 只做“极简控制”,不包含卡死检测与复杂脱困逻辑。 """ if state is None: return False curr_x, curr_y = state.get("x"), state.get("y") curr_facing = state.get("facing") if None in (curr_x, curr_y, curr_facing): return False # 统一 mount 逻辑:优先使用 mounted 字段 if "mounted" in state and state.get("mounted") is False: # 用与其它逻辑一致的上马方式,按 game_state_config 的 mount_key/mount_hold_sec ok = self._ensure_mounted(state) return False if not ok else False # 归一化飞行信号:把 state['flight'] 映射为 0..255 flight_raw = state.get("flight", None) f_signal = None if isinstance(flight_raw, (int, float)): # 若是三档 0/0.5/1,则换算到 0..255 if float(flight_raw) <= 1.0: f_signal = float(flight_raw) * 255.0 else: f_signal = float(flight_raw) # 没有飞行信息:只能退化为“先尝试上马,再尝试起飞” if f_signal is None: ok = self._ensure_mounted(state) if not ok: return False pyautogui.keyDown(str(takeoff_key)) time.sleep(float(takeoff_hold_sec)) pyautogui.keyUp(str(takeoff_key)) return False # 计算距离 dist = math.sqrt((target_x - float(curr_x)) ** 2 + (target_y - float(curr_y)) ** 2) dismount_key = str(dismount_key).strip() if dismount_key else self.mount_key dismount_key = dismount_key or "x" # --- 1. 上马阶段(兼容你的阈值写法:f_signal < 10) --- if f_signal < 10: # 退化:直接按 mount_key 一次并等待 self.logger.info(">>> 飞行阶段检测:未上马,先按 %s", self.mount_key) pyautogui.press(self.mount_key) time.sleep(float(mount_wait_sec)) return False # --- 2. 起飞阶段(兼容你的阈值写法:100~200) --- if 100 < f_signal < 200: self.logger.info(">>> 起飞:按住 %s %.1f 秒", takeoff_key, float(takeoff_hold_sec)) pyautogui.keyDown(str(takeoff_key)) time.sleep(float(takeoff_hold_sec)) pyautogui.keyUp(str(takeoff_key)) return False # --- 3. 巡航与降落判定(兼容你的阈值写法:>240) --- if f_signal > 240: if dist > float(arrival_threshold): # A. 始终按住 W 前进 pyautogui.keyDown("w") # B. 修正方向(使用与本模块一致的朝向计算) target_heading = self.get_target_heading_deg((float(curr_x), float(curr_y)), (float(target_x), float(target_y))) angle_diff = (target_heading - float(curr_facing) + 180) % 360 - 180 abs_diff = abs(angle_diff) # 在死区内:确保转向键松开,减少左右抖动 if abs_diff <= float(turn_deadzone_deg): pyautogui.keyUp("a") pyautogui.keyUp("d") return False # 防止“上一个修正刚结束,下一帧立刻反向再修正”造成摆头 now = time.time() last_t = getattr(self, "_last_simple_flight_turn_time", 0.0) if now - last_t < 0.08: return False key = "d" if angle_diff > 0 else "a" other_key = "a" if key == "d" else "d" pyautogui.keyUp(other_key) pyautogui.keyDown(key) # 修正脉冲时长随角度误差变化,避免固定 0.05s 过冲/欠冲 duration = self._turn_duration_from_angle(abs_diff) safe_duration = min(float(duration), 0.12) # 误差只略大于死区时,进一步缩短,减少接近目标时的来回摆 if abs_diff < float(turn_deadzone_deg) * 1.8: safe_duration = min(safe_duration, 0.04) time.sleep(safe_duration) pyautogui.keyUp(key) setattr(self, "_last_simple_flight_turn_time", now) return False # C. 到达目标,执行降落 pyautogui.keyUp("w") self.logger.info(">>> 到达目标,执行降落:按住 %s %.1f 秒", land_key, float(land_hold_sec)) pyautogui.keyDown(str(land_key)) time.sleep(float(land_hold_sec)) pyautogui.keyUp(str(land_key)) pyautogui.press(dismount_key) self.logger.info(">>> 降落完成:已按 %s", dismount_key) return True return False def navigate_path(self, get_state, path, forward=True, arrival_threshold=None): """ 按 path 依次走完所有点后返回。每次调用都必须传入 path。 get_state: 可调用对象,每次调用返回当前状态 dict,需包含 'x','y','facing'。 若含 'mounted'(LogicBeacon),路径开始前会先上马,每段 navigate_to_point 也会校验。 path: [[x,y], ...] 或 [(x,y), ...](如 json.load 得到),本次要走的全部航点。 forward: True=正序(0→n),False=倒序(n→0)。 阻塞直到走完 path 中所有点或出错,走完返回 True。内含卡死检测。 """ if not path: self.stop_all() return True points = [(float(p[0]), float(p[1])) for p in path] if not forward: points = points[::-1] # 路径开始前:若 state 含 mounted 且未上马,先完成上马再逐点移动 while True: state = get_state() if state is None: self.stop_all() return False if self._ensure_mounted(state): break time.sleep(0.05) for i, target in enumerate(points): self.logger.info(f">>> 路径点 {i + 1}/{len(points)}: {target}") while True: state = get_state() if state is None: self.stop_all() return False arrived = self.navigate_to_point(state, target, arrival_threshold) if arrived: break time.sleep(0.05) self.stop_all() self.logger.info(">>> 路径走完") return True def press_key(self, key): """只按下指定键,抬起其他移动键。""" for k in ("w", "a", "d"): if k == key: pyautogui.keyDown(k) else: pyautogui.keyUp(k) def stop_all(self): for key in ["w", "a", "s", "d"]: pyautogui.keyUp(key)