Files
plugin-helper/src/plugin_helper/cli.py
T
2026-06-28 14:36:14 -07:00

591 lines
24 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Callable
from .config import instances_roots, repo_root, state_root
from .bootstrap import run_bootstrap
from .bsipa import check_bsipa_health
from .checker import check_lock
from .github import fetch_releases
from .installer import apply_plan, uninstall_plugin
from .instances import get_instance, list_instances
from .models import load_lockfile, load_registry
from .models import Lockfile, Registry
from .planner import create_plan
from .scanner import scan_instance
from .state import load_installed_state
from .updates import check_updates
from .userdata import sync_windows_data_repo
def _json(data: Any) -> None:
print(json.dumps(data, indent=2, sort_keys=True))
def installed_plugins_report(
*,
installed_state: dict[str, Any],
registry: Registry,
lockfile: Lockfile,
) -> dict[str, Any]:
locked_by_id = {plugin.id: plugin for plugin in lockfile.plugins}
plugins: list[dict[str, Any]] = []
for plugin_id, plugin_state in sorted(installed_state.get("plugins", {}).items()):
registry_plugin = registry.get(plugin_id)
locked = locked_by_id.get(plugin_id)
files = plugin_state.get("files", [])
plugins.append(
{
"id": plugin_id,
"name": registry_plugin.name if registry_plugin else plugin_id,
"version": locked.tag if locked and locked.tag else "(not locked)",
"asset": locked.asset if locked and locked.asset else "(unknown)",
"repo": (locked.repo if locked and locked.repo else None)
or (registry_plugin.repo if registry_plugin else None)
or "(unknown)",
"installedAt": plugin_state.get("installedAt", "(unknown)"),
"fileCount": len(files),
"files": files,
}
)
return {
"instance": installed_state.get("instance", lockfile.instance),
"beatSaberVersion": installed_state.get("beatSaberVersion", lockfile.beat_saber_version),
"plugins": plugins,
}
def print_installed_plugins(report: dict[str, Any]) -> None:
plugins = report["plugins"]
print(f"{report['instance']} managed plugins ({len(plugins)})")
if not plugins:
print("No plugins have been installed by plugin-helper yet.")
return
headers = ("Plugin", "Version", "Asset", "Files", "Installed")
rows = [
(
f"{plugin['name']} ({plugin['id']})",
plugin["version"],
plugin["asset"],
str(plugin["fileCount"]),
plugin["installedAt"],
)
for plugin in plugins
]
widths = [
max(len(headers[index]), *(len(row[index]) for row in rows))
for index in range(len(headers))
]
header = " ".join(label.ljust(widths[index]) for index, label in enumerate(headers))
print(header)
print(" ".join("-" * width for width in widths))
for row in rows:
print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row)))
def print_updates(report: dict[str, Any]) -> None:
plugins = report["plugins"]
summary = report["summary"]
print(
f"{report['instance']} updates: "
f"{summary['updates']} available, {summary['current']} current, "
f"{summary['warnings']} warnings, {summary['errors']} errors"
)
if not plugins:
return
headers = ("Plugin", "Current", "Latest", "Asset", "Status")
rows = [
(
f"{plugin['name']} ({plugin['id']})",
plugin.get("currentTag") or "(none)",
plugin.get("latestTag") or "(unknown)",
plugin.get("latestAsset") or plugin.get("currentAsset") or "(unknown)",
plugin["status"],
)
for plugin in plugins
]
widths = [
max(len(headers[index]), *(len(row[index]) for row in rows))
for index in range(len(headers))
]
print(" ".join(label.ljust(widths[index]) for index, label in enumerate(headers)))
print(" ".join("-" * width for width in widths))
for row in rows:
print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row)))
def _add_common(parser: argparse.ArgumentParser, *, suppress_default: bool = False) -> None:
default = argparse.SUPPRESS if suppress_default else None
parser.add_argument("--instances-root", default=default, help="BSManager instances root")
parser.add_argument("--state-dir", default=default, help="plugin-helper state directory")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="plugin-helper")
_add_common(parser)
subcommands = parser.add_subparsers(dest="command", required=True)
subcommands.add_parser(
"instances",
help="List discovered BSManager Beat Saber instances",
parents=[_common_parent()],
)
subcommands.add_parser(
"menu",
help="Open an interactive instance/action menu",
parents=[_common_parent()],
)
scan = subcommands.add_parser(
"scan",
help="Inspect installed Plugins, Libs, and IPA/Pending files",
parents=[_common_parent()],
)
scan.add_argument("--instance", required=True)
scan.add_argument("--hashes", action="store_true", help="Include sha256 hashes")
scan.add_argument("--json", action="store_true", help="Print full JSON scan output")
state = subcommands.add_parser(
"state",
help="Show recorded plugin-helper install state",
parents=[_common_parent()],
)
state.add_argument("--instance", required=True)
installed = subcommands.add_parser(
"installed",
help="List plugins installed by plugin-helper with locked release versions",
parents=[_common_parent()],
)
installed.add_argument("--instance", required=True)
installed.add_argument("--registry", default="registry/plugins.toml")
installed.add_argument("--lockfile")
installed.add_argument("--json", action="store_true", help="Print full JSON output")
check = subcommands.add_parser(
"check",
help="Validate local registry, lockfile, and release asset readiness",
parents=[_common_parent()],
)
check.add_argument("--instance", required=True)
check.add_argument("--registry", default="registry/plugins.toml")
check.add_argument("--lockfile")
check.add_argument("--json", action="store_true", help="Print full JSON check output")
bootstrap = subcommands.add_parser(
"bootstrap",
help="Install locked BSIPA, run IPA.exe -n through Proton, and record bootstrap files",
parents=[_common_parent()],
)
bootstrap.add_argument("--instance", required=True)
bootstrap.add_argument("--registry", default="registry/plugins.toml")
bootstrap.add_argument("--lockfile")
bootstrap.add_argument("--proton", help="Path to Proton executable")
bootstrap.add_argument("--json", action="store_true", help="Print full JSON bootstrap output")
bootstrap_check = subcommands.add_parser(
"bootstrap-check",
help="Verify recorded BSIPA bootstrap state and latest IPA log",
parents=[_common_parent()],
)
bootstrap_check.add_argument("--instance", required=True)
bootstrap_check.add_argument("--json", action="store_true", help="Print full JSON bootstrap health output")
updates = subcommands.add_parser(
"updates",
help="Check GitHub for newer matching releases for locked plugins",
parents=[_common_parent()],
)
updates.add_argument("--instance", required=True)
updates.add_argument("--registry", default="registry/plugins.toml")
updates.add_argument("--lockfile")
updates.add_argument("--plugin", action="append", help="Check only this locked plugin id; repeatable")
updates.add_argument("--include-prerelease", action="store_true", help="Include prerelease GitHub releases")
updates.add_argument("--json", action="store_true", help="Print full JSON update output")
plan = subcommands.add_parser(
"plan",
help="Create a dry-run install plan from registry and lockfile",
parents=[_common_parent()],
)
plan.add_argument("--instance", required=True)
plan.add_argument("--registry", default="registry/plugins.toml")
plan.add_argument("--lockfile")
plan.add_argument("--plugin", "--update", action="append", help="Plan only this locked plugin id; repeatable")
apply = subcommands.add_parser(
"apply",
help="Apply a previously generated plan",
parents=[_common_parent()],
)
apply.add_argument("plan")
uninstall = subcommands.add_parser(
"uninstall",
help="Remove files recorded for a managed plugin",
parents=[_common_parent()],
)
uninstall.add_argument("--instance", required=True)
uninstall.add_argument("plugin")
uninstall.add_argument("--force", action="store_true", help="Delete even when current file hashes differ")
backup = subcommands.add_parser(
"backup-userdata",
help="Copy UserData and Windows AppData into the adjacent backups repo",
parents=[_common_parent()],
)
backup.add_argument("--instance", required=True)
backup.add_argument("--backup-root", default="../backups/beat-saber", help="Backup directory")
backup.add_argument("--appdata-path", help="Override Beat Saber Windows AppData path")
backup.add_argument("--no-appdata", action="store_true", help="Only copy UserData")
return parser
def _common_parent() -> argparse.ArgumentParser:
parent = argparse.ArgumentParser(add_help=False)
_add_common(parent, suppress_default=True)
return parent
def _ask_choice(
*,
title: str,
choices: list[tuple[str, str] | tuple[str, str, str]],
input_func: Callable[[str], str] | None = None,
) -> str | None:
ask = input_func or input
print()
print(title)
for index, choice in enumerate(choices, start=1):
label = choice[1]
print(f" {index}. {label}")
if len(choice) > 2:
print(f" {choice[2]}")
print(" q. Quit")
while True:
answer = ask("> ").strip().lower()
if answer in {"q", "quit", "exit"}:
return None
if answer.isdigit():
index = int(answer)
if 1 <= index <= len(choices):
return choices[index - 1][0]
print("Choose a listed number, or q to quit.")
def _run_menu(inst_roots: list[Path], st_root: Path, input_func: Callable[[str], str] | None = None) -> int:
ask = input_func or input
instances = list_instances(inst_roots)
if not instances:
print(f"No Beat Saber instances found under {', '.join(str(root) for root in inst_roots)}")
return 1
instances_by_choice = {str(index): item for index, item in enumerate(instances, start=1)}
instance_choices = [(str(index), f"{item.name} {item.path}") for index, item in enumerate(instances, start=1)]
action_choices = [
("installed", "Show managed installs", "Lists plugins recorded in plugin-helper state with locked versions and files."),
("updates", "Check locked plugin updates", "Looks at GitHub releases for newer assets matching locked plugins."),
("scan", "Scan installed files", "Counts files currently present in Plugins/, Libs/, and IPA/Pending/."),
("check", "Check lockfile and assets", "Validates registry entries, lockfile data, local assets, and SHA-256 values."),
("bootstrap", "Bootstrap BSIPA", "Fetches the locked BSIPA archive, installs it, runs IPA.exe -n, and records bootstrap files."),
("bootstrap-check", "Check BSIPA bootstrap", "Verifies recorded bootstrap state and the latest BSIPA log evidence."),
("plan", "Create install plan", "Writes a dry-run JSON plan for locked plugin files before anything is applied."),
("apply", "Apply a plan by path", "Installs exactly the file changes from a previously generated plan JSON."),
("backup-userdata", "Back up UserData", "Creates a timestamped archive of UserData before risky changes."),
("change", "Choose another version", "Returns to the Beat Saber version picker."),
]
selected_instance_key = _ask_choice(
title="Choose Beat Saber version",
choices=instance_choices,
input_func=ask,
)
if selected_instance_key is None:
return 0
selected_instance = instances_by_choice[selected_instance_key]
while True:
selected_action = _ask_choice(
title=f"Choose action for {selected_instance.name}",
choices=action_choices,
input_func=ask,
)
if selected_action is None:
return 0
if selected_action == "change":
selected_instance_key = _ask_choice(
title="Choose Beat Saber version",
choices=instance_choices,
input_func=ask,
)
if selected_instance_key is None:
return 0
selected_instance = instances_by_choice[selected_instance_key]
continue
command = [
"--instances-root",
str(selected_instance.path.parent),
"--state-dir",
str(st_root),
selected_action,
]
if selected_action == "apply":
plan_path = ask("Plan path> ").strip()
if not plan_path:
print("No plan path entered.")
continue
command.append(plan_path)
else:
command.extend(["--instance", selected_instance.name])
print()
status = run(command)
print(f"Command exited with status {status}")
def run(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
inst_roots = instances_roots(getattr(args, "instances_root", None))
st_root = state_root(getattr(args, "state_dir", None))
try:
if args.command == "instances":
found = list_instances(inst_roots)
if not found:
print(f"No Beat Saber instances found under {', '.join(str(root) for root in inst_roots)}")
return 1
for item in found:
flags = []
if item.has_plugins:
flags.append("Plugins")
if item.has_libs:
flags.append("Libs")
if item.has_userdata:
flags.append("UserData")
suffix = f" ({', '.join(flags)})" if flags else ""
print(f"{item.name}\t{item.path}{suffix}")
return 0
if args.command == "menu":
return _run_menu(inst_roots, st_root)
if args.command == "scan":
instance = get_instance(inst_roots, args.instance)
result = scan_instance(instance.path, include_hashes=args.hashes)
if args.json:
_json(result)
else:
counts = result["counts"]
print(f"{instance.name}: {counts['files']} files")
print(f" Plugins: {counts['plugins']}")
print(f" Libs: {counts['libs']}")
print(f" IPA/Pending: {counts['pending']}")
return 0
if args.command == "state":
_json(load_installed_state(st_root, args.instance))
return 0
if args.command == "installed":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = installed_plugins_report(
installed_state=load_installed_state(st_root, args.instance),
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
)
if args.json:
_json(result)
else:
print_installed_plugins(result)
return 0
if args.command == "check":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = check_lock(
instance=args.instance,
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
state_root=st_root,
repo_root=root,
)
if args.json:
_json(result)
else:
summary = result["summary"]
print(
f"{args.instance}: {summary['ok']} ok, "
f"{summary['warnings']} warnings, {summary['errors']} errors"
)
for plugin in result["plugins"]:
if plugin["status"] == "ok":
continue
print(f" {plugin['id']}: {plugin['status']}")
for message in plugin["messages"]:
print(f" {message['level']}: {message['message']}")
return 2 if result["summary"]["errors"] else 0
if args.command == "updates":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = check_updates(
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
fetch_releases=fetch_releases,
selected=set(args.plugin) if args.plugin else None,
include_prerelease=args.include_prerelease,
)
if args.json:
_json(result)
else:
print_updates(result)
return 2 if result["summary"]["errors"] else 0
if args.command == "bootstrap":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
lockfile = load_lockfile(lock_path)
result = run_bootstrap(
instance=args.instance,
instance_path=instance.path,
beat_saber_version=lockfile.beat_saber_version,
registry=load_registry(registry_path),
lockfile=lockfile,
state_root=st_root,
repo_root=root,
proton=Path(args.proton).expanduser() if args.proton else None,
progress=lambda message: print(f" {message}", flush=True),
)
if args.json:
_json(result)
else:
delta = result["delta"]
print(f"Bootstrap state: {result['statePath']}")
print(f"Plan: {result['planPath']}")
print(f"IPA.exe -n exit: {result['ipaExitCode']}")
print(
"Bootstrap files: "
f"{len(delta['created'])} created, {len(delta['mutated'])} mutated, "
f"{len(delta['removed'])} removed"
)
print(f"Health: {'ok' if result['health']['ok'] else 'error'}")
for message in result["health"]["messages"]:
print(f" {message}")
return 0 if result["health"]["ok"] else 2
if args.command == "bootstrap-check":
instance = get_instance(inst_roots, args.instance)
result = check_bsipa_health(instance.path, st_root, args.instance)
if args.json:
_json(result)
else:
print(f"BSIPA bootstrap: {'ok' if result['ok'] else 'error'}")
print(f"State: {result['statePath']}")
print(f"Log: {result['logPath']}")
for message in result["messages"]:
print(f" {message}")
return 0 if result["ok"] else 2
if args.command == "plan":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
registry = load_registry(registry_path)
lockfile = load_lockfile(lock_path)
selected = set(args.plugin) if args.plugin else None
plan, path = create_plan(
instance=args.instance,
instance_path=instance.path,
beat_saber_version=lockfile.beat_saber_version,
registry=registry,
lockfile=lockfile,
state_root=st_root,
repo_root=root,
selected=selected,
)
print(f"Wrote plan: {path}")
print(f"Changes: {len(plan['changes'])}")
for warning in plan["warnings"]:
print(f"Warning: {warning}", file=sys.stderr)
return 0
if args.command == "apply":
with Path(args.plan).open("r", encoding="utf-8") as handle:
plan = json.load(handle)
result = apply_plan(plan, st_root)
print(f"Applied {len(result['applied'])} file changes")
print(f"State: {result['statePath']}")
return 0
if args.command == "uninstall":
instance = get_instance(inst_roots, args.instance)
result = uninstall_plugin(args.instance, instance.path, st_root, args.plugin, force=args.force)
print(f"Removed: {len(result['removed'])}")
if result["skipped"]:
print("Skipped:")
for item in result["skipped"]:
print(f" {item['path']}: {item['reason']}")
return 0 if result["stateUpdated"] else 2
if args.command == "backup-userdata":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
backup_root = Path(args.backup_root).expanduser()
if not backup_root.is_absolute():
backup_root = (root / backup_root).resolve()
result = sync_windows_data_repo(
instance=args.instance,
instance_path=instance.path,
backup_root=backup_root,
appdata_path=Path(args.appdata_path).expanduser() if args.appdata_path else None,
include_appdata=not args.no_appdata,
)
print(f"Backup root: {result['backupRoot']}")
for item in result["copied"]:
print(f"{item['label']}: {item['fileCount']} files")
print(f" {item['source']} -> {item['destination']}")
return 0
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
parser.error(f"unknown command: {args.command}")
return 2
def main() -> None:
raise SystemExit(run())
if __name__ == "__main__":
main()