Files
plugin-helper/tests/test_plugin_helper.py
T
2026-07-01 13:43:39 -07:00

1226 lines
47 KiB
Python

from __future__ import annotations
import json
import tempfile
import unittest
import os
from io import StringIO
from pathlib import Path
from unittest.mock import patch
from zipfile import ZipFile
from rich.text import Text
from textual.coordinate import Coordinate
from textual.widgets import DataTable
from plugin_helper.bootstrap import _run_ipa
from plugin_helper.beatmods import by_version_id, normalize_mods
from plugin_helper.checker import check_lock
from plugin_helper.cli import installed_plugins_report, run
from plugin_helper.config import load_local_config, resolve_runtime_config
from plugin_helper.fsutil import sha256_file
from plugin_helper.installer import apply_plan, disable_plugin, uninstall_plugin
from plugin_helper.instances import get_instance, list_instances
from plugin_helper.models import Lockfile, LockedPlugin, Registry, RegistryPlugin
from plugin_helper.planner import create_plan
from plugin_helper.scanner import scan_bootstrap_files, scan_instance
from plugin_helper.state import downloads_dir, load_installed_state, plugin_downloads_dir, save_bootstrap_state, save_installed_state
from plugin_helper.tui import InstallationChoice, PluginHelperTui
from plugin_helper.updates import check_updates
from plugin_helper.userdata import (
backup_userdata,
infer_appdata_path,
infer_proton_appdata_path,
infer_windows_appdata_path,
restore_windows_data_repo,
sync_windows_data_repo,
)
class PluginHelperTests(unittest.TestCase):
def test_normalize_beatmods_current_nested_response(self) -> None:
payload = {
"mods": [
{
"mod": {
"id": 10,
"name": "Example",
"gitUrl": "https://github.com/example/mod",
"category": "library",
},
"latest": {
"id": 1234,
"modVersion": "1.2.3",
"zipHash": "abc123",
"dependencies": [2561, {"id": "2567"}],
},
}
]
}
entries = normalize_mods(payload)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].name, "Example")
self.assertEqual(entries[0].mod_id, 10)
self.assertEqual(entries[0].version_id, 1234)
self.assertEqual(entries[0].dependencies, (2561, 2567))
self.assertEqual(by_version_id(entries)[1234].zip_hash, "abc123")
def test_normalize_beatmods_legacy_flat_response(self) -> None:
entries = normalize_mods(
[
{
"id": "12",
"name": "FlatExample",
"gitUrl": "",
"category": "mods",
"modVersion": "2.0.0",
"zipHash": "def456",
"dependencies": [{"id": 44}, "45", None],
}
]
)
self.assertEqual(entries[0].name, "FlatExample")
self.assertEqual(entries[0].mod_id, 12)
self.assertEqual(entries[0].version_id, 12)
self.assertIsNone(entries[0].git_url)
self.assertEqual(entries[0].dependencies, (44, 45))
def test_instances_and_scan(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
inst = root / "1.40.8"
(inst / "Beat Saber_Data").mkdir(parents=True)
(inst / "Plugins").mkdir()
(inst / "Libs").mkdir()
(inst / "UserData").mkdir()
(inst / "Plugins" / "Example.dll").write_bytes(b"dll")
(root / "not-an-instance").mkdir()
instances = list_instances(root)
self.assertEqual([item.name for item in instances], ["1.40.8"])
self.assertEqual(get_instance(root, "1.40.8").path, inst)
scan = scan_instance(inst, include_hashes=True)
self.assertEqual(scan["counts"]["plugins"], 1)
self.assertEqual(scan["files"][0]["path"], "Plugins/Example.dll")
self.assertIn("sha256", scan["files"][0])
def test_multi_root_instances_and_ambiguous_lookup(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
windows = root / "windows"
local = root / "local"
win_inst = windows / "1.44.1"
local_inst = local / "1.44.1"
(win_inst / "Beat Saber_Data").mkdir(parents=True)
(local_inst / "Beat Saber_Data").mkdir(parents=True)
instances = list_instances([windows, local])
self.assertEqual(len(instances), 2)
self.assertEqual({item.path for item in instances}, {win_inst, local_inst})
with self.assertRaisesRegex(ValueError, "ambiguous"):
get_instance([windows, local], "1.44.1")
def test_local_config_loads_top_level_paths(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = root / "plugin-helper.local.toml"
config.write_text(
"""
instances_root = "~/BSInstances"
state_dir = ".state"
""".lstrip(),
encoding="utf-8",
)
local_config, loaded_path, loaded = load_local_config(config, root=root)
self.assertTrue(loaded)
self.assertEqual(loaded_path, config)
self.assertEqual(local_config.instances_roots, [Path("~/BSInstances").expanduser()])
self.assertEqual(local_config.state_root, root / ".state")
def test_runtime_explicit_overrides_env_and_config(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "plugin-helper.local.toml").write_text(
"""
instances_root = "config-root"
state_dir = "config-state"
""".lstrip(),
encoding="utf-8",
)
with patch.dict(
os.environ,
{
"PLUGIN_HELPER_INSTANCES_ROOT": str(root / "env-root"),
"PLUGIN_HELPER_STATE_DIR": str(root / "env-state"),
},
clear=True,
):
runtime = resolve_runtime_config(
instances_root_value=str(root / "explicit-root"),
state_dir_value="explicit-state",
root=root,
)
self.assertEqual(runtime.instances_roots, [root / "explicit-root"])
self.assertEqual(runtime.state_root, root / "explicit-state")
def test_runtime_env_overrides_config(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "plugin-helper.local.toml").write_text(
"""
instances_root = "config-root"
state_dir = "config-state"
""".lstrip(),
encoding="utf-8",
)
with patch.dict(
os.environ,
{
"PLUGIN_HELPER_INSTANCES_ROOT": f"{root / 'env-root-a'}{os.pathsep}{root / 'env-root-b'}",
"PLUGIN_HELPER_STATE_DIR": str(root / "env-state"),
},
clear=True,
):
runtime = resolve_runtime_config(root=root)
self.assertEqual(runtime.instances_roots, [root / "env-root-a", root / "env-root-b"])
self.assertEqual(runtime.state_root, root / "env-state")
def test_runtime_uses_local_config_before_defaults(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "plugin-helper.local.toml").write_text(
"""
instances_root = "config-root"
state_dir = "config-state"
""".lstrip(),
encoding="utf-8",
)
with patch.dict(os.environ, {}, clear=True):
runtime = resolve_runtime_config(root=root)
self.assertEqual(runtime.instances_roots, [root / "config-root"])
self.assertEqual(runtime.state_root, root / "config-state")
def test_runtime_default_state_uses_xdg_state_home(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
xdg = root / "xdg-state"
with patch.dict(os.environ, {"XDG_STATE_HOME": str(xdg)}, clear=True):
runtime = resolve_runtime_config(root=root)
self.assertEqual(runtime.state_root, xdg / "plugin-helper")
def test_runtime_default_state_uses_home_local_state(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
with patch.dict(os.environ, {}, clear=True):
runtime = resolve_runtime_config(root=root)
self.assertEqual(runtime.state_root, Path.home() / ".local" / "state" / "plugin-helper")
def test_no_args_prints_help_when_not_interactive(self) -> None:
output = StringIO()
with patch("sys.stdin.isatty", return_value=False), patch("sys.stdout", output):
status = run([])
self.assertEqual(status, 2)
self.assertIn("usage: plugin-helper", output.getvalue())
self.assertIn("menu", output.getvalue())
def test_no_command_defaults_to_menu_when_interactive(self) -> None:
with (
patch("sys.stdin.isatty", return_value=True),
patch("sys.stdout.isatty", return_value=True),
patch("plugin_helper.cli._run_menu", return_value=0) as run_menu,
):
status = run([])
self.assertEqual(status, 0)
run_menu.assert_called_once()
def test_run_ipa_timeout_returns_control(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
result = _run_ipa(
command=["python", "-c", "import time; time.sleep(30)"],
instance_path=Path(tmp),
timeout_seconds=1,
)
self.assertTrue(result["timedOut"])
self.assertNotEqual(result["returncode"], 0)
def test_plan_apply_and_uninstall_dll(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance = work / "instances" / "1.40.8"
state = work / "state"
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
(instance / "Plugins").mkdir()
asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.dll"
asset.write_bytes(b"managed dll")
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo="owner/example",
asset_patterns=("*.dll",),
install_strategy="dll-to-plugins",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo="owner/example",
tag="v1.0.0",
asset="Example.dll",
sha256=sha256_file(asset),
),
),
)
plan, plan_path = create_plan(
instance="1.40.8",
instance_path=instance,
beat_saber_version="1.40.8",
registry=registry,
lockfile=lockfile,
state_root=state,
repo_root=work,
)
self.assertTrue(plan_path.exists())
self.assertEqual(plan["changes"][0]["target"], "Plugins/Example.dll")
result = apply_plan(plan, state)
self.assertEqual(len(result["applied"]), 1)
self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"managed dll")
installed = load_installed_state(state, "1.40.8")
self.assertIn("example", installed["plugins"])
removed = uninstall_plugin("1.40.8", instance, state, "example")
self.assertEqual(removed["removed"], ["Plugins/Example.dll"])
self.assertFalse((instance / "Plugins" / "Example.dll").exists())
def test_disable_plugin_removes_files_but_keeps_disabled_state(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance = work / "instances" / "1.40.8"
state = work / "state"
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
(instance / "Plugins").mkdir()
target = instance / "Plugins" / "Example.dll"
target.write_bytes(b"managed dll")
installed = {
"instance": "1.40.8",
"plugins": {
"example": {
"installedAt": "2026-06-14T17:18:40Z",
"files": [
{
"path": "Plugins/Example.dll",
"sha256": sha256_file(target),
"size": target.stat().st_size,
}
],
}
},
}
from plugin_helper.state import save_installed_state
save_installed_state(state, "1.40.8", installed)
result = disable_plugin("1.40.8", instance, state, "example")
self.assertEqual(result["removed"], ["Plugins/Example.dll"])
self.assertFalse(target.exists())
updated = load_installed_state(state, "1.40.8")
self.assertNotIn("example", updated["plugins"])
self.assertIn("example", updated["disabledPlugins"])
self.assertEqual(updated["disabledPlugins"]["example"]["files"][0]["path"], "Plugins/Example.dll")
def test_enable_command_reinstalls_disabled_plugin_from_asset(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance_root = work / "instances"
instance = instance_root / "1.40.8"
state = work / "state"
registry_dir = work / "registry"
locks_dir = work / "locks"
registry_dir.mkdir()
locks_dir.mkdir()
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
(instance / "Plugins").mkdir()
asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.dll"
asset.write_bytes(b"managed dll")
(registry_dir / "plugins.toml").write_text(
"""
[[plugins]]
id = "example"
name = "Example"
repo = "owner/example"
asset_patterns = ["*.dll"]
install_strategy = "dll-to-plugins"
""".lstrip(),
encoding="utf-8",
)
(locks_dir / "1.40.8.lock.toml").write_text(
f"""
beat_saber_version = "1.40.8"
instance = "1.40.8"
[[plugins]]
id = "example"
repo = "owner/example"
tag = "v1.0.0"
asset = "Example.dll"
sha256 = "{sha256_file(asset)}"
""".lstrip(),
encoding="utf-8",
)
disabled = {
"instance": "1.40.8",
"plugins": {},
"disabledPlugins": {
"example": {
"installedAt": "2026-06-14T17:18:40Z",
"disabledAt": "2026-06-14T17:20:00Z",
"files": [
{
"path": "Plugins/Example.dll",
"sha256": sha256_file(asset),
"size": asset.stat().st_size,
}
],
}
},
}
from plugin_helper.state import save_installed_state
save_installed_state(state, "1.40.8", disabled)
with patch("plugin_helper.operations.repo_root", return_value=work):
status = run(
[
"--instances-root",
str(instance_root),
"--state-dir",
str(state),
"enable",
"--instance",
"1.40.8",
"example",
]
)
self.assertEqual(status, 0)
self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"managed dll")
updated = load_installed_state(state, "1.40.8")
self.assertIn("example", updated["plugins"])
self.assertNotIn("example", updated["disabledPlugins"])
def test_zip_to_pending_targets_ipa_pending(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance = work / "instances" / "1.40.8"
state = work / "state"
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.zip"
with ZipFile(asset, "w") as archive:
archive.writestr("Plugins/Example.dll", b"dll")
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo=None,
install_strategy="zip-to-pending",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo=None,
tag=None,
asset="Example.zip",
sha256=sha256_file(asset),
),
),
)
plan, _ = create_plan(
instance="1.40.8",
instance_path=instance,
beat_saber_version="1.40.8",
registry=registry,
lockfile=lockfile,
state_root=state,
repo_root=work,
)
self.assertEqual(plan["changes"][0]["target"], "IPA/Pending/Plugins/Example.dll")
apply_plan(plan, state)
self.assertEqual((instance / "IPA" / "Pending" / "Plugins" / "Example.dll").read_bytes(), b"dll")
def test_plan_still_finds_legacy_flat_downloads(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance = work / "instances" / "1.40.8"
state = work / "state"
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
asset = downloads_dir(state, "1.40.8") / "Example.dll"
asset.write_bytes(b"legacy flat download")
plan, _ = create_plan(
instance="1.40.8",
instance_path=instance,
beat_saber_version="1.40.8",
registry=Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo=None,
install_strategy="dll-to-plugins",
)
}
),
lockfile=Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo=None,
tag=None,
asset="Example.dll",
sha256=sha256_file(asset),
),
),
),
state_root=state,
repo_root=work,
)
self.assertEqual(plan["changes"][0]["source"], str(asset))
def test_zip_member_cannot_escape_instance(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance = work / "instances" / "1.40.8"
state = work / "state"
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
asset = plugin_downloads_dir(state, "1.40.8", "bad") / "Bad.zip"
with ZipFile(asset, "w") as archive:
archive.writestr("../Bad.dll", b"dll")
registry = Registry(
{
"bad": RegistryPlugin(
id="bad",
name="Bad",
repo=None,
install_strategy="zip-to-pending",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="bad",
repo=None,
tag=None,
asset="Bad.zip",
sha256=sha256_file(asset),
),
),
)
with self.assertRaises(ValueError):
create_plan(
instance="1.40.8",
instance_path=instance,
beat_saber_version="1.40.8",
registry=registry,
lockfile=lockfile,
state_root=state,
repo_root=work,
)
def test_scan_bootstrap_files_includes_root_ipa_and_bsipa_dirs(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
instance = Path(tmp)
(instance / "IPA" / "Backups").mkdir(parents=True)
(instance / "Libs").mkdir()
(instance / "winhttp.dll").write_bytes(b"proxy")
(instance / "IPA.exe").write_bytes(b"ipa")
(instance / "IPA.exe.config").write_bytes(b"config")
(instance / "IPA" / "Backups" / "Beat Saber.exe.bak").write_bytes(b"backup")
(instance / "Libs" / "0Harmony.dll").write_bytes(b"harmony")
paths = [item["path"] for item in scan_bootstrap_files(instance)]
self.assertEqual(
paths,
[
"IPA.exe",
"IPA.exe.config",
"IPA/Backups/Beat Saber.exe.bak",
"Libs/0Harmony.dll",
"winhttp.dll",
],
)
def test_plan_requires_healthy_bootstrap_for_locked_bsipa_dependencies(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
instance = work / "instances" / "1.44.1"
state = work / "state"
instance.mkdir(parents=True)
(instance / "Beat Saber_Data").mkdir()
asset = plugin_downloads_dir(state, "1.44.1", "example") / "Example.dll"
asset.write_bytes(b"managed dll")
registry = Registry(
{
"bsipa": RegistryPlugin(
id="bsipa",
name="BSIPA",
repo=None,
install_strategy="root-zip",
),
"example": RegistryPlugin(
id="example",
name="Example",
repo=None,
install_strategy="dll-to-plugins",
),
}
)
lockfile = Lockfile(
beat_saber_version="1.44.1",
instance="1.44.1",
plugins=(
LockedPlugin(id="bsipa", repo=None, tag="4.3.7", asset="BSIPA.zip", sha256=None),
LockedPlugin(
id="example",
repo=None,
tag="v1.0.0",
asset="Example.dll",
sha256=sha256_file(asset),
),
),
)
with self.assertRaisesRegex(ValueError, "BSIPA bootstrap is not healthy"):
create_plan(
instance="1.44.1",
instance_path=instance,
beat_saber_version="1.44.1",
registry=registry,
lockfile=lockfile,
state_root=state,
repo_root=work,
selected={"example"},
)
(instance / "IPA").mkdir()
(instance / "Libs").mkdir()
(instance / "Logs").mkdir()
(instance / "IPA.exe").write_bytes(b"ipa")
(instance / "winhttp.dll").write_bytes(b"proxy")
(instance / "Logs" / "_latest.log").write_text("Beat Saber IPA (BSIPA): 4.3.7\n", encoding="utf-8")
save_bootstrap_state(state, "1.44.1", {"files": scan_bootstrap_files(instance)})
plan, _ = create_plan(
instance="1.44.1",
instance_path=instance,
beat_saber_version="1.44.1",
registry=registry,
lockfile=lockfile,
state_root=state,
repo_root=work,
selected={"example"},
)
self.assertEqual(plan["changes"][0]["target"], "Plugins/Example.dll")
def test_userdata_backup_contains_manifest(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
instance = root / "1.40.8"
state = root / "state"
(instance / "UserData").mkdir(parents=True)
(instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
result = backup_userdata("1.40.8", instance, state)
self.assertTrue(Path(result["archive"]).exists())
self.assertEqual(result["manifest"]["fileCount"], 1)
def test_infer_windows_appdata_path_from_mounted_instance(self) -> None:
instance = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances/1.44.1")
self.assertEqual(
infer_windows_appdata_path(instance),
Path("/home/pleb/Windows/Users/pleb/AppData/LocalLow/Hyperbolic Magnetism/Beat Saber"),
)
def test_sync_windows_data_repo_copies_into_stable_backup_root(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
instance = root / "Users" / "pleb" / "BSManager" / "BSInstances" / "1.44.1"
appdata = root / "Users" / "pleb" / "AppData" / "LocalLow" / "Hyperbolic Magnetism" / "Beat Saber"
backup_repo = root / "backup"
(instance / "UserData").mkdir(parents=True)
(instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
(instance / "UserData" / "BeatLeader" / "Replays").mkdir(parents=True)
(instance / "UserData" / "BeatLeader" / "Replays" / "big.bsor").write_text("replay", encoding="utf-8")
(instance / "UserData" / "ScoreSaber" / "Replays").mkdir(parents=True)
(instance / "UserData" / "ScoreSaber" / "Replays" / "big.bsor").write_text("replay", encoding="utf-8")
(instance / "UserData" / "BeatSaberPlus" / "Cache").mkdir(parents=True)
(instance / "UserData" / "BeatSaberPlus" / "Cache" / "cached.dat").write_text("cache", encoding="utf-8")
(instance / "UserData" / "BeatSaverNotifier.json").write_text('{"refreshToken":"secret"}', encoding="utf-8")
(instance / "UserData" / "Accsaber").mkdir(parents=True)
(instance / "UserData" / "Accsaber" / "PlayerScoreCache.json").write_text("{}", encoding="utf-8")
appdata.mkdir(parents=True)
(appdata / "Player.log").write_text("log", encoding="utf-8")
(appdata / "settings.cfg").write_text("settings", encoding="utf-8")
result = sync_windows_data_repo(
instance="1.44.1",
instance_path=instance,
backup_root=backup_repo,
)
self.assertEqual(result["backupRoot"], str(backup_repo))
self.assertEqual((backup_repo / "UserData" / "settings.json").read_text(), "{}")
self.assertFalse((backup_repo / "UserData" / "BeatLeader" / "Replays").exists())
self.assertFalse((backup_repo / "UserData" / "ScoreSaber" / "Replays").exists())
self.assertFalse((backup_repo / "UserData" / "BeatSaberPlus" / "Cache").exists())
self.assertFalse((backup_repo / "UserData" / "BeatSaverNotifier.json").exists())
self.assertFalse((backup_repo / "UserData" / "Accsaber" / "PlayerScoreCache.json").exists())
self.assertFalse((backup_repo / "AppData" / "Player.log").exists())
self.assertEqual((backup_repo / "AppData" / "settings.cfg").read_text(), "settings")
descriptor = json.loads((backup_repo / "backup-descriptor.json").read_text(encoding="utf-8"))
self.assertEqual(descriptor["instance"], "1.44.1")
self.assertEqual(descriptor["sources"][0]["source"], str(instance / "UserData"))
self.assertIn("BeatLeader/Replays", descriptor["skipped"])
self.assertIn("*.log", descriptor["excludePatterns"])
def test_infer_proton_appdata_path(self) -> None:
self.assertEqual(
infer_proton_appdata_path(),
Path.home()
/ ".local/share/BSManager/SharedContent/compatdata/pfx/drive_c/users/steamuser/AppData/LocalLow/Hyperbolic Magnetism/Beat Saber",
)
def test_infer_appdata_path_uses_windows_or_proton(self) -> None:
windows_instance = Path("/home/pleb/Windows/Users/pleb/BSManager/BSInstances/1.44.1")
linux_instance = Path("/home/pleb/.local/share/BSManager/BSInstances/1.44.1")
self.assertEqual(infer_appdata_path(windows_instance), infer_windows_appdata_path(windows_instance))
self.assertEqual(infer_appdata_path(linux_instance), infer_proton_appdata_path())
def test_restore_windows_data_repo_roundtrip(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
instance = root / "Users" / "pleb" / "BSManager" / "BSInstances" / "1.44.1"
appdata = root / "Users" / "pleb" / "AppData" / "LocalLow" / "Hyperbolic Magnetism" / "Beat Saber"
backup_repo = root / "backup"
(instance / "UserData").mkdir(parents=True)
(instance / "UserData" / "settings.json").write_text('{"saved": true}', encoding="utf-8")
appdata.mkdir(parents=True)
(appdata / "settings.cfg").write_text("settings", encoding="utf-8")
sync_windows_data_repo(
instance="1.44.1",
instance_path=instance,
backup_root=backup_repo,
)
(instance / "UserData" / "settings.json").write_text('{"saved": false}', encoding="utf-8")
(appdata / "settings.cfg").write_text("changed", encoding="utf-8")
result = restore_windows_data_repo(
instance="1.44.1",
instance_path=instance,
backup_root=backup_repo,
)
self.assertEqual((instance / "UserData" / "settings.json").read_text(), '{"saved": true}')
self.assertEqual((appdata / "settings.cfg").read_text(), "settings")
self.assertEqual(len(result["restored"]), 2)
self.assertTrue(all(item["snapshot"] for item in result["restored"]))
def test_restore_windows_data_repo_rejects_instance_mismatch(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
instance = root / "1.44.1"
backup_repo = root / "backup"
(instance / "UserData").mkdir(parents=True)
(instance / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
(backup_repo / "UserData").mkdir(parents=True)
(backup_repo / "UserData" / "settings.json").write_text("{}", encoding="utf-8")
(backup_repo / "backup-descriptor.json").write_text(
json.dumps({"instance": "1.40.8"}) + "\n",
encoding="utf-8",
)
with self.assertRaisesRegex(ValueError, "does not match"):
restore_windows_data_repo(
instance="1.44.1",
instance_path=instance,
backup_root=backup_repo,
include_appdata=False,
)
def test_restore_userdata_cli(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
instances_root = root / "instances"
instance = instances_root / "1.44.1"
backup_repo = root / "backup"
(instance / "UserData").mkdir(parents=True)
(instance / "UserData" / "settings.json").write_text('{"restored": true}', encoding="utf-8")
(backup_repo / "UserData").mkdir(parents=True)
(backup_repo / "UserData" / "settings.json").write_text('{"restored": true}', encoding="utf-8")
status = run(
[
"--instances-root",
str(instances_root),
"--state-dir",
str(root / "state"),
"restore-userdata",
"--instance",
"1.44.1",
"--backup-root",
str(backup_repo),
"--no-appdata",
]
)
self.assertEqual(status, 0)
self.assertEqual((instance / "UserData" / "settings.json").read_text(), '{"restored": true}')
def test_check_reports_missing_asset(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
work = Path(tmp)
state = work / "state"
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo=None,
install_strategy="dll-to-plugins",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo=None,
tag="v1.0.0",
asset="Missing.dll",
sha256=None,
),
),
)
result = check_lock(
instance="1.40.8",
registry=registry,
lockfile=lockfile,
state_root=state,
repo_root=work,
)
self.assertEqual(result["summary"]["errors"], 1)
self.assertEqual(result["plugins"][0]["status"], "error")
def test_installed_plugins_report_includes_locked_version(self) -> None:
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example Plugin",
repo="owner/example",
install_strategy="dll-to-plugins",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo="owner/example",
tag="v1.2.3",
asset="Example.dll",
sha256="abc123",
),
),
)
report = installed_plugins_report(
installed_state={
"instance": "1.40.8",
"plugins": {
"example": {
"installedAt": "2026-06-14T17:18:40Z",
"files": [{"path": "Plugins/Example.dll"}],
}
},
},
registry=registry,
lockfile=lockfile,
)
self.assertEqual(report["plugins"][0]["name"], "Example Plugin")
self.assertEqual(report["plugins"][0]["version"], "v1.2.3")
self.assertEqual(report["plugins"][0]["asset"], "Example.dll")
self.assertEqual(report["plugins"][0]["fileCount"], 1)
def test_update_check_reports_current_matching_asset(self) -> None:
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo="owner/example",
asset_patterns=("1.40.8.zip",),
install_strategy="bsipa-zip",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo="owner/example",
tag="v1.1.0",
asset="1.40.8.zip",
sha256="abc123",
),
),
)
result = check_updates(
registry=registry,
lockfile=lockfile,
fetch_releases=lambda repo: [
{
"tag_name": "v1.1.0",
"published_at": "2026-06-10T00:00:00Z",
"assets": [{"name": "1.40.8.zip", "digest": "sha256:abc123"}],
}
],
)
self.assertEqual(result["summary"]["current"], 1)
self.assertEqual(result["plugins"][0]["status"], "current")
self.assertEqual(result["plugins"][0]["latestAssetSha256"], "abc123")
def test_update_check_reports_new_matching_release(self) -> None:
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo="owner/example",
asset_patterns=("*.zip",),
install_strategy="bsipa-zip",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo="owner/example",
tag="v1.1.0",
asset="1.40.8.zip",
sha256="abc123",
),
),
)
result = check_updates(
registry=registry,
lockfile=lockfile,
fetch_releases=lambda repo: [
{
"tag_name": "v1.2.0",
"published_at": "2026-06-12T00:00:00Z",
"assets": [
{"name": "1.29.1.zip"},
{"name": "1.40.8.zip", "browser_download_url": "https://example.invalid/asset"},
],
},
{
"tag_name": "v1.1.0",
"published_at": "2026-06-10T00:00:00Z",
"assets": [{"name": "1.40.8.zip"}],
},
],
)
self.assertEqual(result["summary"]["updates"], 1)
self.assertEqual(result["plugins"][0]["status"], "update")
self.assertEqual(result["plugins"][0]["latestTag"], "v1.2.0")
self.assertEqual(result["plugins"][0]["latestAsset"], "1.40.8.zip")
def test_update_check_reports_replaced_asset_digest(self) -> None:
registry = Registry(
{
"example": RegistryPlugin(
id="example",
name="Example",
repo="owner/example",
asset_patterns=("1.40.8.zip",),
install_strategy="bsipa-zip",
)
}
)
lockfile = Lockfile(
beat_saber_version="1.40.8",
instance="1.40.8",
plugins=(
LockedPlugin(
id="example",
repo="owner/example",
tag="v1.1.0",
asset="1.40.8.zip",
sha256="old",
),
),
)
result = check_updates(
registry=registry,
lockfile=lockfile,
fetch_releases=lambda repo: [
{
"tag_name": "v1.1.0",
"published_at": "2026-06-10T00:00:00Z",
"assets": [{"name": "1.40.8.zip", "digest": "sha256:new"}],
}
],
)
self.assertEqual(result["summary"]["updates"], 1)
self.assertEqual(result["plugins"][0]["status"], "update")
self.assertEqual(result["plugins"][0]["latestAssetSha256"], "new")
def _make_tui_fixture(root: Path, *, disabled: bool = False, hash_mismatch: bool = False) -> tuple[PluginHelperTui, Path, Path]:
repo = root / "repo"
instance_root = root / "instances"
instance = instance_root / "1.40.8"
state = root / "state"
(repo / "registry").mkdir(parents=True)
(repo / "locks").mkdir()
(instance / "Beat Saber_Data").mkdir(parents=True)
(instance / "Plugins").mkdir()
asset = plugin_downloads_dir(state, "1.40.8", "example") / "Example.dll"
asset.write_bytes(b"managed dll")
(repo / "registry" / "plugins.toml").write_text(
"""
[[plugins]]
id = "example"
name = "Example"
repo = "owner/example"
asset_patterns = ["*.dll"]
install_strategy = "dll-to-plugins"
""".lstrip(),
encoding="utf-8",
)
(repo / "locks" / "1.40.8.lock.toml").write_text(
f"""
beat_saber_version = "1.40.8"
instance = "1.40.8"
[[plugins]]
id = "example"
repo = "owner/example"
tag = "v1.0.0"
asset = "Example.dll"
sha256 = "{sha256_file(asset)}"
""".lstrip(),
encoding="utf-8",
)
target = instance / "Plugins" / "Example.dll"
if disabled:
plugins = {}
disabled_plugins = {
"example": {
"installedAt": "2026-06-14T17:18:40Z",
"disabledAt": "2026-06-14T17:20:00Z",
"files": [{"path": "Plugins/Example.dll", "sha256": sha256_file(asset), "size": asset.stat().st_size}],
}
}
else:
target.write_bytes(b"changed dll" if hash_mismatch else b"managed dll")
plugins = {
"example": {
"installedAt": "2026-06-14T17:18:40Z",
"files": [{"path": "Plugins/Example.dll", "sha256": sha256_file(asset), "size": asset.stat().st_size}],
}
}
disabled_plugins = {}
save_installed_state(
state,
"1.40.8",
{"instance": "1.40.8", "plugins": plugins, "disabledPlugins": disabled_plugins},
)
choice = InstallationChoice(
install_id="test",
install_label="Test Install",
instance_name="1.40.8",
instance_path=instance,
state_root=state,
)
return PluginHelperTui(choices=[choice], repo_root=repo), instance, state
class PluginHelperTuiTests(unittest.IsolatedAsyncioTestCase):
async def test_installation_picker_shows_duplicate_instances_with_state_dirs(self) -> None:
choices = [
InstallationChoice(
install_id="linux",
install_label="Linux",
instance_name="1.44.1",
instance_path=Path("/tmp/linux/1.44.1"),
state_root=Path("/tmp/state-linux"),
),
InstallationChoice(
install_id="windows",
install_label="Windows",
instance_name="1.44.1",
instance_path=Path("/tmp/windows/1.44.1"),
state_root=Path("/tmp/state-windows"),
),
]
app = PluginHelperTui(choices=choices, repo_root=Path("/tmp/repo"))
async with app.run_test():
table = app.query_one(DataTable)
self.assertEqual(table.row_count, 2)
self.assertEqual(app.mode, "installations")
async def test_space_disables_enabled_plugin_without_id_prompt(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
app, instance, state = _make_tui_fixture(Path(tmp))
async with app.run_test() as pilot:
await pilot.press("enter")
self.assertEqual(app.plugin_rows[0]["status"], "enabled")
table = app.query_one(DataTable)
self.assertEqual(table.get_cell_at(Coordinate(0, 0)), Text("[x]", no_wrap=True))
await pilot.press("space")
self.assertFalse((instance / "Plugins" / "Example.dll").exists())
updated = load_installed_state(state, "1.40.8")
self.assertNotIn("example", updated["plugins"])
self.assertIn("example", updated["disabledPlugins"])
async def test_space_enables_disabled_plugin_from_asset(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
app, instance, state = _make_tui_fixture(Path(tmp), disabled=True)
async with app.run_test() as pilot:
await pilot.press("enter")
self.assertEqual(app.plugin_rows[0]["status"], "disabled")
await pilot.press("space")
self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"managed dll")
updated = load_installed_state(state, "1.40.8")
self.assertIn("example", updated["plugins"])
self.assertNotIn("example", updated["disabledPlugins"])
async def test_disable_all_disables_enabled_plugins(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
app, instance, state = _make_tui_fixture(Path(tmp))
async with app.run_test() as pilot:
await pilot.press("enter")
await pilot.press("d")
self.assertFalse((instance / "Plugins" / "Example.dll").exists())
updated = load_installed_state(state, "1.40.8")
self.assertNotIn("example", updated["plugins"])
self.assertIn("example", updated["disabledPlugins"])
self.assertIn("Disabled 1 plugins", app.status_message)
async def test_enable_all_enables_disabled_plugins(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
app, instance, state = _make_tui_fixture(Path(tmp), disabled=True)
async with app.run_test() as pilot:
await pilot.press("enter")
await pilot.press("e")
self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"managed dll")
updated = load_installed_state(state, "1.40.8")
self.assertIn("example", updated["plugins"])
self.assertNotIn("example", updated["disabledPlugins"])
self.assertIn("Enabled 1 plugins", app.status_message)
async def test_space_reports_hash_mismatch_without_state_update(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
app, instance, state = _make_tui_fixture(Path(tmp), hash_mismatch=True)
async with app.run_test() as pilot:
await pilot.press("enter")
await pilot.press("space")
self.assertEqual((instance / "Plugins" / "Example.dll").read_bytes(), b"changed dll")
updated = load_installed_state(state, "1.40.8")
self.assertIn("example", updated["plugins"])
self.assertNotIn("example", updated.get("disabledPlugins", {}))
self.assertIn("hash mismatch", app.status_message)
if __name__ == "__main__":
unittest.main()