Files
wow/coordinate_patrol.py
2026-03-23 16:05:27 +08:00

528 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
坐标巡逻模块:按航点列表循环巡逻,朝向计算与 player_movement 约定一致atan2(dx,-dy));内含卡死检测与脱困。
"""
import math
import random
import time
import logging
import pyautogui
from stuck_handler import StuckHandler
# 转向与死区常量(度)
ANGLE_THRESHOLD_DEG = 15.0 # 朝向容差,超过此值才按 A/D 转向
ANGLE_DEADZONE_DEG = 5.0 # 死区,此范围内不按 A/D、只按 W减少左右微调
# 随机行为跳跃概率每帧触发概率0.005 ≈ 平均 200 帧一次)
RANDOM_JUMP_PROB = 0.05
class CoordinatePatrol:
"""按航点坐标巡逻,用 pyautogui 按键转向与前进。"""
def __init__(
self,
waypoints,
arrival_threshold=0.5,
angle_threshold_deg=ANGLE_THRESHOLD_DEG,
angle_deadzone_deg=ANGLE_DEADZONE_DEG,
random_jump_prob=RANDOM_JUMP_PROB,
mount_key="x",
mount_hold_sec=1.6,
mount_retry_after_sec=2.0,
):
"""
Args:
waypoints: 航点列表,游戏坐标格式 [(x1, y1), (x2, y2), ...],如 [(26.0, 78.0), (30.0, 82.0)]
arrival_threshold: 到达判定距离(游戏坐标单位),默认 0.5
angle_threshold_deg: 朝向容差(度),超过此值才按 A/D 转向,默认 ANGLE_THRESHOLD_DEG
angle_deadzone_deg: 转向死区(度),此范围内不按 A/D、只按 W默认 ANGLE_DEADZONE_DEG
mount_key / mount_hold_sec / mount_retry_after_sec: 未上马时先上马(与 game_state_config / GUI 一致)
"""
self.waypoints = waypoints
self.current_index = 0
self.arrival_threshold = arrival_threshold
self.angle_threshold_deg = angle_threshold_deg
self.angle_deadzone_deg = angle_deadzone_deg
self.random_jump_prob = float(random_jump_prob)
self.logger = logging.getLogger(__name__)
self.last_turn_end_time = 0 # 最近一次结束 A/D 转向的时间,供卡死检测排除原地转向
self.stuck_handler = StuckHandler()
self._next_mount_allowed = 0.0
self.mount_key = str(mount_key).strip() or "x"
self.mount_hold_sec = float(mount_hold_sec)
self.mount_retry_after_sec = float(mount_retry_after_sec)
def _ensure_mounted(self, state):
"""
已上马返回 True。
未上马则松开移动键、按住上马键 MOUNT_HOLD_SEC本帧不走路返回 False。
state 无 mounted 字段时视为无法判断,不拦巡逻(兼容未开 LogicBeacon
"""
if "mounted" not in state:
return True
if state.get("mounted"):
return True
self.stop_all()
now = time.time()
if now < self._next_mount_allowed:
return False
self.logger.info(
f">>> 未上马,先按 {self.mount_key} {self.mount_hold_sec:.1f}s 上马"
)
pyautogui.keyDown(self.mount_key)
time.sleep(self.mount_hold_sec)
pyautogui.keyUp(self.mount_key)
self._next_mount_allowed = time.time() + self.mount_retry_after_sec
return False
@staticmethod
def get_distance(p1, p2):
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
def snap_to_forward_waypoint(self, current_pos, max_ahead=10, max_dist=10.0, skip_current=True):
"""
根据当前位置,将巡逻索引 current_index 智能“接回”前方最近的航点。
典型场景:战斗结束后,角色可能偏离路径,希望从前进方向上的最近航点继续巡逻,
而不是跑回战斗前的旧航点。
Args:
current_pos: 当前坐标 (x, y),游戏坐标。
max_ahead: 向前最多查看多少个航点(以 current_index 为起点),默认 10。
max_dist: 认为“可以接回”的最大距离阈值,超过则不调整索引,单位同坐标,默认 10.0。
skip_current: 是否跳过 current_index 本身,直接从下一个点开始找,避免立刻又去原来的点。
Returns:
bool: True 表示已调整 current_indexFalse 表示未调整(距离太远或数据不完整)。
"""
if current_pos is None:
return False
if not self.waypoints:
return False
try:
cx, cy = float(current_pos[0]), float(current_pos[1])
except Exception:
return False
n = len(self.waypoints)
if n == 0:
return False
start_idx = (self.current_index + (1 if skip_current else 0)) % n
best_idx = None
best_dist = None
# 在 current_index 之后的若干个点里寻找最近的一个
for offset in range(max_ahead + 1):
idx = (start_idx + offset) % n
wp = self.waypoints[idx]
d = self.get_distance((cx, cy), (float(wp[0]), float(wp[1])))
if (best_dist is None) or (d < best_dist):
best_dist = d
best_idx = idx
if best_idx is None or best_dist is None:
return False
if best_dist > float(max_dist):
# 离前方所有点都太远,则不强行跳索引,维持原行为
return False
# 将当前巡逻索引接到前方最近的点或其后一个点
self.current_index = best_idx
self.reset_stuck()
self.logger.info(
f">>> 战斗结束智能接回巡逻current_pos={current_pos}, "
f"接入点索引={best_idx}, 距离={best_dist:.2f}"
)
return True
def get_target_heading_deg(self, current_pos, target_pos):
"""计算朝向目标所需的游戏朝向。dx_actual ∝ sin(heading)dy_actual ∝ -cos(heading) → 目标朝向 = atan2(dx, -dy)。"""
dx = target_pos[0] - current_pos[0]
dy = target_pos[1] - current_pos[1]
if dx == 0 and dy == 0:
return 0.0
return math.degrees(math.atan2(dx, -dy)) % 360
def _turn_duration_from_angle(self, angle_diff_deg):
"""
专门为“边走边转”设计的非线性脉冲函数。
逻辑:
- 大于 45°给予一个较长脉冲快速切入方向。
- 15° 到 45°线性修正。
- 小于 15°极小脉冲防止在直线上摇摆。
"""
abs_angle = abs(angle_diff_deg)
# 基础转向系数 (针对边走边转优化,值越小越不容易画龙)
# 推荐范围 0.002 - 0.0035
scale_factor = 0.0025
if abs_angle > 45:
# 大角度:给予一个明显的转向力,但不超过 0.2s 避免丢失 W 的推力感
base_duration = min(abs_angle * scale_factor * 1.2, 0.20)
elif abs_angle > 15:
# 中角度:标准线性修正
base_duration = abs_angle * scale_factor
else:
# 小角度微调:使用极短脉冲 (物理按键触发的底线)
base_duration = 0.04
# 随机化:模拟人工轻点按键的不规则性
# 这里的随机范围要小,否则会导致修正量不稳定
jitter = random.uniform(0.005, 0.015)
final_duration = base_duration + jitter
# 最终安全检查:严禁超过 0.25s,否则会造成明显的顿挫
return min(final_duration, 0.25)
def reset_stuck(self):
"""重置卡死检测状态(切航点、停止巡逻、死亡/后勤/战斗时由外部调用)。"""
self.stuck_handler.reset()
def navigate(self, state):
"""
优化版导航:支持边走边转,非阻塞平滑移动。
"""
if state is None:
self.stop_all()
return
# 1. 基础状态解析与校验
x, y = state.get("x"), state.get("y")
heading = state.get("facing")
if x is None or y is None or heading is None:
self.logger.warning("state 数据不完整,跳过导航帧")
self.stop_all()
return
if not self._ensure_mounted(state):
return
# 2. 卡死检测:位移检测逻辑保持在巡逻中
if self.stuck_handler.check_stuck(state, self.last_turn_end_time):
self.logger.error("!!! 检测到卡死,启动脱困程序 !!!")
self.stop_all()
self.stuck_handler.resolve_stuck()
return
curr_pos = (float(x), float(y))
heading = float(heading)
target_pos = self.waypoints[self.current_index]
dist = self.get_distance(curr_pos, target_pos)
# 3. 到达判定(到点后平滑切换下一个航点,不再完全刹车)
if dist < self.arrival_threshold:
self.logger.info(f">>> 到达航点 {self.current_index},切换至下一目标(平滑过渡)")
self.current_index = (self.current_index + 1) % len(self.waypoints)
# 重置卡死检测,避免在航点附近因为轻微抖动被误判卡死
self.reset_stuck()
# 不再调用 stop_all(),让 W 保持按下,下一帧直接朝下一个航点继续前进
return
# 4. 计算航向逻辑
target_heading = self.get_target_heading_deg(curr_pos, target_pos)
angle_diff = (target_heading - heading + 180) % 360 - 180
abs_diff = abs(angle_diff)
# --- [平滑移动核心:边走边转控制层] ---
# A. 始终保持前进动力
pyautogui.keyDown("w")
# B. 转向决策逻辑
if abs_diff <= self.angle_deadzone_deg:
if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次
self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困")
pyautogui.press("space")
# 在死区内:绝对直线行驶,松开所有转向键
pyautogui.keyUp("a")
pyautogui.keyUp("d")
elif abs_diff > self.angle_threshold_deg:
# 超出阈值:执行平滑脉冲转向
turn_key = "d" if angle_diff > 0 else "a"
other_key = "a" if angle_diff > 0 else "d"
# 确保不会同时按下左右键
pyautogui.keyUp(other_key)
# 获取针对平滑移动优化的短脉冲时长
duration = self._turn_duration_from_angle(abs_diff)
# 关键:执行短促转向脉冲
pyautogui.keyDown(turn_key)
# 这里的 sleep 极短 (建议 < 0.1s),不会造成移动卡顿
time.sleep(duration)
pyautogui.keyUp(turn_key)
self.last_turn_end_time = time.time()
# self.logger.debug(f"修正航向: {turn_key} | 角度差: {angle_diff:.1f}°")
else:
# 在死区与阈值之间:为了平滑,不进行任何按键动作,仅靠 W 前进
pyautogui.keyUp("a")
pyautogui.keyUp("d")
return
def navigate_to_point(self, state, target_pos, arrival_threshold=None):
"""
优化版:平滑导航,支持边走边转。
"""
if state is None:
self.stop_all()
return False
current_pos = (state.get('x'), state.get('y'))
current_facing = state.get('facing')
if None in current_pos or current_facing is None:
self.stop_all()
return False
if not self._ensure_mounted(state):
return False
# 1. 卡死检测
if self.stuck_handler.check_stuck(state, self.last_turn_end_time):
self.stop_all()
self.stuck_handler.resolve_stuck()
return False
current_pos = (float(current_pos[0]), float(current_pos[1]))
current_facing = float(current_facing)
threshold = arrival_threshold if arrival_threshold is not None else self.arrival_threshold
# 2. 距离判断
dist = self.get_distance(current_pos, target_pos)
if dist < threshold:
self.stop_all()
return True
# 3. 计算角度差
target_heading = self.get_target_heading_deg(current_pos, target_pos)
angle_diff = (target_heading - current_facing + 180) % 360 - 180
abs_diff = abs(angle_diff)
# --- 平滑移动核心逻辑 ---
# 只要没到终点,始终保持前进 W 键按下
pyautogui.keyDown("w")
# 4. 根据偏角决定转向动作
if abs_diff <= self.angle_deadzone_deg:
if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次
self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困")
pyautogui.press("space")
# 在死区内,确保转向键松开,直线前进
pyautogui.keyUp("a")
pyautogui.keyUp("d")
else:
# 在死区外,需要修正方向
turn_key = "d" if angle_diff > 0 else "a"
other_key = "a" if angle_diff > 0 else "d"
# 松开反方向键
pyautogui.keyUp(other_key)
# 计算本次修正的时长
# 边走边转时duration 应比原地转向更短,建议使用 0.05s - 0.1s 的短脉冲
duration = self._turn_duration_from_angle(abs_diff)
# 限制单次修正时间,防止转过头导致“画龙”
safe_duration = min(duration, 0.15)
pyautogui.keyDown(turn_key)
time.sleep(safe_duration) # 短暂物理延迟确保按键生效
pyautogui.keyUp(turn_key)
self.last_turn_end_time = time.time()
return False
def simple_flight_navigate(
self,
state,
target_x,
target_y,
*,
takeoff_key="space",
takeoff_hold_sec=2.0,
land_key="p",
land_hold_sec=4.0,
dismount_key=None,
arrival_threshold=0.003,
turn_deadzone_deg=5.0,
mount_wait_sec=2.5,
):
"""
极简飞行导航:只做 1.上马 2.起飞 3.飞向坐标 4.降落。
说明:
- 当前项目的飞行状态使用 `state['flight']`0/0.5/1并换算到 0..255 再按阈值判断阶段。
- 只做“极简控制”,不包含卡死检测与复杂脱困逻辑。
"""
if state is None:
return False
curr_x, curr_y = state.get("x"), state.get("y")
curr_facing = state.get("facing")
if None in (curr_x, curr_y, curr_facing):
return False
# 统一 mount 逻辑:优先使用 mounted 字段
if "mounted" in state and state.get("mounted") is False:
# 用与其它逻辑一致的上马方式,按 game_state_config 的 mount_key/mount_hold_sec
ok = self._ensure_mounted(state)
return False if not ok else False
# 归一化飞行信号:把 state['flight'] 映射为 0..255
flight_raw = state.get("flight", None)
f_signal = None
if isinstance(flight_raw, (int, float)):
# 若是三档 0/0.5/1则换算到 0..255
if float(flight_raw) <= 1.0:
f_signal = float(flight_raw) * 255.0
else:
f_signal = float(flight_raw)
# 没有飞行信息:只能退化为“先尝试上马,再尝试起飞”
if f_signal is None:
ok = self._ensure_mounted(state)
if not ok:
return False
pyautogui.keyDown(str(takeoff_key))
time.sleep(float(takeoff_hold_sec))
pyautogui.keyUp(str(takeoff_key))
return False
# 计算距离
dist = math.sqrt((target_x - float(curr_x)) ** 2 + (target_y - float(curr_y)) ** 2)
dismount_key = str(dismount_key).strip() if dismount_key else self.mount_key
dismount_key = dismount_key or "x"
# --- 1. 上马阶段兼容你的阈值写法f_signal < 10 ---
if f_signal < 10:
# 退化:直接按 mount_key 一次并等待
self.logger.info(">>> 飞行阶段检测:未上马,先按 %s", self.mount_key)
pyautogui.press(self.mount_key)
time.sleep(float(mount_wait_sec))
return False
# --- 2. 起飞阶段兼容你的阈值写法100~200 ---
if 100 < f_signal < 200:
self.logger.info(">>> 起飞:按住 %s %.1f", takeoff_key, float(takeoff_hold_sec))
pyautogui.keyDown(str(takeoff_key))
time.sleep(float(takeoff_hold_sec))
pyautogui.keyUp(str(takeoff_key))
return False
# --- 3. 巡航与降落判定(兼容你的阈值写法:>240 ---
if f_signal > 240:
if dist > float(arrival_threshold):
# A. 始终按住 W 前进
pyautogui.keyDown("w")
# B. 修正方向(使用与本模块一致的朝向计算)
target_heading = self.get_target_heading_deg((float(curr_x), float(curr_y)), (float(target_x), float(target_y)))
angle_diff = (target_heading - float(curr_facing) + 180) % 360 - 180
abs_diff = abs(angle_diff)
# 在死区内:确保转向键松开,减少左右抖动
if abs_diff <= float(turn_deadzone_deg):
pyautogui.keyUp("a")
pyautogui.keyUp("d")
return False
# 防止“上一个修正刚结束,下一帧立刻反向再修正”造成摆头
now = time.time()
last_t = getattr(self, "_last_simple_flight_turn_time", 0.0)
if now - last_t < 0.08:
return False
key = "d" if angle_diff > 0 else "a"
other_key = "a" if key == "d" else "d"
pyautogui.keyUp(other_key)
pyautogui.keyDown(key)
# 修正脉冲时长随角度误差变化,避免固定 0.05s 过冲/欠冲
duration = self._turn_duration_from_angle(abs_diff)
safe_duration = min(float(duration), 0.12)
# 误差只略大于死区时,进一步缩短,减少接近目标时的来回摆
if abs_diff < float(turn_deadzone_deg) * 1.8:
safe_duration = min(safe_duration, 0.04)
time.sleep(safe_duration)
pyautogui.keyUp(key)
setattr(self, "_last_simple_flight_turn_time", now)
return False
# C. 到达目标,执行降落
pyautogui.keyUp("w")
self.logger.info(">>> 到达目标,执行降落:按住 %s %.1f", land_key, float(land_hold_sec))
pyautogui.keyDown(str(land_key))
time.sleep(float(land_hold_sec))
pyautogui.keyUp(str(land_key))
pyautogui.press(dismount_key)
self.logger.info(">>> 降落完成:已按 %s", dismount_key)
return True
return False
def navigate_path(self, get_state, path, forward=True, arrival_threshold=None):
"""
按 path 依次走完所有点后返回。每次调用都必须传入 path。
get_state: 可调用对象,每次调用返回当前状态 dict需包含 'x','y','facing'
若含 'mounted'LogicBeacon路径开始前会先上马每段 navigate_to_point 也会校验。
path: [[x,y], ...] 或 [(x,y), ...](如 json.load 得到),本次要走的全部航点。
forward: True=正序0→nFalse=倒序n→0
阻塞直到走完 path 中所有点或出错,走完返回 True。内含卡死检测。
"""
if not path:
self.stop_all()
return True
points = [(float(p[0]), float(p[1])) for p in path]
if not forward:
points = points[::-1]
# 路径开始前:若 state 含 mounted 且未上马,先完成上马再逐点移动
while True:
state = get_state()
if state is None:
self.stop_all()
return False
if self._ensure_mounted(state):
break
time.sleep(0.05)
for i, target in enumerate(points):
self.logger.info(f">>> 路径点 {i + 1}/{len(points)}: {target}")
while True:
state = get_state()
if state is None:
self.stop_all()
return False
arrived = self.navigate_to_point(state, target, arrival_threshold)
if arrived:
break
time.sleep(0.05)
self.stop_all()
self.logger.info(">>> 路径走完")
return True
def press_key(self, key):
"""只按下指定键,抬起其他移动键。"""
for k in ("w", "a", "d"):
if k == key:
pyautogui.keyDown(k)
else:
pyautogui.keyUp(k)
def stop_all(self):
for key in ["w", "a", "s", "d"]:
pyautogui.keyUp(key)