From 931c1d4f73738a5f50ccfaa019450576fc7f9bfa Mon Sep 17 00:00:00 2001 From: pleb Date: Sun, 28 Jun 2026 12:12:57 -0700 Subject: [PATCH] Add plugin update and install reporting --- .../skills/install-beatsaber-plugin/SKILL.md | 33 +-- README.md | 19 +- src/plugin_helper/checker.py | 2 +- src/plugin_helper/cli.py | 155 ++++++++++++ src/plugin_helper/github.py | 22 ++ src/plugin_helper/planner.py | 13 +- src/plugin_helper/state.py | 6 + src/plugin_helper/updates.py | 179 ++++++++++++++ tests/test_plugin_helper.py | 231 +++++++++++++++++- 9 files changed, 618 insertions(+), 42 deletions(-) create mode 100644 src/plugin_helper/github.py create mode 100644 src/plugin_helper/updates.py diff --git a/.agents/skills/install-beatsaber-plugin/SKILL.md b/.agents/skills/install-beatsaber-plugin/SKILL.md index 32df2c8..f9c3472 100644 --- a/.agents/skills/install-beatsaber-plugin/SKILL.md +++ b/.agents/skills/install-beatsaber-plugin/SKILL.md @@ -54,19 +54,7 @@ https://github.com///releases/download// PYTHONPATH=src python -m plugin_helper instances ``` -4. Snapshot first when requested. - - If the user asks for a one-time or pre-helper snapshot, archive the instance's `Plugins/` directory before any install: - - ```bash - mkdir -p "$HOME/archive/beatsaber" - tar -C "/" -czf "$HOME/archive/beatsaber/-Plugins-pre-helper-.tar.gz" Plugins - sha256sum "$HOME/archive/beatsaber/-Plugins-pre-helper-.tar.gz" - ``` - - Report the archive path and hash. - -5. Resolve the release from the user-provided URL only. +4. Resolve the release from the user-provided URL only. For GitHub URLs, derive `/` and optional `` from the URL. Query the GitHub API directly for metadata: @@ -77,20 +65,20 @@ https://github.com///releases/download// Pick the asset that matches the Beat Saber instance/version. Prefer an exact versioned asset such as `1.40.8.zip` over broad or source archives. If multiple plausible assets remain, ask the user. -6. Inspect the asset before selecting an install strategy. +5. Inspect the asset before selecting an install strategy. Download to the helper state directory: ```bash - mkdir -p .state/instances//downloads - curl -L --fail -o .state/instances//downloads/ "" - sha256sum .state/instances//downloads/ + mkdir -p .state/instances//downloads/ + curl -L --fail -o .state/instances//downloads// "" + sha256sum .state/instances//downloads// ``` Match the checksum against GitHub's `digest` when available. Inspect zip contents: ```bash - unzip -l .state/instances//downloads/ + unzip -l .state/instances//downloads// ``` Strategy guide: @@ -101,7 +89,7 @@ https://github.com///releases/download// - `zip-to-pending`: only when the release is intended for `IPA/Pending/`. - `manual`: do not use for installable releases. -7. Update the registry and lockfile. +6. Update the registry and lockfile. Add or update exactly one `[[plugins]]` entry in `registry/plugins.toml` with: @@ -128,7 +116,7 @@ https://github.com///releases/download// Preserve unrelated registry and lockfile content. Do not invent dependency versions unless they are explicitly stated by the provided release notes or existing local metadata. -8. Use the helper to validate, plan, and apply. +7. Use the helper to validate, plan, and apply. Always pass `--state-dir .state` so the helper uses the repo-local downloaded asset: @@ -140,7 +128,7 @@ https://github.com///releases/download// Before applying, read or summarize the generated plan enough to confirm it changes only the intended plugin files. -9. Verify the result. +8. Verify the result. Confirm the installed file hashes match the plan or archive members: @@ -152,12 +140,11 @@ https://github.com///releases/download// Use `PYTHONPATH=src`; plain `python -m unittest` may fail in this source-layout repo. -10. Final response. +9. Final response. Include: - release URL/tag/asset used - - snapshot path and hash, if created - files changed in the repo and helper state - live instance files changed - backup path created by the helper diff --git a/README.md b/README.md index 5bda6c4..9e2d4bf 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,8 @@ The first implementation focuses on safe local workflows: - scan existing `Plugins/` and `Libs/` files - read checked-in registry and per-version lockfiles - generate a machine-readable install plan from local release assets -- apply exactly that plan with backups and install state +- apply exactly that plan and record install state - uninstall only files recorded in install state -- back up `UserData` separately Default BSManager instance root: @@ -20,18 +19,20 @@ Default BSManager instance root: Override with `--instances-root` or `PLUGIN_HELPER_INSTANCES_ROOT`. -## Quick Start +## Commands + +Run from the repo root with `PYTHONPATH=src` unless installed. ```sh -python -m plugin_helper instances -python -m plugin_helper scan --instance 1.40.8 -python -m plugin_helper plan --instance 1.40.8 --state-dir .state +PYTHONPATH=src python -m plugin_helper instances +PYTHONPATH=src python -m plugin_helper --state-dir .state installed --instance 1.40.8 +PYTHONPATH=src python -m plugin_helper updates --instance 1.40.8 +PYTHONPATH=src python -m plugin_helper scan --instance 1.40.8 +PYTHONPATH=src python -m plugin_helper --state-dir .state plan --instance 1.40.8 ``` Install assets are currently expected to already exist locally, usually under: ```text -.state/instances//downloads/ +.state/instances//downloads// ``` - -Future milestones will add GitHub release discovery and download. diff --git a/src/plugin_helper/checker.py b/src/plugin_helper/checker.py index f299d9c..a983511 100644 --- a/src/plugin_helper/checker.py +++ b/src/plugin_helper/checker.py @@ -30,7 +30,7 @@ def check_lock( if not locked.asset: messages.append({"level": "error", "message": "missing asset"}) else: - asset_path = _find_asset(locked.asset, state_root, instance, repo_root) + asset_path = _find_asset(locked.asset, state_root, instance, repo_root, locked.id) if not asset_path: messages.append({"level": "error", "message": "asset not found in downloads or repo assets"}) elif locked.sha256 and sha256_file(asset_path) != locked.sha256: diff --git a/src/plugin_helper/cli.py b/src/plugin_helper/cli.py index 2fc51a4..ba7c818 100644 --- a/src/plugin_helper/cli.py +++ b/src/plugin_helper/cli.py @@ -8,12 +8,15 @@ from typing import Any from .config import instances_root, repo_root, state_root from .checker import check_lock +from .github import fetch_releases from .installer import apply_plan, uninstall_plugin from .instances import get_instance, list_instances from .models import load_lockfile, load_registry +from .models import Lockfile, Registry from .planner import create_plan from .scanner import scan_instance from .state import load_installed_state +from .updates import check_updates from .userdata import backup_userdata @@ -21,6 +24,100 @@ def _json(data: Any) -> None: print(json.dumps(data, indent=2, sort_keys=True)) +def installed_plugins_report( + *, + installed_state: dict[str, Any], + registry: Registry, + lockfile: Lockfile, +) -> dict[str, Any]: + locked_by_id = {plugin.id: plugin for plugin in lockfile.plugins} + plugins: list[dict[str, Any]] = [] + for plugin_id, plugin_state in sorted(installed_state.get("plugins", {}).items()): + registry_plugin = registry.get(plugin_id) + locked = locked_by_id.get(plugin_id) + files = plugin_state.get("files", []) + plugins.append( + { + "id": plugin_id, + "name": registry_plugin.name if registry_plugin else plugin_id, + "version": locked.tag if locked and locked.tag else "(not locked)", + "asset": locked.asset if locked and locked.asset else "(unknown)", + "repo": (locked.repo if locked and locked.repo else None) + or (registry_plugin.repo if registry_plugin else None) + or "(unknown)", + "installedAt": plugin_state.get("installedAt", "(unknown)"), + "fileCount": len(files), + "files": files, + } + ) + return { + "instance": installed_state.get("instance", lockfile.instance), + "beatSaberVersion": installed_state.get("beatSaberVersion", lockfile.beat_saber_version), + "plugins": plugins, + } + + +def print_installed_plugins(report: dict[str, Any]) -> None: + plugins = report["plugins"] + print(f"{report['instance']} managed plugins ({len(plugins)})") + if not plugins: + print("No plugins have been installed by plugin-helper yet.") + return + + headers = ("Plugin", "Version", "Asset", "Files", "Installed") + rows = [ + ( + f"{plugin['name']} ({plugin['id']})", + plugin["version"], + plugin["asset"], + str(plugin["fileCount"]), + plugin["installedAt"], + ) + for plugin in plugins + ] + widths = [ + max(len(headers[index]), *(len(row[index]) for row in rows)) + for index in range(len(headers)) + ] + header = " ".join(label.ljust(widths[index]) for index, label in enumerate(headers)) + print(header) + print(" ".join("-" * width for width in widths)) + for row in rows: + print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row))) + + +def print_updates(report: dict[str, Any]) -> None: + plugins = report["plugins"] + summary = report["summary"] + print( + f"{report['instance']} updates: " + f"{summary['updates']} available, {summary['current']} current, " + f"{summary['warnings']} warnings, {summary['errors']} errors" + ) + if not plugins: + return + + headers = ("Plugin", "Current", "Latest", "Asset", "Status") + rows = [ + ( + f"{plugin['name']} ({plugin['id']})", + plugin.get("currentTag") or "(none)", + plugin.get("latestTag") or "(unknown)", + plugin.get("latestAsset") or plugin.get("currentAsset") or "(unknown)", + plugin["status"], + ) + for plugin in plugins + ] + widths = [ + max(len(headers[index]), *(len(row[index]) for row in rows)) + for index in range(len(headers)) + ] + print(" ".join(label.ljust(widths[index]) for index, label in enumerate(headers))) + print(" ".join("-" * width for width in widths)) + for row in rows: + print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row))) + + def _add_common(parser: argparse.ArgumentParser, *, suppress_default: bool = False) -> None: default = argparse.SUPPRESS if suppress_default else None parser.add_argument("--instances-root", default=default, help="BSManager instances root") @@ -54,6 +151,16 @@ def build_parser() -> argparse.ArgumentParser: ) state.add_argument("--instance", required=True) + installed = subcommands.add_parser( + "installed", + help="List plugins installed by plugin-helper with locked release versions", + parents=[_common_parent()], + ) + installed.add_argument("--instance", required=True) + installed.add_argument("--registry", default="registry/plugins.toml") + installed.add_argument("--lockfile") + installed.add_argument("--json", action="store_true", help="Print full JSON output") + check = subcommands.add_parser( "check", help="Validate local registry, lockfile, and release asset readiness", @@ -64,6 +171,18 @@ def build_parser() -> argparse.ArgumentParser: check.add_argument("--lockfile") check.add_argument("--json", action="store_true", help="Print full JSON check output") + updates = subcommands.add_parser( + "updates", + help="Check GitHub for newer matching releases for locked plugins", + parents=[_common_parent()], + ) + updates.add_argument("--instance", required=True) + updates.add_argument("--registry", default="registry/plugins.toml") + updates.add_argument("--lockfile") + updates.add_argument("--plugin", action="append", help="Check only this locked plugin id; repeatable") + updates.add_argument("--include-prerelease", action="store_true", help="Include prerelease GitHub releases") + updates.add_argument("--json", action="store_true", help="Print full JSON update output") + plan = subcommands.add_parser( "plan", help="Create a dry-run install plan from registry and lockfile", @@ -147,6 +266,23 @@ def run(argv: list[str] | None = None) -> int: _json(load_installed_state(st_root, args.instance)) return 0 + if args.command == "installed": + root = repo_root() + registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry) + lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml" + if not lock_path.is_absolute(): + lock_path = (root / lock_path).resolve() + result = installed_plugins_report( + installed_state=load_installed_state(st_root, args.instance), + registry=load_registry(registry_path), + lockfile=load_lockfile(lock_path), + ) + if args.json: + _json(result) + else: + print_installed_plugins(result) + return 0 + if args.command == "check": root = repo_root() registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry) @@ -176,6 +312,25 @@ def run(argv: list[str] | None = None) -> int: print(f" {message['level']}: {message['message']}") return 2 if result["summary"]["errors"] else 0 + if args.command == "updates": + root = repo_root() + registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry) + lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml" + if not lock_path.is_absolute(): + lock_path = (root / lock_path).resolve() + result = check_updates( + registry=load_registry(registry_path), + lockfile=load_lockfile(lock_path), + fetch_releases=fetch_releases, + selected=set(args.plugin) if args.plugin else None, + include_prerelease=args.include_prerelease, + ) + if args.json: + _json(result) + else: + print_updates(result) + return 2 if result["summary"]["errors"] else 0 + if args.command == "plan": instance = get_instance(inst_root, args.instance) root = repo_root() diff --git a/src/plugin_helper/github.py b/src/plugin_helper/github.py new file mode 100644 index 0000000..7ecdd64 --- /dev/null +++ b/src/plugin_helper/github.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import json +import os +from typing import Any +from urllib.request import Request, urlopen + + +def fetch_releases(repo: str) -> list[dict[str, Any]]: + token = os.environ.get("GITHUB_TOKEN") + headers = { + "Accept": "application/vnd.github+json", + "User-Agent": "plugin-helper", + } + if token: + headers["Authorization"] = f"Bearer {token}" + request = Request(f"https://api.github.com/repos/{repo}/releases", headers=headers) + with urlopen(request, timeout=20) as response: + data = json.loads(response.read().decode("utf-8")) + if not isinstance(data, list): + raise ValueError(f"unexpected GitHub releases response for {repo}") + return data diff --git a/src/plugin_helper/planner.py b/src/plugin_helper/planner.py index fd98feb..1a96870 100644 --- a/src/plugin_helper/planner.py +++ b/src/plugin_helper/planner.py @@ -9,7 +9,7 @@ from zipfile import ZipFile from .fsutil import ensure_relative, sha256_bytes, sha256_file from .models import Lockfile, Registry, VALID_STRATEGIES -from .state import downloads_dir, plans_dir +from .state import downloads_dir, plans_dir, plugin_downloads_dir ALLOWED_BSIPA_TOP_LEVEL = {"IPA", "Libs", "Plugins"} @@ -19,13 +19,15 @@ def _now_slug() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") -def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path) -> Path | None: +def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path, plugin_id: str | None = None) -> Path | None: candidates = [ Path(asset).expanduser(), - downloads_dir(state_root, instance) / asset, repo_root / "assets" / asset, repo_root / "locks" / "assets" / asset, ] + if plugin_id: + candidates.insert(1, plugin_downloads_dir(state_root, instance, plugin_id) / asset) + candidates.insert(2 if plugin_id else 1, downloads_dir(state_root, instance) / asset) for candidate in candidates: if candidate.exists() and candidate.is_file(): return candidate @@ -91,10 +93,11 @@ def create_plan( if registry_plugin and not _asset_matches_patterns(Path(locked.asset).name, registry_plugin.asset_patterns): warnings.append(f"{locked.id}: asset does not match registry patterns") - asset_path = _find_asset(locked.asset, state_root, instance, repo_root) + asset_path = _find_asset(locked.asset, state_root, instance, repo_root, locked.id) if not asset_path: raise FileNotFoundError( - f"{locked.id}: asset not found: {locked.asset}; put it in {downloads_dir(state_root, instance)}" + f"{locked.id}: asset not found: {locked.asset}; put it in " + f"{plugin_downloads_dir(state_root, instance, locked.id)}" ) asset_sha = sha256_file(asset_path) if locked.sha256 and locked.sha256 != asset_sha: diff --git a/src/plugin_helper/state.py b/src/plugin_helper/state.py index a5d1c82..2da1b4d 100644 --- a/src/plugin_helper/state.py +++ b/src/plugin_helper/state.py @@ -40,6 +40,12 @@ def downloads_dir(state_root: Path, instance: str) -> Path: return path +def plugin_downloads_dir(state_root: Path, instance: str, plugin_id: str) -> Path: + path = downloads_dir(state_root, instance) / plugin_id + path.mkdir(parents=True, exist_ok=True) + return path + + def backups_dir(state_root: Path, instance: str) -> Path: path = instance_state_dir(state_root, instance) / "backups" path.mkdir(parents=True, exist_ok=True) diff --git a/src/plugin_helper/updates.py b/src/plugin_helper/updates.py new file mode 100644 index 0000000..935a13b --- /dev/null +++ b/src/plugin_helper/updates.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +import fnmatch +import re +from typing import Any, Callable + +from .models import Lockfile, Registry + + +FetchReleases = Callable[[str], list[dict[str, Any]]] + + +def _release_tag(release: dict[str, Any]) -> str: + return str(release.get("tag_name") or "") + + +def _release_assets(release: dict[str, Any]) -> list[dict[str, Any]]: + assets = release.get("assets") or [] + return [asset for asset in assets if isinstance(asset, dict)] + + +def _asset_name(asset: dict[str, Any]) -> str: + return str(asset.get("name") or "") + + +def _semver_key(tag: str) -> tuple[int, tuple[int, ...], str]: + match = re.search(r"(\d+(?:\.\d+){0,3})", tag) + if not match: + return (0, (), tag) + return (1, tuple(int(part) for part in match.group(1).split(".")), tag) + + +def _sorted_releases(releases: list[dict[str, Any]], include_prerelease: bool) -> list[dict[str, Any]]: + candidates = [ + release + for release in releases + if not release.get("draft") and (include_prerelease or not release.get("prerelease")) + ] + return sorted( + candidates, + key=lambda release: ( + str(release.get("published_at") or release.get("created_at") or ""), + _semver_key(_release_tag(release)), + ), + reverse=True, + ) + + +def _matching_assets( + release: dict[str, Any], + *, + asset_patterns: tuple[str, ...], + current_asset: str | None, + beat_saber_version: str, +) -> list[dict[str, Any]]: + assets = _release_assets(release) + if asset_patterns: + assets = [ + asset + for asset in assets + if any(fnmatch.fnmatch(_asset_name(asset), pattern) for pattern in asset_patterns) + ] + if not assets: + return [] + + exact_version = [asset for asset in assets if _asset_name(asset) == f"{beat_saber_version}.zip"] + if exact_version: + return exact_version + if current_asset: + same_name = [asset for asset in assets if _asset_name(asset) == current_asset] + if same_name: + return same_name + contains_version = [asset for asset in assets if beat_saber_version in _asset_name(asset)] + return contains_version or assets + + +def _find_latest_matching_release( + releases: list[dict[str, Any]], + *, + asset_patterns: tuple[str, ...], + current_asset: str | None, + beat_saber_version: str, + include_prerelease: bool, +) -> tuple[dict[str, Any], dict[str, Any]] | None: + for release in _sorted_releases(releases, include_prerelease): + assets = _matching_assets( + release, + asset_patterns=asset_patterns, + current_asset=current_asset, + beat_saber_version=beat_saber_version, + ) + if assets: + return release, assets[0] + return None + + +def check_updates( + *, + registry: Registry, + lockfile: Lockfile, + fetch_releases: FetchReleases, + selected: set[str] | None = None, + include_prerelease: bool = False, +) -> dict[str, Any]: + selected_ids = selected or {plugin.id for plugin in lockfile.plugins} + plugins: list[dict[str, Any]] = [] + summary = {"current": 0, "updates": 0, "warnings": 0, "errors": 0} + + for locked in lockfile.plugins: + if locked.id not in selected_ids: + continue + registry_plugin = registry.get(locked.id) + repo = locked.repo or (registry_plugin.repo if registry_plugin else None) + entry: dict[str, Any] = { + "id": locked.id, + "name": registry_plugin.name if registry_plugin else locked.id, + "repo": repo, + "currentTag": locked.tag, + "currentAsset": locked.asset, + "currentSha256": locked.sha256, + "latestTag": None, + "latestAsset": None, + "latestAssetSha256": None, + "status": "unknown", + "messages": [], + } + if not repo: + entry["status"] = "warning" + entry["messages"].append("missing repository") + summary["warnings"] += 1 + plugins.append(entry) + continue + + try: + releases = fetch_releases(repo) + except Exception as exc: + entry["status"] = "error" + entry["messages"].append(str(exc)) + summary["errors"] += 1 + plugins.append(entry) + continue + + match = _find_latest_matching_release( + releases, + asset_patterns=registry_plugin.asset_patterns if registry_plugin else (), + current_asset=locked.asset, + beat_saber_version=lockfile.beat_saber_version, + include_prerelease=include_prerelease, + ) + if not match: + entry["status"] = "warning" + entry["messages"].append("no matching release asset found") + summary["warnings"] += 1 + plugins.append(entry) + continue + + latest_release, latest_asset = match + entry["latestTag"] = _release_tag(latest_release) + entry["latestAsset"] = _asset_name(latest_asset) + entry["latestAssetSha256"] = str(latest_asset.get("digest") or "").removeprefix("sha256:") or None + entry["latestUrl"] = latest_release.get("html_url") + entry["latestAssetUrl"] = latest_asset.get("browser_download_url") + same_release = locked.tag == entry["latestTag"] and locked.asset == entry["latestAsset"] + same_hash = not entry["latestAssetSha256"] or locked.sha256 == entry["latestAssetSha256"] + if same_release and same_hash: + entry["status"] = "current" + summary["current"] += 1 + else: + entry["status"] = "update" + summary["updates"] += 1 + plugins.append(entry) + + return { + "instance": lockfile.instance, + "beatSaberVersion": lockfile.beat_saber_version, + "includePrerelease": include_prerelease, + "summary": summary, + "plugins": plugins, + } diff --git a/tests/test_plugin_helper.py b/tests/test_plugin_helper.py index cec9d78..b309f88 100644 --- a/tests/test_plugin_helper.py +++ b/tests/test_plugin_helper.py @@ -6,13 +6,15 @@ from pathlib import Path from zipfile import ZipFile from plugin_helper.checker import check_lock +from plugin_helper.cli import installed_plugins_report from plugin_helper.fsutil import sha256_file from plugin_helper.installer import apply_plan, uninstall_plugin from plugin_helper.instances import get_instance, list_instances from plugin_helper.models import Lockfile, LockedPlugin, Registry, RegistryPlugin from plugin_helper.planner import create_plan from plugin_helper.scanner import scan_instance -from plugin_helper.state import downloads_dir, load_installed_state +from plugin_helper.state import downloads_dir, load_installed_state, plugin_downloads_dir +from plugin_helper.updates import check_updates from plugin_helper.userdata import backup_userdata @@ -46,7 +48,7 @@ class PluginHelperTests(unittest.TestCase): (instance / "Beat Saber_Data").mkdir() (instance / "Plugins").mkdir() - asset = downloads_dir(state, "1.40.8") / "Example.dll" + asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.dll" asset.write_bytes(b"managed dll") registry = Registry( @@ -103,7 +105,7 @@ class PluginHelperTests(unittest.TestCase): state = work / "state" instance.mkdir(parents=True) (instance / "Beat Saber_Data").mkdir() - asset = downloads_dir(state, "1.40.8") / "Example.zip" + asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.zip" with ZipFile(asset, "w") as archive: archive.writestr("Plugins/Example.dll", b"dll") @@ -144,6 +146,49 @@ class PluginHelperTests(unittest.TestCase): apply_plan(plan, state) self.assertEqual((instance / "IPA" / "Pending" / "Plugins" / "Example.dll").read_bytes(), b"dll") + def test_plan_still_finds_legacy_flat_downloads(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + work = Path(tmp) + instance = work / "instances" / "1.40.8" + state = work / "state" + instance.mkdir(parents=True) + (instance / "Beat Saber_Data").mkdir() + asset = downloads_dir(state, "1.40.8") / "Example.dll" + asset.write_bytes(b"legacy flat download") + + plan, _ = create_plan( + instance="1.40.8", + instance_path=instance, + beat_saber_version="1.40.8", + registry=Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo=None, + install_strategy="dll-to-plugins", + ) + } + ), + lockfile=Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo=None, + tag=None, + asset="Example.dll", + sha256=sha256_file(asset), + ), + ), + ), + state_root=state, + repo_root=work, + ) + + self.assertEqual(plan["changes"][0]["source"], str(asset)) + def test_zip_member_cannot_escape_instance(self) -> None: with tempfile.TemporaryDirectory() as tmp: work = Path(tmp) @@ -151,7 +196,7 @@ class PluginHelperTests(unittest.TestCase): state = work / "state" instance.mkdir(parents=True) (instance / "Beat Saber_Data").mkdir() - asset = downloads_dir(state, "1.40.8") / "Bad.zip" + asset = plugin_downloads_dir(state, "1.40.8", "bad") / "Bad.zip" with ZipFile(asset, "w") as archive: archive.writestr("../Bad.dll", b"dll") @@ -239,6 +284,184 @@ class PluginHelperTests(unittest.TestCase): self.assertEqual(result["summary"]["errors"], 1) self.assertEqual(result["plugins"][0]["status"], "error") + def test_installed_plugins_report_includes_locked_version(self) -> None: + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example Plugin", + repo="owner/example", + install_strategy="dll-to-plugins", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo="owner/example", + tag="v1.2.3", + asset="Example.dll", + sha256="abc123", + ), + ), + ) + report = installed_plugins_report( + installed_state={ + "instance": "1.40.8", + "plugins": { + "example": { + "installedAt": "2026-06-14T17:18:40Z", + "files": [{"path": "Plugins/Example.dll"}], + } + }, + }, + registry=registry, + lockfile=lockfile, + ) + + self.assertEqual(report["plugins"][0]["name"], "Example Plugin") + self.assertEqual(report["plugins"][0]["version"], "v1.2.3") + self.assertEqual(report["plugins"][0]["asset"], "Example.dll") + self.assertEqual(report["plugins"][0]["fileCount"], 1) + + def test_update_check_reports_current_matching_asset(self) -> None: + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo="owner/example", + asset_patterns=("1.40.8.zip",), + install_strategy="bsipa-zip", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo="owner/example", + tag="v1.1.0", + asset="1.40.8.zip", + sha256="abc123", + ), + ), + ) + + result = check_updates( + registry=registry, + lockfile=lockfile, + fetch_releases=lambda repo: [ + { + "tag_name": "v1.1.0", + "published_at": "2026-06-10T00:00:00Z", + "assets": [{"name": "1.40.8.zip", "digest": "sha256:abc123"}], + } + ], + ) + + self.assertEqual(result["summary"]["current"], 1) + self.assertEqual(result["plugins"][0]["status"], "current") + self.assertEqual(result["plugins"][0]["latestAssetSha256"], "abc123") + + def test_update_check_reports_new_matching_release(self) -> None: + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo="owner/example", + asset_patterns=("*.zip",), + install_strategy="bsipa-zip", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo="owner/example", + tag="v1.1.0", + asset="1.40.8.zip", + sha256="abc123", + ), + ), + ) + + result = check_updates( + registry=registry, + lockfile=lockfile, + fetch_releases=lambda repo: [ + { + "tag_name": "v1.2.0", + "published_at": "2026-06-12T00:00:00Z", + "assets": [ + {"name": "1.29.1.zip"}, + {"name": "1.40.8.zip", "browser_download_url": "https://example.invalid/asset"}, + ], + }, + { + "tag_name": "v1.1.0", + "published_at": "2026-06-10T00:00:00Z", + "assets": [{"name": "1.40.8.zip"}], + }, + ], + ) + + self.assertEqual(result["summary"]["updates"], 1) + self.assertEqual(result["plugins"][0]["status"], "update") + self.assertEqual(result["plugins"][0]["latestTag"], "v1.2.0") + self.assertEqual(result["plugins"][0]["latestAsset"], "1.40.8.zip") + + def test_update_check_reports_replaced_asset_digest(self) -> None: + registry = Registry( + { + "example": RegistryPlugin( + id="example", + name="Example", + repo="owner/example", + asset_patterns=("1.40.8.zip",), + install_strategy="bsipa-zip", + ) + } + ) + lockfile = Lockfile( + beat_saber_version="1.40.8", + instance="1.40.8", + plugins=( + LockedPlugin( + id="example", + repo="owner/example", + tag="v1.1.0", + asset="1.40.8.zip", + sha256="old", + ), + ), + ) + + result = check_updates( + registry=registry, + lockfile=lockfile, + fetch_releases=lambda repo: [ + { + "tag_name": "v1.1.0", + "published_at": "2026-06-10T00:00:00Z", + "assets": [{"name": "1.40.8.zip", "digest": "sha256:new"}], + } + ], + ) + + self.assertEqual(result["summary"]["updates"], 1) + self.assertEqual(result["plugins"][0]["status"], "update") + self.assertEqual(result["plugins"][0]["latestAssetSha256"], "new") + if __name__ == "__main__": unittest.main()