Add plugin helper with agent skill for updating plugins

This commit is contained in:
pleb
2026-06-14 10:26:22 -07:00
parent a9881ebec4
commit caaa4a6558
23 changed files with 1604 additions and 0 deletions
+3
View File
@@ -0,0 +1,3 @@
"""Beat Saber plugin helper."""
__version__ = "0.1.0"
+5
View File
@@ -0,0 +1,5 @@
from .cli import main
if __name__ == "__main__":
main()
+66
View File
@@ -0,0 +1,66 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from .fsutil import sha256_file
from .models import Lockfile, Registry
from .planner import _find_asset
def check_lock(
*,
instance: str,
registry: Registry,
lockfile: Lockfile,
state_root: Path,
repo_root: Path,
) -> dict[str, Any]:
entries: list[dict[str, Any]] = []
summary = {"ok": 0, "warnings": 0, "errors": 0}
for locked in lockfile.plugins:
registry_plugin = registry.get(locked.id)
strategy = locked.install_strategy or (registry_plugin.install_strategy if registry_plugin else "manual")
messages: list[dict[str, str]] = []
if not registry_plugin:
messages.append({"level": "warning", "message": "missing registry entry"})
if strategy == "manual":
messages.append({"level": "error", "message": "manual install strategy cannot be applied"})
if not locked.asset:
messages.append({"level": "error", "message": "missing asset"})
else:
asset_path = _find_asset(locked.asset, state_root, instance, repo_root)
if not asset_path:
messages.append({"level": "error", "message": "asset not found in downloads or repo assets"})
elif locked.sha256 and sha256_file(asset_path) != locked.sha256:
messages.append({"level": "error", "message": "asset sha256 mismatch"})
if any(message["level"] == "error" for message in messages):
status = "error"
summary["errors"] += 1
elif messages:
status = "warning"
summary["warnings"] += 1
else:
status = "ok"
summary["ok"] += 1
entries.append(
{
"id": locked.id,
"repo": locked.repo or (registry_plugin.repo if registry_plugin else None),
"tag": locked.tag,
"asset": locked.asset,
"installStrategy": strategy,
"status": status,
"messages": messages,
}
)
return {
"instance": instance,
"beatSaberVersion": lockfile.beat_saber_version,
"summary": summary,
"plugins": entries,
}
+245
View File
@@ -0,0 +1,245 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
from .config import instances_root, repo_root, state_root
from .checker import check_lock
from .installer import apply_plan, uninstall_plugin
from .instances import get_instance, list_instances
from .models import load_lockfile, load_registry
from .planner import create_plan
from .scanner import scan_instance
from .state import load_installed_state
from .userdata import backup_userdata
def _json(data: Any) -> None:
print(json.dumps(data, indent=2, sort_keys=True))
def _add_common(parser: argparse.ArgumentParser, *, suppress_default: bool = False) -> None:
default = argparse.SUPPRESS if suppress_default else None
parser.add_argument("--instances-root", default=default, help="BSManager instances root")
parser.add_argument("--state-dir", default=default, help="plugin-helper state directory")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="plugin-helper")
_add_common(parser)
subcommands = parser.add_subparsers(dest="command", required=True)
subcommands.add_parser(
"instances",
help="List discovered BSManager Beat Saber instances",
parents=[_common_parent()],
)
scan = subcommands.add_parser(
"scan",
help="Inspect installed Plugins, Libs, and IPA/Pending files",
parents=[_common_parent()],
)
scan.add_argument("--instance", required=True)
scan.add_argument("--hashes", action="store_true", help="Include sha256 hashes")
scan.add_argument("--json", action="store_true", help="Print full JSON scan output")
state = subcommands.add_parser(
"state",
help="Show recorded plugin-helper install state",
parents=[_common_parent()],
)
state.add_argument("--instance", required=True)
check = subcommands.add_parser(
"check",
help="Validate local registry, lockfile, and release asset readiness",
parents=[_common_parent()],
)
check.add_argument("--instance", required=True)
check.add_argument("--registry", default="registry/plugins.toml")
check.add_argument("--lockfile")
check.add_argument("--json", action="store_true", help="Print full JSON check output")
plan = subcommands.add_parser(
"plan",
help="Create a dry-run install plan from registry and lockfile",
parents=[_common_parent()],
)
plan.add_argument("--instance", required=True)
plan.add_argument("--registry", default="registry/plugins.toml")
plan.add_argument("--lockfile")
plan.add_argument("--plugin", "--update", action="append", help="Plan only this locked plugin id; repeatable")
apply = subcommands.add_parser(
"apply",
help="Apply a previously generated plan",
parents=[_common_parent()],
)
apply.add_argument("plan")
uninstall = subcommands.add_parser(
"uninstall",
help="Remove files recorded for a managed plugin",
parents=[_common_parent()],
)
uninstall.add_argument("--instance", required=True)
uninstall.add_argument("plugin")
uninstall.add_argument("--force", action="store_true", help="Delete even when current file hashes differ")
backup = subcommands.add_parser(
"backup-userdata",
help="Create a timestamped UserData backup archive",
parents=[_common_parent()],
)
backup.add_argument("--instance", required=True)
return parser
def _common_parent() -> argparse.ArgumentParser:
parent = argparse.ArgumentParser(add_help=False)
_add_common(parent, suppress_default=True)
return parent
def run(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
inst_root = instances_root(getattr(args, "instances_root", None))
st_root = state_root(getattr(args, "state_dir", None))
try:
if args.command == "instances":
found = list_instances(inst_root)
if not found:
print(f"No Beat Saber instances found under {inst_root}")
return 1
for item in found:
flags = []
if item.has_plugins:
flags.append("Plugins")
if item.has_libs:
flags.append("Libs")
if item.has_userdata:
flags.append("UserData")
suffix = f" ({', '.join(flags)})" if flags else ""
print(f"{item.name}\t{item.path}{suffix}")
return 0
if args.command == "scan":
instance = get_instance(inst_root, args.instance)
result = scan_instance(instance.path, include_hashes=args.hashes)
if args.json:
_json(result)
else:
counts = result["counts"]
print(f"{instance.name}: {counts['files']} files")
print(f" Plugins: {counts['plugins']}")
print(f" Libs: {counts['libs']}")
print(f" IPA/Pending: {counts['pending']}")
return 0
if args.command == "state":
_json(load_installed_state(st_root, args.instance))
return 0
if args.command == "check":
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
result = check_lock(
instance=args.instance,
registry=load_registry(registry_path),
lockfile=load_lockfile(lock_path),
state_root=st_root,
repo_root=root,
)
if args.json:
_json(result)
else:
summary = result["summary"]
print(
f"{args.instance}: {summary['ok']} ok, "
f"{summary['warnings']} warnings, {summary['errors']} errors"
)
for plugin in result["plugins"]:
if plugin["status"] == "ok":
continue
print(f" {plugin['id']}: {plugin['status']}")
for message in plugin["messages"]:
print(f" {message['level']}: {message['message']}")
return 2 if result["summary"]["errors"] else 0
if args.command == "plan":
instance = get_instance(inst_root, args.instance)
root = repo_root()
registry_path = (root / args.registry).resolve() if not Path(args.registry).is_absolute() else Path(args.registry)
lock_path = Path(args.lockfile) if args.lockfile else root / "locks" / f"{args.instance}.lock.toml"
if not lock_path.is_absolute():
lock_path = (root / lock_path).resolve()
registry = load_registry(registry_path)
lockfile = load_lockfile(lock_path)
selected = set(args.plugin) if args.plugin else None
plan, path = create_plan(
instance=args.instance,
instance_path=instance.path,
beat_saber_version=lockfile.beat_saber_version,
registry=registry,
lockfile=lockfile,
state_root=st_root,
repo_root=root,
selected=selected,
)
print(f"Wrote plan: {path}")
print(f"Changes: {len(plan['changes'])}")
for warning in plan["warnings"]:
print(f"Warning: {warning}", file=sys.stderr)
return 0
if args.command == "apply":
with Path(args.plan).open("r", encoding="utf-8") as handle:
plan = json.load(handle)
result = apply_plan(plan, st_root)
print(f"Applied {len(result['applied'])} file changes")
print(f"State: {result['statePath']}")
return 0
if args.command == "uninstall":
instance = get_instance(inst_root, args.instance)
result = uninstall_plugin(args.instance, instance.path, st_root, args.plugin, force=args.force)
print(f"Removed: {len(result['removed'])}")
if result["skipped"]:
print("Skipped:")
for item in result["skipped"]:
print(f" {item['path']}: {item['reason']}")
return 0 if result["stateUpdated"] else 2
if args.command == "backup-userdata":
instance = get_instance(inst_root, args.instance)
result = backup_userdata(args.instance, instance.path, st_root)
manifest = result["manifest"]
print(f"Archive: {result['archive']}")
print(f"Files: {manifest['fileCount']}")
print(f"Bytes: {manifest['totalSize']}")
return 0
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
parser.error(f"unknown command: {args.command}")
return 2
def main() -> None:
raise SystemExit(run())
if __name__ == "__main__":
main()
+24
View File
@@ -0,0 +1,24 @@
from __future__ import annotations
import os
from pathlib import Path
DEFAULT_INSTANCES_ROOT = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances")
def instances_root(value: str | None = None) -> Path:
raw = value or os.environ.get("PLUGIN_HELPER_INSTANCES_ROOT")
return Path(raw).expanduser() if raw else DEFAULT_INSTANCES_ROOT
def state_root(value: str | None = None) -> Path:
if value:
return Path(value).expanduser()
xdg_state = os.environ.get("XDG_STATE_HOME")
base = Path(xdg_state).expanduser() if xdg_state else Path.home() / ".local" / "state"
return base / "plugin-helper"
def repo_root() -> Path:
return Path(__file__).resolve().parents[2]
+59
View File
@@ -0,0 +1,59 @@
from __future__ import annotations
import hashlib
import json
import os
import tempfile
from pathlib import Path
from typing import Any
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def atomic_write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", dir=path.parent)
try:
with os.fdopen(fd, "w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2, sort_keys=True)
handle.write("\n")
Path(tmp_name).replace(path)
except Exception:
Path(tmp_name).unlink(missing_ok=True)
raise
def read_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def ensure_relative(path: str) -> Path:
if "\\" in path:
raise ValueError(f"unsafe relative path: {path}")
rel = Path(path)
if rel.is_absolute() or ".." in rel.parts or any(part.endswith(":") for part in rel.parts):
raise ValueError(f"unsafe relative path: {path}")
return rel
def ensure_inside(root: Path, target: Path) -> Path:
root_resolved = root.resolve()
target_resolved = target.resolve(strict=False)
try:
target_resolved.relative_to(root_resolved)
except ValueError as exc:
raise ValueError(f"target escapes instance root: {target}") from exc
return target_resolved
+112
View File
@@ -0,0 +1,112 @@
from __future__ import annotations
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from zipfile import ZipFile
from .fsutil import ensure_inside, ensure_relative, sha256_bytes, sha256_file
from .state import backups_dir, load_installed_state, save_installed_state
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def _backup_existing(instance_path: Path, backup_root: Path, rel_target: str) -> str | None:
target = ensure_inside(instance_path, instance_path / ensure_relative(rel_target))
if not target.exists():
return None
backup_path = backup_root / rel_target
backup_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(target, backup_path)
return str(backup_path)
def apply_plan(plan: dict[str, Any], state_root: Path) -> dict[str, Any]:
instance = plan["instance"]
instance_path = Path(plan["instancePath"])
if not instance_path.is_dir():
raise FileNotFoundError(f"instance path does not exist: {instance_path}")
backup_root = backups_dir(state_root, instance) / f"apply-{_timestamp()}"
installed_state = load_installed_state(state_root, instance)
installed_state.setdefault("beatSaberVersion", plan.get("beatSaberVersion"))
installed_state.setdefault("plugins", {})
applied: list[dict[str, Any]] = []
for change in plan.get("changes", []):
source = Path(change["source"])
if sha256_file(source) != change["sourceSha256"]:
raise ValueError(f"source hash changed: {source}")
rel_target = ensure_relative(change["target"]).as_posix()
target = ensure_inside(instance_path, instance_path / rel_target)
backup = _backup_existing(instance_path, backup_root, rel_target)
target.parent.mkdir(parents=True, exist_ok=True)
if change["action"] == "copy":
shutil.copy2(source, target)
elif change["action"] == "extract":
with ZipFile(source) as archive:
data = archive.read(change["archiveMember"])
if sha256_bytes(data) != change["sha256"]:
raise ValueError(f"archive member hash changed: {change['archiveMember']}")
target.write_bytes(data)
else:
raise ValueError(f"unsupported action: {change['action']}")
actual_sha = sha256_file(target)
if actual_sha != change["sha256"]:
raise ValueError(f"installed file hash mismatch: {rel_target}")
plugin_state = installed_state["plugins"].setdefault(
change["plugin"],
{
"installedAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"files": [],
},
)
plugin_state["files"] = [
item for item in plugin_state.get("files", []) if item.get("path") != rel_target
]
plugin_state["files"].append(
{
"path": rel_target,
"sha256": actual_sha,
"size": target.stat().st_size,
}
)
applied.append({"path": rel_target, "plugin": change["plugin"], "backup": backup})
save_installed_state(state_root, instance, installed_state)
return {"applied": applied, "statePath": str(state_root / "instances" / instance / "installed.json")}
def uninstall_plugin(instance: str, instance_path: Path, state_root: Path, plugin_id: str, force: bool = False) -> dict[str, Any]:
installed_state = load_installed_state(state_root, instance)
plugin_state = installed_state.get("plugins", {}).get(plugin_id)
if not plugin_state:
raise KeyError(f"plugin is not recorded in install state: {plugin_id}")
removed: list[str] = []
skipped: list[dict[str, str]] = []
for item in plugin_state.get("files", []):
rel_path = ensure_relative(item["path"]).as_posix()
target = ensure_inside(instance_path, instance_path / rel_path)
if not target.exists():
removed.append(rel_path)
continue
current_sha = sha256_file(target)
if current_sha != item.get("sha256") and not force:
skipped.append({"path": rel_path, "reason": "hash mismatch"})
continue
target.unlink()
removed.append(rel_path)
if skipped and not force:
return {"removed": removed, "skipped": skipped, "stateUpdated": False}
installed_state.get("plugins", {}).pop(plugin_id, None)
save_installed_state(state_root, instance, installed_state)
return {"removed": removed, "skipped": skipped, "stateUpdated": True}
+54
View File
@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class Instance:
name: str
path: Path
has_plugins: bool
has_libs: bool
has_userdata: bool
def looks_like_instance(path: Path) -> bool:
return (
(path / "Beat Saber_Data").is_dir()
or (path / "Beat Saber.exe").exists()
or (path / "Plugins").is_dir()
or (path / "UserData").is_dir()
)
def list_instances(root: Path) -> list[Instance]:
if not root.exists():
return []
instances: list[Instance] = []
for child in sorted(root.iterdir(), key=lambda item: item.name):
if not child.is_dir() or not looks_like_instance(child):
continue
instances.append(
Instance(
name=child.name,
path=child,
has_plugins=(child / "Plugins").is_dir(),
has_libs=(child / "Libs").is_dir(),
has_userdata=(child / "UserData").is_dir(),
)
)
return instances
def get_instance(root: Path, name: str) -> Instance:
path = root / name
if not path.is_dir() or not looks_like_instance(path):
raise FileNotFoundError(f"Beat Saber instance not found: {path}")
return Instance(
name=name,
path=path,
has_plugins=(path / "Plugins").is_dir(),
has_libs=(path / "Libs").is_dir(),
has_userdata=(path / "UserData").is_dir(),
)
+109
View File
@@ -0,0 +1,109 @@
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
VALID_STRATEGIES = {"dll-to-plugins", "zip-to-pending", "bsipa-zip", "root-zip", "manual"}
@dataclass(frozen=True)
class Dependency:
id: str
constraint: str | None = None
required: bool = True
@dataclass(frozen=True)
class RegistryPlugin:
id: str
name: str
repo: str | None
asset_patterns: tuple[str, ...] = ()
install_strategy: str = "manual"
category: str | None = None
dependencies: tuple[Dependency, ...] = ()
@dataclass(frozen=True)
class Registry:
plugins: dict[str, RegistryPlugin] = field(default_factory=dict)
def get(self, plugin_id: str) -> RegistryPlugin | None:
return self.plugins.get(plugin_id)
@dataclass(frozen=True)
class LockedPlugin:
id: str
repo: str | None
tag: str | None
asset: str | None
sha256: str | None
install_strategy: str | None = None
reason: str | None = None
@dataclass(frozen=True)
class Lockfile:
beat_saber_version: str
instance: str
plugins: tuple[LockedPlugin, ...]
def _load_toml(path: Path) -> dict[str, Any]:
with path.open("rb") as handle:
return tomllib.load(handle)
def load_registry(path: Path) -> Registry:
if not path.exists():
return Registry()
data = _load_toml(path)
plugins: dict[str, RegistryPlugin] = {}
for item in data.get("plugins", []):
dependencies = tuple(
Dependency(
id=dep["id"],
constraint=dep.get("constraint"),
required=dep.get("required", True),
)
for dep in item.get("dependencies", [])
)
strategy = item.get("install_strategy", "manual")
if strategy not in VALID_STRATEGIES:
raise ValueError(f"{path}: invalid install_strategy for {item['id']}: {strategy}")
plugin = RegistryPlugin(
id=item["id"],
name=item.get("name", item["id"]),
repo=item.get("repo"),
asset_patterns=tuple(item.get("asset_patterns", [])),
install_strategy=strategy,
category=item.get("category"),
dependencies=dependencies,
)
plugins[plugin.id] = plugin
return Registry(plugins)
def load_lockfile(path: Path) -> Lockfile:
data = _load_toml(path)
plugins = tuple(
LockedPlugin(
id=item["id"],
repo=item.get("repo"),
tag=item.get("tag"),
asset=item.get("asset"),
sha256=item.get("sha256"),
install_strategy=item.get("install_strategy"),
reason=item.get("reason"),
)
for item in data.get("plugins", [])
)
return Lockfile(
beat_saber_version=data["beat_saber_version"],
instance=data.get("instance", data["beat_saber_version"]),
plugins=plugins,
)
+148
View File
@@ -0,0 +1,148 @@
from __future__ import annotations
import fnmatch
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from zipfile import ZipFile
from .fsutil import ensure_relative, sha256_bytes, sha256_file
from .models import Lockfile, Registry, VALID_STRATEGIES
from .state import downloads_dir, plans_dir
ALLOWED_BSIPA_TOP_LEVEL = {"IPA", "Libs", "Plugins"}
def _now_slug() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def _find_asset(asset: str, state_root: Path, instance: str, repo_root: Path) -> Path | None:
candidates = [
Path(asset).expanduser(),
downloads_dir(state_root, instance) / asset,
repo_root / "assets" / asset,
repo_root / "locks" / "assets" / asset,
]
for candidate in candidates:
if candidate.exists() and candidate.is_file():
return candidate
return None
def _zip_members(asset_path: Path) -> list[tuple[str, int, str]]:
result: list[tuple[str, int, str]] = []
with ZipFile(asset_path) as archive:
for info in archive.infolist():
if info.is_dir():
continue
rel = ensure_relative(info.filename).as_posix()
data = archive.read(info)
result.append((rel, len(data), sha256_bytes(data)))
return result
def _target_for_member(strategy: str, member: str) -> str:
rel = ensure_relative(member).as_posix()
if strategy == "zip-to-pending":
return f"IPA/Pending/{rel}"
if strategy == "root-zip":
return rel
if strategy == "bsipa-zip":
top = rel.split("/", 1)[0]
if top not in ALLOWED_BSIPA_TOP_LEVEL:
raise ValueError(f"bsipa-zip member has unsupported top-level path: {rel}")
return rel
raise ValueError(f"unsupported zip strategy: {strategy}")
def _asset_matches_patterns(name: str, patterns: tuple[str, ...]) -> bool:
return not patterns or any(fnmatch.fnmatch(name, pattern) for pattern in patterns)
def create_plan(
*,
instance: str,
instance_path: Path,
beat_saber_version: str,
registry: Registry,
lockfile: Lockfile,
state_root: Path,
repo_root: Path,
selected: set[str] | None = None,
) -> tuple[dict[str, Any], Path]:
selected_ids = selected or {plugin.id for plugin in lockfile.plugins}
changes: list[dict[str, Any]] = []
warnings: list[str] = []
for locked in lockfile.plugins:
if locked.id not in selected_ids:
continue
registry_plugin = registry.get(locked.id)
strategy = locked.install_strategy or (registry_plugin.install_strategy if registry_plugin else "manual")
if strategy not in VALID_STRATEGIES:
raise ValueError(f"{locked.id}: invalid install strategy: {strategy}")
if strategy == "manual":
raise ValueError(f"{locked.id}: install_strategy is manual; add a concrete registry rule first")
if not locked.asset:
raise ValueError(f"{locked.id}: lock entry has no asset")
if registry_plugin and not _asset_matches_patterns(Path(locked.asset).name, registry_plugin.asset_patterns):
warnings.append(f"{locked.id}: asset does not match registry patterns")
asset_path = _find_asset(locked.asset, state_root, instance, repo_root)
if not asset_path:
raise FileNotFoundError(
f"{locked.id}: asset not found: {locked.asset}; put it in {downloads_dir(state_root, instance)}"
)
asset_sha = sha256_file(asset_path)
if locked.sha256 and locked.sha256 != asset_sha:
raise ValueError(f"{locked.id}: asset sha256 mismatch for {asset_path}")
if strategy == "dll-to-plugins":
if asset_path.suffix.lower() != ".dll":
raise ValueError(f"{locked.id}: dll-to-plugins expects a .dll asset")
changes.append(
{
"plugin": locked.id,
"action": "copy",
"source": str(asset_path),
"sourceSha256": asset_sha,
"target": f"Plugins/{asset_path.name}",
"size": asset_path.stat().st_size,
"sha256": asset_sha,
}
)
continue
if asset_path.suffix.lower() != ".zip":
raise ValueError(f"{locked.id}: {strategy} expects a .zip asset")
for member, size, member_sha in _zip_members(asset_path):
changes.append(
{
"plugin": locked.id,
"action": "extract",
"source": str(asset_path),
"sourceSha256": asset_sha,
"archiveMember": member,
"target": _target_for_member(strategy, member),
"size": size,
"sha256": member_sha,
}
)
plan = {
"schemaVersion": 1,
"createdAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"instance": instance,
"instancePath": str(instance_path),
"beatSaberVersion": beat_saber_version,
"warnings": warnings,
"changes": changes,
}
plan_path = plans_dir(state_root, instance) / f"plan-{_now_slug()}.json"
with plan_path.open("w", encoding="utf-8") as handle:
json.dump(plan, handle, indent=2, sort_keys=True)
handle.write("\n")
return plan, plan_path
+33
View File
@@ -0,0 +1,33 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from .fsutil import sha256_file
SCAN_DIRS = ("Plugins", "Libs", "IPA/Pending")
def scan_instance(instance_path: Path, include_hashes: bool = False) -> dict[str, Any]:
files: list[dict[str, Any]] = []
for dirname in SCAN_DIRS:
root = instance_path / dirname
if not root.exists():
continue
for path in sorted(item for item in root.rglob("*") if item.is_file()):
rel = path.relative_to(instance_path).as_posix()
entry: dict[str, Any] = {"path": rel, "size": path.stat().st_size}
if include_hashes:
entry["sha256"] = sha256_file(path)
files.append(entry)
return {
"instancePath": str(instance_path),
"files": files,
"counts": {
"files": len(files),
"plugins": sum(1 for item in files if item["path"].startswith("Plugins/")),
"libs": sum(1 for item in files if item["path"].startswith("Libs/")),
"pending": sum(1 for item in files if item["path"].startswith("IPA/Pending/")),
},
}
+46
View File
@@ -0,0 +1,46 @@
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .fsutil import atomic_write_json, read_json
def instance_state_dir(state_root: Path, instance: str) -> Path:
return state_root / "instances" / instance
def installed_state_path(state_root: Path, instance: str) -> Path:
return instance_state_dir(state_root, instance) / "installed.json"
def load_installed_state(state_root: Path, instance: str) -> dict[str, Any]:
return read_json(
installed_state_path(state_root, instance),
{"instance": instance, "plugins": {}},
)
def save_installed_state(state_root: Path, instance: str, state: dict[str, Any]) -> None:
state.setdefault("instance", instance)
state["updatedAt"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
atomic_write_json(installed_state_path(state_root, instance), state)
def plans_dir(state_root: Path, instance: str) -> Path:
path = instance_state_dir(state_root, instance) / "plans"
path.mkdir(parents=True, exist_ok=True)
return path
def downloads_dir(state_root: Path, instance: str) -> Path:
path = instance_state_dir(state_root, instance) / "downloads"
path.mkdir(parents=True, exist_ok=True)
return path
def backups_dir(state_root: Path, instance: str) -> Path:
path = instance_state_dir(state_root, instance) / "backups"
path.mkdir(parents=True, exist_ok=True)
return path
+46
View File
@@ -0,0 +1,46 @@
from __future__ import annotations
import json
import tarfile
from datetime import datetime, timezone
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from .fsutil import sha256_file
from .state import backups_dir
def backup_userdata(instance: str, instance_path: Path, state_root: Path) -> dict[str, Any]:
source = instance_path / "UserData"
if not source.is_dir():
raise FileNotFoundError(f"UserData directory not found: {source}")
created_at = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
destination = backups_dir(state_root, instance) / f"userdata-{created_at}.tar.gz"
files: list[dict[str, Any]] = []
total_size = 0
for path in sorted(item for item in source.rglob("*") if item.is_file()):
rel = path.relative_to(instance_path).as_posix()
size = path.stat().st_size
total_size += size
files.append({"path": rel, "size": size, "sha256": sha256_file(path)})
manifest = {
"createdAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"instance": instance,
"source": str(source),
"fileCount": len(files),
"totalSize": total_size,
"files": files,
}
destination.parent.mkdir(parents=True, exist_ok=True)
with tarfile.open(destination, "w:gz") as archive:
archive.add(source, arcname="UserData")
with NamedTemporaryFile("w", encoding="utf-8", suffix=".json") as handle:
json.dump(manifest, handle, indent=2, sort_keys=True)
handle.write("\n")
handle.flush()
archive.add(handle.name, arcname="manifest.json")
return {"archive": str(destination), "manifest": manifest}