Add plugin update and install reporting

This commit is contained in:
pleb
2026-06-28 12:12:57 -07:00
parent 5a9e873de4
commit 931c1d4f73
9 changed files with 618 additions and 42 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ def check_lock(
if not locked.asset:
messages.append({"level": "error", "message": "missing asset"})
else:
asset_path = _find_asset(locked.asset, state_root, instance, repo_root)
asset_path = _find_asset(locked.asset, state_root, instance, repo_root, locked.id)
if not asset_path:
messages.append({"level": "error", "message": "asset not found in downloads or repo assets"})
elif locked.sha256 and sha256_file(asset_path) != locked.sha256:
+155
View File
@@ -8,12 +8,15 @@ from typing import Any
from .config import instances_root, repo_root, state_root
from .checker import check_lock
from .github import fetch_releases
from .installer import apply_plan, uninstall_plugin
from .instances import get_instance, list_instances
from .models import load_lockfile, load_registry
from .models import Lockfile, Registry
from .planner import create_plan
from .scanner import scan_instance
from .state import load_installed_state
from .updates import check_updates
from .userdata import backup_userdata
@@ -21,6 +24,100 @@ def _json(data: Any) -> None:
print(json.dumps(data, indent=2, sort_keys=True))
def installed_plugins_report(
*,
installed_state: dict[str, Any],
registry: Registry,
lockfile: Lockfile,
) -> dict[str, Any]:
locked_by_id = {plugin.id: plugin for plugin in lockfile.plugins}
plugins: list[dict[str, Any]] = []
for plugin_id, plugin_state in sorted(installed_state.get("plugins", {}).items()):
registry_plugin = registry.get(plugin_id)
locked = locked_by_id.get(plugin_id)
files = plugin_state.get("files", [])
plugins.append(
{
"id": plugin_id,
"name": registry_plugin.name if registry_plugin else plugin_id,
"version": locked.tag if locked and locked.tag else "(not locked)",
"asset": locked.asset if locked and locked.asset else "(unknown)",
"repo": (locked.repo if locked and locked.repo else None)
or (registry_plugin.repo if registry_plugin else None)
or "(unknown)",
"installedAt": plugin_state.get("installedAt", "(unknown)"),
"fileCount": len(files),
"files": files,
}
)
return {
"instance": installed_state.get("instance", lockfile.instance),
"beatSaberVersion": installed_state.get("beatSaberVersion", lockfile.beat_saber_version),
"plugins": plugins,
}
def print_installed_plugins(report: dict[str, Any]) -> None:
plugins = report["plugins"]
print(f"{report['instance']} managed plugins ({len(plugins)})")
if not plugins:
print("No plugins have been installed by plugin-helper yet.")
return
headers = ("Plugin", "Version", "Asset", "Files", "Installed")
rows = [
(
f"{plugin['name']} ({plugin['id']})",
plugin["version"],
plugin["asset"],
str(plugin["fileCount"]),
plugin["installedAt"],
)
for plugin in plugins
]
widths = [
max(len(headers[index]), *(len(row[index]) for row in rows))
for index in range(len(headers))
]
header = " ".join(label.ljust(widths[index]) for index, label in enumerate(headers))
print(header)
print(" ".join("-" * width for width in widths))
for row in rows:
print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row)))
def print_updates(report: dict[str, Any]) -> None:
plugins = report["plugins"]
summary = report["summary"]
print(
f"{report['instance']} updates: "
f"{summary['updates']} available, {summary['current']} current, "
f"{summary['warnings']} warnings, {summary['errors']} errors"
)
if not plugins:
return
headers = ("Plugin", "Current", "Latest", "Asset", "Status")
rows = [
(
f"{plugin['name']} ({plugin['id']})",
plugin.get("currentTag") or "(none)",
plugin.get("latestTag") or "(unknown)",
plugin.get("latestAsset") or plugin.get("currentAsset") or "(unknown)",
plugin["status"],
)
for plugin in plugins
]
widths = [
max(len(headers[index]), *(len(row[index]) for row in rows))
for index in range(len(headers))
]
print(" ".join(label.ljust(widths[index]) for index, label in enumerate(headers)))
print(" ".join("-" * width for width in widths))
for row in rows:
print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row)))
def _add_common(parser: argparse.ArgumentParser, *, suppress_default: bool = False) -> None:
default = argparse.SUPPRESS if suppress_default else None
parser.add_argument("--instances-root", default=default, help="BSManager instances root")
@@ -54,6 +151,16 @@ def build_parser() -> argparse.ArgumentParser:
)
state.add_argument("--instance", required=True)
installed = subcommands.add_parser(
"installed",
help="List plugins installed by plugin-helper with locked release versions",
parents=[_common_parent()],
)
installed.add_argument("--instance", required=True)
installed.add_argument("--registry", default="registry/plugins.toml")
installed.add_argument("--lockfile")
installed.add_argument("--json", action="store_true", help="Print full JSON output")
check = subcommands.add_parser(
"check",
help="Validate local registry, lockfile, and release asset readiness",
@@ -64,6 +171,18 @@ def build_parser() -> argparse.ArgumentParser:
check.add_argument("--lockfile")
check.add_argument("--json", action="store_true", help="Print full JSON check output")
updates = subcommands.add_parser(
"updates",
help="Check GitHub for newer matching releases for locked plugins",
parents=[_common_parent()],
)
updates.add_argument("--instance", required=True)
updates.add_argument("--registry", default="registry/plugins.toml")
updates.add_argument("--lockfile")
updates.add_argument("--plugin", action="append", help="Check only this locked plugin id; repeatable")
updates.add_argument("--include-prerelease", action="store_true", help="Include prerelease GitHub releases")
updates.add_argument("--json", action="store_true", help="Print full JSON update output")
plan = subcommands.add_parser(
"plan",
help="Create a dry-run install plan from registry and lockfile",
@@ -147,6 +266,23 @@ def run(argv: list[str] | None = None) -> int:
_json(load_installed_state(st_root, args.instance))
return 0
if args.command == "installed":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = installed_plugins_report(
installed_state=load_installed_state(st_root, args.instance),
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
)
if args.json:
_json(result)
else:
print_installed_plugins(result)
return 0
if args.command == "check":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
@@ -176,6 +312,25 @@ def run(argv: list[str] | None = None) -> int:
print(f" {message['level']}: {message['message']}")
return 2 if result["summary"]["errors"] else 0
if args.command == "updates":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = check_updates(
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
fetch_releases=fetch_releases,
selected=set(args.plugin) if args.plugin else None,
include_prerelease=args.include_prerelease,
)
if args.json:
_json(result)
else:
print_updates(result)
return 2 if result["summary"]["errors"] else 0
if args.command == "plan":
instance = get_instance(inst_root, args.instance)
root = repo_root()
+22
View File
@@ -0,0 +1,22 @@
from __future__ import annotations
import json
import os
from typing import Any
from urllib.request import Request, urlopen
def fetch_releases(repo: str) -> list[dict[str, Any]]:
token = os.environ.get("GITHUB_TOKEN")
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "plugin-helper",
}
if token:
headers["Authorization"] = f"Bearer {token}"
request = Request(f"https://api.github.com/repos/{repo}/releases", headers=headers)
with urlopen(request, timeout=20) as response:
data = json.loads(response.read().decode("utf-8"))
if not isinstance(data, list):
raise ValueError(f"unexpected GitHub releases response for {repo}")
return data
+8 -5
View File
@@ -9,7 +9,7 @@ from zipfile import ZipFile
from .fsutil import ensure_relative, sha256_bytes, sha256_file
from .models import Lockfile, Registry, VALID_STRATEGIES
from .state import downloads_dir, plans_dir
from .state import downloads_dir, plans_dir, plugin_downloads_dir
ALLOWED_BSIPA_TOP_LEVEL = {"IPA", "Libs", "Plugins"}
@@ -19,13 +19,15 @@ def _now_slug() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path) -> Path | None:
def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path, plugin_id: str | None = None) -> Path | None:
candidates = [
Path(asset).expanduser(),
downloads_dir(state_root, instance) / asset,
repo_root / "assets" / asset,
repo_root / "locks" / "assets" / asset,
]
if plugin_id:
candidates.insert(1, plugin_downloads_dir(state_root, instance, plugin_id) / asset)
candidates.insert(2 if plugin_id else 1, downloads_dir(state_root, instance) / asset)
for candidate in candidates:
if candidate.exists() and candidate.is_file():
return candidate
@@ -91,10 +93,11 @@ def create_plan(
if registry_plugin and not _asset_matches_patterns(Path(locked.asset).name, registry_plugin.asset_patterns):
warnings.append(f"{locked.id}: asset does not match registry patterns")
asset_path = _find_asset(locked.asset, state_root, instance, repo_root)
asset_path = _find_asset(locked.asset, state_root, instance, repo_root, locked.id)
if not asset_path:
raise FileNotFoundError(
f"{locked.id}: asset not found: {locked.asset}; put it in {downloads_dir(state_root, instance)}"
f"{locked.id}: asset not found: {locked.asset}; put it in "
f"{plugin_downloads_dir(state_root, instance, locked.id)}"
)
asset_sha = sha256_file(asset_path)
if locked.sha256 and locked.sha256 != asset_sha:
+6
View File
@@ -40,6 +40,12 @@ def downloads_dir(state_root: Path, instance: str) -> Path:
return path
def plugin_downloads_dir(state_root: Path, instance: str, plugin_id: str) -> Path:
path = downloads_dir(state_root, instance) / plugin_id
path.mkdir(parents=True, exist_ok=True)
return path
def backups_dir(state_root: Path, instance: str) -> Path:
path = instance_state_dir(state_root, instance) / "backups"
path.mkdir(parents=True, exist_ok=True)
+179
View File
@@ -0,0 +1,179 @@
from __future__ import annotations
import fnmatch
import re
from typing import Any, Callable
from .models import Lockfile, Registry
FetchReleases = Callable[[str], list[dict[str, Any]]]
def _release_tag(release: dict[str, Any]) -> str:
return str(release.get("tag_name") or "")
def _release_assets(release: dict[str, Any]) -> list[dict[str, Any]]:
assets = release.get("assets") or []
return [asset for asset in assets if isinstance(asset, dict)]
def _asset_name(asset: dict[str, Any]) -> str:
return str(asset.get("name") or "")
def _semver_key(tag: str) -> tuple[int, tuple[int, ...], str]:
match = re.search(r"(\d+(?:\.\d+){0,3})", tag)
if not match:
return (0, (), tag)
return (1, tuple(int(part) for part in match.group(1).split(".")), tag)
def _sorted_releases(releases: list[dict[str, Any]], include_prerelease: bool) -> list[dict[str, Any]]:
candidates = [
release
for release in releases
if not release.get("draft") and (include_prerelease or not release.get("prerelease"))
]
return sorted(
candidates,
key=lambda release: (
str(release.get("published_at") or release.get("created_at") or ""),
_semver_key(_release_tag(release)),
),
reverse=True,
)
def _matching_assets(
release: dict[str, Any],
*,
asset_patterns: tuple[str, ...],
current_asset: str | None,
beat_saber_version: str,
) -> list[dict[str, Any]]:
assets = _release_assets(release)
if asset_patterns:
assets = [
asset
for asset in assets
if any(fnmatch.fnmatch(_asset_name(asset), pattern) for pattern in asset_patterns)
]
if not assets:
return []
exact_version = [asset for asset in assets if _asset_name(asset) == f"{beat_saber_version}.zip"]
if exact_version:
return exact_version
if current_asset:
same_name = [asset for asset in assets if _asset_name(asset) == current_asset]
if same_name:
return same_name
contains_version = [asset for asset in assets if beat_saber_version in _asset_name(asset)]
return contains_version or assets
def _find_latest_matching_release(
releases: list[dict[str, Any]],
*,
asset_patterns: tuple[str, ...],
current_asset: str | None,
beat_saber_version: str,
include_prerelease: bool,
) -> tuple[dict[str, Any], dict[str, Any]] | None:
for release in _sorted_releases(releases, include_prerelease):
assets = _matching_assets(
release,
asset_patterns=asset_patterns,
current_asset=current_asset,
beat_saber_version=beat_saber_version,
)
if assets:
return release, assets[0]
return None
def check_updates(
*,
registry: Registry,
lockfile: Lockfile,
fetch_releases: FetchReleases,
selected: set[str] | None = None,
include_prerelease: bool = False,
) -> dict[str, Any]:
selected_ids = selected or {plugin.id for plugin in lockfile.plugins}
plugins: list[dict[str, Any]] = []
summary = {"current": 0, "updates": 0, "warnings": 0, "errors": 0}
for locked in lockfile.plugins:
if locked.id not in selected_ids:
continue
registry_plugin = registry.get(locked.id)
repo = locked.repo or (registry_plugin.repo if registry_plugin else None)
entry: dict[str, Any] = {
"id": locked.id,
"name": registry_plugin.name if registry_plugin else locked.id,
"repo": repo,
"currentTag": locked.tag,
"currentAsset": locked.asset,
"currentSha256": locked.sha256,
"latestTag": None,
"latestAsset": None,
"latestAssetSha256": None,
"status": "unknown",
"messages": [],
}
if not repo:
entry["status"] = "warning"
entry["messages"].append("missing repository")
summary["warnings"] += 1
plugins.append(entry)
continue
try:
releases = fetch_releases(repo)
except Exception as exc:
entry["status"] = "error"
entry["messages"].append(str(exc))
summary["errors"] += 1
plugins.append(entry)
continue
match = _find_latest_matching_release(
releases,
asset_patterns=registry_plugin.asset_patterns if registry_plugin else (),
current_asset=locked.asset,
beat_saber_version=lockfile.beat_saber_version,
include_prerelease=include_prerelease,
)
if not match:
entry["status"] = "warning"
entry["messages"].append("no matching release asset found")
summary["warnings"] += 1
plugins.append(entry)
continue
latest_release, latest_asset = match
entry["latestTag"] = _release_tag(latest_release)
entry["latestAsset"] = _asset_name(latest_asset)
entry["latestAssetSha256"] = str(latest_asset.get("digest") or "").removeprefix("sha256:") or None
entry["latestUrl"] = latest_release.get("html_url")
entry["latestAssetUrl"] = latest_asset.get("browser_download_url")
same_release = locked.tag == entry["latestTag"] and locked.asset == entry["latestAsset"]
same_hash = not entry["latestAssetSha256"] or locked.sha256 == entry["latestAssetSha256"]
if same_release and same_hash:
entry["status"] = "current"
summary["current"] += 1
else:
entry["status"] = "update"
summary["updates"] += 1
plugins.append(entry)
return {
"instance": lockfile.instance,
"beatSaberVersion": lockfile.beat_saber_version,
"includePrerelease": include_prerelease,
"summary": summary,
"plugins": plugins,
}