Files
plugin-helper/src/plugin_helper/cli.py
T
2026-07-01 11:44:03 -07:00

603 lines
24 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
from .config import Profile, repo_root, resolve_runtime_config
from .bootstrap import run_bootstrap
from .bsipa import check_bsipa_health
from .checker import check_lock
from .github import fetch_releases
from .installer import apply_plan, disable_plugin, uninstall_plugin
from .instances import get_instance, list_instances
from .models import load_lockfile, load_registry
from .operations import enable_disabled_plugin
from .planner import create_plan
from .reports import installed_plugins_report, print_installed_plugins
from .scanner import scan_instance
from .state import load_installed_state
from .updates import check_updates
from .userdata import restore_windows_data_repo, sync_windows_data_repo
def _json(data: Any) -> None:
print(json.dumps(data, indent=2, sort_keys=True))
def print_updates(report: dict[str, Any]) -> None:
plugins = report["plugins"]
summary = report["summary"]
print(
f"{report['instance']} updates: "
f"{summary['updates']} available, {summary['current']} current, "
f"{summary['warnings']} warnings, {summary['errors']} errors"
)
if not plugins:
return
headers = ("Plugin", "Current", "Latest", "Asset", "Status")
rows = [
(
f"{plugin['name']} ({plugin['id']})",
plugin.get("currentTag") or "(none)",
plugin.get("latestTag") or "(unknown)",
plugin.get("latestAsset") or plugin.get("currentAsset") or "(unknown)",
plugin["status"],
)
for plugin in plugins
]
widths = [
max(len(headers[index]), *(len(row[index]) for row in rows))
for index in range(len(headers))
]
print(" ".join(label.ljust(widths[index]) for index, label in enumerate(headers)))
print(" ".join("-" * width for width in widths))
for row in rows:
print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row)))
def _add_common(parser: argparse.ArgumentParser, *, suppress_default: bool = False) -> None:
default = argparse.SUPPRESS if suppress_default else None
parser.add_argument("--instances-root", default=default, help="BSManager instances root")
parser.add_argument("--state-dir", default=default, help="plugin-helper state directory")
parser.add_argument("--profile", default=default, help="Configured profile id from plugin-helper.local.toml")
parser.add_argument("--config", default=default, help="Path to plugin-helper TOML config")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="plugin-helper")
_add_common(parser)
subcommands = parser.add_subparsers(dest="command", required=True)
subcommands.add_parser(
"instances",
help="List discovered BSManager Beat Saber instances",
parents=[_common_parent()],
)
subcommands.add_parser(
"menu",
help="Open an interactive instance/action menu",
parents=[_common_parent()],
)
scan = subcommands.add_parser(
"scan",
help="Inspect installed Plugins, Libs, and IPA/Pending files",
parents=[_common_parent()],
)
scan.add_argument("--instance", required=True)
scan.add_argument("--hashes", action="store_true", help="Include sha256 hashes")
scan.add_argument("--json", action="store_true", help="Print full JSON scan output")
state = subcommands.add_parser(
"state",
help="Show recorded plugin-helper install state",
parents=[_common_parent()],
)
state.add_argument("--instance", required=True)
installed = subcommands.add_parser(
"installed",
help="List plugins installed by plugin-helper with locked release versions",
parents=[_common_parent()],
)
installed.add_argument("--instance", required=True)
installed.add_argument("--registry", default="registry/plugins.toml")
installed.add_argument("--lockfile")
installed.add_argument("--json", action="store_true", help="Print full JSON output")
check = subcommands.add_parser(
"check",
help="Validate local registry, lockfile, and release asset readiness",
parents=[_common_parent()],
)
check.add_argument("--instance", required=True)
check.add_argument("--registry", default="registry/plugins.toml")
check.add_argument("--lockfile")
check.add_argument("--json", action="store_true", help="Print full JSON check output")
bootstrap = subcommands.add_parser(
"bootstrap",
help="Install locked BSIPA, run IPA.exe -n through Proton, and record bootstrap files",
parents=[_common_parent()],
)
bootstrap.add_argument("--instance", required=True)
bootstrap.add_argument("--registry", default="registry/plugins.toml")
bootstrap.add_argument("--lockfile")
bootstrap.add_argument("--proton", help="Path to Proton executable")
bootstrap.add_argument("--json", action="store_true", help="Print full JSON bootstrap output")
bootstrap_check = subcommands.add_parser(
"bootstrap-check",
help="Verify recorded BSIPA bootstrap state and latest IPA log",
parents=[_common_parent()],
)
bootstrap_check.add_argument("--instance", required=True)
bootstrap_check.add_argument("--json", action="store_true", help="Print full JSON bootstrap health output")
updates = subcommands.add_parser(
"updates",
help="Check GitHub for newer matching releases for locked plugins",
parents=[_common_parent()],
)
updates.add_argument("--instance", required=True)
updates.add_argument("--registry", default="registry/plugins.toml")
updates.add_argument("--lockfile")
updates.add_argument("--plugin", action="append", help="Check only this locked plugin id; repeatable")
updates.add_argument("--include-prerelease", action="store_true", help="Include prerelease GitHub releases")
updates.add_argument("--json", action="store_true", help="Print full JSON update output")
plan = subcommands.add_parser(
"plan",
help="Create a dry-run install plan from registry and lockfile",
parents=[_common_parent()],
)
plan.add_argument("--instance", required=True)
plan.add_argument("--registry", default="registry/plugins.toml")
plan.add_argument("--lockfile")
plan.add_argument("--plugin", "--update", action="append", help="Plan only this locked plugin id; repeatable")
apply = subcommands.add_parser(
"apply",
help="Apply a previously generated plan",
parents=[_common_parent()],
)
apply.add_argument("plan")
uninstall = subcommands.add_parser(
"uninstall",
help="Remove files recorded for a managed plugin",
parents=[_common_parent()],
)
uninstall.add_argument("--instance", required=True)
uninstall.add_argument("plugin")
uninstall.add_argument("--force", action="store_true", help="Delete even when current file hashes differ")
disable = subcommands.add_parser(
"disable",
help="Remove a managed plugin's files from the game but keep state/assets for re-enable",
parents=[_common_parent()],
)
disable.add_argument("--instance", required=True)
disable.add_argument("plugin")
disable.add_argument("--force", action="store_true", help="Disable even when current file hashes differ")
enable = subcommands.add_parser(
"enable",
help="Reinstall a disabled locked plugin from local assets",
parents=[_common_parent()],
)
enable.add_argument("--instance", required=True)
enable.add_argument("--registry", default="registry/plugins.toml")
enable.add_argument("--lockfile")
enable.add_argument("plugin")
backup = subcommands.add_parser(
"backup-userdata",
help="Copy UserData and Windows AppData into the adjacent backups repo",
parents=[_common_parent()],
)
backup.add_argument("--instance", required=True)
backup.add_argument("--backup-root", default="../backups/beat-saber", help="Backup directory")
backup.add_argument("--appdata-path", help="Override Beat Saber Windows AppData path")
backup.add_argument("--no-appdata", action="store_true", help="Only copy UserData")
restore = subcommands.add_parser(
"restore-userdata",
help="Restore UserData and AppData from the backups repo into an instance",
parents=[_common_parent()],
)
restore.add_argument("--instance", required=True)
restore.add_argument("--backup-root", default="../backups/beat-saber", help="Backup directory")
restore.add_argument("--appdata-path", help="Override Beat Saber AppData destination path")
restore.add_argument("--no-appdata", action="store_true", help="Only restore UserData")
return parser
def _common_parent() -> argparse.ArgumentParser:
parent = argparse.ArgumentParser(add_help=False)
_add_common(parent, suppress_default=True)
return parent
def _menu_profiles(
*,
profiles: tuple[Profile, ...],
selected_profile: Profile | None,
instances_roots: list[Path],
state_root: Path,
use_config_profiles: bool,
) -> list[Profile]:
if selected_profile:
return [
Profile(
id=selected_profile.id if len(instances_roots) == 1 else f"{selected_profile.id}-{index}",
label=selected_profile.label,
instances_root=root,
state_dir=state_root,
)
for index, root in enumerate(instances_roots, start=1)
]
if use_config_profiles and profiles:
return list(profiles)
return [
Profile(
id=f"root-{index}",
label=str(root),
instances_root=root,
state_dir=state_root,
)
for index, root in enumerate(instances_roots, start=1)
]
def _run_menu(
*,
runtime: Any,
explicit_instances_root: bool,
explicit_state_dir: bool,
) -> int:
try:
from .tui import InstallationChoice, PluginHelperTui
except ImportError as exc:
print(f"Textual is required for the menu TUI: {exc}")
print("Install project dependencies, for example: python -m pip install -e .")
return 1
profiles = _menu_profiles(
profiles=runtime.profiles,
selected_profile=runtime.selected_profile,
instances_roots=runtime.instances_roots,
state_root=runtime.state_root,
use_config_profiles=runtime.config_loaded and not explicit_instances_root and not explicit_state_dir,
)
choices: list[InstallationChoice] = []
for profile in profiles:
for instance in list_instances(profile.instances_root):
choices.append(
InstallationChoice(
profile_id=profile.id,
profile_label=profile.label,
instance_name=instance.name,
instance_path=instance.path,
state_root=profile.state_dir,
)
)
if not choices:
searched = ", ".join(str(profile.instances_root) for profile in profiles)
print(f"No Beat Saber instances found under {searched}")
return 1
setup_hint = None
if not runtime.config_loaded and not runtime.selected_profile and not explicit_instances_root and not explicit_state_dir:
setup_hint = (
f"No {runtime.config_path.name} found; using default roots and default state. "
f"Copy plugin-helper.toml.example to {runtime.config_path.name} to map each install to a state dir."
)
app = PluginHelperTui(choices=choices, repo_root=repo_root(), setup_hint=setup_hint)
result = app.run()
return int(result or 0)
def run(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
explicit_instances_root = getattr(args, "instances_root", None) is not None
explicit_state_dir = getattr(args, "state_dir", None) is not None
runtime = resolve_runtime_config(
instances_root_value=getattr(args, "instances_root", None),
state_dir_value=getattr(args, "state_dir", None),
profile_id=getattr(args, "profile", None),
config_path=getattr(args, "config", None),
)
inst_roots = runtime.instances_roots
st_root = runtime.state_root
if args.command == "instances":
found = list_instances(inst_roots)
if not found:
print(f"No Beat Saber instances found under {', '.join(str(root) for root in inst_roots)}")
return 1
for item in found:
flags = []
if item.has_plugins:
flags.append("Plugins")
if item.has_libs:
flags.append("Libs")
if item.has_userdata:
flags.append("UserData")
suffix = f" ({', '.join(flags)})" if flags else ""
print(f"{item.name}\t{item.path}{suffix}")
return 0
if args.command == "menu":
return _run_menu(
runtime=runtime,
explicit_instances_root=explicit_instances_root,
explicit_state_dir=explicit_state_dir,
)
if args.command == "scan":
instance = get_instance(inst_roots, args.instance)
result = scan_instance(instance.path, include_hashes=args.hashes)
if args.json:
_json(result)
else:
counts = result["counts"]
print(f"{instance.name}: {counts['files']} files")
print(f" Plugins: {counts['plugins']}")
print(f" Libs: {counts['libs']}")
print(f" IPA/Pending: {counts['pending']}")
return 0
if args.command == "state":
_json(load_installed_state(st_root, args.instance))
return 0
if args.command == "installed":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = installed_plugins_report(
installed_state=load_installed_state(st_root, args.instance),
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
)
if args.json:
_json(result)
else:
print_installed_plugins(result)
return 0
if args.command == "check":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = check_lock(
instance=args.instance,
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
state_root=st_root,
repo_root=root,
)
if args.json:
_json(result)
else:
summary = result["summary"]
print(
f"{args.instance}: {summary['ok']} ok, "
f"{summary['warnings']} warnings, {summary['errors']} errors"
)
for plugin in result["plugins"]:
if plugin["status"] == "ok":
continue
print(f" {plugin['id']}: {plugin['status']}")
for message in plugin["messages"]:
print(f" {message['level']}: {message['message']}")
return 2 if result["summary"]["errors"] else 0
if args.command == "updates":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = check_updates(
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
fetch_releases=fetch_releases,
selected=set(args.plugin) if args.plugin else None,
include_prerelease=args.include_prerelease,
)
if args.json:
_json(result)
else:
print_updates(result)
return 2 if result["summary"]["errors"] else 0
if args.command == "bootstrap":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
lockfile = load_lockfile(lock_path)
result = run_bootstrap(
instance=args.instance,
instance_path=instance.path,
beat_saber_version=lockfile.beat_saber_version,
registry=load_registry(registry_path),
lockfile=lockfile,
state_root=st_root,
repo_root=root,
proton=Path(args.proton).expanduser() if args.proton else None,
progress=lambda message: print(f" {message}", flush=True),
)
if args.json:
_json(result)
else:
delta = result["delta"]
print(f"Bootstrap state: {result['statePath']}")
print(f"Plan: {result['planPath']}")
print(f"IPA.exe -n exit: {result['ipaExitCode']}")
print(
"Bootstrap files: "
f"{len(delta['created'])} created, {len(delta['mutated'])} mutated, "
f"{len(delta['removed'])} removed"
)
print(f"Health: {'ok' if result['health']['ok'] else 'error'}")
for message in result["health"]["messages"]:
print(f" {message}")
return 0 if result["health"]["ok"] else 2
if args.command == "bootstrap-check":
instance = get_instance(inst_roots, args.instance)
result = check_bsipa_health(instance.path, st_root, args.instance)
if args.json:
_json(result)
else:
print(f"BSIPA bootstrap: {'ok' if result['ok'] else 'error'}")
print(f"State: {result['statePath']}")
print(f"Log: {result['logPath']}")
for message in result["messages"]:
print(f" {message}")
return 0 if result["ok"] else 2
if args.command == "plan":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
registry = load_registry(registry_path)
lockfile = load_lockfile(lock_path)
selected = set(args.plugin) if args.plugin else None
plan, path = create_plan(
instance=args.instance,
instance_path=instance.path,
beat_saber_version=lockfile.beat_saber_version,
registry=registry,
lockfile=lockfile,
state_root=st_root,
repo_root=root,
selected=selected,
)
print(f"Wrote plan: {path}")
print(f"Changes: {len(plan['changes'])}")
for warning in plan["warnings"]:
print(f"Warning: {warning}", file=sys.stderr)
return 0
if args.command == "apply":
with Path(args.plan).open("r", encoding="utf-8") as handle:
plan = json.load(handle)
result = apply_plan(plan, st_root)
print(f"Applied {len(result['applied'])} file changes")
print(f"State: {result['statePath']}")
return 0
if args.command == "uninstall":
instance = get_instance(inst_roots, args.instance)
result = uninstall_plugin(args.instance, instance.path, st_root, args.plugin, force=args.force)
print(f"Removed: {len(result['removed'])}")
if result["skipped"]:
print("Skipped:")
for item in result["skipped"]:
print(f" {item['path']}: {item['reason']}")
return 0 if result["stateUpdated"] else 2
if args.command == "disable":
instance = get_instance(inst_roots, args.instance)
result = disable_plugin(args.instance, instance.path, st_root, args.plugin, force=args.force)
print(f"Disabled: {args.plugin}")
print(f"Removed: {len(result['removed'])}")
if result["skipped"]:
print("Skipped:")
for item in result["skipped"]:
print(f" {item['path']}: {item['reason']}")
return 0 if result["stateUpdated"] else 2
if args.command == "enable":
instance = get_instance(inst_roots, args.instance)
result = enable_disabled_plugin(
instance=args.instance,
instance_path=instance.path,
state_root=st_root,
plugin_id=args.plugin,
registry=args.registry,
lockfile=args.lockfile,
)
print(f"Enabled: {args.plugin}")
print(f"Plan: {result['planPath']}")
print(f"Applied: {len(result['applied'])}")
print(f"State: {result['statePath']}")
return 0
if args.command == "backup-userdata":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
backup_root = Path(args.backup_root).expanduser()
if not backup_root.is_absolute():
backup_root = (root / backup_root).resolve()
result = sync_windows_data_repo(
instance=args.instance,
instance_path=instance.path,
backup_root=backup_root,
appdata_path=Path(args.appdata_path).expanduser() if args.appdata_path else None,
include_appdata=not args.no_appdata,
)
print(f"Backup root: {result['backupRoot']}")
for item in result["copied"]:
print(f"{item['label']}: {item['fileCount']} files")
print(f" {item['source']} -> {item['destination']}")
return 0
if args.command == "restore-userdata":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
backup_root = Path(args.backup_root).expanduser()
if not backup_root.is_absolute():
backup_root = (root / backup_root).resolve()
result = restore_windows_data_repo(
instance=args.instance,
instance_path=instance.path,
backup_root=backup_root,
appdata_path=Path(args.appdata_path).expanduser() if args.appdata_path else None,
include_appdata=not args.no_appdata,
)
print(f"Backup root: {result['backupRoot']}")
for item in result["restored"]:
print(f"{item['label']}: {item['fileCount']} files")
print(f" {item['source']} -> {item['destination']}")
if item["snapshot"]:
print(f" previous: {item['snapshot']}")
return 0
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
parser.error(f"unknown command: {args.command}")
return 2
def main() -> None:
raise SystemExit(run())
if __name__ == "__main__":
main()