161 lines
5.9 KiB
Python
161 lines
5.9 KiB
Python
|
|
"""
|
|||
|
|
战斗引擎模块,负责核心战斗逻辑实现
|
|||
|
|
"""
|
|||
|
|
import time
|
|||
|
|
import logging
|
|||
|
|
import pyautogui
|
|||
|
|
from config import GameConfig
|
|||
|
|
from text_finder import TextFinder
|
|||
|
|
from combat_detector import CombatDetector
|
|||
|
|
from player_movement import PlayerMovement
|
|||
|
|
from game_state import GameStateReader
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CombatEngine:
|
|||
|
|
|
|||
|
|
# 巡逻路径:依次移动到这些坐标,循环往复
|
|||
|
|
PATROL_WAYPOINTS = [
|
|||
|
|
(23.8, 71.0),
|
|||
|
|
(27.0, 79.0),
|
|||
|
|
(31.0, 72.0)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 每走完 PATROL_STEP 坐标单位后检测一次目标;值越小检测越频繁但移动越碎
|
|||
|
|
PATROL_STEP = 3.0
|
|||
|
|
def __init__(self, config: GameConfig):
|
|||
|
|
self.config = config
|
|||
|
|
self.screen_width, self.screen_height = pyautogui.size()
|
|||
|
|
self._setup_safety_measures()
|
|||
|
|
self.text_finder = TextFinder()
|
|||
|
|
self.combat_detector = CombatDetector(config)
|
|||
|
|
self.player_movement = PlayerMovement()
|
|||
|
|
self.game_state_reader = GameStateReader()
|
|||
|
|
self.logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
def _setup_safety_measures(self) -> None:
|
|||
|
|
"""设置安全防护措施"""
|
|||
|
|
pyautogui.FAILSAFE = True
|
|||
|
|
pyautogui.PAUSE = self.config.action_delay
|
|||
|
|
|
|||
|
|
def _find_target_on_screen(self) -> bool:
|
|||
|
|
"""在屏幕上寻找怪物目标"""
|
|||
|
|
try:
|
|||
|
|
# 先按3键
|
|||
|
|
pyautogui.press('3')
|
|||
|
|
# 等待一段时间,确保画面更新
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
pyautogui.press('4')
|
|||
|
|
time.sleep(3)
|
|||
|
|
if self.combat_detector.is_in_combat():
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
return False
|
|||
|
|
except Exception as e:
|
|||
|
|
raise RuntimeError(f"目标识别失败: {str(e)}")
|
|||
|
|
|
|||
|
|
def _execute_attack(self) -> None:
|
|||
|
|
"""执行攻击动作"""
|
|||
|
|
try:
|
|||
|
|
# 等待1秒
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
|
|||
|
|
if(self.combat_detector.is_in_combat()):
|
|||
|
|
while(self.combat_detector.is_in_combat()):
|
|||
|
|
pyautogui.press('2')
|
|||
|
|
# time.sleep(1)
|
|||
|
|
pyautogui.press('4')
|
|||
|
|
else:
|
|||
|
|
# time.sleep(1)
|
|||
|
|
# 怪物死亡,按4键拾取物品
|
|||
|
|
pyautogui.press('4')
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
pyautogui.press('4')
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
raise RuntimeError(f"攻击执行失败: {str(e)}")
|
|||
|
|
|
|||
|
|
def run_combat_loop(self) -> None:
|
|||
|
|
"""战斗主循环。
|
|||
|
|
|
|||
|
|
按顺序巡逻 PATROL_WAYPOINTS 中的坐标:
|
|||
|
|
- 每移动一小段后检测是否存在目标;
|
|||
|
|
- 找到目标立即停止移动,执行攻击,打完后继续巡逻;
|
|||
|
|
- 未找到目标则继续向当前巡逻点前进,到达后切换下一个。
|
|||
|
|
"""
|
|||
|
|
waypoint_index = 0
|
|||
|
|
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
# 获取并打印游戏状态(血量、法力、战斗、目标)
|
|||
|
|
state = self.game_state_reader.get_game_state()
|
|||
|
|
self.logger.info(
|
|||
|
|
f"游戏状态 | 血量:{state['hp_percent']}% 法力:{state['mp_percent']}% "
|
|||
|
|
f"战斗中:{state['in_combat']} 有目标:{state['has_target']}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# target_x, target_y = self.PATROL_WAYPOINTS[waypoint_index]
|
|||
|
|
# self.logger.info(f"前往巡逻点 ({target_x}, {target_y})")
|
|||
|
|
|
|||
|
|
# # 分段移动:每次最多走 MOVE_STEP_DURATION 秒,间隔检测目标
|
|||
|
|
# target_found = self._move_until_target_or_arrived(target_x, target_y)
|
|||
|
|
|
|||
|
|
# if target_found:
|
|||
|
|
# self.logger.info("发现目标,停止移动,开始攻击")
|
|||
|
|
# self._execute_attack()
|
|||
|
|
# self.logger.info("攻击结束,继续巡逻")
|
|||
|
|
# else:
|
|||
|
|
# self.logger.info(f"已到达巡逻点 ({target_x}, {target_y}),切换下一个")
|
|||
|
|
# waypoint_index = (waypoint_index + 1) % len(self.PATROL_WAYPOINTS)
|
|||
|
|
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
if self.config.auto_recovery:
|
|||
|
|
time.sleep(self.config.error_recovery_delay)
|
|||
|
|
continue
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def _move_until_target_or_arrived(self, target_x: float, target_y: float) -> bool:
|
|||
|
|
"""向目标坐标移动,期间每走一小段就检测一次是否存在目标。
|
|||
|
|
|
|||
|
|
到达判定使用轴对齐方式(与 player_movement.move_to 保持一致),
|
|||
|
|
避免欧氏距离与轴对齐判定不一致导致的死循环。
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
bool: 找到目标返回 True;到达目标坐标未找到目标返回 False
|
|||
|
|
"""
|
|||
|
|
import math
|
|||
|
|
from player_position import PlayerPosition
|
|||
|
|
|
|||
|
|
tol = self.config.move_tolerance
|
|||
|
|
pos_reader = PlayerPosition()
|
|||
|
|
|
|||
|
|
while True:
|
|||
|
|
pos = pos_reader.get_position_with_retry()
|
|||
|
|
if pos is None:
|
|||
|
|
self.logger.warning("无法读取坐标,中止移动")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
dx = target_x - pos[0]
|
|||
|
|
dy = target_y - pos[1]
|
|||
|
|
|
|||
|
|
# 轴对齐到达判定(与 player_movement.move_to 内部逻辑一致)
|
|||
|
|
if abs(dx) <= tol and abs(dy) <= tol:
|
|||
|
|
self.logger.info(f"已到达巡逻点 ({target_x}, {target_y})")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
distance = math.sqrt(dx ** 2 + dy ** 2)
|
|||
|
|
|
|||
|
|
# 检测目标(每移动一小段检测一次)
|
|||
|
|
if self._find_target_on_screen():
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
# 向目标方向走最多 PATROL_STEP 坐标单位,走完后再检测一次目标
|
|||
|
|
step = min(distance, self.PATROL_STEP)
|
|||
|
|
ratio = step / distance
|
|||
|
|
mid_x = pos[0] + dx * ratio
|
|||
|
|
mid_y = pos[1] + dy * ratio
|
|||
|
|
|
|||
|
|
self.logger.info(f"移动小段 → ({mid_x:.2f}, {mid_y:.2f}),剩余距离 {distance:.2f}")
|
|||
|
|
self.player_movement.move_to_position(mid_x, mid_y, tolerance=tol)
|