Files
wow/coordinate_patrol.py
2026-03-23 09:50:08 +08:00

398 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
坐标巡逻模块:按航点列表循环巡逻,朝向计算与 player_movement 约定一致atan2(dx,-dy));内含卡死检测与脱困。
"""
import math
import random
import time
import logging
import pyautogui
from stuck_handler import StuckHandler
# 转向与死区常量(度)
ANGLE_THRESHOLD_DEG = 15.0 # 朝向容差,超过此值才按 A/D 转向
ANGLE_DEADZONE_DEG = 5.0 # 死区,此范围内不按 A/D、只按 W减少左右微调
# 随机行为跳跃概率每帧触发概率0.005 ≈ 平均 200 帧一次)
RANDOM_JUMP_PROB = 0.05
class CoordinatePatrol:
"""按航点坐标巡逻,用 pyautogui 按键转向与前进。"""
def __init__(
self,
waypoints,
arrival_threshold=0.5,
angle_threshold_deg=ANGLE_THRESHOLD_DEG,
angle_deadzone_deg=ANGLE_DEADZONE_DEG,
random_jump_prob=RANDOM_JUMP_PROB,
mount_key="x",
mount_hold_sec=1.6,
mount_retry_after_sec=2.0,
):
"""
Args:
waypoints: 航点列表,游戏坐标格式 [(x1, y1), (x2, y2), ...],如 [(26.0, 78.0), (30.0, 82.0)]
arrival_threshold: 到达判定距离(游戏坐标单位),默认 0.5
angle_threshold_deg: 朝向容差(度),超过此值才按 A/D 转向,默认 ANGLE_THRESHOLD_DEG
angle_deadzone_deg: 转向死区(度),此范围内不按 A/D、只按 W默认 ANGLE_DEADZONE_DEG
mount_key / mount_hold_sec / mount_retry_after_sec: 未上马时先上马(与 game_state_config / GUI 一致)
"""
self.waypoints = waypoints
self.current_index = 0
self.arrival_threshold = arrival_threshold
self.angle_threshold_deg = angle_threshold_deg
self.angle_deadzone_deg = angle_deadzone_deg
self.random_jump_prob = float(random_jump_prob)
self.logger = logging.getLogger(__name__)
self.last_turn_end_time = 0 # 最近一次结束 A/D 转向的时间,供卡死检测排除原地转向
self.stuck_handler = StuckHandler()
self._next_mount_allowed = 0.0
self.mount_key = str(mount_key).strip() or "x"
self.mount_hold_sec = float(mount_hold_sec)
self.mount_retry_after_sec = float(mount_retry_after_sec)
def _ensure_mounted(self, state):
"""
已上马返回 True。
未上马则松开移动键、按住上马键 MOUNT_HOLD_SEC本帧不走路返回 False。
state 无 mounted 字段时视为无法判断,不拦巡逻(兼容未开 LogicBeacon
"""
if "mounted" not in state:
return True
if state.get("mounted"):
return True
self.stop_all()
now = time.time()
if now < self._next_mount_allowed:
return False
self.logger.info(
f">>> 未上马,先按 {self.mount_key} {self.mount_hold_sec:.1f}s 上马"
)
pyautogui.keyDown(self.mount_key)
time.sleep(self.mount_hold_sec)
pyautogui.keyUp(self.mount_key)
self._next_mount_allowed = time.time() + self.mount_retry_after_sec
return False
@staticmethod
def get_distance(p1, p2):
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
def snap_to_forward_waypoint(self, current_pos, max_ahead=10, max_dist=10.0, skip_current=True):
"""
根据当前位置,将巡逻索引 current_index 智能“接回”前方最近的航点。
典型场景:战斗结束后,角色可能偏离路径,希望从前进方向上的最近航点继续巡逻,
而不是跑回战斗前的旧航点。
Args:
current_pos: 当前坐标 (x, y),游戏坐标。
max_ahead: 向前最多查看多少个航点(以 current_index 为起点),默认 10。
max_dist: 认为“可以接回”的最大距离阈值,超过则不调整索引,单位同坐标,默认 10.0。
skip_current: 是否跳过 current_index 本身,直接从下一个点开始找,避免立刻又去原来的点。
Returns:
bool: True 表示已调整 current_indexFalse 表示未调整(距离太远或数据不完整)。
"""
if current_pos is None:
return False
if not self.waypoints:
return False
try:
cx, cy = float(current_pos[0]), float(current_pos[1])
except Exception:
return False
n = len(self.waypoints)
if n == 0:
return False
start_idx = (self.current_index + (1 if skip_current else 0)) % n
best_idx = None
best_dist = None
# 在 current_index 之后的若干个点里寻找最近的一个
for offset in range(max_ahead + 1):
idx = (start_idx + offset) % n
wp = self.waypoints[idx]
d = self.get_distance((cx, cy), (float(wp[0]), float(wp[1])))
if (best_dist is None) or (d < best_dist):
best_dist = d
best_idx = idx
if best_idx is None or best_dist is None:
return False
if best_dist > float(max_dist):
# 离前方所有点都太远,则不强行跳索引,维持原行为
return False
# 将当前巡逻索引接到前方最近的点或其后一个点
self.current_index = best_idx
self.reset_stuck()
self.logger.info(
f">>> 战斗结束智能接回巡逻current_pos={current_pos}, "
f"接入点索引={best_idx}, 距离={best_dist:.2f}"
)
return True
def get_target_heading_deg(self, current_pos, target_pos):
"""计算朝向目标所需的游戏朝向。dx_actual ∝ sin(heading)dy_actual ∝ -cos(heading) → 目标朝向 = atan2(dx, -dy)。"""
dx = target_pos[0] - current_pos[0]
dy = target_pos[1] - current_pos[1]
if dx == 0 and dy == 0:
return 0.0
return math.degrees(math.atan2(dx, -dy)) % 360
def _turn_duration_from_angle(self, angle_diff_deg):
"""
专门为“边走边转”设计的非线性脉冲函数。
逻辑:
- 大于 45°给予一个较长脉冲快速切入方向。
- 15° 到 45°线性修正。
- 小于 15°极小脉冲防止在直线上摇摆。
"""
abs_angle = abs(angle_diff_deg)
# 基础转向系数 (针对边走边转优化,值越小越不容易画龙)
# 推荐范围 0.002 - 0.0035
scale_factor = 0.0025
if abs_angle > 45:
# 大角度:给予一个明显的转向力,但不超过 0.2s 避免丢失 W 的推力感
base_duration = min(abs_angle * scale_factor * 1.2, 0.20)
elif abs_angle > 15:
# 中角度:标准线性修正
base_duration = abs_angle * scale_factor
else:
# 小角度微调:使用极短脉冲 (物理按键触发的底线)
base_duration = 0.04
# 随机化:模拟人工轻点按键的不规则性
# 这里的随机范围要小,否则会导致修正量不稳定
jitter = random.uniform(0.005, 0.015)
final_duration = base_duration + jitter
# 最终安全检查:严禁超过 0.25s,否则会造成明显的顿挫
return min(final_duration, 0.25)
def reset_stuck(self):
"""重置卡死检测状态(切航点、停止巡逻、死亡/后勤/战斗时由外部调用)。"""
self.stuck_handler.reset()
def navigate(self, state):
"""
优化版导航:支持边走边转,非阻塞平滑移动。
"""
if state is None:
self.stop_all()
return
# 1. 基础状态解析与校验
x, y = state.get("x"), state.get("y")
heading = state.get("facing")
if x is None or y is None or heading is None:
self.logger.warning("state 数据不完整,跳过导航帧")
self.stop_all()
return
if not self._ensure_mounted(state):
return
# 2. 卡死检测:位移检测逻辑保持在巡逻中
if self.stuck_handler.check_stuck(state, self.last_turn_end_time):
self.logger.error("!!! 检测到卡死,启动脱困程序 !!!")
self.stop_all()
self.stuck_handler.resolve_stuck()
return
curr_pos = (float(x), float(y))
heading = float(heading)
target_pos = self.waypoints[self.current_index]
dist = self.get_distance(curr_pos, target_pos)
# 3. 到达判定(到点后平滑切换下一个航点,不再完全刹车)
if dist < self.arrival_threshold:
self.logger.info(f">>> 到达航点 {self.current_index},切换至下一目标(平滑过渡)")
self.current_index = (self.current_index + 1) % len(self.waypoints)
# 重置卡死检测,避免在航点附近因为轻微抖动被误判卡死
self.reset_stuck()
# 不再调用 stop_all(),让 W 保持按下,下一帧直接朝下一个航点继续前进
return
# 4. 计算航向逻辑
target_heading = self.get_target_heading_deg(curr_pos, target_pos)
angle_diff = (target_heading - heading + 180) % 360 - 180
abs_diff = abs(angle_diff)
# --- [平滑移动核心:边走边转控制层] ---
# A. 始终保持前进动力
pyautogui.keyDown("w")
# B. 转向决策逻辑
if abs_diff <= self.angle_deadzone_deg:
if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次
self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困")
pyautogui.press("space")
# 在死区内:绝对直线行驶,松开所有转向键
pyautogui.keyUp("a")
pyautogui.keyUp("d")
elif abs_diff > self.angle_threshold_deg:
# 超出阈值:执行平滑脉冲转向
turn_key = "d" if angle_diff > 0 else "a"
other_key = "a" if angle_diff > 0 else "d"
# 确保不会同时按下左右键
pyautogui.keyUp(other_key)
# 获取针对平滑移动优化的短脉冲时长
duration = self._turn_duration_from_angle(abs_diff)
# 关键:执行短促转向脉冲
pyautogui.keyDown(turn_key)
# 这里的 sleep 极短 (建议 < 0.1s),不会造成移动卡顿
time.sleep(duration)
pyautogui.keyUp(turn_key)
self.last_turn_end_time = time.time()
# self.logger.debug(f"修正航向: {turn_key} | 角度差: {angle_diff:.1f}°")
else:
# 在死区与阈值之间:为了平滑,不进行任何按键动作,仅靠 W 前进
pyautogui.keyUp("a")
pyautogui.keyUp("d")
return
def navigate_to_point(self, state, target_pos, arrival_threshold=None):
"""
优化版:平滑导航,支持边走边转。
"""
if state is None:
self.stop_all()
return False
current_pos = (state.get('x'), state.get('y'))
current_facing = state.get('facing')
if None in current_pos or current_facing is None:
self.stop_all()
return False
if not self._ensure_mounted(state):
return False
# 1. 卡死检测
if self.stuck_handler.check_stuck(state, self.last_turn_end_time):
self.stop_all()
self.stuck_handler.resolve_stuck()
return False
current_pos = (float(current_pos[0]), float(current_pos[1]))
current_facing = float(current_facing)
threshold = arrival_threshold if arrival_threshold is not None else self.arrival_threshold
# 2. 距离判断
dist = self.get_distance(current_pos, target_pos)
if dist < threshold:
self.stop_all()
return True
# 3. 计算角度差
target_heading = self.get_target_heading_deg(current_pos, target_pos)
angle_diff = (target_heading - current_facing + 180) % 360 - 180
abs_diff = abs(angle_diff)
# --- 平滑移动核心逻辑 ---
# 只要没到终点,始终保持前进 W 键按下
pyautogui.keyDown("w")
# 4. 根据偏角决定转向动作
if abs_diff <= self.angle_deadzone_deg:
if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次
self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困")
pyautogui.press("space")
# 在死区内,确保转向键松开,直线前进
pyautogui.keyUp("a")
pyautogui.keyUp("d")
else:
# 在死区外,需要修正方向
turn_key = "d" if angle_diff > 0 else "a"
other_key = "a" if angle_diff > 0 else "d"
# 松开反方向键
pyautogui.keyUp(other_key)
# 计算本次修正的时长
# 边走边转时duration 应比原地转向更短,建议使用 0.05s - 0.1s 的短脉冲
duration = self._turn_duration_from_angle(abs_diff)
# 限制单次修正时间,防止转过头导致“画龙”
safe_duration = min(duration, 0.15)
pyautogui.keyDown(turn_key)
time.sleep(safe_duration) # 短暂物理延迟确保按键生效
pyautogui.keyUp(turn_key)
self.last_turn_end_time = time.time()
return False
def navigate_path(self, get_state, path, forward=True, arrival_threshold=None):
"""
按 path 依次走完所有点后返回。每次调用都必须传入 path。
get_state: 可调用对象,每次调用返回当前状态 dict需包含 'x','y','facing'
若含 'mounted'LogicBeacon路径开始前会先上马每段 navigate_to_point 也会校验。
path: [[x,y], ...] 或 [(x,y), ...](如 json.load 得到),本次要走的全部航点。
forward: True=正序0→nFalse=倒序n→0
阻塞直到走完 path 中所有点或出错,走完返回 True。内含卡死检测。
"""
if not path:
self.stop_all()
return True
points = [(float(p[0]), float(p[1])) for p in path]
if not forward:
points = points[::-1]
# 路径开始前:若 state 含 mounted 且未上马,先完成上马再逐点移动
while True:
state = get_state()
if state is None:
self.stop_all()
return False
if self._ensure_mounted(state):
break
time.sleep(0.05)
for i, target in enumerate(points):
self.logger.info(f">>> 路径点 {i + 1}/{len(points)}: {target}")
while True:
state = get_state()
if state is None:
self.stop_all()
return False
arrived = self.navigate_to_point(state, target, arrival_threshold)
if arrived:
break
time.sleep(0.05)
self.stop_all()
self.logger.info(">>> 路径走完")
return True
def press_key(self, key):
"""只按下指定键,抬起其他移动键。"""
for k in ("w", "a", "d"):
if k == key:
pyautogui.keyDown(k)
else:
pyautogui.keyUp(k)
def stop_all(self):
for key in ["w", "a", "s", "d"]:
pyautogui.keyUp(key)