114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
|
|
"""
|
|||
|
|
飞行模式逻辑:
|
|||
|
|
- 从坐标 JSON 读取最后一个坐标点作为目的地
|
|||
|
|
- 每帧调用 `CoordinatePatrol.simple_flight_navigate()` 执行:
|
|||
|
|
1) 上马(未上马则先上马,mount_key/mount_hold_sec 从 game_state_config.json 读取)
|
|||
|
|
2) 起飞
|
|||
|
|
3) 飞向目的地坐标
|
|||
|
|
4) 降落
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import logging
|
|||
|
|
|
|||
|
|
from coordinate_patrol import CoordinatePatrol
|
|||
|
|
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _load_last_waypoint_from_json(path):
|
|||
|
|
"""读取与巡逻相同的 JSON 格式:[[x,y], ...],返回最后一个有效点 (x, y)。"""
|
|||
|
|
if not path:
|
|||
|
|
return None
|
|||
|
|
try:
|
|||
|
|
with open(path, "r", encoding="utf-8") as f:
|
|||
|
|
data = json.load(f)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning("读取航线 JSON 失败: %s", e)
|
|||
|
|
return None
|
|||
|
|
if not isinstance(data, list):
|
|||
|
|
return None
|
|||
|
|
last = None
|
|||
|
|
for item in data:
|
|||
|
|
if isinstance(item, (list, tuple)) and len(item) >= 2:
|
|||
|
|
try:
|
|||
|
|
last = (float(item[0]), float(item[1]))
|
|||
|
|
except (TypeError, ValueError):
|
|||
|
|
continue
|
|||
|
|
return last
|
|||
|
|
|
|||
|
|
|
|||
|
|
class FlightModeBot:
|
|||
|
|
"""飞行模式控制器:每次执行逻辑只推动状态前进一步。"""
|
|||
|
|
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
json_path,
|
|||
|
|
log_callback=None,
|
|||
|
|
*,
|
|||
|
|
takeoff_key="space",
|
|||
|
|
takeoff_hold_sec=2.0,
|
|||
|
|
land_key="p",
|
|||
|
|
land_hold_sec=2.0,
|
|||
|
|
):
|
|||
|
|
self._log = log_callback or (lambda _m: None)
|
|||
|
|
self.destination = _load_last_waypoint_from_json(json_path)
|
|||
|
|
self.done = False
|
|||
|
|
|
|||
|
|
# 上马按键与时长来自 game_state_config.json
|
|||
|
|
try:
|
|||
|
|
from game_state import load_layout_config
|
|||
|
|
|
|||
|
|
cfg = load_layout_config()
|
|||
|
|
mount_key = str(cfg.get("mount_key", "x") or "x")
|
|||
|
|
mount_hold_sec = float(cfg.get("mount_hold_sec", 1.6))
|
|||
|
|
mount_retry_after_sec = float(cfg.get("mount_retry_after_sec", 2.0))
|
|||
|
|
except Exception:
|
|||
|
|
mount_key, mount_hold_sec, mount_retry_after_sec = "x", 1.6, 2.0
|
|||
|
|
|
|||
|
|
# 只用于 simple_flight_navigate 内部的 _ensure_mounted 与一些公共参数
|
|||
|
|
self._patrol = CoordinatePatrol(
|
|||
|
|
waypoints=[(0.0, 0.0)] if self.destination else [],
|
|||
|
|
mount_key=mount_key,
|
|||
|
|
mount_hold_sec=mount_hold_sec,
|
|||
|
|
mount_retry_after_sec=mount_retry_after_sec,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.takeoff_key = takeoff_key
|
|||
|
|
self.takeoff_hold_sec = float(takeoff_hold_sec)
|
|||
|
|
self.land_key = land_key
|
|||
|
|
self.land_hold_sec = float(land_hold_sec)
|
|||
|
|
|
|||
|
|
if not self.destination:
|
|||
|
|
self._log("❌ 飞行模式:JSON 中无有效坐标点")
|
|||
|
|
self.done = True
|
|||
|
|
|
|||
|
|
def execute_logic(self, state):
|
|||
|
|
if self.done:
|
|||
|
|
return
|
|||
|
|
if state is None:
|
|||
|
|
return
|
|||
|
|
if not self.destination:
|
|||
|
|
self.done = True
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
arrived = self._patrol.simple_flight_navigate(
|
|||
|
|
state,
|
|||
|
|
self.destination[0],
|
|||
|
|
self.destination[1],
|
|||
|
|
takeoff_key=self.takeoff_key,
|
|||
|
|
takeoff_hold_sec=self.takeoff_hold_sec,
|
|||
|
|
land_key=self.land_key,
|
|||
|
|
land_hold_sec=self.land_hold_sec,
|
|||
|
|
dismount_key=self._patrol.mount_key,
|
|||
|
|
arrival_threshold=self._patrol.arrival_threshold,
|
|||
|
|
mount_wait_sec=self._patrol.mount_hold_sec,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if arrived:
|
|||
|
|
self._patrol.stop_all()
|
|||
|
|
self.done = True
|
|||
|
|
self._log("✅ 飞行模式:已到达目的地并降落完成")
|
|||
|
|
|