""" 战斗引擎模块,负责核心战斗逻辑实现 """ import time import logging import pyautogui from config import GameConfig from text_finder import TextFinder from combat_detector import CombatDetector from player_movement import PlayerMovement from game_state import GameStateReader class CombatEngine: # 巡逻路径:依次移动到这些坐标,循环往复 PATROL_WAYPOINTS = [ (23.8, 71.0), (27.0, 79.0), (31.0, 72.0) ] # 每走完 PATROL_STEP 坐标单位后检测一次目标;值越小检测越频繁但移动越碎 PATROL_STEP = 3.0 def __init__(self, config: GameConfig): self.config = config self.screen_width, self.screen_height = pyautogui.size() self._setup_safety_measures() self.text_finder = TextFinder() self.combat_detector = CombatDetector(config) self.player_movement = PlayerMovement() self.game_state_reader = GameStateReader() self.logger = logging.getLogger(__name__) def _setup_safety_measures(self) -> None: """设置安全防护措施""" pyautogui.FAILSAFE = True pyautogui.PAUSE = self.config.action_delay def _find_target_on_screen(self) -> bool: """在屏幕上寻找怪物目标""" try: # 先按3键 pyautogui.press('3') # 等待一段时间,确保画面更新 time.sleep(0.5) pyautogui.press('4') time.sleep(3) if self.combat_detector.is_in_combat(): return True return False except Exception as e: raise RuntimeError(f"目标识别失败: {str(e)}") def _execute_attack(self) -> None: """执行攻击动作""" try: # 等待1秒 time.sleep(0.5) if(self.combat_detector.is_in_combat()): while(self.combat_detector.is_in_combat()): pyautogui.press('2') # time.sleep(1) pyautogui.press('4') else: # time.sleep(1) # 怪物死亡,按4键拾取物品 pyautogui.press('4') time.sleep(0.5) pyautogui.press('4') except Exception as e: raise RuntimeError(f"攻击执行失败: {str(e)}") def run_combat_loop(self) -> None: """战斗主循环。 按顺序巡逻 PATROL_WAYPOINTS 中的坐标: - 每移动一小段后检测是否存在目标; - 找到目标立即停止移动,执行攻击,打完后继续巡逻; - 未找到目标则继续向当前巡逻点前进,到达后切换下一个。 """ waypoint_index = 0 while True: try: # 获取并打印游戏状态(血量、法力、战斗、目标) state = self.game_state_reader.get_game_state() self.logger.info( f"游戏状态 | 血量:{state['hp_percent']}% 法力:{state['mp_percent']}% " f"战斗中:{state['in_combat']} 有目标:{state['has_target']}" ) # target_x, target_y = self.PATROL_WAYPOINTS[waypoint_index] # self.logger.info(f"前往巡逻点 ({target_x}, {target_y})") # # 分段移动:每次最多走 MOVE_STEP_DURATION 秒,间隔检测目标 # target_found = self._move_until_target_or_arrived(target_x, target_y) # if target_found: # self.logger.info("发现目标,停止移动,开始攻击") # self._execute_attack() # self.logger.info("攻击结束,继续巡逻") # else: # self.logger.info(f"已到达巡逻点 ({target_x}, {target_y}),切换下一个") # waypoint_index = (waypoint_index + 1) % len(self.PATROL_WAYPOINTS) except KeyboardInterrupt: raise except Exception as e: if self.config.auto_recovery: time.sleep(self.config.error_recovery_delay) continue raise def _move_until_target_or_arrived(self, target_x: float, target_y: float) -> bool: """向目标坐标移动,期间每走一小段就检测一次是否存在目标。 到达判定使用轴对齐方式(与 player_movement.move_to 保持一致), 避免欧氏距离与轴对齐判定不一致导致的死循环。 Returns: bool: 找到目标返回 True;到达目标坐标未找到目标返回 False """ import math from player_position import PlayerPosition tol = self.config.move_tolerance pos_reader = PlayerPosition() while True: pos = pos_reader.get_position_with_retry() if pos is None: self.logger.warning("无法读取坐标,中止移动") return False dx = target_x - pos[0] dy = target_y - pos[1] # 轴对齐到达判定(与 player_movement.move_to 内部逻辑一致) if abs(dx) <= tol and abs(dy) <= tol: self.logger.info(f"已到达巡逻点 ({target_x}, {target_y})") return False distance = math.sqrt(dx ** 2 + dy ** 2) # 检测目标(每移动一小段检测一次) if self._find_target_on_screen(): return True # 向目标方向走最多 PATROL_STEP 坐标单位,走完后再检测一次目标 step = min(distance, self.PATROL_STEP) ratio = step / distance mid_x = pos[0] + dx * ratio mid_y = pos[1] + dy * ratio self.logger.info(f"移动小段 → ({mid_x:.2f}, {mid_y:.2f}),剩余距离 {distance:.2f}") self.player_movement.move_to_position(mid_x, mid_y, tolerance=tol)