first commit

This commit is contained in:
王鹏
2026-03-18 09:04:37 +08:00
commit b7719b377d
121 changed files with 116104 additions and 0 deletions

349
coordinate_patrol.py Normal file
View File

@@ -0,0 +1,349 @@
"""
坐标巡逻模块:按航点列表循环巡逻,朝向计算与 player_movement 约定一致atan2(dx,-dy));内含卡死检测与脱困。
"""
import math
import random
import time
import logging
import pyautogui
from stuck_handler import StuckHandler
# 转向与死区常量(度)
ANGLE_THRESHOLD_DEG = 15.0 # 朝向容差,超过此值才按 A/D 转向
ANGLE_DEADZONE_DEG = 5.0 # 死区,此范围内不按 A/D、只按 W减少左右微调
# 随机行为跳跃概率每帧触发概率0.005 ≈ 平均 200 帧一次)
RANDOM_JUMP_PROB = 0.05
class CoordinatePatrol:
"""按航点坐标巡逻,用 pyautogui 按键转向与前进。"""
def __init__(
self,
waypoints,
arrival_threshold=0.5,
angle_threshold_deg=ANGLE_THRESHOLD_DEG,
angle_deadzone_deg=ANGLE_DEADZONE_DEG,
random_jump_prob=RANDOM_JUMP_PROB,
):
"""
Args:
waypoints: 航点列表,游戏坐标格式 [(x1, y1), (x2, y2), ...],如 [(26.0, 78.0), (30.0, 82.0)]
arrival_threshold: 到达判定距离(游戏坐标单位),默认 0.5
angle_threshold_deg: 朝向容差(度),超过此值才按 A/D 转向,默认 ANGLE_THRESHOLD_DEG
angle_deadzone_deg: 转向死区(度),此范围内不按 A/D、只按 W默认 ANGLE_DEADZONE_DEG
"""
self.waypoints = waypoints
self.current_index = 0
self.arrival_threshold = arrival_threshold
self.angle_threshold_deg = angle_threshold_deg
self.angle_deadzone_deg = angle_deadzone_deg
self.random_jump_prob = float(random_jump_prob)
self.logger = logging.getLogger(__name__)
self.last_turn_end_time = 0 # 最近一次结束 A/D 转向的时间,供卡死检测排除原地转向
self.stuck_handler = StuckHandler()
@staticmethod
def get_distance(p1, p2):
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
def snap_to_forward_waypoint(self, current_pos, max_ahead=10, max_dist=10.0, skip_current=True):
"""
根据当前位置,将巡逻索引 current_index 智能“接回”前方最近的航点。
典型场景:战斗结束后,角色可能偏离路径,希望从前进方向上的最近航点继续巡逻,
而不是跑回战斗前的旧航点。
Args:
current_pos: 当前坐标 (x, y),游戏坐标。
max_ahead: 向前最多查看多少个航点(以 current_index 为起点),默认 10。
max_dist: 认为“可以接回”的最大距离阈值,超过则不调整索引,单位同坐标,默认 10.0。
skip_current: 是否跳过 current_index 本身,直接从下一个点开始找,避免立刻又去原来的点。
Returns:
bool: True 表示已调整 current_indexFalse 表示未调整(距离太远或数据不完整)。
"""
if current_pos is None:
return False
if not self.waypoints:
return False
try:
cx, cy = float(current_pos[0]), float(current_pos[1])
except Exception:
return False
n = len(self.waypoints)
if n == 0:
return False
start_idx = (self.current_index + (1 if skip_current else 0)) % n
best_idx = None
best_dist = None
# 在 current_index 之后的若干个点里寻找最近的一个
for offset in range(max_ahead + 1):
idx = (start_idx + offset) % n
wp = self.waypoints[idx]
d = self.get_distance((cx, cy), (float(wp[0]), float(wp[1])))
if (best_dist is None) or (d < best_dist):
best_dist = d
best_idx = idx
if best_idx is None or best_dist is None:
return False
if best_dist > float(max_dist):
# 离前方所有点都太远,则不强行跳索引,维持原行为
return False
# 将当前巡逻索引接到前方最近的点或其后一个点
self.current_index = best_idx
self.reset_stuck()
self.logger.info(
f">>> 战斗结束智能接回巡逻current_pos={current_pos}, "
f"接入点索引={best_idx}, 距离={best_dist:.2f}"
)
return True
def get_target_heading_deg(self, current_pos, target_pos):
"""计算朝向目标所需的游戏朝向。dx_actual ∝ sin(heading)dy_actual ∝ -cos(heading) → 目标朝向 = atan2(dx, -dy)。"""
dx = target_pos[0] - current_pos[0]
dy = target_pos[1] - current_pos[1]
if dx == 0 and dy == 0:
return 0.0
return math.degrees(math.atan2(dx, -dy)) % 360
def _turn_duration_from_angle(self, angle_diff_deg):
"""
专门为“边走边转”设计的非线性脉冲函数。
逻辑:
- 大于 45°给予一个较长脉冲快速切入方向。
- 15° 到 45°线性修正。
- 小于 15°极小脉冲防止在直线上摇摆。
"""
abs_angle = abs(angle_diff_deg)
# 基础转向系数 (针对边走边转优化,值越小越不容易画龙)
# 推荐范围 0.002 - 0.0035
scale_factor = 0.0025
if abs_angle > 45:
# 大角度:给予一个明显的转向力,但不超过 0.2s 避免丢失 W 的推力感
base_duration = min(abs_angle * scale_factor * 1.2, 0.20)
elif abs_angle > 15:
# 中角度:标准线性修正
base_duration = abs_angle * scale_factor
else:
# 小角度微调:使用极短脉冲 (物理按键触发的底线)
base_duration = 0.04
# 随机化:模拟人工轻点按键的不规则性
# 这里的随机范围要小,否则会导致修正量不稳定
jitter = random.uniform(0.005, 0.015)
final_duration = base_duration + jitter
# 最终安全检查:严禁超过 0.25s,否则会造成明显的顿挫
return min(final_duration, 0.25)
def reset_stuck(self):
"""重置卡死检测状态(切航点、停止巡逻、死亡/后勤/战斗时由外部调用)。"""
self.stuck_handler.reset()
def navigate(self, state):
"""
优化版导航:支持边走边转,非阻塞平滑移动。
"""
if state is None:
self.stop_all()
return
# 1. 基础状态解析与校验
x, y = state.get("x"), state.get("y")
heading = state.get("facing")
if x is None or y is None or heading is None:
self.logger.warning("state 数据不完整,跳过导航帧")
self.stop_all()
return
# 2. 卡死检测:位移检测逻辑保持在巡逻中
if self.stuck_handler.check_stuck(state, self.last_turn_end_time):
self.logger.error("!!! 检测到卡死,启动脱困程序 !!!")
self.stop_all()
self.stuck_handler.resolve_stuck()
return
curr_pos = (float(x), float(y))
heading = float(heading)
target_pos = self.waypoints[self.current_index]
dist = self.get_distance(curr_pos, target_pos)
# 3. 到达判定(到点后平滑切换下一个航点,不再完全刹车)
if dist < self.arrival_threshold:
self.logger.info(f">>> 到达航点 {self.current_index},切换至下一目标(平滑过渡)")
self.current_index = (self.current_index + 1) % len(self.waypoints)
# 重置卡死检测,避免在航点附近因为轻微抖动被误判卡死
self.reset_stuck()
# 不再调用 stop_all(),让 W 保持按下,下一帧直接朝下一个航点继续前进
return
# 4. 计算航向逻辑
target_heading = self.get_target_heading_deg(curr_pos, target_pos)
angle_diff = (target_heading - heading + 180) % 360 - 180
abs_diff = abs(angle_diff)
# --- [平滑移动核心:边走边转控制层] ---
# A. 始终保持前进动力
pyautogui.keyDown("w")
# B. 转向决策逻辑
if abs_diff <= self.angle_deadzone_deg:
if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次
self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困")
pyautogui.press("space")
# 在死区内:绝对直线行驶,松开所有转向键
pyautogui.keyUp("a")
pyautogui.keyUp("d")
elif abs_diff > self.angle_threshold_deg:
# 超出阈值:执行平滑脉冲转向
turn_key = "d" if angle_diff > 0 else "a"
other_key = "a" if angle_diff > 0 else "d"
# 确保不会同时按下左右键
pyautogui.keyUp(other_key)
# 获取针对平滑移动优化的短脉冲时长
duration = self._turn_duration_from_angle(abs_diff)
# 关键:执行短促转向脉冲
pyautogui.keyDown(turn_key)
# 这里的 sleep 极短 (建议 < 0.1s),不会造成移动卡顿
time.sleep(duration)
pyautogui.keyUp(turn_key)
self.last_turn_end_time = time.time()
# self.logger.debug(f"修正航向: {turn_key} | 角度差: {angle_diff:.1f}°")
else:
# 在死区与阈值之间:为了平滑,不进行任何按键动作,仅靠 W 前进
pyautogui.keyUp("a")
pyautogui.keyUp("d")
return
def navigate_to_point(self, state, target_pos, arrival_threshold=None):
"""
优化版:平滑导航,支持边走边转。
"""
if state is None:
self.stop_all()
return False
current_pos = (state.get('x'), state.get('y'))
current_facing = state.get('facing')
if None in current_pos or current_facing is None:
self.stop_all()
return False
# 1. 卡死检测
if self.stuck_handler.check_stuck(state, self.last_turn_end_time):
self.stop_all()
self.stuck_handler.resolve_stuck()
return False
current_pos = (float(current_pos[0]), float(current_pos[1]))
current_facing = float(current_facing)
threshold = arrival_threshold if arrival_threshold is not None else self.arrival_threshold
# 2. 距离判断
dist = self.get_distance(current_pos, target_pos)
if dist < threshold:
self.stop_all()
return True
# 3. 计算角度差
target_heading = self.get_target_heading_deg(current_pos, target_pos)
angle_diff = (target_heading - current_facing + 180) % 360 - 180
abs_diff = abs(angle_diff)
# --- 平滑移动核心逻辑 ---
# 只要没到终点,始终保持前进 W 键按下
pyautogui.keyDown("w")
# 4. 根据偏角决定转向动作
if abs_diff <= self.angle_deadzone_deg:
if random.random() < self.random_jump_prob: # 频率建议设低,约每 200 帧跳一次
self.logger.info(">>> 随机跳跃:模拟真人并辅助脱困")
pyautogui.press("space")
# 在死区内,确保转向键松开,直线前进
pyautogui.keyUp("a")
pyautogui.keyUp("d")
else:
# 在死区外,需要修正方向
turn_key = "d" if angle_diff > 0 else "a"
other_key = "a" if angle_diff > 0 else "d"
# 松开反方向键
pyautogui.keyUp(other_key)
# 计算本次修正的时长
# 边走边转时duration 应比原地转向更短,建议使用 0.05s - 0.1s 的短脉冲
duration = self._turn_duration_from_angle(abs_diff)
# 限制单次修正时间,防止转过头导致“画龙”
safe_duration = min(duration, 0.15)
pyautogui.keyDown(turn_key)
time.sleep(safe_duration) # 短暂物理延迟确保按键生效
pyautogui.keyUp(turn_key)
self.last_turn_end_time = time.time()
return False
def navigate_path(self, get_state, path, forward=True, arrival_threshold=None):
"""
按 path 依次走完所有点后返回。每次调用都必须传入 path。
get_state: 可调用对象,每次调用返回当前状态 dict需包含 'x','y','facing'
path: [[x,y], ...] 或 [(x,y), ...](如 json.load 得到),本次要走的全部航点。
forward: True=正序0→nFalse=倒序n→0
阻塞直到走完 path 中所有点或出错,走完返回 True。内含卡死检测。
"""
if not path:
self.stop_all()
return True
points = [(float(p[0]), float(p[1])) for p in path]
if not forward:
points = points[::-1]
for i, target in enumerate(points):
self.logger.info(f">>> 路径点 {i + 1}/{len(points)}: {target}")
while True:
state = get_state()
if state is None:
self.stop_all()
return False
arrived = self.navigate_to_point(state, target, arrival_threshold)
if arrived:
break
time.sleep(0.05)
self.stop_all()
self.logger.info(">>> 路径走完")
return True
def press_key(self, key):
"""只按下指定键,抬起其他移动键。"""
for k in ("w", "a", "d"):
if k == key:
pyautogui.keyDown(k)
else:
pyautogui.keyUp(k)
def stop_all(self):
for key in ["w", "a", "s", "d"]:
pyautogui.keyUp(key)