855 lines
33 KiB
Python
855 lines
33 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
import os
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
from zipfile import ZipFile
|
|
|
|
from plugin_helper.bootstrap import _run_ipa
|
|
from plugin_helper.beatmods import by_version_id, normalize_mods
|
|
from plugin_helper.checker import check_lock
|
|
from plugin_helper.cli import installed_plugins_report, run
|
|
from plugin_helper.fsutil import sha256_file
|
|
from plugin_helper.installer import apply_plan, uninstall_plugin
|
|
from plugin_helper.instances import get_instance, list_instances
|
|
from plugin_helper.models import Lockfile, LockedPlugin, Registry, RegistryPlugin
|
|
from plugin_helper.planner import create_plan
|
|
from plugin_helper.scanner import scan_bootstrap_files, scan_instance
|
|
from plugin_helper.state import downloads_dir, load_installed_state, plugin_downloads_dir, save_bootstrap_state
|
|
from plugin_helper.updates import check_updates
|
|
from plugin_helper.userdata import (
|
|
backup_userdata,
|
|
infer_appdata_path,
|
|
infer_proton_appdata_path,
|
|
infer_windows_appdata_path,
|
|
restore_windows_data_repo,
|
|
sync_windows_data_repo,
|
|
)
|
|
|
|
|
|
class PluginHelperTests(unittest.TestCase):
|
|
def test_normalize_beatmods_current_nested_response(self) -> None:
|
|
payload = {
|
|
"mods": [
|
|
{
|
|
"mod": {
|
|
"id": 10,
|
|
"name": "Example",
|
|
"gitUrl": "https://github.com/example/mod",
|
|
"category": "library",
|
|
},
|
|
"latest": {
|
|
"id": 1234,
|
|
"modVersion": "1.2.3",
|
|
"zipHash": "abc123",
|
|
"dependencies": [2561, {"id": "2567"}],
|
|
},
|
|
}
|
|
]
|
|
}
|
|
|
|
entries = normalize_mods(payload)
|
|
|
|
self.assertEqual(len(entries), 1)
|
|
self.assertEqual(entries[0].name, "Example")
|
|
self.assertEqual(entries[0].mod_id, 10)
|
|
self.assertEqual(entries[0].version_id, 1234)
|
|
self.assertEqual(entries[0].dependencies, (2561, 2567))
|
|
self.assertEqual(by_version_id(entries)[1234].zip_hash, "abc123")
|
|
|
|
def test_normalize_beatmods_legacy_flat_response(self) -> None:
|
|
entries = normalize_mods(
|
|
[
|
|
{
|
|
"id": "12",
|
|
"name": "FlatExample",
|
|
"gitUrl": "",
|
|
"category": "mods",
|
|
"modVersion": "2.0.0",
|
|
"zipHash": "def456",
|
|
"dependencies": [{"id": 44}, "45", None],
|
|
}
|
|
]
|
|
)
|
|
|
|
self.assertEqual(entries[0].name, "FlatExample")
|
|
self.assertEqual(entries[0].mod_id, 12)
|
|
self.assertEqual(entries[0].version_id, 12)
|
|
self.assertIsNone(entries[0].git_url)
|
|
self.assertEqual(entries[0].dependencies, (44, 45))
|
|
|
|
def test_instances_and_scan(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
inst = root / "1.40.8"
|
|
(inst / "Beat Saber_Data").mkdir(parents=True)
|
|
(inst / "Plugins").mkdir()
|
|
(inst / "Libs").mkdir()
|
|
(inst / "UserData").mkdir()
|
|
(inst / "Plugins" / "Example.dll").write_bytes(b"dll")
|
|
(root / "not-an-instance").mkdir()
|
|
|
|
instances = list_instances(root)
|
|
self.assertEqual([item.name for item in instances], ["1.40.8"])
|
|
self.assertEqual(get_instance(root, "1.40.8").path, inst)
|
|
|
|
scan = scan_instance(inst, include_hashes=True)
|
|
self.assertEqual(scan["counts"]["plugins"], 1)
|
|
self.assertEqual(scan["files"][0]["path"], "Plugins/Example.dll")
|
|
self.assertIn("sha256", scan["files"][0])
|
|
|
|
def test_multi_root_instances_and_ambiguous_lookup(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
windows = root / "windows"
|
|
local = root / "local"
|
|
win_inst = windows / "1.44.1"
|
|
local_inst = local / "1.44.1"
|
|
(win_inst / "Beat Saber_Data").mkdir(parents=True)
|
|
(local_inst / "Beat Saber_Data").mkdir(parents=True)
|
|
|
|
instances = list_instances([windows, local])
|
|
|
|
self.assertEqual(len(instances), 2)
|
|
self.assertEqual({item.path for item in instances}, {win_inst, local_inst})
|
|
with self.assertRaisesRegex(ValueError, "ambiguous"):
|
|
get_instance([windows, local], "1.44.1")
|
|
|
|
def test_menu_selects_instance_and_action(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
instance = root / "instances" / "1.40.8"
|
|
state = root / "state"
|
|
(instance / "Beat Saber_Data").mkdir(parents=True)
|
|
(instance / "Plugins").mkdir()
|
|
|
|
answers = iter(["1", "3", "q"])
|
|
output = StringIO()
|
|
with patch("builtins.input", side_effect=lambda _: next(answers)), patch("sys.stdout", output):
|
|
status = run(
|
|
[
|
|
"--instances-root",
|
|
str(root / "instances"),
|
|
"--state-dir",
|
|
str(state),
|
|
"menu",
|
|
]
|
|
)
|
|
|
|
self.assertEqual(status, 0)
|
|
self.assertIn("Counts files currently present", output.getvalue())
|
|
|
|
def test_menu_routes_duplicate_instance_names_by_selected_root(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
first_root = root / "a-root"
|
|
second_root = root / "z-root"
|
|
first = first_root / "1.44.1"
|
|
second = second_root / "1.44.1"
|
|
state = root / "state"
|
|
(first / "Beat Saber_Data").mkdir(parents=True)
|
|
(second / "Beat Saber_Data").mkdir(parents=True)
|
|
(second / "Plugins").mkdir()
|
|
(second / "Plugins" / "Example.dll").write_bytes(b"dll")
|
|
|
|
answers = iter(["2", "3", "q"])
|
|
output = StringIO()
|
|
with patch("builtins.input", side_effect=lambda _: next(answers)), patch("sys.stdout", output):
|
|
status = run(
|
|
[
|
|
"--instances-root",
|
|
os.pathsep.join([str(first_root), str(second_root)]),
|
|
"--state-dir",
|
|
str(state),
|
|
"menu",
|
|
]
|
|
)
|
|
|
|
self.assertEqual(status, 0)
|
|
self.assertIn("1.44.1: 1 files", output.getvalue())
|
|
|
|
def test_run_ipa_timeout_returns_control(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
result = _run_ipa(
|
|
command=["python", "-c", "import time; time.sleep(30)"],
|
|
instance_path=Path(tmp),
|
|
timeout_seconds=1,
|
|
)
|
|
|
|
self.assertTrue(result["timedOut"])
|
|
self.assertNotEqual(result["returncode"], 0)
|
|
|
|
def test_plan_apply_and_uninstall_dll(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
work = Path(tmp)
|
|
instance = work / "instances" / "1.40.8"
|
|
state = work / "state"
|
|
instance.mkdir(parents=True)
|
|
(instance / "Beat Saber_Data").mkdir()
|
|
(instance / "Plugins").mkdir()
|
|
|
|
asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.dll"
|
|
asset.write_bytes(b"managed dll")
|
|
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo="owner/example",
|
|
asset_patterns=("*.dll",),
|
|
install_strategy="dll-to-plugins",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo="owner/example",
|
|
tag="v1.0.0",
|
|
asset="Example.dll",
|
|
sha256=sha256_file(asset),
|
|
),
|
|
),
|
|
)
|
|
|
|
plan, plan_path = create_plan(
|
|
instance="1.40.8",
|
|
instance_path=instance,
|
|
beat_saber_version="1.40.8",
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
state_root=state,
|
|
repo_root=work,
|
|
)
|
|
self.assertTrue(plan_path.exists())
|
|
self.assertEqual(plan["changes"][0]["target"], "Plugins/Example.dll")
|
|
|
|
result = apply_plan(plan, state)
|
|
self.assertEqual(len(result["applied"]), 1)
|
|
self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"managed dll")
|
|
installed = load_installed_state(state, "1.40.8")
|
|
self.assertIn("example", installed["plugins"])
|
|
|
|
removed = uninstall_plugin("1.40.8", instance, state, "example")
|
|
self.assertEqual(removed["removed"], ["Plugins/Example.dll"])
|
|
self.assertFalse((instance / "Plugins" / "Example.dll").exists())
|
|
|
|
def test_zip_to_pending_targets_ipa_pending(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
work = Path(tmp)
|
|
instance = work / "instances" / "1.40.8"
|
|
state = work / "state"
|
|
instance.mkdir(parents=True)
|
|
(instance / "Beat Saber_Data").mkdir()
|
|
asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.zip"
|
|
with ZipFile(asset, "w") as archive:
|
|
archive.writestr("Plugins/Example.dll", b"dll")
|
|
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo=None,
|
|
install_strategy="zip-to-pending",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo=None,
|
|
tag=None,
|
|
asset="Example.zip",
|
|
sha256=sha256_file(asset),
|
|
),
|
|
),
|
|
)
|
|
|
|
plan, _ = create_plan(
|
|
instance="1.40.8",
|
|
instance_path=instance,
|
|
beat_saber_version="1.40.8",
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
state_root=state,
|
|
repo_root=work,
|
|
)
|
|
self.assertEqual(plan["changes"][0]["target"], "IPA/Pending/Plugins/Example.dll")
|
|
apply_plan(plan, state)
|
|
self.assertEqual((instance / "IPA" / "Pending" / "Plugins" / "Example.dll").read_bytes(), b"dll")
|
|
|
|
def test_plan_still_finds_legacy_flat_downloads(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
work = Path(tmp)
|
|
instance = work / "instances" / "1.40.8"
|
|
state = work / "state"
|
|
instance.mkdir(parents=True)
|
|
(instance / "Beat Saber_Data").mkdir()
|
|
asset = downloads_dir(state, "1.40.8") / "Example.dll"
|
|
asset.write_bytes(b"legacy flat download")
|
|
|
|
plan, _ = create_plan(
|
|
instance="1.40.8",
|
|
instance_path=instance,
|
|
beat_saber_version="1.40.8",
|
|
registry=Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo=None,
|
|
install_strategy="dll-to-plugins",
|
|
)
|
|
}
|
|
),
|
|
lockfile=Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo=None,
|
|
tag=None,
|
|
asset="Example.dll",
|
|
sha256=sha256_file(asset),
|
|
),
|
|
),
|
|
),
|
|
state_root=state,
|
|
repo_root=work,
|
|
)
|
|
|
|
self.assertEqual(plan["changes"][0]["source"], str(asset))
|
|
|
|
def test_zip_member_cannot_escape_instance(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
work = Path(tmp)
|
|
instance = work / "instances" / "1.40.8"
|
|
state = work / "state"
|
|
instance.mkdir(parents=True)
|
|
(instance / "Beat Saber_Data").mkdir()
|
|
asset = plugin_downloads_dir(state, "1.40.8", "bad") / "Bad.zip"
|
|
with ZipFile(asset, "w") as archive:
|
|
archive.writestr("../Bad.dll", b"dll")
|
|
|
|
registry = Registry(
|
|
{
|
|
"bad": RegistryPlugin(
|
|
id="bad",
|
|
name="Bad",
|
|
repo=None,
|
|
install_strategy="zip-to-pending",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="bad",
|
|
repo=None,
|
|
tag=None,
|
|
asset="Bad.zip",
|
|
sha256=sha256_file(asset),
|
|
),
|
|
),
|
|
)
|
|
|
|
with self.assertRaises(ValueError):
|
|
create_plan(
|
|
instance="1.40.8",
|
|
instance_path=instance,
|
|
beat_saber_version="1.40.8",
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
state_root=state,
|
|
repo_root=work,
|
|
)
|
|
|
|
def test_scan_bootstrap_files_includes_root_ipa_and_bsipa_dirs(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
instance = Path(tmp)
|
|
(instance / "IPA" / "Backups").mkdir(parents=True)
|
|
(instance / "Libs").mkdir()
|
|
(instance / "winhttp.dll").write_bytes(b"proxy")
|
|
(instance / "IPA.exe").write_bytes(b"ipa")
|
|
(instance / "IPA.exe.config").write_bytes(b"config")
|
|
(instance / "IPA" / "Backups" / "Beat Saber.exe.bak").write_bytes(b"backup")
|
|
(instance / "Libs" / "0Harmony.dll").write_bytes(b"harmony")
|
|
|
|
paths = [item["path"] for item in scan_bootstrap_files(instance)]
|
|
|
|
self.assertEqual(
|
|
paths,
|
|
[
|
|
"IPA.exe",
|
|
"IPA.exe.config",
|
|
"IPA/Backups/Beat Saber.exe.bak",
|
|
"Libs/0Harmony.dll",
|
|
"winhttp.dll",
|
|
],
|
|
)
|
|
|
|
def test_plan_requires_healthy_bootstrap_for_locked_bsipa_dependencies(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
work = Path(tmp)
|
|
instance = work / "instances" / "1.44.1"
|
|
state = work / "state"
|
|
instance.mkdir(parents=True)
|
|
(instance / "Beat Saber_Data").mkdir()
|
|
asset = plugin_downloads_dir(state, "1.44.1", "example") / "Example.dll"
|
|
asset.write_bytes(b"managed dll")
|
|
|
|
registry = Registry(
|
|
{
|
|
"bsipa": RegistryPlugin(
|
|
id="bsipa",
|
|
name="BSIPA",
|
|
repo=None,
|
|
install_strategy="root-zip",
|
|
),
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo=None,
|
|
install_strategy="dll-to-plugins",
|
|
),
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.44.1",
|
|
instance="1.44.1",
|
|
plugins=(
|
|
LockedPlugin(id="bsipa", repo=None, tag="4.3.7", asset="BSIPA.zip", sha256=None),
|
|
LockedPlugin(
|
|
id="example",
|
|
repo=None,
|
|
tag="v1.0.0",
|
|
asset="Example.dll",
|
|
sha256=sha256_file(asset),
|
|
),
|
|
),
|
|
)
|
|
|
|
with self.assertRaisesRegex(ValueError, "BSIPA bootstrap is not healthy"):
|
|
create_plan(
|
|
instance="1.44.1",
|
|
instance_path=instance,
|
|
beat_saber_version="1.44.1",
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
state_root=state,
|
|
repo_root=work,
|
|
selected={"example"},
|
|
)
|
|
|
|
(instance / "IPA").mkdir()
|
|
(instance / "Libs").mkdir()
|
|
(instance / "Logs").mkdir()
|
|
(instance / "IPA.exe").write_bytes(b"ipa")
|
|
(instance / "winhttp.dll").write_bytes(b"proxy")
|
|
(instance / "Logs" / "_latest.log").write_text("Beat Saber IPA (BSIPA): 4.3.7\n", encoding="utf-8")
|
|
save_bootstrap_state(state, "1.44.1", {"files": scan_bootstrap_files(instance)})
|
|
|
|
plan, _ = create_plan(
|
|
instance="1.44.1",
|
|
instance_path=instance,
|
|
beat_saber_version="1.44.1",
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
state_root=state,
|
|
repo_root=work,
|
|
selected={"example"},
|
|
)
|
|
self.assertEqual(plan["changes"][0]["target"], "Plugins/Example.dll")
|
|
|
|
def test_userdata_backup_contains_manifest(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
instance = root / "1.40.8"
|
|
state = root / "state"
|
|
(instance / "UserData").mkdir(parents=True)
|
|
(instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
|
|
|
|
result = backup_userdata("1.40.8", instance, state)
|
|
self.assertTrue(Path(result["archive"]).exists())
|
|
self.assertEqual(result["manifest"]["fileCount"], 1)
|
|
|
|
def test_infer_windows_appdata_path_from_mounted_instance(self) -> None:
|
|
instance = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances/1.44.1")
|
|
|
|
self.assertEqual(
|
|
infer_windows_appdata_path(instance),
|
|
Path("/home/pleb/Windows/Users/pleb/AppData/LocalLow/Hyperbolic Magnetism/Beat Saber"),
|
|
)
|
|
|
|
def test_sync_windows_data_repo_copies_into_stable_backup_root(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
instance = root / "Users" / "pleb" / "BSManager" / "BSInstances" / "1.44.1"
|
|
appdata = root / "Users" / "pleb" / "AppData" / "LocalLow" / "Hyperbolic Magnetism" / "Beat Saber"
|
|
backup_repo = root / "backup"
|
|
(instance / "UserData").mkdir(parents=True)
|
|
(instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
|
|
(instance / "UserData" / "BeatLeader" / "Replays").mkdir(parents=True)
|
|
(instance / "UserData" / "BeatLeader" / "Replays" / "big.bsor").write_text("replay", encoding="utf-8")
|
|
(instance / "UserData" / "ScoreSaber" / "Replays").mkdir(parents=True)
|
|
(instance / "UserData" / "ScoreSaber" / "Replays" / "big.bsor").write_text("replay", encoding="utf-8")
|
|
(instance / "UserData" / "BeatSaberPlus" / "Cache").mkdir(parents=True)
|
|
(instance / "UserData" / "BeatSaberPlus" / "Cache" / "cached.dat").write_text("cache", encoding="utf-8")
|
|
(instance / "UserData" / "BeatSaverNotifier.json").write_text('{"refreshToken":"secret"}', encoding="utf-8")
|
|
(instance / "UserData" / "Accsaber").mkdir(parents=True)
|
|
(instance / "UserData" / "Accsaber" / "PlayerScoreCache.json").write_text("{}", encoding="utf-8")
|
|
appdata.mkdir(parents=True)
|
|
(appdata / "Player.log").write_text("log", encoding="utf-8")
|
|
(appdata / "settings.cfg").write_text("settings", encoding="utf-8")
|
|
|
|
result = sync_windows_data_repo(
|
|
instance="1.44.1",
|
|
instance_path=instance,
|
|
backup_root=backup_repo,
|
|
)
|
|
|
|
self.assertEqual(result["backupRoot"], str(backup_repo))
|
|
self.assertEqual((backup_repo / "UserData" / "settings.json").read_text(), "{}")
|
|
self.assertFalse((backup_repo / "UserData" / "BeatLeader" / "Replays").exists())
|
|
self.assertFalse((backup_repo / "UserData" / "ScoreSaber" / "Replays").exists())
|
|
self.assertFalse((backup_repo / "UserData" / "BeatSaberPlus" / "Cache").exists())
|
|
self.assertFalse((backup_repo / "UserData" / "BeatSaverNotifier.json").exists())
|
|
self.assertFalse((backup_repo / "UserData" / "Accsaber" / "PlayerScoreCache.json").exists())
|
|
self.assertFalse((backup_repo / "AppData" / "Player.log").exists())
|
|
self.assertEqual((backup_repo / "AppData" / "settings.cfg").read_text(), "settings")
|
|
descriptor = json.loads((backup_repo / "backup-descriptor.json").read_text(encoding="utf-8"))
|
|
self.assertEqual(descriptor["instance"], "1.44.1")
|
|
self.assertEqual(descriptor["sources"][0]["source"], str(instance / "UserData"))
|
|
self.assertIn("BeatLeader/Replays", descriptor["skipped"])
|
|
self.assertIn("*.log", descriptor["excludePatterns"])
|
|
|
|
def test_infer_proton_appdata_path(self) -> None:
|
|
self.assertEqual(
|
|
infer_proton_appdata_path(),
|
|
Path.home()
|
|
/ ".local/share/BSManager/SharedContent/compatdata/pfx/drive_c/users/steamuser/AppData/LocalLow/Hyperbolic Magnetism/Beat Saber",
|
|
)
|
|
|
|
def test_infer_appdata_path_uses_windows_or_proton(self) -> None:
|
|
windows_instance = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances/1.44.1")
|
|
linux_instance = Path("/home/pleb/.local/share/BSManager/BSInstances/1.44.1")
|
|
|
|
self.assertEqual(infer_appdata_path(windows_instance), infer_windows_appdata_path(windows_instance))
|
|
self.assertEqual(infer_appdata_path(linux_instance), infer_proton_appdata_path())
|
|
|
|
def test_restore_windows_data_repo_roundtrip(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
instance = root / "Users" / "pleb" / "BSManager" / "BSInstances" / "1.44.1"
|
|
appdata = root / "Users" / "pleb" / "AppData" / "LocalLow" / "Hyperbolic Magnetism" / "Beat Saber"
|
|
backup_repo = root / "backup"
|
|
(instance / "UserData").mkdir(parents=True)
|
|
(instance / "UserData" / "settings.json").write_text('{"saved": true}', encoding="utf-8")
|
|
appdata.mkdir(parents=True)
|
|
(appdata / "settings.cfg").write_text("settings", encoding="utf-8")
|
|
|
|
sync_windows_data_repo(
|
|
instance="1.44.1",
|
|
instance_path=instance,
|
|
backup_root=backup_repo,
|
|
)
|
|
|
|
(instance / "UserData" / "settings.json").write_text('{"saved": false}', encoding="utf-8")
|
|
(appdata / "settings.cfg").write_text("changed", encoding="utf-8")
|
|
|
|
result = restore_windows_data_repo(
|
|
instance="1.44.1",
|
|
instance_path=instance,
|
|
backup_root=backup_repo,
|
|
)
|
|
|
|
self.assertEqual((instance / "UserData" / "settings.json").read_text(), '{"saved": true}')
|
|
self.assertEqual((appdata / "settings.cfg").read_text(), "settings")
|
|
self.assertEqual(len(result["restored"]), 2)
|
|
self.assertTrue(all(item["snapshot"] for item in result["restored"]))
|
|
|
|
def test_restore_windows_data_repo_rejects_instance_mismatch(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
instance = root / "1.44.1"
|
|
backup_repo = root / "backup"
|
|
(instance / "UserData").mkdir(parents=True)
|
|
(instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
|
|
(backup_repo / "UserData").mkdir(parents=True)
|
|
(backup_repo / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
|
|
(backup_repo / "backup-descriptor.json").write_text(
|
|
json.dumps({"instance": "1.40.8"}) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with self.assertRaisesRegex(ValueError, "does not match"):
|
|
restore_windows_data_repo(
|
|
instance="1.44.1",
|
|
instance_path=instance,
|
|
backup_root=backup_repo,
|
|
include_appdata=False,
|
|
)
|
|
|
|
def test_restore_userdata_cli(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
root = Path(tmp)
|
|
instances_root = root / "instances"
|
|
instance = instances_root / "1.44.1"
|
|
backup_repo = root / "backup"
|
|
(instance / "UserData").mkdir(parents=True)
|
|
(instance / "UserData" / "settings.json").write_text('{"restored": true}', encoding="utf-8")
|
|
(backup_repo / "UserData").mkdir(parents=True)
|
|
(backup_repo / "UserData" / "settings.json").write_text('{"restored": true}', encoding="utf-8")
|
|
|
|
status = run(
|
|
[
|
|
"--instances-root",
|
|
str(instances_root),
|
|
"--state-dir",
|
|
str(root / "state"),
|
|
"restore-userdata",
|
|
"--instance",
|
|
"1.44.1",
|
|
"--backup-root",
|
|
str(backup_repo),
|
|
"--no-appdata",
|
|
]
|
|
)
|
|
|
|
self.assertEqual(status, 0)
|
|
self.assertEqual((instance / "UserData" / "settings.json").read_text(), '{"restored": true}')
|
|
|
|
def test_check_reports_missing_asset(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
work = Path(tmp)
|
|
state = work / "state"
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo=None,
|
|
install_strategy="dll-to-plugins",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo=None,
|
|
tag="v1.0.0",
|
|
asset="Missing.dll",
|
|
sha256=None,
|
|
),
|
|
),
|
|
)
|
|
result = check_lock(
|
|
instance="1.40.8",
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
state_root=state,
|
|
repo_root=work,
|
|
)
|
|
self.assertEqual(result["summary"]["errors"], 1)
|
|
self.assertEqual(result["plugins"][0]["status"], "error")
|
|
|
|
def test_installed_plugins_report_includes_locked_version(self) -> None:
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example Plugin",
|
|
repo="owner/example",
|
|
install_strategy="dll-to-plugins",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo="owner/example",
|
|
tag="v1.2.3",
|
|
asset="Example.dll",
|
|
sha256="abc123",
|
|
),
|
|
),
|
|
)
|
|
report = installed_plugins_report(
|
|
installed_state={
|
|
"instance": "1.40.8",
|
|
"plugins": {
|
|
"example": {
|
|
"installedAt": "2026-06-14T17:18:40Z",
|
|
"files": [{"path": "Plugins/Example.dll"}],
|
|
}
|
|
},
|
|
},
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
)
|
|
|
|
self.assertEqual(report["plugins"][0]["name"], "Example Plugin")
|
|
self.assertEqual(report["plugins"][0]["version"], "v1.2.3")
|
|
self.assertEqual(report["plugins"][0]["asset"], "Example.dll")
|
|
self.assertEqual(report["plugins"][0]["fileCount"], 1)
|
|
|
|
def test_update_check_reports_current_matching_asset(self) -> None:
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo="owner/example",
|
|
asset_patterns=("1.40.8.zip",),
|
|
install_strategy="bsipa-zip",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo="owner/example",
|
|
tag="v1.1.0",
|
|
asset="1.40.8.zip",
|
|
sha256="abc123",
|
|
),
|
|
),
|
|
)
|
|
|
|
result = check_updates(
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
fetch_releases=lambda repo: [
|
|
{
|
|
"tag_name": "v1.1.0",
|
|
"published_at": "2026-06-10T00:00:00Z",
|
|
"assets": [{"name": "1.40.8.zip", "digest": "sha256:abc123"}],
|
|
}
|
|
],
|
|
)
|
|
|
|
self.assertEqual(result["summary"]["current"], 1)
|
|
self.assertEqual(result["plugins"][0]["status"], "current")
|
|
self.assertEqual(result["plugins"][0]["latestAssetSha256"], "abc123")
|
|
|
|
def test_update_check_reports_new_matching_release(self) -> None:
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo="owner/example",
|
|
asset_patterns=("*.zip",),
|
|
install_strategy="bsipa-zip",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo="owner/example",
|
|
tag="v1.1.0",
|
|
asset="1.40.8.zip",
|
|
sha256="abc123",
|
|
),
|
|
),
|
|
)
|
|
|
|
result = check_updates(
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
fetch_releases=lambda repo: [
|
|
{
|
|
"tag_name": "v1.2.0",
|
|
"published_at": "2026-06-12T00:00:00Z",
|
|
"assets": [
|
|
{"name": "1.29.1.zip"},
|
|
{"name": "1.40.8.zip", "browser_download_url": "https://example.invalid/asset"},
|
|
],
|
|
},
|
|
{
|
|
"tag_name": "v1.1.0",
|
|
"published_at": "2026-06-10T00:00:00Z",
|
|
"assets": [{"name": "1.40.8.zip"}],
|
|
},
|
|
],
|
|
)
|
|
|
|
self.assertEqual(result["summary"]["updates"], 1)
|
|
self.assertEqual(result["plugins"][0]["status"], "update")
|
|
self.assertEqual(result["plugins"][0]["latestTag"], "v1.2.0")
|
|
self.assertEqual(result["plugins"][0]["latestAsset"], "1.40.8.zip")
|
|
|
|
def test_update_check_reports_replaced_asset_digest(self) -> None:
|
|
registry = Registry(
|
|
{
|
|
"example": RegistryPlugin(
|
|
id="example",
|
|
name="Example",
|
|
repo="owner/example",
|
|
asset_patterns=("1.40.8.zip",),
|
|
install_strategy="bsipa-zip",
|
|
)
|
|
}
|
|
)
|
|
lockfile = Lockfile(
|
|
beat_saber_version="1.40.8",
|
|
instance="1.40.8",
|
|
plugins=(
|
|
LockedPlugin(
|
|
id="example",
|
|
repo="owner/example",
|
|
tag="v1.1.0",
|
|
asset="1.40.8.zip",
|
|
sha256="old",
|
|
),
|
|
),
|
|
)
|
|
|
|
result = check_updates(
|
|
registry=registry,
|
|
lockfile=lockfile,
|
|
fetch_releases=lambda repo: [
|
|
{
|
|
"tag_name": "v1.1.0",
|
|
"published_at": "2026-06-10T00:00:00Z",
|
|
"assets": [{"name": "1.40.8.zip", "digest": "sha256:new"}],
|
|
}
|
|
],
|
|
)
|
|
|
|
self.assertEqual(result["summary"]["updates"], 1)
|
|
self.assertEqual(result["plugins"][0]["status"], "update")
|
|
self.assertEqual(result["plugins"][0]["latestAssetSha256"], "new")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|