Add BeatMods parsing and userdata restore

This commit is contained in:
pleb
2026-06-29 10:56:57 -07:00
parent 17bd736e59
commit 1f35f6b078
10 changed files with 448 additions and 22 deletions
+75
View File
@@ -0,0 +1,75 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class BeatModsEntry:
name: str
mod_id: int | None
git_url: str | None
category: str | None
version_id: int | None
mod_version: str | None
zip_hash: str | None
dependencies: tuple[int, ...]
raw: dict[str, Any]
def extract_mods(payload: Any) -> list[dict[str, Any]]:
"""Return the list of BeatMods records from either current or legacy shapes."""
if isinstance(payload, dict) and isinstance(payload.get("mods"), list):
payload = payload["mods"]
if not isinstance(payload, list):
raise ValueError("BeatMods payload must be a list or an object with a mods list")
return [item for item in payload if isinstance(item, dict)]
def normalize_entry(entry: dict[str, Any]) -> BeatModsEntry:
"""Normalize BeatMods' nested {mod, latest} response and older flat records."""
mod = entry.get("mod") if isinstance(entry.get("mod"), dict) else entry
latest = entry.get("latest") if isinstance(entry.get("latest"), dict) else entry
dependencies = latest.get("dependencies", ())
return BeatModsEntry(
name=str(mod.get("name") or ""),
mod_id=_int_or_none(mod.get("id")),
git_url=_str_or_none(mod.get("gitUrl")),
category=_str_or_none(mod.get("category")),
version_id=_int_or_none(latest.get("id")),
mod_version=_str_or_none(latest.get("modVersion")),
zip_hash=_str_or_none(latest.get("zipHash")),
dependencies=tuple(_dependency_id(dep) for dep in dependencies if _dependency_id(dep) is not None),
raw=entry,
)
def normalize_mods(payload: Any) -> list[BeatModsEntry]:
return [normalize_entry(entry) for entry in extract_mods(payload)]
def by_version_id(entries: list[BeatModsEntry]) -> dict[int, BeatModsEntry]:
return {entry.version_id: entry for entry in entries if entry.version_id is not None}
def _dependency_id(dependency: Any) -> int | None:
if isinstance(dependency, dict):
return _int_or_none(dependency.get("id"))
return _int_or_none(dependency)
def _int_or_none(value: Any) -> int | None:
if isinstance(value, bool) or value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _str_or_none(value: Any) -> str | None:
if value is None:
return None
text = str(value)
return text if text else None
+34 -2
View File
@@ -19,7 +19,7 @@ from .planner import create_plan
from .scanner import scan_instance
from .state import load_installed_state
from .updates import check_updates
from .userdata import sync_windows_data_repo
from .userdata import restore_windows_data_repo, sync_windows_data_repo
def _json(data: Any) -> None:
@@ -246,6 +246,16 @@ def build_parser() -> argparse.ArgumentParser:
backup.add_argument("--appdata-path", help="Override Beat Saber Windows AppData path")
backup.add_argument("--no-appdata", action="store_true", help="Only copy UserData")
restore = subcommands.add_parser(
"restore-userdata",
help="Restore UserData and AppData from the backups repo into an instance",
parents=[_common_parent()],
)
restore.add_argument("--instance", required=True)
restore.add_argument("--backup-root", default="../backups/beat-saber", help="Backup directory")
restore.add_argument("--appdata-path", help="Override Beat Saber AppData destination path")
restore.add_argument("--no-appdata", action="store_true", help="Only restore UserData")
return parser
@@ -300,7 +310,8 @@ def _run_menu(inst_roots: list[Path], st_root: Path, input_func: Callable[[str],
("bootstrap-check", "Check BSIPA bootstrap", "Verifies recorded bootstrap state and the latest BSIPA log evidence."),
("plan", "Create install plan", "Writes a dry-run JSON plan for locked plugin files before anything is applied."),
("apply", "Apply a plan by path", "Installs exactly the file changes from a previously generated plan JSON."),
("backup-userdata", "Back up UserData", "Creates a timestamped archive of UserData before risky changes."),
("backup-userdata", "Back up UserData", "Copies UserData and AppData into the adjacent backups repo."),
("restore-userdata", "Restore UserData", "Restores UserData and AppData from the backups repo into an instance."),
("change", "Choose another version", "Returns to the Beat Saber version picker."),
]
@@ -574,6 +585,27 @@ def run(argv: list[str] | None = None) -> int:
print(f" {item['source']} -> {item['destination']}")
return 0
if args.command == "restore-userdata":
instance = get_instance(inst_roots, args.instance)
root = repo_root()
backup_root = Path(args.backup_root).expanduser()
if not backup_root.is_absolute():
backup_root = (root / backup_root).resolve()
result = restore_windows_data_repo(
instance=args.instance,
instance_path=instance.path,
backup_root=backup_root,
appdata_path=Path(args.appdata_path).expanduser() if args.appdata_path else None,
include_appdata=not args.no_appdata,
)
print(f"Backup root: {result['backupRoot']}")
for item in result["restored"]:
print(f"{item['label']}: {item['fileCount']} files")
print(f" {item['source']} -> {item['destination']}")
if item["snapshot"]:
print(f" previous: {item['snapshot']}")
return 0
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
+71 -1
View File
@@ -84,6 +84,19 @@ def infer_windows_appdata_path(instance_path: Path) -> Path:
return profile / "AppData" / "LocalLow" / "Hyperbolic Magnetism" / "Beat Saber"
def infer_proton_appdata_path() -> Path:
return (
Path.home()
/ ".local/share/BSManager/SharedContent/compatdata/pfx/drive_c/users/steamuser/AppData/LocalLow/Hyperbolic Magnetism/Beat Saber"
)
def infer_appdata_path(instance_path: Path) -> Path:
if "Users" in instance_path.resolve().parts:
return infer_windows_appdata_path(instance_path)
return infer_proton_appdata_path()
def sync_windows_data_repo(
*,
instance: str,
@@ -96,7 +109,7 @@ def sync_windows_data_repo(
("UserData", instance_path / "UserData", backup_root / "UserData"),
]
if include_appdata:
appdata = appdata_path or infer_windows_appdata_path(instance_path)
appdata = appdata_path or infer_appdata_path(instance_path)
sources.append(("AppData", appdata, backup_root / "AppData"))
for label, source, _ in sources:
@@ -145,6 +158,63 @@ def sync_windows_data_repo(
}
def restore_windows_data_repo(
*,
instance: str,
instance_path: Path,
backup_root: Path,
appdata_path: Path | None = None,
include_appdata: bool = True,
) -> dict[str, Any]:
descriptor_path = backup_root / "backup-descriptor.json"
if descriptor_path.is_file():
descriptor = json.loads(descriptor_path.read_text(encoding="utf-8"))
descriptor_instance = descriptor.get("instance")
if descriptor_instance and descriptor_instance != instance:
raise ValueError(
f"backup descriptor instance {descriptor_instance!r} does not match requested instance {instance!r}"
)
created_at = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
restores: list[tuple[str, Path, Path]] = [
("UserData", backup_root / "UserData", instance_path / "UserData"),
]
if include_appdata:
appdata = appdata_path or infer_appdata_path(instance_path)
restores.append(("AppData", backup_root / "AppData", appdata))
for label, source, _ in restores:
if not source.is_dir():
raise FileNotFoundError(f"{label} backup not found: {source}")
restored: list[dict[str, Any]] = []
snapshots: list[dict[str, Any]] = []
for label, source, destination in restores:
snapshot: Path | None = None
if destination.exists():
snapshot = destination.parent / f"{destination.name}.pre-restore-{created_at}"
destination.rename(snapshot)
snapshots.append({"label": label, "path": str(snapshot)})
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(source, destination, symlinks=True)
restored.append(
{
"label": label,
"source": str(source),
"destination": str(destination),
"fileCount": sum(1 for item in destination.rglob("*") if item.is_file()),
"snapshot": str(snapshot) if snapshot else None,
}
)
return {
"backupRoot": str(backup_root),
"restored": restored,
"snapshots": snapshots,
}
def _ignore_backup_paths(source_root: Path, skipped: list[str]) -> Callable[[str, list[str]], set[str]]:
def ignore(current_dir: str, names: list[str]) -> set[str]:
ignored: set[str] = set()