From caaa4a655834d54ffb1cd37d2f3243759696cd3c Mon Sep 17 00:00:00 2001 From: pleb Date: Sun, 14 Jun 2026 10:26:22 -0700 Subject: [PATCH] Add plugin helper with agent skill for updating plugins --- .../skills/install-beatsaber-plugin/SKILL.md | 174 +++++++++++++ .../agents/openai.yaml | 4 + .gitignore | 8 + README.md | 37 +++ docs/ROADMAP.md | 80 ++++++ flake.nix | 47 ++++ locks/1.40.8.lock.toml | 18 ++ pyproject.toml | 19 ++ registry/plugins.toml | 23 ++ src/plugin_helper/__init__.py | 3 + src/plugin_helper/__main__.py | 5 + src/plugin_helper/checker.py | 66 +++++ src/plugin_helper/cli.py | 245 ++++++++++++++++++ src/plugin_helper/config.py | 24 ++ src/plugin_helper/fsutil.py | 59 +++++ src/plugin_helper/installer.py | 112 ++++++++ src/plugin_helper/instances.py | 54 ++++ src/plugin_helper/models.py | 109 ++++++++ src/plugin_helper/planner.py | 148 +++++++++++ src/plugin_helper/scanner.py | 33 +++ src/plugin_helper/state.py | 46 ++++ src/plugin_helper/userdata.py | 46 ++++ tests/test_plugin_helper.py | 244 +++++++++++++++++ 23 files changed, 1604 insertions(+) create mode 100644 .agents/skills/install-beatsaber-plugin/SKILL.md create mode 100644 .agents/skills/install-beatsaber-plugin/agents/openai.yaml create mode 100644 .gitignore create mode 100644 README.md create mode 100644 docs/ROADMAP.md create mode 100644 flake.nix create mode 100644 locks/1.40.8.lock.toml create mode 100644 pyproject.toml create mode 100644 registry/plugins.toml create mode 100644 src/plugin_helper/__init__.py create mode 100644 src/plugin_helper/__main__.py create mode 100644 src/plugin_helper/checker.py create mode 100644 src/plugin_helper/cli.py create mode 100644 src/plugin_helper/config.py create mode 100644 src/plugin_helper/fsutil.py create mode 100644 src/plugin_helper/installer.py create mode 100644 src/plugin_helper/instances.py create mode 100644 src/plugin_helper/models.py create mode 100644 src/plugin_helper/planner.py create mode 100644 src/plugin_helper/scanner.py create mode 100644 src/plugin_helper/state.py create mode 100644 src/plugin_helper/userdata.py create mode 100644 tests/test_plugin_helper.py diff --git a/.agents/skills/install-beatsaber-plugin/SKILL.md b/.agents/skills/install-beatsaber-plugin/SKILL.md new file mode 100644 index 0000000..32df2c8 --- /dev/null +++ b/.agents/skills/install-beatsaber-plugin/SKILL.md @@ -0,0 +1,174 @@ +--- +name: install-beatsaber-plugin +description: Install or update a Beat Saber plugin in the plugin-helper repo by using the local helper workflow. Use when the user asks to add, install, update, bump, lock, or manage a Beat Saber plugin release for a BSManager instance. The user must provide an explicit GitHub release URL in the prompt; if no release URL is present, ask for one and do not infer, search for, or substitute a different repository. +--- + +# Install Beat Saber Plugin + +Use the repository's own `plugin-helper` commands to manage plugins for BSManager instances. Do not manually copy release files into the game instance except to undo your own mistaken install before rerunning the helper. + +## Hard Guardrail + +Require an explicit GitHub release URL from the user's prompt before selecting any release or repository. + +- If the prompt does not contain a GitHub release URL, stop and ask the user for it. +- Do not search the web to discover a repository or "correct" release URL. +- Do not substitute a similar repo, fork, project, or package name. +- If the provided URL is a general releases page, use that repo's release API and choose the latest non-draft, non-prerelease release unless the user asks for a specific tag/version. +- If the provided URL is a tag URL, use that exact tag. + +Accepted URL shapes include: + +```text +https://github.com///releases +https://github.com///releases/tag/ +https://github.com///releases/download// +``` + +## Workflow + +1. Confirm the workspace is the `plugin-helper` repo. + + ```bash + test -f pyproject.toml && test -d src/plugin_helper && test -d registry && test -d locks + ``` + +2. Read the local helper behavior before changing files. + + Inspect at least: + + ```bash + sed -n '1,220p' README.md + sed -n '1,260p' src/plugin_helper/cli.py + sed -n '1,260p' src/plugin_helper/planner.py + sed -n '1,220p' src/plugin_helper/models.py + sed -n '1,220p' registry/plugins.toml + sed -n '1,220p' locks/.lock.toml + ``` + +3. Determine the instance. + + Prefer the instance the user names. If omitted and the working context clearly points at one lockfile, use that instance. Otherwise run: + + ```bash + PYTHONPATH=src python -m plugin_helper instances + ``` + +4. Snapshot first when requested. + + If the user asks for a one-time or pre-helper snapshot, archive the instance's `Plugins/` directory before any install: + + ```bash + mkdir -p "$HOME/archive/beatsaber" + tar -C "/" -czf "$HOME/archive/beatsaber/-Plugins-pre-helper-.tar.gz" Plugins + sha256sum "$HOME/archive/beatsaber/-Plugins-pre-helper-.tar.gz" + ``` + + Report the archive path and hash. + +5. Resolve the release from the user-provided URL only. + + For GitHub URLs, derive `/` and optional `` from the URL. Query the GitHub API directly for metadata: + + ```bash + curl -sS https://api.github.com/repos///releases + curl -sS https://api.github.com/repos///releases/tags/ + ``` + + Pick the asset that matches the Beat Saber instance/version. Prefer an exact versioned asset such as `1.40.8.zip` over broad or source archives. If multiple plausible assets remain, ask the user. + +6. Inspect the asset before selecting an install strategy. + + Download to the helper state directory: + + ```bash + mkdir -p .state/instances//downloads + curl -L --fail -o .state/instances//downloads/ "" + sha256sum .state/instances//downloads/ + ``` + + Match the checksum against GitHub's `digest` when available. Inspect zip contents: + + ```bash + unzip -l .state/instances//downloads/ + ``` + + Strategy guide: + + - `dll-to-plugins`: asset is a single `.dll` that belongs in `Plugins/`. + - `bsipa-zip`: zip top-level paths are only `IPA/`, `Libs/`, or `Plugins/`. + - `root-zip`: zip contains valid game-root paths outside the BSIPA top-level set. + - `zip-to-pending`: only when the release is intended for `IPA/Pending/`. + - `manual`: do not use for installable releases. + +7. Update the registry and lockfile. + + Add or update exactly one `[[plugins]]` entry in `registry/plugins.toml` with: + + ```toml + [[plugins]] + id = "" + name = "" + repo = "/" + asset_patterns = [""] + install_strategy = "" + category = "" + ``` + + Add or update the matching `[[plugins]]` entry in `locks/.lock.toml` with: + + ```toml + [[plugins]] + id = "" + repo = "/" + tag = "" + asset = "" + sha256 = "" + ``` + + Preserve unrelated registry and lockfile content. Do not invent dependency versions unless they are explicitly stated by the provided release notes or existing local metadata. + +8. Use the helper to validate, plan, and apply. + + Always pass `--state-dir .state` so the helper uses the repo-local downloaded asset: + + ```bash + PYTHONPATH=src python -m plugin_helper --state-dir .state check --instance + PYTHONPATH=src python -m plugin_helper --state-dir .state plan --instance --plugin + PYTHONPATH=src python -m plugin_helper --state-dir .state apply + ``` + + Before applying, read or summarize the generated plan enough to confirm it changes only the intended plugin files. + +9. Verify the result. + + Confirm the installed file hashes match the plan or archive members: + + ```bash + PYTHONPATH=src python -m plugin_helper --state-dir .state state --instance + PYTHONPATH=src python -m plugin_helper --state-dir .state check --instance + PYTHONPATH=src python -m unittest discover -s tests + ``` + + Use `PYTHONPATH=src`; plain `python -m unittest` may fail in this source-layout repo. + +10. Final response. + + Include: + + - release URL/tag/asset used + - snapshot path and hash, if created + - files changed in the repo and helper state + - live instance files changed + - backup path created by the helper + - validation commands and results + +## Mistake Recovery + +If you installed from the wrong release or repo during the current task: + +1. Restore affected live files from the helper backup created by that mistaken apply. +2. Remove mistaken downloaded assets and mistaken plan files from `.state`. +3. Correct the registry and lockfile to the user-provided release URL. +4. Rerun `check`, `plan`, and `apply` with `--state-dir .state`. +5. Keep or report only the backup relevant to the final correct apply unless the user asks for full audit history. diff --git a/.agents/skills/install-beatsaber-plugin/agents/openai.yaml b/.agents/skills/install-beatsaber-plugin/agents/openai.yaml new file mode 100644 index 0000000..fa32492 --- /dev/null +++ b/.agents/skills/install-beatsaber-plugin/agents/openai.yaml @@ -0,0 +1,4 @@ +interface: + display_name: "Install Beat Saber Plugin" + short_description: "Update Beat Saber plugins via helper" + default_prompt: "Use $install-beatsaber-plugin to install or update a Beat Saber plugin from this release URL: ." diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..420f743 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/.state/ +/.pytest_cache/ +/build/ +/dist/ +/*.egg-info/ +/src/*.egg-info/ +/__pycache__/ +*.pyc diff --git a/README.md b/README.md new file mode 100644 index 0000000..5bda6c4 --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +# plugin-helper + +`plugin-helper` is an early Python CLI for managing Beat Saber plugins in a mounted Windows BSManager install. + +The first implementation focuses on safe local workflows: + +- discover BSManager instances +- scan existing `Plugins/` and `Libs/` files +- read checked-in registry and per-version lockfiles +- generate a machine-readable install plan from local release assets +- apply exactly that plan with backups and install state +- uninstall only files recorded in install state +- back up `UserData` separately + +Default BSManager instance root: + +```text +/home/pleb/Windows/Users/pleb/BSManager/BSInstances +``` + +Override with `--instances-root` or `PLUGIN_HELPER_INSTANCES_ROOT`. + +## Quick Start + +```sh +python -m plugin_helper instances +python -m plugin_helper scan --instance 1.40.8 +python -m plugin_helper plan --instance 1.40.8 --state-dir .state +``` + +Install assets are currently expected to already exist locally, usually under: + +```text +.state/instances//downloads/ +``` + +Future milestones will add GitHub release discovery and download. diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md new file mode 100644 index 0000000..9c98185 --- /dev/null +++ b/docs/ROADMAP.md @@ -0,0 +1,80 @@ +# plugin-helper Roadmap + +This roadmap tracks ideas that are useful but not part of the first safe CLI slice. + +## Current Direction + +The initial tool should stay conservative: + +- Python owns instance discovery, dry-run plans, activation, install state, uninstall, and `UserData` backups. +- Release assets are selected through registry and lockfile data. +- Mutating operations apply an explicit plan and record exact file hashes. +- Nix packages `plugin-helper`, but does not directly manage the mutable Beat Saber tree. + +This works well while Beat Saber is still launched from a Windows install or a mounted Windows filesystem. + +## Future: Nix-Orchestrated Plugin Sets + +Once Beat Saber is running on Linux through Steam Proton, it may make sense to let Nix orchestrate the plugin payload itself. + +The core idea: + +```text +Nix flake / plugin set + fetch exact GitHub release assets + verify hashes + unpack and normalize Plugins/, Libs/, IPA/Pending/ + produce /nix/store/...-beatsaber-plugins-/ + +plugin-helper + run nix build .#pluginSets. + compare the resulting tree to the target Beat Saber instance + create a normal dry-run plan + copy or link files into the instance + record activation state +``` + +In that model, the plugin folder effectively gets a reproducible lock: + +- `flake.lock` pins Nix inputs. +- A plugin-set definition pins plugin repositories, tags, release assets, and hashes. +- The generated Nix output is a canonical, immutable plugin tree for one Beat Saber version. +- `plugin-helper` remains the safety layer around activation and rollback. + +## Why Wait For Proton + +For the current dual-boot Windows path, a pure Nix-store plugin tree is awkward: + +- Windows cannot use `/nix/store` paths directly. +- Linux symlinks inside a mounted Windows filesystem may not behave the way native Windows Beat Saber expects. +- Some plugins may create or expect colocated mutable files. + +When running through Proton on Linux, Nix-store outputs and symlink activation become much more practical. Even then, `copy` mode should remain available for plugins that expect writable colocated files. + +## Activation Modes + +A future Nix-backed planner should support at least these activation modes: + +- `copy`: materialize files into the Beat Saber instance. Best compatibility, including mounted Windows trees. +- `symlink`: link plugin files from the Nix output. Best reproducibility and cleanup on Linux/Proton. +- `materialize`: link immutable files where safe and copy known-mutable files. + +All modes should still produce the same kind of explicit plan before applying. + +## Proposed Milestones + +1. Keep the Python safety harness stable: scan, plan, apply, uninstall, and backups. +2. Model one real plugin end to end with the current TOML lockfile and local asset planning. +3. Add a Nix function that fetches and unpacks one locked plugin asset into a normalized tree. +4. Generate a full plugin-set derivation for one Beat Saber version. +5. Teach `plugin-helper plan` to compare a Nix output tree against an instance. +6. Add `--activation-mode copy|symlink|materialize`. +7. Move compatibility and dependency metadata toward shared data that both Python and Nix can consume. + +## Open Questions + +- Should the human-edited source of truth be TOML, Nix, or TOML that generates Nix? +- How should plugin-specific unpack rules be represented without making Nix expressions too noisy? +- Which plugin files are known to need mutability after install? +- Should the Nix output include BSIPA itself, or continue assuming BSIPA is provided by the game instance manager? +- How should updates be proposed: Python querying GitHub, Nix update scripts, or both? diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..5d5e404 --- /dev/null +++ b/flake.nix @@ -0,0 +1,47 @@ +{ + description = "Beat Saber plugin-helper CLI"; + + inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + + outputs = { self, nixpkgs }: + let + system = "x86_64-linux"; + pkgs = import nixpkgs { inherit system; }; + python = pkgs.python313; + in + { + packages.${system}.default = python.pkgs.buildPythonApplication { + pname = "plugin-helper"; + version = "0.1.0"; + src = ./.; + pyproject = true; + + build-system = with python.pkgs; [ + setuptools + wheel + ]; + + nativeCheckInputs = [ ]; + checkPhase = '' + runHook preCheck + python -m unittest discover -s tests + runHook postCheck + ''; + }; + + apps.${system}.default = { + type = "app"; + program = "${self.packages.${system}.default}/bin/plugin-helper"; + }; + + devShells.${system}.default = pkgs.mkShell { + packages = [ + python + pkgs.ruff + ]; + shellHook = '' + export PYTHONPATH="$PWD/src''${PYTHONPATH:+:$PYTHONPATH}" + ''; + }; + }; +} diff --git a/locks/1.40.8.lock.toml b/locks/1.40.8.lock.toml new file mode 100644 index 0000000..e38e458 --- /dev/null +++ b/locks/1.40.8.lock.toml @@ -0,0 +1,18 @@ +beat_saber_version = "1.40.8" +instance = "1.40.8" + +# Add locked plugin selections here as release assets become managed. +# +# [[plugins]] +# id = "songcore" +# repo = "Kylemc1413/SongCore" +# tag = "v4.3.0" +# asset = "SongCore-4.3.0.zip" +# sha256 = "..." + +[[plugins]] +id = "accsaber-reloaded" +repo = "not-dexter/accsaber-reloaded-plugin" +tag = "v1.1.3" +asset = "1.40.8.zip" +sha256 = "c1b1687e8378ee7f550edcbd0811fa71f8ddbaa9f167a5d1b48d4cbf9e808af7" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..93708df --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["setuptools>=69", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "plugin-helper" +version = "0.1.0" +description = "A safe CLI for managing Beat Saber plugins in mounted BSManager instances." +readme = "README.md" +requires-python = ">=3.11" +license = "MIT" +authors = [{ name = "plugin-helper contributors" }] +dependencies = [] + +[project.scripts] +plugin-helper = "plugin_helper.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/registry/plugins.toml b/registry/plugins.toml new file mode 100644 index 0000000..0a6ca48 --- /dev/null +++ b/registry/plugins.toml @@ -0,0 +1,23 @@ +# Human-maintained plugin registry. +# +# Example: +# [[plugins]] +# id = "songcore" +# name = "SongCore" +# repo = "Kylemc1413/SongCore" +# asset_patterns = ["*SongCore*.zip"] +# install_strategy = "bsipa-zip" +# category = "library" +# +# [[plugins.dependencies]] +# id = "bs-utils" +# constraint = ">=1.0" +# required = true + +[[plugins]] +id = "accsaber-reloaded" +name = "AccSaber Reloaded" +repo = "not-dexter/accsaber-reloaded-plugin" +asset_patterns = ["1.40.8.zip"] +install_strategy = "bsipa-zip" +category = "leaderboard" diff --git a/src/plugin_helper/__init__.py b/src/plugin_helper/__init__.py new file mode 100644 index 0000000..3b74f2f --- /dev/null +++ b/src/plugin_helper/__init__.py @@ -0,0 +1,3 @@ +"""Beat Saber plugin helper.""" + +__version__ = "0.1.0" diff --git a/src/plugin_helper/__main__.py b/src/plugin_helper/__main__.py new file mode 100644 index 0000000..2f05ddc --- /dev/null +++ b/src/plugin_helper/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + + +if __name__ == "__main__": + main() diff --git a/src/plugin_helper/checker.py b/src/plugin_helper/checker.py new file mode 100644 index 0000000..f299d9c --- /dev/null +++ b/src/plugin_helper/checker.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from .fsutil import sha256_file +from .models import Lockfile, Registry +from .planner import _find_asset + + +def check_lock( + *, + instance: str, + registry: Registry, + lockfile: Lockfile, + state_root: Path, + repo_root: Path, +) -> dict[str, Any]: + entries: list[dict[str, Any]] = [] + summary = {"ok": 0, "warnings": 0, "errors": 0} + for locked in lockfile.plugins: + registry_plugin = registry.get(locked.id) + strategy = locked.install_strategy or (registry_plugin.install_strategy if registry_plugin else "manual") + messages: list[dict[str, str]] = [] + + if not registry_plugin: + messages.append({"level": "warning", "message": "missing registry entry"}) + if strategy == "manual": + messages.append({"level": "error", "message": "manual install strategy cannot be applied"}) + if not locked.asset: + messages.append({"level": "error", "message": "missing asset"}) + else: + asset_path = _find_asset(locked.asset, state_root, instance, repo_root) + if not asset_path: + messages.append({"level": "error", "message": "asset not found in downloads or repo assets"}) + elif locked.sha256 and sha256_file(asset_path) != locked.sha256: + messages.append({"level": "error", "message": "asset sha256 mismatch"}) + + if any(message["level"] == "error" for message in messages): + status = "error" + summary["errors"] += 1 + elif messages: + status = "warning" + summary["warnings"] += 1 + else: + status = "ok" + summary["ok"] += 1 + + entries.append( + { + "id": locked.id, + "repo": locked.repo or (registry_plugin.repo if registry_plugin else None), + "tag": locked.tag, + "asset": locked.asset, + "installStrategy": strategy, + "status": status, + "messages": messages, + } + ) + + return { + "instance": instance, + "beatSaberVersion": lockfile.beat_saber_version, + "summary": summary, + "plugins": entries, + } diff --git a/src/plugin_helper/cli.py b/src/plugin_helper/cli.py new file mode 100644 index 0000000..2fc51a4 --- /dev/null +++ b/src/plugin_helper/cli.py @@ -0,0 +1,245 @@ +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +from .config import instances_root, repo_root, state_root +from .checker import check_lock +from .installer import apply_plan, uninstall_plugin +from .instances import get_instance, list_instances +from .models import load_lockfile, load_registry +from .planner import create_plan +from .scanner import scan_instance +from .state import load_installed_state +from .userdata import backup_userdata + + +def _json(data: Any) -> None: + print(json.dumps(data, indent=2, sort_keys=True)) + + +def _add_common(parser: argparse.ArgumentParser, *, suppress_default: bool = False) -> None: + default = argparse.SUPPRESS if suppress_default else None + parser.add_argument("--instances-root", default=default, help="BSManager instances root") + parser.add_argument("--state-dir", default=default, help="plugin-helper state directory") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="plugin-helper") + _add_common(parser) + subcommands = parser.add_subparsers(dest="command", required=True) + + subcommands.add_parser( + "instances", + help="List discovered BSManager Beat Saber instances", + parents=[_common_parent()], + ) + + scan = subcommands.add_parser( + "scan", + help="Inspect installed Plugins, Libs, and IPA/Pending files", + parents=[_common_parent()], + ) + scan.add_argument("--instance", required=True) + scan.add_argument("--hashes", action="store_true", help="Include sha256 hashes") + scan.add_argument("--json", action="store_true", help="Print full JSON scan output") + + state = subcommands.add_parser( + "state", + help="Show recorded plugin-helper install state", + parents=[_common_parent()], + ) + state.add_argument("--instance", required=True) + + check = subcommands.add_parser( + "check", + help="Validate local registry, lockfile, and release asset readiness", + parents=[_common_parent()], + ) + check.add_argument("--instance", required=True) + check.add_argument("--registry", default="registry/plugins.toml") + check.add_argument("--lockfile") + check.add_argument("--json", action="store_true", help="Print full JSON check output") + + plan = subcommands.add_parser( + "plan", + help="Create a dry-run install plan from registry and lockfile", + parents=[_common_parent()], + ) + plan.add_argument("--instance", required=True) + plan.add_argument("--registry", default="registry/plugins.toml") + plan.add_argument("--lockfile") + plan.add_argument("--plugin", "--update", action="append", help="Plan only this locked plugin id; repeatable") + + apply = subcommands.add_parser( + "apply", + help="Apply a previously generated plan", + parents=[_common_parent()], + ) + apply.add_argument("plan") + + uninstall = subcommands.add_parser( + "uninstall", + help="Remove files recorded for a managed plugin", + parents=[_common_parent()], + ) + uninstall.add_argument("--instance", required=True) + uninstall.add_argument("plugin") + uninstall.add_argument("--force", action="store_true", help="Delete even when current file hashes differ") + + backup = subcommands.add_parser( + "backup-userdata", + help="Create a timestamped UserData backup archive", + parents=[_common_parent()], + ) + backup.add_argument("--instance", required=True) + + return parser + + +def _common_parent() -> argparse.ArgumentParser: + parent = argparse.ArgumentParser(add_help=False) + _add_common(parent, suppress_default=True) + return parent + + +def run(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + inst_root = instances_root(getattr(args, "instances_root", None)) + st_root = state_root(getattr(args, "state_dir", None)) + + try: + if args.command == "instances": + found = list_instances(inst_root) + if not found: + print(f"No Beat Saber instances found under {inst_root}") + return 1 + for item in found: + flags = [] + if item.has_plugins: + flags.append("Plugins") + if item.has_libs: + flags.append("Libs") + if item.has_userdata: + flags.append("UserData") + suffix = f" ({', '.join(flags)})" if flags else "" + print(f"{item.name}\t{item.path}{suffix}") + return 0 + + if args.command == "scan": + instance = get_instance(inst_root, args.instance) + result = scan_instance(instance.path, include_hashes=args.hashes) + if args.json: + _json(result) + else: + counts = result["counts"] + print(f"{instance.name}: {counts['files']} files") + print(f" Plugins: {counts['plugins']}") + print(f" Libs: {counts['libs']}") + print(f" IPA/Pending: {counts['pending']}") + return 0 + + if args.command == "state": + _json(load_installed_state(st_root, args.instance)) + return 0 + + if args.command == "check": + root = repo_root() + registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry) + lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml" + if not lock_path.is_absolute(): + lock_path = (root / lock_path).resolve() + result = check_lock( + instance=args.instance, + registry=load_registry(registry_path), + lockfile=load_lockfile(lock_path), + state_root=st_root, + repo_root=root, + ) + if args.json: + _json(result) + else: + summary = result["summary"] + print( + f"{args.instance}: {summary['ok']} ok, " + f"{summary['warnings']} warnings, {summary['errors']} errors" + ) + for plugin in result["plugins"]: + if plugin["status"] == "ok": + continue + print(f" {plugin['id']}: {plugin['status']}") + for message in plugin["messages"]: + print(f" {message['level']}: {message['message']}") + return 2 if result["summary"]["errors"] else 0 + + if args.command == "plan": + instance = get_instance(inst_root, args.instance) + root = repo_root() + registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry) + lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml" + if not lock_path.is_absolute(): + lock_path = (root / lock_path).resolve() + registry = load_registry(registry_path) + lockfile = load_lockfile(lock_path) + selected = set(args.plugin) if args.plugin else None + plan, path = create_plan( + instance=args.instance, + instance_path=instance.path, + beat_saber_version=lockfile.beat_saber_version, + registry=registry, + lockfile=lockfile, + state_root=st_root, + repo_root=root, + selected=selected, + ) + print(f"Wrote plan: {path}") + print(f"Changes: {len(plan['changes'])}") + for warning in plan["warnings"]: + print(f"Warning: {warning}", file=sys.stderr) + return 0 + + if args.command == "apply": + with Path(args.plan).open("r", encoding="utf-8") as handle: + plan = json.load(handle) + result = apply_plan(plan, st_root) + print(f"Applied {len(result['applied'])} file changes") + print(f"State: {result['statePath']}") + return 0 + + if args.command == "uninstall": + instance = get_instance(inst_root, args.instance) + result = uninstall_plugin(args.instance, instance.path, st_root, args.plugin, force=args.force) + print(f"Removed: {len(result['removed'])}") + if result["skipped"]: + print("Skipped:") + for item in result["skipped"]: + print(f" {item['path']}: {item['reason']}") + return 0 if result["stateUpdated"] else 2 + + if args.command == "backup-userdata": + instance = get_instance(inst_root, args.instance) + result = backup_userdata(args.instance, instance.path, st_root) + manifest = result["manifest"] + print(f"Archive: {result['archive']}") + print(f"Files: {manifest['fileCount']}") + print(f"Bytes: {manifest['totalSize']}") + return 0 + + except Exception as exc: + print(f"error: {exc}", file=sys.stderr) + return 2 + + parser.error(f"unknown command: {args.command}") + return 2 + + +def main() -> None: + raise SystemExit(run()) + + +if __name__ == "__main__": + main() diff --git a/src/plugin_helper/config.py b/src/plugin_helper/config.py new file mode 100644 index 0000000..894c06f --- /dev/null +++ b/src/plugin_helper/config.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import os +from pathlib import Path + + +DEFAULT_INSTANCES_ROOT = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances") + + +def instances_root(value: str | None = None) -> Path: + raw = value or os.environ.get("PLUGIN_HELPER_INSTANCES_ROOT") + return Path(raw).expanduser() if raw else DEFAULT_INSTANCES_ROOT + + +def state_root(value: str | None = None) -> Path: + if value: + return Path(value).expanduser() + xdg_state = os.environ.get("XDG_STATE_HOME") + base = Path(xdg_state).expanduser() if xdg_state else Path.home() / ".local" / "state" + return base / "plugin-helper" + + +def repo_root() -> Path: + return Path(__file__).resolve().parents[2] diff --git a/src/plugin_helper/fsutil.py b/src/plugin_helper/fsutil.py new file mode 100644 index 0000000..3146b7e --- /dev/null +++ b/src/plugin_helper/fsutil.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import hashlib +import json +import os +import tempfile +from pathlib import Path +from typing import Any + + +def sha256_file(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def sha256_bytes(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def atomic_write_json(path: Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", dir=path.parent) + try: + with os.fdopen(fd, "w", encoding="utf-8") as handle: + json.dump(data, handle, indent=2, sort_keys=True) + handle.write("\n") + Path(tmp_name).replace(path) + except Exception: + Path(tmp_name).unlink(missing_ok=True) + raise + + +def read_json(path: Path, default: Any) -> Any: + if not path.exists(): + return default + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def ensure_relative(path: str) -> Path: + if "\\" in path: + raise ValueError(f"unsafe relative path: {path}") + rel = Path(path) + if rel.is_absolute() or ".." in rel.parts or any(part.endswith(":") for part in rel.parts): + raise ValueError(f"unsafe relative path: {path}") + return rel + + +def ensure_inside(root: Path, target: Path) -> Path: + root_resolved = root.resolve() + target_resolved = target.resolve(strict=False) + try: + target_resolved.relative_to(root_resolved) + except ValueError as exc: + raise ValueError(f"target escapes instance root: {target}") from exc + return target_resolved diff --git a/src/plugin_helper/installer.py b/src/plugin_helper/installer.py new file mode 100644 index 0000000..d650194 --- /dev/null +++ b/src/plugin_helper/installer.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import shutil +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from zipfile import ZipFile + +from .fsutil import ensure_inside, ensure_relative, sha256_bytes, sha256_file +from .state import backups_dir, load_installed_state, save_installed_state + + +def _timestamp() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def _backup_existing(instance_path: Path, backup_root: Path, rel_target: str) -> str | None: + target = ensure_inside(instance_path, instance_path / ensure_relative(rel_target)) + if not target.exists(): + return None + backup_path = backup_root / rel_target + backup_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(target, backup_path) + return str(backup_path) + + +def apply_plan(plan: dict[str, Any], state_root: Path) -> dict[str, Any]: + instance = plan["instance"] + instance_path = Path(plan["instancePath"]) + if not instance_path.is_dir(): + raise FileNotFoundError(f"instance path does not exist: {instance_path}") + + backup_root = backups_dir(state_root, instance) / f"apply-{_timestamp()}" + installed_state = load_installed_state(state_root, instance) + installed_state.setdefault("beatSaberVersion", plan.get("beatSaberVersion")) + installed_state.setdefault("plugins", {}) + + applied: list[dict[str, Any]] = [] + for change in plan.get("changes", []): + source = Path(change["source"]) + if sha256_file(source) != change["sourceSha256"]: + raise ValueError(f"source hash changed: {source}") + rel_target = ensure_relative(change["target"]).as_posix() + target = ensure_inside(instance_path, instance_path / rel_target) + backup = _backup_existing(instance_path, backup_root, rel_target) + target.parent.mkdir(parents=True, exist_ok=True) + + if change["action"] == "copy": + shutil.copy2(source, target) + elif change["action"] == "extract": + with ZipFile(source) as archive: + data = archive.read(change["archiveMember"]) + if sha256_bytes(data) != change["sha256"]: + raise ValueError(f"archive member hash changed: {change['archiveMember']}") + target.write_bytes(data) + else: + raise ValueError(f"unsupported action: {change['action']}") + + actual_sha = sha256_file(target) + if actual_sha != change["sha256"]: + raise ValueError(f"installed file hash mismatch: {rel_target}") + + plugin_state = installed_state["plugins"].setdefault( + change["plugin"], + { + "installedAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "files": [], + }, + ) + plugin_state["files"] = [ + item for item in plugin_state.get("files", []) if item.get("path") != rel_target + ] + plugin_state["files"].append( + { + "path": rel_target, + "sha256": actual_sha, + "size": target.stat().st_size, + } + ) + applied.append({"path": rel_target, "plugin": change["plugin"], "backup": backup}) + + save_installed_state(state_root, instance, installed_state) + return {"applied": applied, "statePath": str(state_root / "instances" / instance / "installed.json")} + + +def uninstall_plugin(instance: str, instance_path: Path, state_root: Path, plugin_id: str, force: bool = False) -> dict[str, Any]: + installed_state = load_installed_state(state_root, instance) + plugin_state = installed_state.get("plugins", {}).get(plugin_id) + if not plugin_state: + raise KeyError(f"plugin is not recorded in install state: {plugin_id}") + + removed: list[str] = [] + skipped: list[dict[str, str]] = [] + for item in plugin_state.get("files", []): + rel_path = ensure_relative(item["path"]).as_posix() + target = ensure_inside(instance_path, instance_path / rel_path) + if not target.exists(): + removed.append(rel_path) + continue + current_sha = sha256_file(target) + if current_sha != item.get("sha256") and not force: + skipped.append({"path": rel_path, "reason": "hash mismatch"}) + continue + target.unlink() + removed.append(rel_path) + + if skipped and not force: + return {"removed": removed, "skipped": skipped, "stateUpdated": False} + + installed_state.get("plugins", {}).pop(plugin_id, None) + save_installed_state(state_root, instance, installed_state) + return {"removed": removed, "skipped": skipped, "stateUpdated": True} diff --git a/src/plugin_helper/instances.py b/src/plugin_helper/instances.py new file mode 100644 index 0000000..7e87b88 --- /dev/null +++ b/src/plugin_helper/instances.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class Instance: + name: str + path: Path + has_plugins: bool + has_libs: bool + has_userdata: bool + + +def looks_like_instance(path: Path) -> bool: + return ( + (path / "Beat Saber_Data").is_dir() + or (path / "Beat Saber.exe").exists() + or (path / "Plugins").is_dir() + or (path / "UserData").is_dir() + ) + + +def list_instances(root: Path) -> list[Instance]: + if not root.exists(): + return [] + instances: list[Instance] = [] + for child in sorted(root.iterdir(), key=lambda item: item.name): + if not child.is_dir() or not looks_like_instance(child): + continue + instances.append( + Instance( + name=child.name, + path=child, + has_plugins=(child / "Plugins").is_dir(), + has_libs=(child / "Libs").is_dir(), + has_userdata=(child / "UserData").is_dir(), + ) + ) + return instances + + +def get_instance(root: Path, name: str) -> Instance: + path = root / name + if not path.is_dir() or not looks_like_instance(path): + raise FileNotFoundError(f"Beat Saber instance not found: {path}") + return Instance( + name=name, + path=path, + has_plugins=(path / "Plugins").is_dir(), + has_libs=(path / "Libs").is_dir(), + has_userdata=(path / "UserData").is_dir(), + ) diff --git a/src/plugin_helper/models.py b/src/plugin_helper/models.py new file mode 100644 index 0000000..cabf8d2 --- /dev/null +++ b/src/plugin_helper/models.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import tomllib +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +VALID_STRATEGIES = {"dll-to-plugins", "zip-to-pending", "bsipa-zip", "root-zip", "manual"} + + +@dataclass(frozen=True) +class Dependency: + id: str + constraint: str | None = None + required: bool = True + + +@dataclass(frozen=True) +class RegistryPlugin: + id: str + name: str + repo: str | None + asset_patterns: tuple[str, ...] = () + install_strategy: str = "manual" + category: str | None = None + dependencies: tuple[Dependency, ...] = () + + +@dataclass(frozen=True) +class Registry: + plugins: dict[str, RegistryPlugin] = field(default_factory=dict) + + def get(self, plugin_id: str) -> RegistryPlugin | None: + return self.plugins.get(plugin_id) + + +@dataclass(frozen=True) +class LockedPlugin: + id: str + repo: str | None + tag: str | None + asset: str | None + sha256: str | None + install_strategy: str | None = None + reason: str | None = None + + +@dataclass(frozen=True) +class Lockfile: + beat_saber_version: str + instance: str + plugins: tuple[LockedPlugin, ...] + + +def _load_toml(path: Path) -> dict[str, Any]: + with path.open("rb") as handle: + return tomllib.load(handle) + + +def load_registry(path: Path) -> Registry: + if not path.exists(): + return Registry() + data = _load_toml(path) + plugins: dict[str, RegistryPlugin] = {} + for item in data.get("plugins", []): + dependencies = tuple( + Dependency( + id=dep["id"], + constraint=dep.get("constraint"), + required=dep.get("required", True), + ) + for dep in item.get("dependencies", []) + ) + strategy = item.get("install_strategy", "manual") + if strategy not in VALID_STRATEGIES: + raise ValueError(f"{path}: invalid install_strategy for {item['id']}: {strategy}") + plugin = RegistryPlugin( + id=item["id"], + name=item.get("name", item["id"]), + repo=item.get("repo"), + asset_patterns=tuple(item.get("asset_patterns", [])), + install_strategy=strategy, + category=item.get("category"), + dependencies=dependencies, + ) + plugins[plugin.id] = plugin + return Registry(plugins) + + +def load_lockfile(path: Path) -> Lockfile: + data = _load_toml(path) + plugins = tuple( + LockedPlugin( + id=item["id"], + repo=item.get("repo"), + tag=item.get("tag"), + asset=item.get("asset"), + sha256=item.get("sha256"), + install_strategy=item.get("install_strategy"), + reason=item.get("reason"), + ) + for item in data.get("plugins", []) + ) + return Lockfile( + beat_saber_version=data["beat_saber_version"], + instance=data.get("instance", data["beat_saber_version"]), + plugins=plugins, + ) diff --git a/src/plugin_helper/planner.py b/src/plugin_helper/planner.py new file mode 100644 index 0000000..fd98feb --- /dev/null +++ b/src/plugin_helper/planner.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import fnmatch +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from zipfile import ZipFile + +from .fsutil import ensure_relative, sha256_bytes, sha256_file +from .models import Lockfile, Registry, VALID_STRATEGIES +from .state import downloads_dir, plans_dir + + +ALLOWED_BSIPA_TOP_LEVEL = {"IPA", "Libs", "Plugins"} + + +def _now_slug() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path) -> Path | None: + candidates = [ + Path(asset).expanduser(), + downloads_dir(state_root, instance) / asset, + repo_root / "assets" / asset, + repo_root / "locks" / "assets" / asset, + ] + for candidate in candidates: + if candidate.exists() and candidate.is_file(): + return candidate + return None + + +def _zip_members(asset_path: Path) -> list[tuple[str, int, str]]: + result: list[tuple[str, int, str]] = [] + with ZipFile(asset_path) as archive: + for info in archive.infolist(): + if info.is_dir(): + continue + rel = ensure_relative(info.filename).as_posix() + data = archive.read(info) + result.append((rel, len(data), sha256_bytes(data))) + return result + + +def _target_for_member(strategy: str, member: str) -> str: + rel = ensure_relative(member).as_posix() + if strategy == "zip-to-pending": + return f"IPA/Pending/{rel}" + if strategy == "root-zip": + return rel + if strategy == "bsipa-zip": + top = rel.split("/", 1)[0] + if top not in ALLOWED_BSIPA_TOP_LEVEL: + raise ValueError(f"bsipa-zip member has unsupported top-level path: {rel}") + return rel + raise ValueError(f"unsupported zip strategy: {strategy}") + + +def _asset_matches_patterns(name: str, patterns: tuple[str, ...]) -> bool: + return not patterns or any(fnmatch.fnmatch(name, pattern) for pattern in patterns) + + +def create_plan( + *, + instance: str, + instance_path: Path, + beat_saber_version: str, + registry: Registry, + lockfile: Lockfile, + state_root: Path, + repo_root: Path, + selected: set[str] | None = None, +) -> tuple[dict[str, Any], Path]: + selected_ids = selected or {plugin.id for plugin in lockfile.plugins} + changes: list[dict[str, Any]] = [] + warnings: list[str] = [] + + for locked in lockfile.plugins: + if locked.id not in selected_ids: + continue + registry_plugin = registry.get(locked.id) + strategy = locked.install_strategy or (registry_plugin.install_strategy if registry_plugin else "manual") + if strategy not in VALID_STRATEGIES: + raise ValueError(f"{locked.id}: invalid install strategy: {strategy}") + if strategy == "manual": + raise ValueError(f"{locked.id}: install_strategy is manual; add a concrete registry rule first") + if not locked.asset: + raise ValueError(f"{locked.id}: lock entry has no asset") + if registry_plugin and not _asset_matches_patterns(Path(locked.asset).name, registry_plugin.asset_patterns): + warnings.append(f"{locked.id}: asset does not match registry patterns") + + asset_path = _find_asset(locked.asset, state_root, instance, repo_root) + if not asset_path: + raise FileNotFoundError( + f"{locked.id}: asset not found: {locked.asset}; put it in {downloads_dir(state_root, instance)}" + ) + asset_sha = sha256_file(asset_path) + if locked.sha256 and locked.sha256 != asset_sha: + raise ValueError(f"{locked.id}: asset sha256 mismatch for {asset_path}") + + if strategy == "dll-to-plugins": + if asset_path.suffix.lower() != ".dll": + raise ValueError(f"{locked.id}: dll-to-plugins expects a .dll asset") + changes.append( + { + "plugin": locked.id, + "action": "copy", + "source": str(asset_path), + "sourceSha256": asset_sha, + "target": f"Plugins/{asset_path.name}", + "size": asset_path.stat().st_size, + "sha256": asset_sha, + } + ) + continue + + if asset_path.suffix.lower() != ".zip": + raise ValueError(f"{locked.id}: {strategy} expects a .zip asset") + for member, size, member_sha in _zip_members(asset_path): + changes.append( + { + "plugin": locked.id, + "action": "extract", + "source": str(asset_path), + "sourceSha256": asset_sha, + "archiveMember": member, + "target": _target_for_member(strategy, member), + "size": size, + "sha256": member_sha, + } + ) + + plan = { + "schemaVersion": 1, + "createdAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "instance": instance, + "instancePath": str(instance_path), + "beatSaberVersion": beat_saber_version, + "warnings": warnings, + "changes": changes, + } + plan_path = plans_dir(state_root, instance) / f"plan-{_now_slug()}.json" + with plan_path.open("w", encoding="utf-8") as handle: + json.dump(plan, handle, indent=2, sort_keys=True) + handle.write("\n") + return plan, plan_path diff --git a/src/plugin_helper/scanner.py b/src/plugin_helper/scanner.py new file mode 100644 index 0000000..d0789a0 --- /dev/null +++ b/src/plugin_helper/scanner.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from .fsutil import sha256_file + + +SCAN_DIRS = ("Plugins", "Libs", "IPA/Pending") + + +def scan_instance(instance_path: Path, include_hashes: bool = False) -> dict[str, Any]: + files: list[dict[str, Any]] = [] + for dirname in SCAN_DIRS: + root = instance_path / dirname + if not root.exists(): + continue + for path in sorted(item for item in root.rglob("*") if item.is_file()): + rel = path.relative_to(instance_path).as_posix() + entry: dict[str, Any] = {"path": rel, "size": path.stat().st_size} + if include_hashes: + entry["sha256"] = sha256_file(path) + files.append(entry) + return { + "instancePath": str(instance_path), + "files": files, + "counts": { + "files": len(files), + "plugins": sum(1 for item in files if item["path"].startswith("Plugins/")), + "libs": sum(1 for item in files if item["path"].startswith("Libs/")), + "pending": sum(1 for item in files if item["path"].startswith("IPA/Pending/")), + }, + } diff --git a/src/plugin_helper/state.py b/src/plugin_helper/state.py new file mode 100644 index 0000000..a5d1c82 --- /dev/null +++ b/src/plugin_helper/state.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from .fsutil import atomic_write_json, read_json + + +def instance_state_dir(state_root: Path, instance: str) -> Path: + return state_root / "instances" / instance + + +def installed_state_path(state_root: Path, instance: str) -> Path: + return instance_state_dir(state_root, instance) / "installed.json" + + +def load_installed_state(state_root: Path, instance: str) -> dict[str, Any]: + return read_json( + installed_state_path(state_root, instance), + {"instance": instance, "plugins": {}}, + ) + + +def save_installed_state(state_root: Path, instance: str, state: dict[str, Any]) -> None: + state.setdefault("instance", instance) + state["updatedAt"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + atomic_write_json(installed_state_path(state_root, instance), state) + + +def plans_dir(state_root: Path, instance: str) -> Path: + path = instance_state_dir(state_root, instance) / "plans" + path.mkdir(parents=True, exist_ok=True) + return path + + +def downloads_dir(state_root: Path, instance: str) -> Path: + path = instance_state_dir(state_root, instance) / "downloads" + path.mkdir(parents=True, exist_ok=True) + return path + + +def backups_dir(state_root: Path, instance: str) -> Path: + path = instance_state_dir(state_root, instance) / "backups" + path.mkdir(parents=True, exist_ok=True) + return path diff --git a/src/plugin_helper/userdata.py b/src/plugin_helper/userdata.py new file mode 100644 index 0000000..20f1a5b --- /dev/null +++ b/src/plugin_helper/userdata.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import json +import tarfile +from datetime import datetime, timezone +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any + +from .fsutil import sha256_file +from .state import backups_dir + + +def backup_userdata(instance: str, instance_path: Path, state_root: Path) -> dict[str, Any]: + source = instance_path / "UserData" + if not source.is_dir(): + raise FileNotFoundError(f"UserData directory not found: {source}") + + created_at = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + destination = backups_dir(state_root, instance) / f"userdata-{created_at}.tar.gz" + files: list[dict[str, Any]] = [] + total_size = 0 + for path in sorted(item for item in source.rglob("*") if item.is_file()): + rel = path.relative_to(instance_path).as_posix() + size = path.stat().st_size + total_size += size + files.append({"path": rel, "size": size, "sha256": sha256_file(path)}) + + manifest = { + "createdAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "instance": instance, + "source": str(source), + "fileCount": len(files), + "totalSize": total_size, + "files": files, + } + + destination.parent.mkdir(parents=True, exist_ok=True) + with tarfile.open(destination, "w:gz") as archive: + archive.add(source, arcname="UserData") + with NamedTemporaryFile("w", encoding="utf-8", suffix=".json") as handle: + json.dump(manifest, handle, indent=2, sort_keys=True) + handle.write("\n") + handle.flush() + archive.add(handle.name, arcname="manifest.json") + return {"archive": str(destination), "manifest": manifest} diff --git a/tests/test_plugin_helper.py b/tests/test_plugin_helper.py new file mode 100644 index 0000000..cec9d78 --- /dev/null +++ b/tests/test_plugin_helper.py @@ -0,0 +1,244 @@ +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from zipfile import ZipFile + +from plugin_helper.checker import check_lock +from plugin_helper.fsutil import sha256_file +from plugin_helper.installer import apply_plan, uninstall_plugin +from plugin_helper.instances import get_instance, list_instances +from plugin_helper.models import Lockfile, LockedPlugin, Registry, RegistryPlugin +from plugin_helper.planner import create_plan +from plugin_helper.scanner import scan_instance +from plugin_helper.state import downloads_dir, load_installed_state +from plugin_helper.userdata import backup_userdata + + +class PluginHelperTests(unittest.TestCase): + def test_instances_and_scan(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + inst = root / "1.40.8" + (inst / "Beat Saber_Data").mkdir(parents=True) + (inst / "Plugins").mkdir() + (inst / "Libs").mkdir() + (inst / "UserData").mkdir() + (inst / "Plugins" / "Example.dll").write_bytes(b"dll") + (root / "not-an-instance").mkdir() + + instances = list_instances(root) + self.assertEqual([item.name for item in instances], ["1.40.8"]) + self.assertEqual(get_instance(root, "1.40.8").path, inst) + + scan = scan_instance(inst, include_hashes=True) + self.assertEqual(scan["counts"]["plugins"], 1) + self.assertEqual(scan["files"][0]["path"], "Plugins/Example.dll") + self.assertIn("sha256", scan["files"][0]) + + def test_plan_apply_and_uninstall_dll(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + work = Path(tmp) + instance = work / "instances" / "1.40.8" + state = work / "state" + instance.mkdir(parents=True) + (instance / "Beat Saber_Data").mkdir() + (instance / "Plugins").mkdir() + + asset = downloads_dir(state, "1.40.8") / "Example.dll" + asset.write_bytes(b"managed dll") + + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo="owner/example", + asset_patterns=("*.dll",), + install_strategy="dll-to-plugins", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo="owner/example", + tag="v1.0.0", + asset="Example.dll", + sha256=sha256_file(asset), + ), + ), + ) + + plan, plan_path = create_plan( + instance="1.40.8", + instance_path=instance, + beat_saber_version="1.40.8", + registry=registry, + lockfile=lockfile, + state_root=state, + repo_root=work, + ) + self.assertTrue(plan_path.exists()) + self.assertEqual(plan["changes"][0]["target"], "Plugins/Example.dll") + + result = apply_plan(plan, state) + self.assertEqual(len(result["applied"]), 1) + self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"managed dll") + installed = load_installed_state(state, "1.40.8") + self.assertIn("example", installed["plugins"]) + + removed = uninstall_plugin("1.40.8", instance, state, "example") + self.assertEqual(removed["removed"], ["Plugins/Example.dll"]) + self.assertFalse((instance / "Plugins" / "Example.dll").exists()) + + def test_zip_to_pending_targets_ipa_pending(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + work = Path(tmp) + instance = work / "instances" / "1.40.8" + state = work / "state" + instance.mkdir(parents=True) + (instance / "Beat Saber_Data").mkdir() + asset = downloads_dir(state, "1.40.8") / "Example.zip" + with ZipFile(asset, "w") as archive: + archive.writestr("Plugins/Example.dll", b"dll") + + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo=None, + install_strategy="zip-to-pending", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo=None, + tag=None, + asset="Example.zip", + sha256=sha256_file(asset), + ), + ), + ) + + plan, _ = create_plan( + instance="1.40.8", + instance_path=instance, + beat_saber_version="1.40.8", + registry=registry, + lockfile=lockfile, + state_root=state, + repo_root=work, + ) + self.assertEqual(plan["changes"][0]["target"], "IPA/Pending/Plugins/Example.dll") + apply_plan(plan, state) + self.assertEqual((instance / "IPA" / "Pending" / "Plugins" / "Example.dll").read_bytes(), b"dll") + + def test_zip_member_cannot_escape_instance(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + work = Path(tmp) + instance = work / "instances" / "1.40.8" + state = work / "state" + instance.mkdir(parents=True) + (instance / "Beat Saber_Data").mkdir() + asset = downloads_dir(state, "1.40.8") / "Bad.zip" + with ZipFile(asset, "w") as archive: + archive.writestr("../Bad.dll", b"dll") + + registry = Registry( + { + "bad": RegistryPlugin( + id="bad", + name="Bad", + repo=None, + install_strategy="zip-to-pending", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="bad", + repo=None, + tag=None, + asset="Bad.zip", + sha256=sha256_file(asset), + ), + ), + ) + + with self.assertRaises(ValueError): + create_plan( + instance="1.40.8", + instance_path=instance, + beat_saber_version="1.40.8", + registry=registry, + lockfile=lockfile, + state_root=state, + repo_root=work, + ) + + def test_userdata_backup_contains_manifest(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + instance = root / "1.40.8" + state = root / "state" + (instance / "UserData").mkdir(parents=True) + (instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8") + + result = backup_userdata("1.40.8", instance, state) + self.assertTrue(Path(result["archive"]).exists()) + self.assertEqual(result["manifest"]["fileCount"], 1) + + def test_check_reports_missing_asset(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + work = Path(tmp) + state = work / "state" + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo=None, + install_strategy="dll-to-plugins", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo=None, + tag="v1.0.0", + asset="Missing.dll", + sha256=None, + ), + ), + ) + result = check_lock( + instance="1.40.8", + registry=registry, + lockfile=lockfile, + state_root=state, + repo_root=work, + ) + self.assertEqual(result["summary"]["errors"], 1) + self.assertEqual(result["plugins"][0]["status"], "error") + + +if __name__ == "__main__": + unittest.main()