Files
plugin-helper/src/plugin_helper/planner.py
T
2026-06-28 14:36:14 -07:00

162 lines
6.2 KiB
Python

from __future__ import annotations
import fnmatch
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from zipfile import ZipFile
from .fsutil import ensure_relative, sha256_bytes, sha256_file
from .models import Lockfile, Registry, VALID_STRATEGIES
from .bsipa import BSIPA_PLUGIN_ID, check_bsipa_health
from .state import downloads_dir, plans_dir, plugin_downloads_dir
ALLOWED_BSIPA_TOP_LEVEL = {"IPA", "Libs", "Plugins"}
def _now_slug() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path, plugin_id: str | None = None) -> Path | None:
candidates = [
Path(asset).expanduser(),
repo_root / "assets" / asset,
repo_root / "locks" / "assets" / asset,
]
if plugin_id:
candidates.insert(1, plugin_downloads_dir(state_root, instance, plugin_id) / asset)
candidates.insert(2 if plugin_id else 1, downloads_dir(state_root, instance) / asset)
for candidate in candidates:
if candidate.exists() and candidate.is_file():
return candidate
return None
def _zip_members(asset_path: Path) -> list[tuple[str, int, str]]:
result: list[tuple[str, int, str]] = []
with ZipFile(asset_path) as archive:
for info in archive.infolist():
if info.is_dir():
continue
rel = ensure_relative(info.filename).as_posix()
data = archive.read(info)
result.append((rel, len(data), sha256_bytes(data)))
return result
def _target_for_member(strategy: str, member: str) -> str:
rel = ensure_relative(member).as_posix()
if strategy == "zip-to-pending":
return f"IPA/Pending/{rel}"
if strategy == "root-zip":
return rel
if strategy == "bsipa-zip":
top = rel.split("/", 1)[0]
if top not in ALLOWED_BSIPA_TOP_LEVEL:
raise ValueError(f"bsipa-zip member has unsupported top-level path: {rel}")
return rel
raise ValueError(f"unsupported zip strategy: {strategy}")
def _asset_matches_patterns(name: str, patterns: tuple[str, ...]) -> bool:
return not patterns or any(fnmatch.fnmatch(name, pattern) for pattern in patterns)
def create_plan(
*,
instance: str,
instance_path: Path,
beat_saber_version: str,
registry: Registry,
lockfile: Lockfile,
state_root: Path,
repo_root: Path,
selected: set[str] | None = None,
require_bootstrap: bool = True,
) -> tuple[dict[str, Any], Path]:
selected_ids = selected or {plugin.id for plugin in lockfile.plugins}
changes: list[dict[str, Any]] = []
warnings: list[str] = []
has_locked_bsipa = any(plugin.id == BSIPA_PLUGIN_ID for plugin in lockfile.plugins)
planning_ordinary_plugins = any(plugin_id != BSIPA_PLUGIN_ID for plugin_id in selected_ids)
if require_bootstrap and has_locked_bsipa and planning_ordinary_plugins:
health = check_bsipa_health(instance_path, state_root, instance)
if not health["ok"]:
joined = "; ".join(health["messages"])
raise ValueError(f"BSIPA bootstrap is not healthy; run bootstrap first: {joined}")
for locked in lockfile.plugins:
if locked.id not in selected_ids:
continue
registry_plugin = registry.get(locked.id)
strategy = locked.install_strategy or (registry_plugin.install_strategy if registry_plugin else "manual")
if strategy not in VALID_STRATEGIES:
raise ValueError(f"{locked.id}: invalid install strategy: {strategy}")
if strategy == "manual":
raise ValueError(f"{locked.id}: install_strategy is manual; add a concrete registry rule first")
if not locked.asset:
raise ValueError(f"{locked.id}: lock entry has no asset")
if registry_plugin and not _asset_matches_patterns(Path(locked.asset).name, registry_plugin.asset_patterns):
warnings.append(f"{locked.id}: asset does not match registry patterns")
asset_path = _find_asset(locked.asset, state_root, instance, repo_root, locked.id)
if not asset_path:
raise FileNotFoundError(
f"{locked.id}: asset not found: {locked.asset}; put it in "
f"{plugin_downloads_dir(state_root, instance, locked.id)}"
)
asset_sha = sha256_file(asset_path)
if locked.sha256 and locked.sha256 != asset_sha:
raise ValueError(f"{locked.id}: asset sha256 mismatch for {asset_path}")
if strategy == "dll-to-plugins":
if asset_path.suffix.lower() != ".dll":
raise ValueError(f"{locked.id}: dll-to-plugins expects a .dll asset")
changes.append(
{
"plugin": locked.id,
"action": "copy",
"source": str(asset_path),
"sourceSha256": asset_sha,
"target": f"Plugins/{asset_path.name}",
"size": asset_path.stat().st_size,
"sha256": asset_sha,
}
)
continue
if asset_path.suffix.lower() != ".zip":
raise ValueError(f"{locked.id}: {strategy} expects a .zip asset")
for member, size, member_sha in _zip_members(asset_path):
changes.append(
{
"plugin": locked.id,
"action": "extract",
"source": str(asset_path),
"sourceSha256": asset_sha,
"archiveMember": member,
"target": _target_for_member(strategy, member),
"size": size,
"sha256": member_sha,
}
)
plan = {
"schemaVersion": 1,
"createdAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"instance": instance,
"instancePath": str(instance_path),
"beatSaberVersion": beat_saber_version,
"warnings": warnings,
"changes": changes,
}
plan_path = plans_dir(state_root, instance) / f"plan-{_now_slug()}.json"
with plan_path.open("w", encoding="utf-8") as handle:
json.dump(plan, handle, indent=2, sort_keys=True)
handle.write("\n")
return plan, plan_path