Add Ghostbox hardware and logistics automation

This commit is contained in:
王鹏
2026-05-07 15:19:59 +08:00
parent bf26de3244
commit a48ed597b4
14 changed files with 1287 additions and 323 deletions

BIN
Ghostbox_ddl/gbild64.dll Normal file

Binary file not shown.

260
Ghostbox_ddl/ghostbox.py Normal file
View File

@@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
import platform
from ctypes import *
import os
# 加载DLL
script_dir = os.path.dirname(os.path.abspath(__file__))
if platform.architecture()[0] == "64bit":
dll_path = os.path.join(script_dir, "gbild64.dll")
else:
dll_path = os.path.join(script_dir, "gbild32.dll")
dll = windll.LoadLibrary(dll_path)
# 设置接口返回值类型
dll.getmodel.restype = c_char_p
dll.getserialnumber.restype = c_char_p
dll.getproductiondate.restype = c_char_p
dll.getfirmwareversion.restype = c_char_p
dll.getclientscreenresolution.restype = c_char_p
dll.readstring.restype = c_char_p
dll.encryptstring.restype = c_char_p
dll.decryptstring.restype = c_char_p
dll.getproductname.restype = c_char_p
dll.sdktype.restype = c_char_p
dll.sdkversion.restype = c_char_p
# ================ 设备操作
# 打开设备(根据设备序号)
def opendevice(index=0):
return dll.opendevice(index)
# 打开设备根据设备ID
def opendevicebyid(vid,pid):
return dll.opendevicebyid(vid,pid)
# 打开设备(根据设备路径)
def opendevicebypath(path):
return dll.opendevicebypath(bytes(path,"utf-8"))
# 检查设备是否连接
def isconnected():
return dll.isconnected()
# 关闭设备
def closedevice():
return dll.closedevice()
# 复位设备
def resetdevice():
return dll.resetdevice()
# ================ 设备信息
# 获取设备型号
def getmodel():
return dll.getmodel().decode("utf-8")
# 获取设备序列号
def getserialnumber():
return dll.getserialnumber().decode("utf-8")
# 获取设备生产日期
def getproductiondate():
return dll.getproductiondate().decode("utf-8")
# 获取设备固件版本号
def getfirmwareversion():
return dll.getfirmwareversion().decode("utf-8")
# ================ 键盘操作
# 按下键
def presskeybyname(keyn):
return dll.presskeybyname(bytes(keyn,"utf-8"))
def presskeybyvalue(keyv):
return dll.presskeybyvalue(keyv)
# 释放键
def releasekeybyname(keyn):
return dll.releasekeybyname(bytes(keyn,"utf-8"))
def releasekeybyvalue(keyv):
return dll.releasekeybyvalue(keyv)
# 按下并释放键
def pressandreleasekeybyname(keyn):
return dll.pressandreleasekeybyname(bytes(keyn,"utf-8"))
def pressandreleasekeybyvalue(keyv):
return dll.pressandreleasekeybyvalue(keyv)
# 判断键盘按键状态
def iskeypressedbyname(keyn):
return dll.iskeypressedbyname(bytes(keyn,"utf-8"))
def iskeypressedbyvalue(keyv):
return dll.iskeypressedbyvalue(keyv)
# 释放所有键盘按键
def releaseallkey():
return dll.releaseallkey()
# 组合按键
def combinationkey(keys):
return dll.combinationkey(bytes(keys,"utf-8"))
# 输入字符串
def inputstring(str):
return dll.inputstring(bytes(str,"utf-8"))
# 获取大写锁定状态
def getcapslock():
return dll.getcapslock()
# 获取数字键盘锁定状态
def getnumlock():
return dll.getnumlock()
# 设置是否区分大小写
def setcasesensitive(cs):
return dll.setcasesensitive(cs)
# 设置按键延时
def setpresskeydelay(maxd,mind):
return dll.setpresskeydelay(maxd,mind)
# 设置输入字符串间隔时间
def setinputstringintervaltime(maxd,mind):
return dll.setinputstringintervaltime(maxd,mind)
# 设置中文输入模式
def setchineseinputmode(mode):
return dll.setchineseinputmode(mode)
# ================ 鼠标操作
# 按下鼠标键
def pressmousebutton(mbtn):
return dll.pressmousebutton(mbtn)
# 释放鼠标键
def releasemousebutton(mbtn):
return dll.releasemousebutton(mbtn)
# 按下并释放鼠标键
def pressandreleasemousebutton(mbtn):
return dll.pressandreleasemousebutton(mbtn)
# 判断鼠标按键状态
def ismousebuttonpressed(mbtn):
return dll.ismousebuttonpressed(mbtn)
# 释放所有鼠标按键
def releaseallmousebutton():
return dll.releaseallmousebutton()
# 相对移动鼠标
def movemouserelative(x,y):
return dll.movemouserelative(x,y)
# 移动鼠标到指定坐标
def movemouseto(x,y):
return dll.movemouseto(x,y)
# 获取鼠标当前位置
def getmousex():
return dll.getmousex()
def getmousey():
return dll.getmousey()
# 移动鼠标滚轮
def movemousewheel(z):
return dll.movemousewheel(z)
# 设置鼠标按键延时
def setpressmousebuttondelay(maxd,mind):
return dll.setpressmousebuttondelay(maxd,mind)
# 设置鼠标移动延时
def setmousemovementdelay(maxd,mind):
return dll.setmousemovementdelay(maxd,mind)
# 设置鼠标移动速度
def setmousemovementspeed(speedvalue):
return dll.setmousemovementspeed(speedvalue)
# 设置鼠标移动模式
def setmousemovementmode(modevalue):
return dll.setmousemovementmode(modevalue)
# ================ 双机设备接口
# 设置鼠标当前位置(针对不支持绝对值的鼠标)
def setmouseposition(x,y):
return dll.setmouseposition(x,y)
# 设置鼠标绝对位置(针对支持绝对值的鼠标)
def setmouseabsoluteposition(x,y):
return dll.setmouseabsoluteposition(x,y)
# 设置鼠标逻辑位置(不移动鼠标,只更新内部当前坐标)
def setmouselogicalposition(x,y):
return dll.setmouselogicalposition(x,y)
# 相对移动鼠标(只移动鼠标,不更新内部当前坐标)
def movemouserelative_ncc(x,y):
return dll.movemouserelative_ncc(x,y)
# 设置被控端屏幕分辨率
def setclientscreenresolution(width,height):
return dll.setclientscreenresolution(width,height)
# 获取被控端屏幕分辨率
def getclientscreenresolution():
return dll.getclientscreenresolution().decode("utf-8")
# 选择被控端对于APV系列的一控多模式指定键盘鼠标操作对应的设备
def selectsubdevice(addr):
return dll.selectsubdevice(addr)
# 修改被控端设备ID devtype: 0 主控端1 被控端键盘U22 被控端鼠标U23 被控端复合键鼠U1
def setslavedeviceid(vid,pid,devtype):
return dll.setslavedeviceid(vid,pid,devtype)
# ================ 加密狗操作
# 初始化加密狗
def initializedongle():
return dll.initializedongle()
# 设置读密码
def setreadpassword(writepwd,newpwd):
return dll.setreadpassword(bytes(writepwd,"utf-8"),bytes(newpwd,"utf-8"))
# 设置写密码
def setwritepassword(oldpwd,newpwd):
return dll.setwritepassword(bytes(oldpwd,"utf-8"),bytes(newpwd,"utf-8"))
# 从设备读字符串
def readstring(readpwd,addr,count):
return dll.readstring(bytes(readpwd,"utf-8"),addr,count).decode("utf-8", errors="ignore")
# 将字符串写入设备
def writestring(writepwd,str,addr):
return dll.writestring(bytes(writepwd,"utf-8"),bytes(str,"utf-8"),addr)
# 设置密钥
def setencryptionkey(writepwd,cipher):
return dll.setencryptionkey(bytes(writepwd,"utf-8"),bytes(cipher,"utf-8"))
# 加密字符串
def encryptstring(str):
return dll.encryptstring(bytes(str,"utf-8")).decode("utf-8")
# 解密字符串
def decryptstring(str):
return dll.decryptstring(bytes(str,"utf-8")).decode("utf-8")
# ================ 电源控制接口
# 按下电源按钮
def presspowerbutton():
return dll.presspowerbutton()
# 释放电源按钮
def releasepowerbutton():
return dll.releasepowerbutton()
# 按下并释放电源按钮
def pressandreleasepowerbutton():
return dll.pressandreleasepowerbutton()
# 获取电源工作状态
def getpowerstatus():
return dll.getpowerstatus()
# ================ 设备定义接口
# 修改设备速度
def setspeed(speed):
return dll.setspeed()
# 修改设备ID
def setdeviceid(vid,pid):
return dll.setdeviceid(vid,pid)
# 恢复设备默认ID
def restoredeviceid():
return dll.restoredeviceid()
# 获取当前设备VID
def getvid():
return dll.getvid()
# 获取当前设备PID
def getpid():
return dll.getpid()
# 设置产品名称
def setproductname(name):
return dll.setproductname(bytes(name,"gbk"))
# 获取产品名称
def getproductname():
return dll.getproductname().decode("gbk")
# ================ 其他操作
# 获取当前SDK库类型
def sdktype():
return dll.sdktype().decode("utf-8")
# 获取当前SDK库版本
def sdkversion():
return dll.sdkversion().decode("utf-8")

View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import ghostbox as gb
import time
print(f"{gb.sdktype()}({gb.sdkversion()})")
# 打开设备
print(gb.opendevice())
#print(gb.opendevicebyid(0x5188,0x1801))
#print(gb.opendevicebypath("\\\\?\\hid#vid_5188&pid_1801&mi_02#7&28610698&0&0000#{4d1e55b2-f16f-11cf-88cb-001111000030}"))
# 获取设备型号
print(gb.getmodel())
# 获取硬件版本号
print(gb.getfirmwareversion())
# 按win键
# print(gb.pressandreleasekeybyname("win"))
# 移动鼠标
# gb.setmousemovementmode(3)
# gb.setmousemovementdelay(3,5)
# print(gb.movemouseto(100,100))
gb.presskeybyname("right")
gb.presskeybyname("up")
time.sleep(3)
gb.releaseallkey()
# gb.combinationkey("win+r")
# time.sleep(2)
# print(gb.getcapslock())
# gb.inputstring("xxxDDDff")

View File

@@ -287,6 +287,7 @@ class AutoBotMove:
)
self.distance_interact_brake_sec = 0.08
self._last_mouse_path_scale_signature = None
self.logistics_resume_cooldown_until = 0.0
# stop_check: 返回 True 表示需要立即停止(用于中断阻塞中的后勤/路线导航)
self._stop_check = stop_check if callable(stop_check) else (lambda: False)
if waypoints is None:
@@ -327,6 +328,12 @@ class AutoBotMove:
self.logistics_manager = LogisticsManager(vendor_file)
self.logistics_manager.bag_full_hearthstone = bool(layout.get("bag_full_hearthstone", False))
self.logistics_manager.hearthstone_key = str(layout.get("hearthstone_key", "b") or "b")
self.logistics_manager.hearthstone_cast_sec = float(layout.get("hearthstone_wait_sec", 12.0))
self.logistics_manager.logistics_full_auto = bool(layout.get("logistics_full_auto", False))
self.logistics_manager.bag_slot_threshold = int(layout.get("bag_slot_threshold", 2))
self.logistics_manager.durability_threshold = float(layout.get("durability_threshold", 0.2))
self.logistics_manager.repair_target_key = str(layout.get("repair_target_key", "8") or "8")
self.logistics_manager.repair_interact_key = str(layout.get("repair_interact_key", "4") or "4")
self.logistics_manager.enable_bag_full_mail = bool(layout.get("enable_bag_full_mail", False))
self.logistics_manager.mailbox_route_file = str(
mailbox_route_path
@@ -342,6 +349,25 @@ class AutoBotMove:
def _has_prepare_route(self) -> bool:
return bool(self.prepare_route_waypoints)
def _reset_prepare_route_for_logistics_resume(self):
self.prepare_route_index = 0
self.prepare_route_completed = not bool(self.prepare_route_waypoints)
self.is_running_prepare_route = bool(self.prepare_route_waypoints)
self.prepare_route_snapped = False
self.target_acquired_time = None
self.last_turn_signal_time = 0.0
self.last_interaction_time = 0.0
self.distance_interact_brake_pending = False
self.distance_interact_pause_until = 0.0
self.logistics_resume_cooldown_until = time.time() + 5.0
self.last_tab_time = time.time() + 1.0
self.patrol_controller.stop_all()
self.patrol_controller.reset_stuck()
if self.prepare_route_waypoints:
print(">>> [后勤] 全自动续跑:重置准备路线,准备返回巡逻。")
else:
print(">>> [后勤] 全自动续跑:未配置准备路线,直接恢复巡逻。")
def _snap_prepare_route_to_nearest(self, state):
if self.prepare_route_snapped or self.prepare_route_completed or not self._has_prepare_route():
return
@@ -768,40 +794,46 @@ class AutoBotMove:
self.death_manager.run_to_corpse(state, get_state)
return
# 2. 后勤检查(脱战时):空格或耐久不足则回城
effective_target = self._is_effective_target(state)
in_combat_or_target = bool(state['combat'] or effective_target)
# 2. 后勤检查:只在脱战且无有效目标时触发,避免战斗中突然回城/修理/邮寄。
if not in_combat_or_target and time.time() >= self.logistics_resume_cooldown_until:
self.logistics_manager.check_logistics(state)
else:
self.logistics_manager.is_returning = False
if self.logistics_manager.is_returning:
if self.is_moving:
self.patrol_controller.stop_all()
self.is_moving = False
self.patrol_controller.reset_stuck()
bag_full_now = int(state.get('free_slots', 0) or 0) < 2
if bag_full_now and self.logistics_manager.enable_bag_full_mail:
tasks = self.logistics_manager.get_pending_tasks(state)
get_state_fn = (lambda: None if self._should_stop() else parse_game_state())
self.logistics_manager.run_bag_full_mail_flow(
ok, completed_tasks = self.logistics_manager.run_pending_tasks(
tasks,
get_state_fn,
self.patrol_controller,
stop_check=self._should_stop,
)
can_resume = (
ok
and self.logistics_manager.logistics_full_auto
and any(task in ("mail", "repair") for task in completed_tasks)
)
if can_resume:
self._reset_prepare_route_for_logistics_resume()
self.is_moving = False
return
if callable(getattr(self, '_on_hearthstone_stop', None)):
self._on_hearthstone_stop()
return
# 勾选"包满炉石回城":只有真正包满时才炉石;耐久低仍走修理路线
if bag_full_now and self.logistics_manager.bag_full_hearthstone:
get_state_fn = (lambda: None if self._should_stop() else parse_game_state())
self.logistics_manager.use_hearthstone_and_stop(get_state=get_state_fn)
if callable(getattr(self, '_on_hearthstone_stop', None)):
self._on_hearthstone_stop()
return
# 中断策略:一旦 GUI 停止,后续 get_state 返回 None使 navigate_path 立即退出。
get_state = (lambda: None if self._should_stop() else parse_game_state())
self.logistics_manager.run_route1_round(get_state, self.patrol_controller)
return
# 3. 战斗/有目标:停止移动,执行攻击逻辑;仅在「跑向怪」短窗口内做卡死检测
effective_target = self._is_effective_target(state)
# 核心修改:只要还在战斗中,就不算“完全脱战”,即使当前没目标(可能正在 Tab 找下一个)
in_combat_or_target = bool(state['combat'] or effective_target)
if in_combat_or_target:
# 被动进战时立即打断进食等待,转入正常战斗流程

View File

@@ -6,6 +6,7 @@ added_files = [
('recorder\\*.json', 'recorder'),
('game_state_config.json', '.'),
('ddl', 'ddl'),
('Ghostbox_ddl', 'Ghostbox_ddl'),
('images', 'images'),
('loot_path.json', '.'),
]

View File

@@ -28,6 +28,10 @@ Write-Host "Copying ddl folder to dist..." -ForegroundColor Cyan
if (Test-Path "dist\ddl") { Remove-Item "dist\ddl" -Recurse -Force -ErrorAction SilentlyContinue }
if (Test-Path "ddl") { Copy-Item "ddl" "dist\ddl" -Recurse -Force }
Write-Host "Copying Ghostbox_ddl folder to dist..." -ForegroundColor Cyan
if (Test-Path "dist\Ghostbox_ddl") { Remove-Item "dist\Ghostbox_ddl" -Recurse -Force -ErrorAction SilentlyContinue }
if (Test-Path "Ghostbox_ddl") { Copy-Item "Ghostbox_ddl" "dist\Ghostbox_ddl" -Recurse -Force }
Write-Host "Copying images folder to dist..." -ForegroundColor Cyan
if (Test-Path "dist\images") { Remove-Item "dist\images" -Recurse -Force -ErrorAction SilentlyContinue }
if (Test-Path "images") { Copy-Item "images" "dist\images" -Recurse -Force }

View File

@@ -6,6 +6,7 @@ added_files = [
('recorder\\*.json', 'recorder'),
('game_state_config.json', '.'),
('ddl', 'ddl'),
('Ghostbox_ddl', 'Ghostbox_ddl'),
('images', 'images'),
('loot_path.json', '.'),
]

View File

@@ -490,7 +490,14 @@ class CoordinatePatrol:
return False
def navigate_path(self, get_state, path, forward=True, arrival_threshold=None):
def navigate_path(
self,
get_state,
path,
forward=True,
arrival_threshold=None,
snap_to_nearest=False,
):
"""
按 path 依次走完所有点后返回。每次调用都必须传入 path。
get_state: 可调用对象,每次调用返回当前状态 dict需包含 'x','y','facing'
@@ -520,6 +527,26 @@ class CoordinatePatrol:
break
time.sleep(poll_sleep_sec)
if snap_to_nearest and points:
current_pos = None
try:
current_pos = (float(state.get("x")), float(state.get("y")))
except Exception:
current_pos = None
if current_pos is not None:
best_idx = 0
best_dist = None
for idx, point in enumerate(points):
dist = self.get_distance(current_pos, point)
if best_dist is None or dist < best_dist:
best_idx = idx
best_dist = dist
if best_idx > 0:
self.logger.info(
f">>> 路径智能接入最近点 {best_idx + 1}/{len(points)},距离 {best_dist:.2f}"
)
points = points[best_idx:]
for i, target in enumerate(points):
self.logger.info(f">>> 路径点 {i + 1}/{len(points)}: {target}")
while True:

View File

@@ -30,7 +30,13 @@ _DEFAULTS = {
"mount_retry_after_sec": 2.0,
# 炉石回城
"hearthstone_key": "b",
"hearthstone_wait_sec": 12.0,
"bag_full_hearthstone": False,
"logistics_full_auto": False,
"bag_slot_threshold": 2,
"durability_threshold": 0.2,
"repair_target_key": "8",
"repair_interact_key": "4",
# 包满炉石后跑邮箱,并触发 MailboxCourier 插件宏
"enable_bag_full_mail": False,
"mailbox_route_json": "recorder/mailbox.json",

View File

@@ -12,5 +12,18 @@
"mount_hold_sec": 2.0,
"mount_retry_after_sec": 2.0,
"hearthstone_key": "6",
"bag_full_hearthstone": true
"hearthstone_wait_sec": 12.0,
"bag_full_hearthstone": true,
"logistics_full_auto": false,
"bag_slot_threshold": 2,
"durability_threshold": 0.2,
"repair_target_key": "8",
"repair_interact_key": "4",
"enable_bag_full_mail": false,
"mailbox_route_json": "recorder/mailbox.json",
"mailbox_interact_key": "4",
"mail_recipient_key": "f7",
"mail_send_key": "f8",
"mailbox_open_wait_sec": 2.0,
"mail_send_wait_sec": 60.0
}

View File

@@ -1,3 +1,4 @@
import importlib.util
import json
import os
import random
@@ -20,35 +21,489 @@ else:
MAIN_CONFIG_FILE = "wow_multikey_qt.json"
BACKEND_TIANYA = "tianya"
BACKEND_GHOST = "ghost"
BACKEND_DIRECTINPUT = "directinput"
TIANYA_DLL_PATH = "ddl/wyhkm.dll"
GHOSTBOX_MODULE_PATH = "Ghostbox_ddl/ghostbox.py"
class _TianyaBackend:
name = BACKEND_TIANYA
def __init__(self, controller, dll_path=TIANYA_DLL_PATH):
self._controller = controller
self._dll_path_setting = dll_path
self._wyhkm = None
self.dll_path = None
def configure(self, dll_path=None):
if dll_path:
self._dll_path_setting = dll_path
def label(self):
return self.name if self.is_available() else f"{self.name}_unavailable"
def is_available(self):
if self._wyhkm:
try:
if self._wyhkm.IsOpen(0):
return True
except Exception:
self._wyhkm = None
self.dll_path = self._controller._resolve_resource_path(self._dll_path_setting)
if not self.dll_path:
self._controller._log(f">>> [tianya_control] DLL not found: {self._dll_path_setting}")
self._wyhkm = None
return False
try:
hkmdll = windll.LoadLibrary(self.dll_path)
hkmdll.DllInstall.argtypes = (c_long, c_longlong)
if hkmdll.DllInstall(1, 2) < 0:
self._controller._log(">>> [tianya_control] DllInstall failed")
self._wyhkm = None
return False
pythoncom.CoInitialize()
self._wyhkm = win32com.client.Dispatch("wyp.hkm")
dev_id = self._wyhkm.SearchDevice(0x2612, 0x1701, 0)
if dev_id == -1:
self._controller._log(">>> [tianya_control] Device not found (Index 0)")
self._wyhkm = None
return False
if not self._wyhkm.Open(dev_id, 0):
self._controller._log(">>> [tianya_control] Open device failed")
self._wyhkm = None
return False
self._wyhkm.SetMode(1, 1)
self._wyhkm.SetMode(2, 1)
self._wyhkm.SetKeyInterval(30, 50)
self._wyhkm.SetMouseInterval(30, 50)
self._controller._log(f">>> [tianya_control] Device opened and initialized: {dev_id}")
return True
except Exception as exc:
self._controller._log(f">>> [tianya_control] Init failed: {exc}")
self._wyhkm = None
return False
def _normalize_key(self, key_str):
return str(key_str).strip().upper()
def delay_rnd(self, min_ms, max_ms):
if not self.is_available():
return False
try:
self._wyhkm.DelayRnd(int(min_ms), int(max_ms))
return True
except Exception:
return False
def key_down(self, key_str):
if not self.is_available():
return False
try:
self._wyhkm.KeyDown(self._normalize_key(key_str))
return True
except Exception as exc:
self._controller._log(f">>> [tianya_control] KeyDown failed ({key_str}): {exc}")
return False
def key_up(self, key_str):
if not self.is_available():
return False
try:
self._wyhkm.KeyUp(self._normalize_key(key_str))
return True
except Exception as exc:
self._controller._log(f">>> [tianya_control] KeyUp failed ({key_str}): {exc}")
return False
def key_press(self, key_str):
if not self.is_available():
return False
try:
self._wyhkm.KeyPress(self._normalize_key(key_str))
return True
except Exception as exc:
self._controller._log(f">>> [tianya_control] KeyPress failed ({key_str}): {exc}")
return False
def move_to(self, x, y):
if not self.is_available():
return False
try:
return bool(self._wyhkm.MoveTo(int(x), int(y)))
except Exception as exc:
self._controller._log(f">>> [tianya_control] MoveTo failed ({x}, {y}): {exc}")
return False
def move_r(self, dx, dy):
if not self.is_available():
return False
try:
return bool(self._wyhkm.MoveR(int(dx), int(dy)))
except Exception as exc:
self._controller._log(f">>> [tianya_control] MoveR failed ({dx}, {dy}): {exc}")
return False
def left_click(self):
if not self.is_available():
return False
try:
return bool(self._wyhkm.LeftClick())
except Exception as exc:
self._controller._log(f">>> [tianya_control] LeftClick failed: {exc}")
return False
def right_click(self):
if not self.is_available():
return False
try:
return bool(self._wyhkm.RightClick())
except Exception as exc:
self._controller._log(f">>> [tianya_control] RightClick failed: {exc}")
return False
def left_down(self):
if not self.is_available():
return False
try:
return bool(self._wyhkm.LeftDown())
except Exception as exc:
self._controller._log(f">>> [tianya_control] LeftDown failed: {exc}")
return False
def left_up(self):
if not self.is_available():
return False
try:
return bool(self._wyhkm.LeftUp())
except Exception as exc:
self._controller._log(f">>> [tianya_control] LeftUp failed: {exc}")
return False
class _GhostboxBackend:
name = BACKEND_GHOST
LEFT_BUTTON = 1
RIGHT_BUTTON = 2
def __init__(self, controller, module_path=GHOSTBOX_MODULE_PATH):
self._controller = controller
self._module_path_setting = module_path
self._module = None
self.module_path = None
def configure(self, module_path=None):
if module_path and module_path != self._module_path_setting:
self._module_path_setting = module_path
self._module = None
def label(self):
return self.name if self.is_available() else f"{self.name}_unavailable"
def _load_module(self):
if self._module is not None:
return self._module
self.module_path = self._controller._resolve_resource_path(self._module_path_setting)
if not self.module_path:
self._controller._log(f">>> [ghost_control] module not found: {self._module_path_setting}")
return None
try:
spec = importlib.util.spec_from_file_location("ghostbox_runtime", self.module_path)
if spec is None or spec.loader is None:
self._controller._log(f">>> [ghost_control] load spec failed: {self.module_path}")
return None
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
self._module = module
return module
except Exception as exc:
self._controller._log(f">>> [ghost_control] load failed: {exc}")
self._module = None
return None
def is_available(self):
gb = self._load_module()
if gb is None:
return False
try:
if gb.isconnected():
return True
except Exception:
pass
try:
result = gb.opendevice()
connected = bool(gb.isconnected())
if not connected:
self._controller._log(f">>> [ghost_control] Open device failed: result={result}")
return False
try:
gb.setpresskeydelay(30, 50)
gb.setpressmousebuttondelay(30, 50)
gb.setmousemovementdelay(3, 5)
except Exception:
pass
self._controller._log(">>> [ghost_control] Device opened and initialized")
return True
except Exception as exc:
self._controller._log(f">>> [ghost_control] Init failed: {exc}")
return False
def _normalize_key(self, key_str):
return str(key_str).strip().lower()
def _call(self, action, func, *args):
if not self.is_available():
return False
try:
func(*args)
return True
except Exception as exc:
self._controller._log(f">>> [ghost_control] {action} failed: {exc}")
return False
def delay_rnd(self, min_ms, max_ms):
low = min(int(min_ms), int(max_ms))
high = max(int(min_ms), int(max_ms))
time.sleep(random.uniform(low, high) / 1000.0)
return True
def key_down(self, key_str):
gb = self._load_module()
if gb is None:
return False
return self._call(f"KeyDown({key_str})", gb.presskeybyname, self._normalize_key(key_str))
def key_up(self, key_str):
gb = self._load_module()
if gb is None:
return False
return self._call(f"KeyUp({key_str})", gb.releasekeybyname, self._normalize_key(key_str))
def key_press(self, key_str):
gb = self._load_module()
if gb is None:
return False
return self._call(f"KeyPress({key_str})", gb.pressandreleasekeybyname, self._normalize_key(key_str))
def move_to(self, x, y):
gb = self._load_module()
if gb is None:
return False
return self._call(f"MoveTo({x}, {y})", gb.movemouseto, int(x), int(y))
def move_r(self, dx, dy):
gb = self._load_module()
if gb is None:
return False
return self._call(f"MoveR({dx}, {dy})", gb.movemouserelative, int(dx), int(dy))
def left_click(self):
gb = self._load_module()
if gb is None:
return False
return self._call("LeftClick", gb.pressandreleasemousebutton, self.LEFT_BUTTON)
def right_click(self):
gb = self._load_module()
if gb is None:
return False
return self._call("RightClick", gb.pressandreleasemousebutton, self.RIGHT_BUTTON)
def left_down(self):
gb = self._load_module()
if gb is None:
return False
return self._call("LeftDown", gb.pressmousebutton, self.LEFT_BUTTON)
def left_up(self):
gb = self._load_module()
if gb is None:
return False
return self._call("LeftUp", gb.releasemousebutton, self.LEFT_BUTTON)
class _DirectInputBackend:
name = BACKEND_DIRECTINPUT
def __init__(self, controller):
self._controller = controller
self._last_error = None
def label(self):
return self.name if self.is_available() else f"{self.name}_unavailable"
def is_available(self):
if pydirectinput is not None:
return True
if self._last_error != _PYDIRECTINPUT_IMPORT_ERROR:
self._last_error = _PYDIRECTINPUT_IMPORT_ERROR
self._controller._log(f">>> [input_control] pydirectinput unavailable: {_PYDIRECTINPUT_IMPORT_ERROR}")
return False
def _normalize_key(self, key_str):
return str(key_str).strip().lower()
def delay_rnd(self, min_ms, max_ms):
if not self.is_available():
return False
low = min(int(min_ms), int(max_ms))
high = max(int(min_ms), int(max_ms))
time.sleep(random.uniform(low, high) / 1000.0)
return True
def key_down(self, key_str):
if not self.is_available():
return False
try:
pydirectinput.keyDown(self._normalize_key(key_str))
return True
except Exception as exc:
self._controller._log(f">>> [directinput] keyDown failed ({key_str}): {exc}")
return False
def key_up(self, key_str):
if not self.is_available():
return False
try:
pydirectinput.keyUp(self._normalize_key(key_str))
return True
except Exception as exc:
self._controller._log(f">>> [directinput] keyUp failed ({key_str}): {exc}")
return False
def key_press(self, key_str):
if not self.is_available():
return False
try:
pydirectinput.press(self._normalize_key(key_str))
return True
except Exception as exc:
self._controller._log(f">>> [directinput] press failed ({key_str}): {exc}")
return False
def move_to(self, x, y):
if not self.is_available():
return False
try:
pydirectinput.moveTo(int(x), int(y))
return True
except Exception as exc:
self._controller._log(f">>> [directinput] moveTo failed ({x}, {y}): {exc}")
return False
def move_r(self, dx, dy):
if not self.is_available():
return False
try:
pydirectinput.moveRel(int(dx), int(dy))
return True
except Exception as exc:
self._controller._log(f">>> [directinput] moveRel failed ({dx}, {dy}): {exc}")
return False
def left_click(self):
if not self.is_available():
return False
try:
pydirectinput.click(button="left")
return True
except Exception as exc:
self._controller._log(f">>> [directinput] left click failed: {exc}")
return False
def right_click(self):
if not self.is_available():
return False
try:
pydirectinput.click(button="right")
return True
except Exception as exc:
self._controller._log(f">>> [directinput] right click failed: {exc}")
return False
def left_down(self):
if not self.is_available():
return False
try:
pydirectinput.mouseDown(button="left")
return True
except Exception as exc:
self._controller._log(f">>> [directinput] left mouseDown failed: {exc}")
return False
def left_up(self):
if not self.is_available():
return False
try:
pydirectinput.mouseUp(button="left")
return True
except Exception as exc:
self._controller._log(f">>> [directinput] left mouseUp failed: {exc}")
return False
class HardwareController:
_instance = None
_wyhkm = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(HardwareController, cls).__new__(cls)
return cls._instance
def __init__(self, dll_path="ddl/wyhkm.dll", use_hardware_input=None):
def __init__(
self,
dll_path=TIANYA_DLL_PATH,
use_hardware_input=None,
use_tianya_box=None,
use_ghost_box=None,
ghostbox_module_path=GHOSTBOX_MODULE_PATH,
backend=None,
):
if getattr(self, "_bootstrapped", False):
if dll_path:
self._dll_path_setting = dll_path
if use_hardware_input is not None:
self.configure(use_hardware_input=use_hardware_input, dll_path=dll_path)
self._tianya_backend.configure(dll_path=dll_path)
self._ghost_backend.configure(module_path=ghostbox_module_path)
if backend is not None or use_hardware_input is not None or use_tianya_box is not None or use_ghost_box is not None:
self.configure(
use_hardware_input=use_hardware_input,
use_tianya_box=use_tianya_box,
use_ghost_box=use_ghost_box,
dll_path=dll_path,
ghostbox_module_path=ghostbox_module_path,
backend=backend,
)
return
self._bootstrapped = True
self._dll_path_setting = dll_path
self._backend = "hardware"
self._backend = BACKEND_TIANYA
self._last_backend_log = None
self._last_directinput_error = None
self.dll_path = None
self._tianya_backend = _TianyaBackend(self, dll_path)
self._ghost_backend = _GhostboxBackend(self, ghostbox_module_path)
self._directinput_backend = _DirectInputBackend(self)
if use_hardware_input is None:
use_hardware_input = self._load_backend_preference(default=True)
self.configure(use_hardware_input=use_hardware_input, dll_path=dll_path)
if backend is None and use_hardware_input is None and use_tianya_box is None and use_ghost_box is None:
use_tianya_box, use_ghost_box = self._load_backend_preference(default_tianya=True)
self.configure(
use_hardware_input=use_hardware_input,
use_tianya_box=use_tianya_box,
use_ghost_box=use_ghost_box,
backend=backend,
)
def _runtime_base_dir(self):
if getattr(sys, "frozen", False):
@@ -100,18 +555,21 @@ class HardwareController:
return candidate
return os.path.join(self._runtime_base_dir(), MAIN_CONFIG_FILE)
def _load_backend_preference(self, default=True):
def _load_backend_preference(self, default_tianya=True):
config_path = self._config_path()
if not os.path.exists(config_path):
return default
return default_tianya, False
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
bot_cfg = (cfg or {}).get("bot") or {}
return bool(bot_cfg.get("use_hardware_input", default))
has_new_flags = "use_tianya_box" in bot_cfg or "use_ghost_box" in bot_cfg
if has_new_flags:
return bool(bot_cfg.get("use_tianya_box", False)), bool(bot_cfg.get("use_ghost_box", False))
return bool(bot_cfg.get("use_hardware_input", default_tianya)), False
except Exception as exc:
self._log(f">>> [input_control] load config failed: {exc}")
return default
return default_tianya, False
def _log(self, message):
print(message)
@@ -122,68 +580,28 @@ class HardwareController:
except Exception:
pass
def _normalize_hardware_key(self, key_str):
return str(key_str).strip().upper()
def _normalize_backend(self, backend):
if backend in (BACKEND_TIANYA, "hardware"):
return BACKEND_TIANYA
if backend == BACKEND_GHOST:
return BACKEND_GHOST
if backend == BACKEND_DIRECTINPUT:
return BACKEND_DIRECTINPUT
return BACKEND_DIRECTINPUT
def _normalize_directinput_key(self, key_str):
return str(key_str).strip().lower()
def _select_backend(self, use_tianya_box, use_ghost_box):
if bool(use_tianya_box):
return BACKEND_TIANYA
if bool(use_ghost_box):
return BACKEND_GHOST
return BACKEND_DIRECTINPUT
def _directinput_available(self):
if pydirectinput is not None:
return True
if self._last_directinput_error != _PYDIRECTINPUT_IMPORT_ERROR:
self._last_directinput_error = _PYDIRECTINPUT_IMPORT_ERROR
self._log(f">>> [input_control] pydirectinput unavailable: {_PYDIRECTINPUT_IMPORT_ERROR}")
return False
def _ensure_hardware_ready(self):
if self._wyhkm:
try:
if self._wyhkm.IsOpen(0):
return True
except Exception:
self._wyhkm = None
self.dll_path = self._resolve_resource_path(self._dll_path_setting)
if not self.dll_path:
self._log(f">>> [hardware_control] DLL not found: {self._dll_path_setting}")
self._wyhkm = None
return False
try:
hkmdll = windll.LoadLibrary(self.dll_path)
hkmdll.DllInstall.argtypes = (c_long, c_longlong)
if hkmdll.DllInstall(1, 2) < 0:
self._log(">>> [hardware_control] DllInstall failed")
self._wyhkm = None
return False
pythoncom.CoInitialize()
self._wyhkm = win32com.client.Dispatch("wyp.hkm")
dev_id = self._wyhkm.SearchDevice(0x2612, 0x1701, 0)
if dev_id == -1:
self._log(">>> [hardware_control] Device not found (Index 0)")
self._wyhkm = None
return False
if not self._wyhkm.Open(dev_id, 0):
self._log(">>> [hardware_control] Open device failed")
self._wyhkm = None
return False
self._wyhkm.SetMode(1, 1)
self._wyhkm.SetMode(2, 1)
self._wyhkm.SetKeyInterval(30, 50)
self._wyhkm.SetMouseInterval(30, 50)
self._log(f">>> [hardware_control] Device opened and initialized: {dev_id}")
return True
except Exception as exc:
self._log(f">>> [hardware_control] Init failed: {exc}")
self._wyhkm = None
return False
def _current_backend(self):
if self._backend == BACKEND_TIANYA:
return self._tianya_backend
if self._backend == BACKEND_GHOST:
return self._ghost_backend
return self._directinput_backend
def _log_backend(self):
label = self.backend_label()
@@ -191,92 +609,62 @@ class HardwareController:
self._last_backend_log = label
self._log(f">>> [input_control] backend={label}")
def configure(self, use_hardware_input=True, dll_path=None):
if dll_path:
self._dll_path_setting = dll_path
self._backend = "hardware" if bool(use_hardware_input) else "directinput"
if self._backend == "hardware":
self._ensure_hardware_ready()
def configure(
self,
use_hardware_input=None,
use_tianya_box=None,
use_ghost_box=None,
dll_path=None,
ghostbox_module_path=None,
backend=None,
):
self._tianya_backend.configure(dll_path=dll_path)
self._ghost_backend.configure(module_path=ghostbox_module_path)
if backend is not None:
self._backend = self._normalize_backend(backend)
else:
self._directinput_available()
if use_tianya_box is None and use_ghost_box is None:
if use_hardware_input is None:
use_tianya_box, use_ghost_box = self._load_backend_preference(default_tianya=True)
else:
use_tianya_box, use_ghost_box = bool(use_hardware_input), False
else:
use_tianya_box = bool(use_tianya_box)
use_ghost_box = bool(use_ghost_box)
if use_hardware_input is not None and not bool(use_hardware_input):
use_tianya_box, use_ghost_box = False, False
self._backend = self._select_backend(use_tianya_box, use_ghost_box)
self._log_backend()
return self._backend
def uses_hardware_input(self):
return self._backend == "hardware"
return self._backend in (BACKEND_TIANYA, BACKEND_GHOST)
def uses_tianya_box(self):
return self._backend == BACKEND_TIANYA
def uses_ghost_box(self):
return self._backend == BACKEND_GHOST
def backend_label(self):
if self.uses_hardware_input():
return "hardware" if self._ensure_hardware_ready() else "hardware_unavailable"
return "directinput" if self._directinput_available() else "directinput_unavailable"
return self._current_backend().label()
def is_available(self):
if self.uses_hardware_input():
return self._ensure_hardware_ready()
return self._directinput_available()
return self._current_backend().is_available()
def delay_rnd(self, min_ms, max_ms):
if self.uses_hardware_input() and self.is_available():
try:
self._wyhkm.DelayRnd(int(min_ms), int(max_ms))
except Exception:
pass
return
if self._directinput_available():
low = min(int(min_ms), int(max_ms))
high = max(int(min_ms), int(max_ms))
time.sleep(random.uniform(low, high) / 1000.0)
return self._current_backend().delay_rnd(min_ms, max_ms)
def key_down(self, key_str):
if self.uses_hardware_input():
if self.is_available():
try:
self._wyhkm.KeyDown(self._normalize_hardware_key(key_str))
return True
except Exception as exc:
self._log(f">>> [hardware_control] KeyDown failed ({key_str}): {exc}")
return False
if self._directinput_available():
try:
pydirectinput.keyDown(self._normalize_directinput_key(key_str))
return True
except Exception as exc:
self._log(f">>> [directinput] keyDown failed ({key_str}): {exc}")
return False
return self._current_backend().key_down(key_str)
def key_up(self, key_str):
if self.uses_hardware_input():
if self.is_available():
try:
self._wyhkm.KeyUp(self._normalize_hardware_key(key_str))
return True
except Exception as exc:
self._log(f">>> [hardware_control] KeyUp failed ({key_str}): {exc}")
return False
if self._directinput_available():
try:
pydirectinput.keyUp(self._normalize_directinput_key(key_str))
return True
except Exception as exc:
self._log(f">>> [directinput] keyUp failed ({key_str}): {exc}")
return False
return self._current_backend().key_up(key_str)
def key_press(self, key_str):
if self.uses_hardware_input():
if self.is_available():
try:
self._wyhkm.KeyPress(self._normalize_hardware_key(key_str))
return True
except Exception as exc:
self._log(f">>> [hardware_control] KeyPress failed ({key_str}): {exc}")
return False
if self._directinput_available():
try:
pydirectinput.press(self._normalize_directinput_key(key_str))
return True
except Exception as exc:
self._log(f">>> [directinput] press failed ({key_str}): {exc}")
return False
return self._current_backend().key_press(key_str)
def keyDown(self, key_str):
return self.key_down(key_str)
@@ -288,103 +676,25 @@ class HardwareController:
return self.key_press(key_str)
def move_to(self, x, y):
if self.uses_hardware_input():
if self.is_available():
try:
return bool(self._wyhkm.MoveTo(int(x), int(y)))
except Exception as exc:
self._log(f">>> [hardware_control] MoveTo failed ({x}, {y}): {exc}")
return False
if self._directinput_available():
try:
pydirectinput.moveTo(int(x), int(y))
return True
except Exception as exc:
self._log(f">>> [directinput] moveTo failed ({x}, {y}): {exc}")
return False
return self._current_backend().move_to(x, y)
def move_r(self, dx, dy):
if self.uses_hardware_input():
if self.is_available():
try:
return bool(self._wyhkm.MoveR(int(dx), int(dy)))
except Exception as exc:
self._log(f">>> [hardware_control] MoveR failed ({dx}, {dy}): {exc}")
return False
if self._directinput_available():
try:
pydirectinput.moveRel(int(dx), int(dy))
return True
except Exception as exc:
self._log(f">>> [directinput] moveRel failed ({dx}, {dy}): {exc}")
return False
return self._current_backend().move_r(dx, dy)
def MoveR(self, dx, dy):
return self.move_r(dx, dy)
def left_click(self):
if self.uses_hardware_input():
if self.is_available():
try:
return bool(self._wyhkm.LeftClick())
except Exception as exc:
self._log(f">>> [hardware_control] LeftClick failed: {exc}")
return False
if self._directinput_available():
try:
pydirectinput.click(button="left")
return True
except Exception as exc:
self._log(f">>> [directinput] left click failed: {exc}")
return False
return self._current_backend().left_click()
def right_click(self):
if self.uses_hardware_input():
if self.is_available():
try:
return bool(self._wyhkm.RightClick())
except Exception as exc:
self._log(f">>> [hardware_control] RightClick failed: {exc}")
return False
if self._directinput_available():
try:
pydirectinput.click(button="right")
return True
except Exception as exc:
self._log(f">>> [directinput] right click failed: {exc}")
return False
return self._current_backend().right_click()
def left_down(self):
if self.uses_hardware_input():
if self.is_available():
try:
return bool(self._wyhkm.LeftDown())
except Exception as exc:
self._log(f">>> [hardware_control] LeftDown failed: {exc}")
return False
if self._directinput_available():
try:
pydirectinput.mouseDown(button="left")
return True
except Exception as exc:
self._log(f">>> [directinput] left mouseDown failed: {exc}")
return False
return self._current_backend().left_down()
def left_up(self):
if self.uses_hardware_input():
if self.is_available():
try:
return bool(self._wyhkm.LeftUp())
except Exception as exc:
self._log(f">>> [hardware_control] LeftUp failed: {exc}")
return False
if self._directinput_available():
try:
pydirectinput.mouseUp(button="left")
return True
except Exception as exc:
self._log(f">>> [directinput] left mouseUp failed: {exc}")
return False
return self._current_backend().left_up()
hw_ctrl = HardwareController()

View File

@@ -20,14 +20,20 @@ class LogisticsManager:
self.route_file = route_file or VENDOR_FILE
self.bag_full_hearthstone = False # 包满时用炉石回城而非走路修理
self.hearthstone_key = "b" # 炉石按键
self.hearthstone_cast_sec = 10.0 # 炉石施法等待秒数
self.hearthstone_cast_sec = 12.0 # 炉石施法等待秒数
self.enable_bag_full_mail = False
self.logistics_full_auto = False
self.bag_slot_threshold = 2
self.durability_threshold = 0.2
self.mailbox_route_file = os.path.join("recorder", "mailbox.json")
self.mailbox_interact_key = "8"
self.mail_recipient_key = ""
self.mail_send_key = "f8"
self.mailbox_open_wait_sec = 2.0
self.mail_send_wait_sec = 60.0
self.repair_target_key = "8"
self.repair_interact_key = "4"
self.repair_interact_wait_sec = 2.0
def _resolve_path(self, path):
if not path:
@@ -48,13 +54,59 @@ class LogisticsManager:
time.sleep(min(0.1, end_at - time.time()))
return True
def _read_position(self, get_state):
if not callable(get_state):
return None
st = get_state()
if not st:
return None
try:
x = st.get("x")
y = st.get("y")
if x is None or y is None:
return None
return float(x), float(y)
except Exception:
return None
def _state_bag_full(self, state):
try:
return int(state.get("free_slots", 0) or 0) < int(self.bag_slot_threshold)
except Exception:
return False
def _state_durability_low(self, state):
try:
threshold = float(self.durability_threshold)
if threshold > 1.0:
threshold = threshold / 100.0
durability = state.get("durability", 1.0)
if durability is None:
return False
return float(durability) < threshold
except Exception:
return False
def get_pending_tasks(self, state):
tasks = []
bag_full = self._state_bag_full(state)
durability_low = self._state_durability_low(state)
if bag_full and self.enable_bag_full_mail:
tasks.append("mail")
if durability_low:
tasks.append("repair")
if not tasks and bag_full and self.bag_full_hearthstone:
tasks.append("hearthstone_stop")
return tasks
def check_logistics(self, state):
"""
state['free_slots']: 剩余空格数量
state['durability']: 0.0 ~ 1.0 的耐久度
"""
# 触发阈值:空格少于 2 个,或耐久度低于 20%
if state['free_slots'] < 2 or state['durability'] < 0.2:
# 触发阈值:空格少于配置值,或耐久度低于配置值。
if self.get_pending_tasks(state):
if not self.is_returning:
print(f">>> [后勤警告] 背包/耐久不足!触发回城程序。")
self.is_returning = True
@@ -67,11 +119,7 @@ class LogisticsManager:
success = False
for i in range(max_retries):
start_pos = None
if get_state:
st = get_state()
if st:
start_pos = (st.get('x'), st.get('y'))
start_pos = self._read_position(get_state)
print(f">>> [后勤] 第 {i+1} 次尝试使用炉石(按键: {self.hearthstone_key}...")
# 先按一下 S 确保停止移动,防止移动中按炉石失败
@@ -80,25 +128,34 @@ class LogisticsManager:
hw_ctrl.press(self.hearthstone_key)
# 等待施法过程
print(f">>> [后勤] 正在等待施法 {self.hearthstone_cast_sec}s...")
time.sleep(self.hearthstone_cast_sec + 2.0) # 多等 2 秒保险
print(f">>> [后勤] 正在等待炉石 {self.hearthstone_cast_sec}s...")
time.sleep(float(self.hearthstone_cast_sec))
if get_state and start_pos and start_pos[0] is not None:
st_now = get_state()
if st_now:
end_pos = (st_now.get('x'), st_now.get('y'))
if get_state and start_pos is not None:
for _ in range(5):
end_pos = self._read_position(get_state)
if end_pos is None:
time.sleep(0.5)
continue
dist = math.dist(start_pos, end_pos)
# 如果坐标发生了明显跳变(大于 2.0),证明回城成功
if dist > 2.0:
print(f">>> [后勤] 炉石回城成功!位置跳变距离: {dist:.2f}")
success = True
break
else:
time.sleep(0.5)
if success:
break
print(f">>> [后勤] 炉石似乎失败(位置未变化),准备重试...")
else:
elif get_state:
st_now = get_state()
if st_now is None:
# 获取不到状态可能已经卡死或窗口关闭,默认成功以退出循环
success = True
break
else:
success = True
break
else:
# 如果没法校验坐标,就只执行一次
success = True
@@ -130,13 +187,13 @@ class LogisticsManager:
self.is_returning = False
def _do_vendor_interact(self):
"""执行与修理商/背包的交互按键8、4"""
hw_ctrl.press("8")
"""执行与修理商/背包的交互按键。"""
hw_ctrl.press(self.repair_target_key)
time.sleep(0.5)
hw_ctrl.press("4")
time.sleep(2)
hw_ctrl.press(self.repair_interact_key)
time.sleep(float(self.repair_interact_wait_sec))
def run_bag_full_mail_flow(self, get_state, patrol, stop_check=None):
def run_bag_full_mail_flow(self, get_state, patrol, stop_check=None, use_hearthstone=True):
"""
包满邮寄第一版流程:
炉石 -> 跑到邮箱路线终点 -> 交互打开邮箱 -> 按 MailboxCourier 宏键 -> 等待后停止。
@@ -152,6 +209,7 @@ class LogisticsManager:
self.is_returning = False
return False
if use_hearthstone:
ok = self.use_hearthstone_and_stop(get_state=get_state)
if not ok:
print(">>> [后勤-邮箱] 炉石失败,未继续跑邮箱。")
@@ -176,7 +234,13 @@ class LogisticsManager:
if old_enable_mount is not None:
patrol.enable_mount = False
try:
ok = patrol.navigate_path(get_state, path, forward=True, arrival_threshold=VENDOR_ARRIVAL_THRESHOLD)
ok = patrol.navigate_path(
get_state,
path,
forward=True,
arrival_threshold=VENDOR_ARRIVAL_THRESHOLD,
snap_to_nearest=True,
)
finally:
if old_enable_mount is not None:
patrol.enable_mount = old_enable_mount
@@ -213,6 +277,119 @@ class LogisticsManager:
self.is_returning = False
return True
def run_repair_flow(self, get_state, patrol, stop_check=None, use_hearthstone=True):
"""
耐久低修理流程:
炉石 -> 从修理路线最近点接入 -> 到达 NPC -> 按目标键 -> 按交互/修理键。
"""
route_file = self._resolve_path(self.route_file)
if not route_file or not os.path.exists(route_file):
print(f">>> [后勤-修理] 修理路线不存在,已停止: {route_file}")
self.is_returning = False
return False
if callable(stop_check) and stop_check():
self.is_returning = False
return False
if use_hearthstone:
ok = self.use_hearthstone_and_stop(get_state=get_state)
if not ok:
print(">>> [后勤-修理] 炉石失败,未继续跑修理路线。")
self.is_returning = False
return False
try:
with open(route_file, "r", encoding="utf-8") as f:
path = json.load(f)
except Exception as exc:
print(f">>> [后勤-修理] 修理路线读取失败: {exc}")
self.is_returning = False
return False
if not path:
print(f">>> [后勤-修理] 修理路线为空,已停止: {route_file}")
self.is_returning = False
return False
print(f">>> [后勤-修理] 开始跑修理路线: {route_file}")
ok = patrol.navigate_path(
get_state,
path,
forward=True,
arrival_threshold=VENDOR_ARRIVAL_THRESHOLD,
snap_to_nearest=True,
)
if not ok:
print(">>> [后勤-修理] 修理路线未完成,已停止。")
self.is_returning = False
return False
if callable(stop_check) and stop_check():
self.is_returning = False
return False
patrol.stop_all()
print(f">>> [后勤-修理] 到达 NPC按目标键: {self.repair_target_key}")
hw_ctrl.press(self.repair_target_key)
if not self._sleep_with_stop(0.5, stop_check=stop_check):
self.is_returning = False
return False
print(f">>> [后勤-修理] 按修理交互键: {self.repair_interact_key}")
hw_ctrl.press(self.repair_interact_key)
if not self._sleep_with_stop(self.repair_interact_wait_sec, stop_check=stop_check):
self.is_returning = False
return False
print(">>> [后勤-修理] 修理流程结束。")
self.is_returning = False
return True
def run_pending_tasks(self, tasks, get_state, patrol, stop_check=None):
completed = []
already_homed = False
for task in tasks:
if callable(stop_check) and stop_check():
self.is_returning = False
return False, completed
if task == "mail":
ok = self.run_bag_full_mail_flow(
get_state,
patrol,
stop_check=stop_check,
use_hearthstone=not already_homed,
)
if not ok:
return False, completed
already_homed = True
completed.append(task)
continue
if task == "repair":
ok = self.run_repair_flow(
get_state,
patrol,
stop_check=stop_check,
use_hearthstone=not already_homed,
)
if not ok:
return False, completed
already_homed = True
completed.append(task)
continue
if task == "hearthstone_stop":
ok = self.use_hearthstone_and_stop(get_state=get_state)
if not ok:
return False, completed
already_homed = True
completed.append(task)
self.is_returning = False
return True, completed
def run_route1_round(self, get_state, patrol, route_file=None):
"""
读取 route1.json 路径先正向走完执行交互8、4再反向走完然后结束。

View File

@@ -327,7 +327,9 @@ class GameLoopWorker(QThread):
release_spirit_key=None,
resurrect_key=None,
enable_mouse_loot=True,
use_hardware_input=True,
use_hardware_input=None,
use_tianya_box=None,
use_ghost_box=None,
turn_error_key=None,
turn_error_hold_sec=None,
distance_interact_pause_sec=None,
@@ -374,7 +376,12 @@ class GameLoopWorker(QThread):
self.release_spirit_key = release_spirit_key
self.resurrect_key = resurrect_key
self.enable_mouse_loot = enable_mouse_loot
self.use_hardware_input = bool(use_hardware_input)
if use_tianya_box is None and use_ghost_box is None:
self.use_tianya_box = True if use_hardware_input is None else bool(use_hardware_input)
self.use_ghost_box = False
else:
self.use_tianya_box = bool(use_tianya_box)
self.use_ghost_box = bool(use_ghost_box)
self.turn_error_key = (turn_error_key or "s").strip().lower() or "s"
try:
self.turn_error_hold_sec = float(turn_error_hold_sec)
@@ -393,10 +400,14 @@ class GameLoopWorker(QThread):
self.log_signal.emit("❌ 无法导入 game_state 模块")
return
hw_ctrl.configure(use_hardware_input=self.use_hardware_input)
hw_ctrl.configure(use_tianya_box=self.use_tianya_box, use_ghost_box=self.use_ghost_box)
backend_text = {
"hardware": "硬件",
"hardware_unavailable": "硬件(不可用)",
"hardware": "天涯盒子",
"hardware_unavailable": "天涯盒子(不可用)",
"tianya": "天涯盒子",
"tianya_unavailable": "天涯盒子(不可用)",
"ghost": "幽灵盒子",
"ghost_unavailable": "幽灵盒子(不可用)",
"directinput": "pydirectinput",
"directinput_unavailable": "pydirectinput(不可用)",
}.get(hw_ctrl.backend_label(), "未知")
@@ -495,14 +506,19 @@ class GameLoopWorker(QThread):
enable_mount=bool(layout.get("enable_mount", True)),
)
self.logistics_manager = LogisticsManager(route_file=self.vendor_path)
self.logistics_manager.hearthstone_key = str(layout.get("hearthstone_key", "b") or "b")
self.logistics_manager.hearthstone_cast_sec = float(layout.get("hearthstone_wait_sec", 12.0))
self.logistics_manager.repair_target_key = str(layout.get("repair_target_key", "8") or "8")
self.logistics_manager.repair_interact_key = str(layout.get("repair_interact_key", "4") or "4")
self.log_signal.emit(f"⛑️ 回城修理:开始执行路径({os.path.basename(self.vendor_path)}")
get_state = lambda: parse_game_state() if self.running else None
ok = self.logistics_manager.run_route1_round(
ok = self.logistics_manager.run_repair_flow(
get_state,
self.patrol_controller,
route_file=self.vendor_path,
stop_check=lambda: not self.running,
use_hearthstone=True,
)
if ok:
self.log_signal.emit("✅ 回城修理:路径完成")
@@ -1069,6 +1085,18 @@ class WoWMultiKeyGUI(QMainWindow):
params_layout.addWidget(basic_group)
# 硬件参数
hardware_group = QGroupBox("硬件参数")
hardware_layout = QHBoxLayout(hardware_group)
self.gs_use_tianya_box = QCheckBox("使用天涯盒子")
self.gs_use_tianya_box.setChecked(True)
self.gs_use_ghost_box = QCheckBox("使用幽灵盒子")
self.gs_use_ghost_box.setChecked(False)
hardware_layout.addWidget(self.gs_use_tianya_box)
hardware_layout.addWidget(self.gs_use_ghost_box)
hardware_layout.addStretch()
params_layout.addWidget(hardware_group)
# 游戏参数
game_group = QGroupBox("游戏参数")
game_grid = QGridLayout(game_group)
@@ -1140,6 +1168,29 @@ class WoWMultiKeyGUI(QMainWindow):
self.gs_mail_send_wait.setSingleStep(1.0)
self.gs_mail_send_wait.setValue(60.0)
self.gs_mail_send_wait.setSuffix("")
self.gs_logistics_full_auto = QCheckBox("后勤完成后继续自动巡逻")
self.gs_logistics_full_auto.setChecked(False)
self.gs_bag_slot_threshold = QSpinBox()
self.gs_bag_slot_threshold.setRange(0, 100)
self.gs_bag_slot_threshold.setValue(2)
self.gs_bag_slot_threshold.setSuffix("")
self.gs_durability_threshold = QSpinBox()
self.gs_durability_threshold.setRange(1, 100)
self.gs_durability_threshold.setValue(20)
self.gs_durability_threshold.setSuffix(" %")
self.gs_hearthstone_wait = QDoubleSpinBox()
self.gs_hearthstone_wait.setRange(1.0, 60.0)
self.gs_hearthstone_wait.setSingleStep(0.5)
self.gs_hearthstone_wait.setValue(12.0)
self.gs_hearthstone_wait.setSuffix("")
self.gs_repair_target_key = QLineEdit()
self.gs_repair_target_key.setPlaceholderText("如 8")
self.gs_repair_target_key.setMaxLength(16)
self.gs_repair_target_key.setText("8")
self.gs_repair_interact_key = QLineEdit()
self.gs_repair_interact_key.setPlaceholderText("如 4")
self.gs_repair_interact_key.setMaxLength(16)
self.gs_repair_interact_key.setText("4")
self.gs_enable_mount = QCheckBox("启用上马")
self.gs_enable_mount.setChecked(True)
self.gs_mount_key = QLineEdit()
@@ -1166,8 +1217,6 @@ class WoWMultiKeyGUI(QMainWindow):
self.gs_resurrect_key.setText("0")
self.gs_enable_mouse_loot = QCheckBox("启用扫雷拾取")
self.gs_enable_mouse_loot.setChecked(True)
self.gs_use_hardware_input = QCheckBox("启用硬件控制(关闭则用 pydirectinput")
self.gs_use_hardware_input.setChecked(True)
# 网格填充
game_grid.addWidget(QLabel("剥皮等待时间:"), 0, 0)
@@ -1192,31 +1241,42 @@ class WoWMultiKeyGUI(QMainWindow):
game_grid.addWidget(QLabel("炉石按键:"), 4, 0)
game_grid.addWidget(self.gs_hearthstone_key, 4, 1)
game_grid.addWidget(QLabel("释放灵魂按键:"), 4, 2)
game_grid.addWidget(self.gs_release_spirit_key, 4, 3)
game_grid.addWidget(QLabel("炉石等待:"), 4, 2)
game_grid.addWidget(self.gs_hearthstone_wait, 4, 3)
game_grid.addWidget(QLabel("需转身按键:"), 5, 0)
game_grid.addWidget(self.turn_error_key_edit, 5, 1)
game_grid.addWidget(QLabel("需转身按住时长:"), 6, 0)
game_grid.addWidget(self.turn_error_hold_spin, 6, 1)
game_grid.addWidget(QLabel("释放灵魂按键:"), 5, 0)
game_grid.addWidget(self.gs_release_spirit_key, 5, 1)
game_grid.addWidget(QLabel("复活按键:"), 5, 2)
game_grid.addWidget(self.gs_resurrect_key, 5, 3)
game_grid.addWidget(QLabel("需转身按键:"), 6, 0)
game_grid.addWidget(self.turn_error_key_edit, 6, 1)
game_grid.addWidget(QLabel("需转身按住时长:"), 6, 2)
game_grid.addWidget(self.turn_error_hold_spin, 6, 3)
game_grid.addWidget(QLabel("距离远暂停技能时长:"), 7, 0)
game_grid.addWidget(self.distance_interact_pause_spin, 7, 1)
game_grid.addWidget(self.gs_use_hardware_input, 8, 0, 1, 2)
game_grid.addWidget(QLabel("复活按键:"), 6, 2)
game_grid.addWidget(self.gs_resurrect_key, 6, 3)
game_grid.addWidget(self.gs_bag_full_hearthstone, 9, 0, 1, 2)
game_grid.addWidget(self.gs_enable_bag_full_mail, 9, 2, 1, 2)
game_grid.addWidget(QLabel("邮箱交互键:"), 10, 0)
game_grid.addWidget(self.gs_mailbox_interact_key, 10, 1)
game_grid.addWidget(QLabel("收信人按键:"), 10, 2)
game_grid.addWidget(self.gs_mail_recipient_key, 10, 3)
game_grid.addWidget(QLabel("邮寄宏按键:"), 11, 0)
game_grid.addWidget(self.gs_mail_send_key, 11, 1)
game_grid.addWidget(QLabel("邮箱打开等待:"), 11, 2)
game_grid.addWidget(self.gs_mailbox_open_wait, 11, 3)
game_grid.addWidget(QLabel("邮寄后等待:"), 12, 0)
game_grid.addWidget(self.gs_mail_send_wait, 12, 1)
game_grid.addWidget(self.gs_bag_full_hearthstone, 8, 0, 1, 2)
game_grid.addWidget(self.gs_enable_bag_full_mail, 8, 2, 1, 2)
game_grid.addWidget(QLabel("邮箱交互键:"), 9, 0)
game_grid.addWidget(self.gs_mailbox_interact_key, 9, 1)
game_grid.addWidget(QLabel("收信人按键:"), 9, 2)
game_grid.addWidget(self.gs_mail_recipient_key, 9, 3)
game_grid.addWidget(QLabel("邮寄宏按键:"), 10, 0)
game_grid.addWidget(self.gs_mail_send_key, 10, 1)
game_grid.addWidget(QLabel("邮箱打开等待:"), 10, 2)
game_grid.addWidget(self.gs_mailbox_open_wait, 10, 3)
game_grid.addWidget(QLabel("邮寄后等待:"), 11, 0)
game_grid.addWidget(self.gs_mail_send_wait, 11, 1)
game_grid.addWidget(self.gs_logistics_full_auto, 11, 2, 1, 2)
game_grid.addWidget(QLabel("包格触发阈值:"), 12, 0)
game_grid.addWidget(self.gs_bag_slot_threshold, 12, 1)
game_grid.addWidget(QLabel("耐久触发阈值:"), 12, 2)
game_grid.addWidget(self.gs_durability_threshold, 12, 3)
game_grid.addWidget(QLabel("修理目标键:"), 13, 0)
game_grid.addWidget(self.gs_repair_target_key, 13, 1)
game_grid.addWidget(QLabel("修理交互键:"), 13, 2)
game_grid.addWidget(self.gs_repair_interact_key, 13, 3)
params_layout.addWidget(game_group)
@@ -1263,6 +1323,15 @@ class WoWMultiKeyGUI(QMainWindow):
self.gs_mail_send_key.setText(str(cfg.get('mail_send_key', 'f8') or 'f8'))
self.gs_mailbox_open_wait.setValue(float(cfg.get('mailbox_open_wait_sec', 2.0)))
self.gs_mail_send_wait.setValue(float(cfg.get('mail_send_wait_sec', 60.0)))
self.gs_logistics_full_auto.setChecked(bool(cfg.get('logistics_full_auto', False)))
self.gs_bag_slot_threshold.setValue(int(cfg.get('bag_slot_threshold', 2)))
durability_threshold = float(cfg.get('durability_threshold', 0.2))
if durability_threshold <= 1.0:
durability_threshold *= 100.0
self.gs_durability_threshold.setValue(int(round(durability_threshold)))
self.gs_hearthstone_wait.setValue(float(cfg.get('hearthstone_wait_sec', 12.0)))
self.gs_repair_target_key.setText(str(cfg.get('repair_target_key', '8') or '8'))
self.gs_repair_interact_key.setText(str(cfg.get('repair_interact_key', '4') or '4'))
self.gs_mount_retry.setValue(float(cfg.get('mount_retry_after_sec', 2.0)))
self.gs_release_spirit_key.setText(str(cfg.get('release_spirit_key', '9') or '9'))
self.gs_resurrect_key.setText(str(cfg.get('resurrect_key', '0') or '0'))
@@ -1279,7 +1348,15 @@ class WoWMultiKeyGUI(QMainWindow):
self.turn_error_hold_spin.setValue(float(bot_cfg.get('turn_error_hold_sec', 0.8)))
self.distance_interact_pause_spin.setValue(float(bot_cfg.get('distance_interact_pause_sec', 1.0)))
self.gs_enable_mouse_loot.setChecked(bool(bot_cfg.get('enable_mouse_loot', True)))
self.gs_use_hardware_input.setChecked(bool(bot_cfg.get('use_hardware_input', True)))
has_box_flags = ('use_tianya_box' in bot_cfg) or ('use_ghost_box' in bot_cfg)
if has_box_flags:
use_tianya_box = bool(bot_cfg.get('use_tianya_box', False))
use_ghost_box = bool(bot_cfg.get('use_ghost_box', False))
else:
use_tianya_box = bool(bot_cfg.get('use_hardware_input', True))
use_ghost_box = False
self.gs_use_tianya_box.setChecked(use_tianya_box)
self.gs_use_ghost_box.setChecked(use_ghost_box)
except Exception:
self.skinning_wait_spin.setValue(1.5)
self.food_key_edit.setText('f1')
@@ -1289,7 +1366,8 @@ class WoWMultiKeyGUI(QMainWindow):
self.turn_error_hold_spin.setValue(0.8)
self.distance_interact_pause_spin.setValue(1.0)
self.gs_enable_mouse_loot.setChecked(True)
self.gs_use_hardware_input.setChecked(True)
self.gs_use_tianya_box.setChecked(True)
self.gs_use_ghost_box.setChecked(False)
def _save_params_config(self):
"""保存「参数配置」界面到 game_state_config.json多分辨率并写入 wow_multikey_qt.jsonbot 参数)"""
@@ -1314,6 +1392,12 @@ class WoWMultiKeyGUI(QMainWindow):
cfg['mail_send_key'] = self.gs_mail_send_key.text().strip() or 'f8'
cfg['mailbox_open_wait_sec'] = float(self.gs_mailbox_open_wait.value())
cfg['mail_send_wait_sec'] = float(self.gs_mail_send_wait.value())
cfg['logistics_full_auto'] = self.gs_logistics_full_auto.isChecked()
cfg['bag_slot_threshold'] = int(self.gs_bag_slot_threshold.value())
cfg['durability_threshold'] = float(self.gs_durability_threshold.value()) / 100.0
cfg['hearthstone_wait_sec'] = float(self.gs_hearthstone_wait.value())
cfg['repair_target_key'] = self.gs_repair_target_key.text().strip() or '8'
cfg['repair_interact_key'] = self.gs_repair_interact_key.text().strip() or '4'
cfg['release_spirit_key'] = (self.gs_release_spirit_key.text().strip() or '9')
cfg['resurrect_key'] = (self.gs_resurrect_key.text().strip() or '0')
path = save_layout_config(cfg)
@@ -1328,10 +1412,14 @@ class WoWMultiKeyGUI(QMainWindow):
self.config['bot']['turn_error_hold_sec'] = float(self.turn_error_hold_spin.value())
self.config['bot']['distance_interact_pause_sec'] = float(self.distance_interact_pause_spin.value())
self.config['bot']['enable_mouse_loot'] = self.gs_enable_mouse_loot.isChecked()
self.config['bot']['use_hardware_input'] = self.gs_use_hardware_input.isChecked()
use_tianya_box = self.gs_use_tianya_box.isChecked()
use_ghost_box = self.gs_use_ghost_box.isChecked()
self.config['bot']['use_tianya_box'] = use_tianya_box
self.config['bot']['use_ghost_box'] = use_ghost_box
self.config['bot']['use_hardware_input'] = use_tianya_box or use_ghost_box
self._save_main_config()
from hardware_control import hw_ctrl
hw_ctrl.configure(use_hardware_input=self.gs_use_hardware_input.isChecked())
hw_ctrl.configure(use_tianya_box=use_tianya_box, use_ghost_box=use_ghost_box)
self.log(f"✅ 参数配置已保存至 {path},并更新 bot 参数")
QMessageBox.information(self, "保存成功", f"参数配置已保存至:\n{path}\n\nBot 参数已写入:\n{self.config_path}")
@@ -1824,7 +1912,8 @@ class WoWMultiKeyGUI(QMainWindow):
resurrect_key = '0'
enable_mouse_loot = self.gs_enable_mouse_loot.isChecked()
use_hardware_input = self.gs_use_hardware_input.isChecked()
use_tianya_box = self.gs_use_tianya_box.isChecked()
use_ghost_box = self.gs_use_ghost_box.isChecked()
self.game_worker = GameLoopWorker(
mode, waypoints_path=waypoints_path, prepare_route_path=prepare_route_path, vendor_path=vendor_path,
@@ -1847,7 +1936,8 @@ class WoWMultiKeyGUI(QMainWindow):
release_spirit_key=release_spirit_key,
resurrect_key=resurrect_key,
enable_mouse_loot=enable_mouse_loot,
use_hardware_input=use_hardware_input,
use_tianya_box=use_tianya_box,
use_ghost_box=use_ghost_box,
turn_error_key=turn_error_key,
turn_error_hold_sec=turn_error_hold_sec,
distance_interact_pause_sec=distance_interact_pause_sec,
@@ -1880,7 +1970,13 @@ class WoWMultiKeyGUI(QMainWindow):
min_dist = float(self.record_min_distance_spin.value())
except Exception:
min_dist = 0.5
self.game_worker = GameLoopWorker(mode='record', record_filename=name, record_min_distance=min_dist)
self.game_worker = GameLoopWorker(
mode='record',
record_filename=name,
record_min_distance=min_dist,
use_tianya_box=self.gs_use_tianya_box.isChecked(),
use_ghost_box=self.gs_use_ghost_box.isChecked(),
)
self.game_worker.state_signal.connect(self.record_state_label.setText)
self.game_worker.log_signal.connect(self.log)
self.game_worker.finished.connect(self._on_record_finished)

View File

@@ -11,6 +11,12 @@
"food_key": "f1",
"eat_hp_threshold": 30,
"eat_max_wait_sec": 30.0,
"enable_mouse_loot": true
"enable_mouse_loot": true,
"use_tianya_box": true,
"use_ghost_box": false,
"use_hardware_input": true,
"turn_error_key": "4",
"turn_error_hold_sec": 0.5,
"distance_interact_pause_sec": 1.0
}
}