Files
wow/logistics_manager.py

419 lines
16 KiB
Python
Raw Normal View History

2026-03-18 09:04:37 +08:00
import json
import math
2026-05-07 08:33:10 +08:00
import os
2026-03-18 09:04:37 +08:00
import time
from hardware_control import hw_ctrl
2026-03-18 09:04:37 +08:00
# 修理商所在位置(游戏坐标),按实际位置修改
VENDOR_POS = (30.08, 71.51)
# 到达判定距离(与 coordinate_patrol 一致)
VENDOR_ARRIVAL_THRESHOLD = 0.1
# 修理商所在位置配置文件
VENDOR_FILE = 'vendor.json'
class LogisticsManager:
def __init__(self, route_file=None):
self.need_repair = False
self.bag_full = False
self.is_returning = False
self.route_file = route_file or VENDOR_FILE
2026-04-10 14:36:04 +08:00
self.bag_full_hearthstone = False # 包满时用炉石回城而非走路修理
self.hearthstone_key = "b" # 炉石按键
self.hearthstone_cast_sec = 12.0 # 炉石施法等待秒数
2026-05-07 08:33:10 +08:00
self.enable_bag_full_mail = False
self.logistics_full_auto = False
self.bag_slot_threshold = 2
self.durability_threshold = 0.2
2026-05-07 08:33:10 +08:00
self.mailbox_route_file = os.path.join("recorder", "mailbox.json")
self.mailbox_interact_key = "8"
self.mail_recipient_key = ""
self.mail_send_key = "f8"
self.mailbox_open_wait_sec = 2.0
self.mail_send_wait_sec = 60.0
self.repair_target_key = "8"
self.repair_interact_key = "4"
self.repair_interact_wait_sec = 2.0
2026-05-07 08:33:10 +08:00
def _resolve_path(self, path):
if not path:
return ""
path = str(path)
if os.path.isabs(path):
return path
cwd_path = os.path.abspath(path)
if os.path.exists(cwd_path):
return cwd_path
return os.path.join(os.path.dirname(os.path.abspath(__file__)), path)
def _sleep_with_stop(self, seconds, stop_check=None):
end_at = time.time() + max(0.0, float(seconds))
while time.time() < end_at:
if callable(stop_check) and stop_check():
return False
time.sleep(min(0.1, end_at - time.time()))
return True
def _read_position(self, get_state):
if not callable(get_state):
return None
st = get_state()
if not st:
return None
try:
x = st.get("x")
y = st.get("y")
if x is None or y is None:
return None
return float(x), float(y)
except Exception:
return None
2026-03-18 09:04:37 +08:00
def _state_bag_full(self, state):
try:
return int(state.get("free_slots", 0) or 0) < int(self.bag_slot_threshold)
except Exception:
return False
def _state_durability_low(self, state):
try:
threshold = float(self.durability_threshold)
if threshold > 1.0:
threshold = threshold / 100.0
durability = state.get("durability", 1.0)
if durability is None:
return False
return float(durability) < threshold
except Exception:
return False
def get_pending_tasks(self, state):
tasks = []
bag_full = self._state_bag_full(state)
durability_low = self._state_durability_low(state)
if bag_full and self.enable_bag_full_mail:
tasks.append("mail")
if durability_low:
tasks.append("repair")
if not tasks and bag_full and self.bag_full_hearthstone:
tasks.append("hearthstone_stop")
return tasks
2026-03-18 09:04:37 +08:00
def check_logistics(self, state):
"""
state['free_slots']: 剩余空格数量
state['durability']: 0.0 ~ 1.0 的耐久度
"""
# 触发阈值:空格少于配置值,或耐久度低于配置值。
if self.get_pending_tasks(state):
2026-03-18 09:04:37 +08:00
if not self.is_returning:
print(f">>> [后勤警告] 背包/耐久不足!触发回城程序。")
self.is_returning = True
else:
self.is_returning = False
def use_hearthstone_and_stop(self, get_state=None):
"""按炉石按键并等待施法完成,带有坐标校验的重试机制。"""
max_retries = 3
success = False
for i in range(max_retries):
start_pos = self._read_position(get_state)
print(f">>> [后勤] 第 {i+1} 次尝试使用炉石(按键: {self.hearthstone_key}...")
# 先按一下 S 确保停止移动,防止移动中按炉石失败
hw_ctrl.press('s')
time.sleep(0.5)
hw_ctrl.press(self.hearthstone_key)
# 等待施法过程
print(f">>> [后勤] 正在等待炉石 {self.hearthstone_cast_sec}s...")
time.sleep(float(self.hearthstone_cast_sec))
if get_state and start_pos is not None:
for _ in range(5):
end_pos = self._read_position(get_state)
if end_pos is None:
time.sleep(0.5)
continue
dist = math.dist(start_pos, end_pos)
# 如果坐标发生了明显跳变(大于 2.0),证明回城成功
if dist > 2.0:
print(f">>> [后勤] 炉石回城成功!位置跳变距离: {dist:.2f}")
success = True
break
time.sleep(0.5)
if success:
break
print(f">>> [后勤] 炉石似乎失败(位置未变化),准备重试...")
elif get_state:
st_now = get_state()
if st_now is None:
# 获取不到状态可能已经卡死或窗口关闭,默认成功以退出循环
success = True
break
else:
success = True
break
else:
# 如果没法校验坐标,就只执行一次
success = True
break
if not success:
print(">>> [后勤] 警告:多次尝试炉石回城均未检测到位置跳变!")
2026-04-10 14:36:04 +08:00
self.is_returning = False
return success
2026-04-10 14:36:04 +08:00
2026-03-18 09:04:37 +08:00
def return_home(self):
"""执行回城动作"""
# 1. 停止当前巡逻
# 2. 寻找安全点或直接使用炉石
print(">>> 正在释放炉石...")
hw_ctrl.press('7') # 假设炉石在 7 号键
2026-03-18 09:04:37 +08:00
time.sleep(15) # 等待炉石施法
def handle_town_visit(self, state, patrol):
"""回城流程:用 state 取当前坐标与朝向,调用 patrol.navigate_to_point 前往修理商;到达后按 F3 交互"""
is_arrived = patrol.navigate_to_point(
state, VENDOR_POS,
arrival_threshold=VENDOR_ARRIVAL_THRESHOLD,
)
if is_arrived:
print(">>> 到达修理商,执行交互宏")
self._do_vendor_interact()
self.is_returning = False
def _do_vendor_interact(self):
"""执行与修理商/背包的交互按键。"""
hw_ctrl.press(self.repair_target_key)
2026-03-18 09:04:37 +08:00
time.sleep(0.5)
hw_ctrl.press(self.repair_interact_key)
time.sleep(float(self.repair_interact_wait_sec))
2026-03-18 09:04:37 +08:00
def run_bag_full_mail_flow(self, get_state, patrol, stop_check=None, use_hearthstone=True):
2026-05-07 08:33:10 +08:00
"""
包满邮寄第一版流程
炉石 -> 跑到邮箱路线终点 -> 交互打开邮箱 -> MailboxCourier 宏键 -> 等待后停止
Python 不判断邮件是否发完发送细节交给游戏内插件
"""
route_file = self._resolve_path(self.mailbox_route_file)
if not route_file or not os.path.exists(route_file):
print(f">>> [后勤-邮箱] 邮箱路线不存在,已停止: {route_file}")
self.is_returning = False
return False
if callable(stop_check) and stop_check():
self.is_returning = False
return False
if use_hearthstone:
ok = self.use_hearthstone_and_stop(get_state=get_state)
if not ok:
print(">>> [后勤-邮箱] 炉石失败,未继续跑邮箱。")
self.is_returning = False
return False
2026-05-07 08:33:10 +08:00
try:
with open(route_file, "r", encoding="utf-8") as f:
path = json.load(f)
except Exception as exc:
print(f">>> [后勤-邮箱] 邮箱路线读取失败: {exc}")
self.is_returning = False
return False
if not path:
print(f">>> [后勤-邮箱] 邮箱路线为空,已停止: {route_file}")
self.is_returning = False
return False
print(f">>> [后勤-邮箱] 开始跑邮箱路线: {route_file}")
old_enable_mount = getattr(patrol, "enable_mount", None)
if old_enable_mount is not None:
patrol.enable_mount = False
try:
ok = patrol.navigate_path(
get_state,
path,
forward=True,
arrival_threshold=VENDOR_ARRIVAL_THRESHOLD,
snap_to_nearest=True,
)
2026-05-07 08:33:10 +08:00
finally:
if old_enable_mount is not None:
patrol.enable_mount = old_enable_mount
if not ok:
print(">>> [后勤-邮箱] 邮箱路线未完成,已停止。")
self.is_returning = False
return False
if callable(stop_check) and stop_check():
self.is_returning = False
return False
patrol.stop_all()
print(f">>> [后勤-邮箱] 到达邮箱附近,按交互键: {self.mailbox_interact_key}")
hw_ctrl.press(self.mailbox_interact_key)
if not self._sleep_with_stop(self.mailbox_open_wait_sec, stop_check=stop_check):
self.is_returning = False
return False
if self.mail_recipient_key:
print(f">>> [后勤-邮箱] 触发 MailboxCourier 收件人宏: {self.mail_recipient_key}")
hw_ctrl.press(self.mail_recipient_key)
if not self._sleep_with_stop(0.5, stop_check=stop_check):
self.is_returning = False
return False
print(f">>> [后勤-邮箱] 触发 MailboxCourier 发送宏: {self.mail_send_key}")
hw_ctrl.press(self.mail_send_key)
if not self._sleep_with_stop(self.mail_send_wait_sec, stop_check=stop_check):
self.is_returning = False
return False
print(">>> [后勤-邮箱] 邮寄宏已触发,流程结束。")
self.is_returning = False
return True
def run_repair_flow(self, get_state, patrol, stop_check=None, use_hearthstone=True):
"""
耐久低修理流程
炉石 -> 从修理路线最近点接入 -> 到达 NPC -> 按目标键 -> 按交互/修理键
"""
route_file = self._resolve_path(self.route_file)
if not route_file or not os.path.exists(route_file):
print(f">>> [后勤-修理] 修理路线不存在,已停止: {route_file}")
self.is_returning = False
return False
if callable(stop_check) and stop_check():
self.is_returning = False
return False
if use_hearthstone:
ok = self.use_hearthstone_and_stop(get_state=get_state)
if not ok:
print(">>> [后勤-修理] 炉石失败,未继续跑修理路线。")
self.is_returning = False
return False
try:
with open(route_file, "r", encoding="utf-8") as f:
path = json.load(f)
except Exception as exc:
print(f">>> [后勤-修理] 修理路线读取失败: {exc}")
self.is_returning = False
return False
if not path:
print(f">>> [后勤-修理] 修理路线为空,已停止: {route_file}")
self.is_returning = False
return False
print(f">>> [后勤-修理] 开始跑修理路线: {route_file}")
ok = patrol.navigate_path(
get_state,
path,
forward=True,
arrival_threshold=VENDOR_ARRIVAL_THRESHOLD,
snap_to_nearest=True,
)
if not ok:
print(">>> [后勤-修理] 修理路线未完成,已停止。")
self.is_returning = False
return False
if callable(stop_check) and stop_check():
self.is_returning = False
return False
patrol.stop_all()
print(f">>> [后勤-修理] 到达 NPC按目标键: {self.repair_target_key}")
hw_ctrl.press(self.repair_target_key)
if not self._sleep_with_stop(0.5, stop_check=stop_check):
self.is_returning = False
return False
print(f">>> [后勤-修理] 按修理交互键: {self.repair_interact_key}")
hw_ctrl.press(self.repair_interact_key)
if not self._sleep_with_stop(self.repair_interact_wait_sec, stop_check=stop_check):
self.is_returning = False
return False
print(">>> [后勤-修理] 修理流程结束。")
self.is_returning = False
return True
def run_pending_tasks(self, tasks, get_state, patrol, stop_check=None):
completed = []
already_homed = False
for task in tasks:
if callable(stop_check) and stop_check():
self.is_returning = False
return False, completed
if task == "mail":
ok = self.run_bag_full_mail_flow(
get_state,
patrol,
stop_check=stop_check,
use_hearthstone=not already_homed,
)
if not ok:
return False, completed
already_homed = True
completed.append(task)
continue
if task == "repair":
ok = self.run_repair_flow(
get_state,
patrol,
stop_check=stop_check,
use_hearthstone=not already_homed,
)
if not ok:
return False, completed
already_homed = True
completed.append(task)
continue
if task == "hearthstone_stop":
ok = self.use_hearthstone_and_stop(get_state=get_state)
if not ok:
return False, completed
already_homed = True
completed.append(task)
self.is_returning = False
return True, completed
2026-03-18 09:04:37 +08:00
def run_route1_round(self, get_state, patrol, route_file=None):
"""
读取 route1.json 路径先正向走完执行交互84再反向走完然后结束
get_state: 可调用对象返回当前状态 dict x, y, facing
patrol: CoordinatePatrol 实例用于 navigate_path
route_file: 路径 JSON 文件路径默认使用 __init__ 中的 route_file
"""
route_file = route_file or self.route_file
with open(route_file, "r", encoding="utf-8") as f:
path = json.load(f)
if not path:
print(">>> [后勤] route1 为空,跳过")
return
print(">>> [后勤] 开始 route1 正向")
ok = patrol.navigate_path(get_state, path, forward=True, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD)
if not ok:
print(">>> [后勤] 正向未完成,中止")
return
print(">>> [后勤] 正向到达,执行交互")
self._do_vendor_interact()
print(">>> [后勤] 开始 route1 反向")
ok = patrol.navigate_path(get_state, path, forward=False, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD)
if not ok:
print(">>> [后勤] 反向未完成")
return
print(">>> [后勤] route1 往返结束")