From 8ad2a3dd35ebefdc3b2ac6c967a2cc77885cec98 Mon Sep 17 00:00:00 2001 From: pleb Date: Sun, 28 Jun 2026 14:35:10 -0700 Subject: [PATCH] Add BSIPA bootstrap support --- src/plugin_helper/bootstrap.py | 255 +++++++++++++++++++++++++++++++++ src/plugin_helper/bsipa.py | 47 ++++++ src/plugin_helper/config.py | 13 +- src/plugin_helper/instances.py | 46 +++++- src/plugin_helper/planner.py | 10 ++ src/plugin_helper/scanner.py | 31 ++++ src/plugin_helper/state.py | 14 ++ 7 files changed, 412 insertions(+), 4 deletions(-) create mode 100644 src/plugin_helper/bootstrap.py create mode 100644 src/plugin_helper/bsipa.py diff --git a/src/plugin_helper/bootstrap.py b/src/plugin_helper/bootstrap.py new file mode 100644 index 0000000..52a709b --- /dev/null +++ b/src/plugin_helper/bootstrap.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +import os +import signal +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable +from urllib.request import Request, urlopen + +from .bsipa import BSIPA_PLUGIN_ID, check_bsipa_health +from .fsutil import sha256_file +from .installer import apply_plan +from .models import LockedPlugin, Lockfile, Registry +from .planner import create_plan +from .scanner import scan_bootstrap_files +from .state import bootstrap_state_path, plugin_downloads_dir, save_bootstrap_state + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def _default_proton() -> Path: + return Path.home() / ".local/share/Steam/steamapps/common/Proton - Experimental/proton" + + +def _proton_env(instance_path: Path) -> dict[str, str]: + env = os.environ.copy() + env.update( + { + "SteamAppId": "620980", + "SteamOverlayGameId": "620980", + "SteamGameId": "620980", + "WINEDLLOVERRIDES": "winhttp=n,b", + "STEAM_COMPAT_DATA_PATH": str(Path.home() / ".local/share/BSManager/SharedContent/compatdata"), + "STEAM_COMPAT_INSTALL_PATH": str(instance_path), + "STEAM_COMPAT_CLIENT_INSTALL_PATH": str(Path.home() / ".local/share/Steam"), + "STEAM_COMPAT_APP_ID": "620980", + "SteamEnv": "1", + } + ) + return env + + +def _files_delta(before: list[dict[str, Any]], after: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: + before_by_path = {item["path"]: item for item in before} + after_by_path = {item["path"]: item for item in after} + created = [after_by_path[path] for path in sorted(after_by_path.keys() - before_by_path.keys())] + removed = [before_by_path[path] for path in sorted(before_by_path.keys() - after_by_path.keys())] + mutated = [ + { + "path": path, + "before": before_by_path[path], + "after": after_by_path[path], + } + for path in sorted(before_by_path.keys() & after_by_path.keys()) + if before_by_path[path].get("sha256") != after_by_path[path].get("sha256") + ] + return {"created": created, "mutated": mutated, "removed": removed} + + +def _github_headers() -> dict[str, str]: + headers = { + "Accept": "application/vnd.github+json", + "User-Agent": "plugin-helper", + } + token = os.environ.get("GITHUB_TOKEN") + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + +def _fetch_github_release_asset(locked: LockedPlugin, destination: Path) -> dict[str, Any]: + if not locked.repo or not locked.tag or not locked.asset: + raise ValueError("locked BSIPA entry needs repo, tag, and asset to fetch its archive") + release_url = f"https://api.github.com/repos/{locked.repo}/releases/tags/{locked.tag}" + request = Request(release_url, headers=_github_headers()) + with urlopen(request, timeout=20) as response: + release = response.read().decode("utf-8") + + import json + + data = json.loads(release) + assets = data.get("assets", []) + selected = next((asset for asset in assets if asset.get("name") == locked.asset), None) + if not selected: + raise FileNotFoundError(f"{locked.id}: release {locked.tag} does not contain asset {locked.asset}") + download_url = selected.get("browser_download_url") + if not download_url: + raise ValueError(f"{locked.id}: GitHub asset has no browser_download_url") + + destination.parent.mkdir(parents=True, exist_ok=True) + asset_request = Request(download_url, headers={"User-Agent": "plugin-helper"}) + with urlopen(asset_request, timeout=60) as response: + destination.write_bytes(response.read()) + actual_sha = sha256_file(destination) + if locked.sha256 and actual_sha != locked.sha256: + destination.unlink(missing_ok=True) + raise ValueError(f"{locked.id}: fetched asset sha256 mismatch") + return { + "repo": locked.repo, + "tag": locked.tag, + "asset": locked.asset, + "url": download_url, + "path": str(destination), + "sha256": actual_sha, + } + + +def fetch_locked_bsipa_archive(lockfile: Lockfile, state_root: Path) -> dict[str, Any]: + locked = next((plugin for plugin in lockfile.plugins if plugin.id == BSIPA_PLUGIN_ID), None) + if not locked: + raise ValueError("lockfile does not include a bsipa entry") + if not locked.asset: + raise ValueError("locked BSIPA entry has no asset") + destination = plugin_downloads_dir(state_root, lockfile.instance, BSIPA_PLUGIN_ID) / locked.asset + if destination.is_file(): + actual_sha = sha256_file(destination) + if locked.sha256 and actual_sha != locked.sha256: + raise ValueError(f"{locked.id}: existing asset sha256 mismatch: {destination}") + return { + "asset": locked.asset, + "path": str(destination), + "sha256": actual_sha, + "cached": True, + } + result = _fetch_github_release_asset(locked, destination) + result["cached"] = False + return result + + +def _run_ipa( + *, + command: list[str], + instance_path: Path, + timeout_seconds: int, +) -> dict[str, Any]: + process = subprocess.Popen( + command, + cwd=instance_path, + env=_proton_env(instance_path), + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True, + ) + try: + stdout, stderr = process.communicate(timeout=timeout_seconds) + timed_out = False + except subprocess.TimeoutExpired: + timed_out = True + os.killpg(process.pid, signal.SIGTERM) + try: + stdout, stderr = process.communicate(timeout=5) + except subprocess.TimeoutExpired: + os.killpg(process.pid, signal.SIGKILL) + stdout, stderr = process.communicate() + return { + "returncode": process.returncode, + "stdout": stdout, + "stderr": stderr, + "timedOut": timed_out, + "timeoutSeconds": timeout_seconds, + } + + +def run_bootstrap( + *, + instance: str, + instance_path: Path, + beat_saber_version: str, + registry: Registry, + lockfile: Lockfile, + state_root: Path, + repo_root: Path, + proton: Path | None = None, + progress: Callable[[str], None] | None = None, + ipa_timeout_seconds: int = 120, +) -> dict[str, Any]: + tell = progress or (lambda _message: None) + tell("Fetching locked BSIPA archive") + fetched = fetch_locked_bsipa_archive(lockfile, state_root) + if fetched.get("cached"): + tell(f"Using cached BSIPA archive: {fetched['path']}") + else: + tell(f"Downloaded BSIPA archive: {fetched['path']}") + + tell("Scanning bootstrap files before install") + before = scan_bootstrap_files(instance_path) + tell("Creating BSIPA bootstrap plan") + plan, plan_path = create_plan( + instance=instance, + instance_path=instance_path, + beat_saber_version=beat_saber_version, + registry=registry, + lockfile=lockfile, + state_root=state_root, + repo_root=repo_root, + selected={BSIPA_PLUGIN_ID}, + require_bootstrap=False, + ) + if not plan["changes"]: + raise ValueError("BSIPA bootstrap plan has no changes") + + tell(f"Applying BSIPA bootstrap plan with {len(plan['changes'])} file changes") + apply_result = apply_plan(plan, state_root) + + ipa = instance_path / "IPA.exe" + if not ipa.is_file(): + raise FileNotFoundError(f"BSIPA archive did not install IPA.exe: {ipa}") + + proton_path = proton or _default_proton() + if not proton_path.is_file(): + raise FileNotFoundError(f"Proton executable not found: {proton_path}") + + command = [str(proton_path), "run", str(ipa), "-n"] + tell(f"Running IPA.exe -n through Proton; timeout {ipa_timeout_seconds}s") + completed = _run_ipa( + command=command, + instance_path=instance_path, + timeout_seconds=ipa_timeout_seconds, + ) + tell("Scanning bootstrap files after IPA.exe -n") + after = scan_bootstrap_files(instance_path) + delta = _files_delta(before, after) + state = { + "schemaVersion": 1, + "instance": instance, + "beatSaberVersion": beat_saber_version, + "bootstrappedAt": _now_iso(), + "plugin": BSIPA_PLUGIN_ID, + "archive": fetched, + "planPath": str(plan_path), + "applied": apply_result["applied"], + "proton": str(proton_path), + "command": command, + "ipaExitCode": completed["returncode"], + "ipaTimedOut": completed["timedOut"], + "ipaTimeoutSeconds": completed["timeoutSeconds"], + "ipaStdout": completed["stdout"][-20000:], + "ipaStderr": completed["stderr"][-20000:], + "files": after, + "delta": delta, + "health": {}, + } + save_bootstrap_state(state_root, instance, state) + state["health"] = check_bsipa_health(instance_path, state_root, instance) + save_bootstrap_state(state_root, instance, state) + state["statePath"] = str(bootstrap_state_path(state_root, instance)) + if completed["timedOut"]: + raise TimeoutError(f"IPA.exe -n timed out after {ipa_timeout_seconds}s; state written to {state['statePath']}") + if completed["returncode"] != 0: + raise RuntimeError(f"IPA.exe -n failed with exit code {completed['returncode']}; state written to {state['statePath']}") + return state diff --git a/src/plugin_helper/bsipa.py b/src/plugin_helper/bsipa.py new file mode 100644 index 0000000..385c12e --- /dev/null +++ b/src/plugin_helper/bsipa.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from .fsutil import sha256_file +from .state import bootstrap_state_path, load_bootstrap_state + + +BSIPA_PLUGIN_ID = "bsipa" + + +def latest_log_path(instance_path: Path) -> Path: + return instance_path / "Logs" / "_latest.log" + + +def check_bsipa_health(instance_path: Path, state_root: Path, instance: str) -> dict[str, Any]: + state = load_bootstrap_state(state_root, instance) + messages: list[str] = [] + + required = ["IPA.exe", "winhttp.dll"] + for rel in required: + if not (instance_path / rel).is_file(): + messages.append(f"missing {rel}") + for rel in ("IPA", "Libs"): + if not (instance_path / rel).is_dir(): + messages.append(f"missing {rel}/") + + log_path = latest_log_path(instance_path) + if not log_path.is_file(): + messages.append("missing Logs/_latest.log") + else: + text = log_path.read_text(encoding="utf-8", errors="replace") + if "Beat Saber IPA (BSIPA):" not in text and "Beat Saber IPA" not in text: + messages.append("Logs/_latest.log does not show BSIPA startup") + + if not state: + messages.append(f"missing bootstrap state: {bootstrap_state_path(state_root, instance)}") + + return { + "ok": not messages, + "messages": messages, + "statePath": str(bootstrap_state_path(state_root, instance)), + "logPath": str(log_path), + "logSha256": sha256_file(log_path) if log_path.is_file() else None, + "bootstrapRecordedAt": state.get("updatedAt"), + } diff --git a/src/plugin_helper/config.py b/src/plugin_helper/config.py index 894c06f..9b6f5e2 100644 --- a/src/plugin_helper/config.py +++ b/src/plugin_helper/config.py @@ -4,12 +4,21 @@ import os from pathlib import Path -DEFAULT_INSTANCES_ROOT = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances") +WINDOWS_INSTANCES_ROOT = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances") +LOCAL_INSTANCES_ROOT = Path.home() / ".local/share/BSManager/BSInstances" +DEFAULT_INSTANCES_ROOTS = (WINDOWS_INSTANCES_ROOT, LOCAL_INSTANCES_ROOT) +DEFAULT_INSTANCES_ROOT = WINDOWS_INSTANCES_ROOT def instances_root(value: str | None = None) -> Path: + return instances_roots(value)[0] + + +def instances_roots(value: str | None = None) -> list[Path]: raw = value or os.environ.get("PLUGIN_HELPER_INSTANCES_ROOT") - return Path(raw).expanduser() if raw else DEFAULT_INSTANCES_ROOT + if raw: + return [Path(item).expanduser() for item in raw.split(os.pathsep) if item] + return list(DEFAULT_INSTANCES_ROOTS) def state_root(value: str | None = None) -> Path: diff --git a/src/plugin_helper/instances.py b/src/plugin_helper/instances.py index 7e87b88..4630108 100644 --- a/src/plugin_helper/instances.py +++ b/src/plugin_helper/instances.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path +from collections.abc import Sequence @dataclass(frozen=True) @@ -13,6 +14,9 @@ class Instance: has_userdata: bool +RootInput = Path | Sequence[Path] + + def looks_like_instance(path: Path) -> bool: return ( (path / "Beat Saber_Data").is_dir() @@ -22,7 +26,13 @@ def looks_like_instance(path: Path) -> bool: ) -def list_instances(root: Path) -> list[Instance]: +def _root_list(root: RootInput) -> list[Path]: + if isinstance(root, Path): + return [root] + return list(root) + + +def _list_instances_one(root: Path) -> list[Instance]: if not root.exists(): return [] instances: list[Instance] = [] @@ -41,7 +51,20 @@ def list_instances(root: Path) -> list[Instance]: return instances -def get_instance(root: Path, name: str) -> Instance: +def list_instances(root: RootInput) -> list[Instance]: + instances: list[Instance] = [] + seen_paths: set[Path] = set() + for item in _root_list(root): + for instance in _list_instances_one(item): + resolved = instance.path.resolve(strict=False) + if resolved in seen_paths: + continue + seen_paths.add(resolved) + instances.append(instance) + return sorted(instances, key=lambda item: (item.name, str(item.path))) + + +def _get_instance_one(root: Path, name: str) -> Instance: path = root / name if not path.is_dir() or not looks_like_instance(path): raise FileNotFoundError(f"Beat Saber instance not found: {path}") @@ -52,3 +75,22 @@ def get_instance(root: Path, name: str) -> Instance: has_libs=(path / "Libs").is_dir(), has_userdata=(path / "UserData").is_dir(), ) + + +def get_instance(root: RootInput, name: str) -> Instance: + if isinstance(root, Path): + return _get_instance_one(root, name) + + matches: list[Instance] = [] + for item in _root_list(root): + try: + matches.append(_get_instance_one(item, name)) + except FileNotFoundError: + continue + if not matches: + searched = ", ".join(str(item) for item in _root_list(root)) + raise FileNotFoundError(f"Beat Saber instance not found: {name} under {searched}") + if len(matches) > 1: + paths = ", ".join(str(item.path) for item in matches) + raise ValueError(f"Beat Saber instance name is ambiguous: {name}; matches: {paths}") + return matches[0] diff --git a/src/plugin_helper/planner.py b/src/plugin_helper/planner.py index 1a96870..04620d7 100644 --- a/src/plugin_helper/planner.py +++ b/src/plugin_helper/planner.py @@ -9,6 +9,7 @@ from zipfile import ZipFile from .fsutil import ensure_relative, sha256_bytes, sha256_file from .models import Lockfile, Registry, VALID_STRATEGIES +from .bsipa import BSIPA_PLUGIN_ID, check_bsipa_health from .state import downloads_dir, plans_dir, plugin_downloads_dir @@ -74,11 +75,20 @@ def create_plan( state_root: Path, repo_root: Path, selected: set[str] | None = None, + require_bootstrap: bool = True, ) -> tuple[dict[str, Any], Path]: selected_ids = selected or {plugin.id for plugin in lockfile.plugins} changes: list[dict[str, Any]] = [] warnings: list[str] = [] + has_locked_bsipa = any(plugin.id == BSIPA_PLUGIN_ID for plugin in lockfile.plugins) + planning_ordinary_plugins = any(plugin_id != BSIPA_PLUGIN_ID for plugin_id in selected_ids) + if require_bootstrap and has_locked_bsipa and planning_ordinary_plugins: + health = check_bsipa_health(instance_path, state_root, instance) + if not health["ok"]: + joined = "; ".join(health["messages"]) + raise ValueError(f"BSIPA bootstrap is not healthy; run bootstrap first: {joined}") + for locked in lockfile.plugins: if locked.id not in selected_ids: continue diff --git a/src/plugin_helper/scanner.py b/src/plugin_helper/scanner.py index d0789a0..62c5251 100644 --- a/src/plugin_helper/scanner.py +++ b/src/plugin_helper/scanner.py @@ -7,6 +7,8 @@ from .fsutil import sha256_file SCAN_DIRS = ("Plugins", "Libs", "IPA/Pending") +BOOTSTRAP_DIRS = ("Libs", "IPA") +BOOTSTRAP_ROOT_GLOBS = ("winhttp.dll", "IPA.exe*") def scan_instance(instance_path: Path, include_hashes: bool = False) -> dict[str, Any]: @@ -31,3 +33,32 @@ def scan_instance(instance_path: Path, include_hashes: bool = False) -> dict[str "pending": sum(1 for item in files if item["path"].startswith("IPA/Pending/")), }, } + + +def scan_bootstrap_files(instance_path: Path) -> list[dict[str, Any]]: + files_by_path: dict[str, dict[str, Any]] = {} + + for pattern in BOOTSTRAP_ROOT_GLOBS: + for path in sorted(instance_path.glob(pattern)): + if not path.is_file(): + continue + rel = path.relative_to(instance_path).as_posix() + files_by_path[rel] = { + "path": rel, + "size": path.stat().st_size, + "sha256": sha256_file(path), + } + + for dirname in BOOTSTRAP_DIRS: + root = instance_path / dirname + if not root.exists(): + continue + for path in sorted(item for item in root.rglob("*") if item.is_file()): + rel = path.relative_to(instance_path).as_posix() + files_by_path[rel] = { + "path": rel, + "size": path.stat().st_size, + "sha256": sha256_file(path), + } + + return [files_by_path[path] for path in sorted(files_by_path)] diff --git a/src/plugin_helper/state.py b/src/plugin_helper/state.py index 2da1b4d..b08bb01 100644 --- a/src/plugin_helper/state.py +++ b/src/plugin_helper/state.py @@ -15,6 +15,10 @@ def installed_state_path(state_root: Path, instance: str) -> Path: return instance_state_dir(state_root, instance) / "installed.json" +def bootstrap_state_path(state_root: Path, instance: str) -> Path: + return instance_state_dir(state_root, instance) / "bootstrap.json" + + def load_installed_state(state_root: Path, instance: str) -> dict[str, Any]: return read_json( installed_state_path(state_root, instance), @@ -22,6 +26,16 @@ def load_installed_state(state_root: Path, instance: str) -> dict[str, Any]: ) +def load_bootstrap_state(state_root: Path, instance: str) -> dict[str, Any]: + return read_json(bootstrap_state_path(state_root, instance), {}) + + +def save_bootstrap_state(state_root: Path, instance: str, state: dict[str, Any]) -> None: + state.setdefault("instance", instance) + state["updatedAt"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + atomic_write_json(bootstrap_state_path(state_root, instance), state) + + def save_installed_state(state_root: Path, instance: str, state: dict[str, Any]) -> None: state.setdefault("instance", instance) state["updatedAt"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")