Use md5 to only store unique content
All checks were successful
Python CI / test (push) Successful in 3m12s
Lint / lint (push) Successful in 1m38s
Tests / test (3.11) (push) Successful in 2m26s
Tests / test (3.12) (push) Successful in 3m13s
Tests / test (3.13) (push) Successful in 3m9s
Tests / test (3.14) (push) Successful in 1m20s

This commit is contained in:
Duncan Tourolle 2025-12-31 12:46:01 +01:00
parent 254a95d83c
commit 6a791b1397
4 changed files with 911 additions and 2 deletions

View File

@ -2,12 +2,37 @@
Asset management system for pyPhotoAlbum with automatic reference counting
"""
import hashlib
import os
import shutil
from typing import Dict, Optional
from typing import Dict, List, Optional, Tuple
from pathlib import Path
def compute_file_md5(file_path: str) -> Optional[str]:
"""
Compute MD5 hash of a file.
Args:
file_path: Path to the file
Returns:
MD5 hash as hex string, or None if file doesn't exist
"""
if not os.path.exists(file_path):
return None
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
print(f"AssetManager: Error computing MD5 for {file_path}: {e}")
return None
class AssetManager:
"""Manages project assets with automatic reference counting and cleanup"""
@ -21,6 +46,7 @@ class AssetManager:
self.project_folder = project_folder
self.assets_folder = os.path.join(project_folder, "assets")
self.reference_counts: Dict[str, int] = {} # {relative_path: count}
self.asset_hashes: Dict[str, str] = {} # {relative_path: md5_hash}
# Create assets folder if it doesn't exist
os.makedirs(self.assets_folder, exist_ok=True)
@ -143,9 +169,262 @@ class AssetManager:
def serialize(self) -> Dict:
"""Serialize asset manager state"""
return {"reference_counts": self.reference_counts}
return {
"reference_counts": self.reference_counts,
"asset_hashes": self.asset_hashes,
}
def deserialize(self, data: Dict):
"""Deserialize asset manager state"""
self.reference_counts = data.get("reference_counts", {})
self.asset_hashes = data.get("asset_hashes", {})
print(f"AssetManager: Loaded {len(self.reference_counts)} asset references")
def compute_asset_hash(self, asset_path: str) -> Optional[str]:
"""
Compute and cache the MD5 hash for an asset.
Args:
asset_path: Relative path to the asset
Returns:
MD5 hash as hex string, or None if computation fails
"""
full_path = self.get_absolute_path(asset_path)
md5_hash = compute_file_md5(full_path)
if md5_hash:
self.asset_hashes[asset_path] = md5_hash
return md5_hash
def compute_all_hashes(self) -> Dict[str, str]:
"""
Compute MD5 hashes for all assets in the assets folder.
Returns:
Dictionary mapping relative paths to MD5 hashes
"""
self.asset_hashes.clear()
if not os.path.exists(self.assets_folder):
return self.asset_hashes
for root, dirs, files in os.walk(self.assets_folder):
for filename in files:
file_path = os.path.join(root, filename)
relative_path = os.path.relpath(file_path, self.project_folder)
md5_hash = compute_file_md5(file_path)
if md5_hash:
self.asset_hashes[relative_path] = md5_hash
print(f"AssetManager: Computed hashes for {len(self.asset_hashes)} assets")
return self.asset_hashes
def find_duplicates(self) -> Dict[str, List[str]]:
"""
Find duplicate assets based on MD5 hash.
Returns:
Dictionary mapping MD5 hash to list of asset paths with that hash.
Only includes hashes with more than one file.
"""
# Compute hashes if not already done
if not self.asset_hashes:
self.compute_all_hashes()
# Group assets by hash
hash_to_paths: Dict[str, List[str]] = {}
for path, md5_hash in self.asset_hashes.items():
if md5_hash not in hash_to_paths:
hash_to_paths[md5_hash] = []
hash_to_paths[md5_hash].append(path)
# Filter to only duplicates (more than one file with same hash)
duplicates = {h: paths for h, paths in hash_to_paths.items() if len(paths) > 1}
if duplicates:
total_dups = sum(len(paths) - 1 for paths in duplicates.values())
print(f"AssetManager: Found {total_dups} duplicate files in {len(duplicates)} groups")
return duplicates
def deduplicate_assets(self, update_references_callback=None) -> Tuple[int, int]:
"""
Remove duplicate assets, keeping one canonical copy and updating references.
Args:
update_references_callback: Optional callback function that takes
(old_path, new_path) to update external references (e.g., ImageData elements)
Returns:
Tuple of (files_removed, bytes_saved)
"""
duplicates = self.find_duplicates()
if not duplicates:
print("AssetManager: No duplicates found")
return (0, 0)
files_removed = 0
bytes_saved = 0
for md5_hash, paths in duplicates.items():
# Sort paths to get consistent canonical path (first alphabetically)
paths.sort()
canonical_path = paths[0]
# Remove duplicates and update references
for dup_path in paths[1:]:
full_dup_path = self.get_absolute_path(dup_path)
# Get file size before deletion
try:
file_size = os.path.getsize(full_dup_path)
except OSError:
file_size = 0
# Update references if callback provided
if update_references_callback:
update_references_callback(dup_path, canonical_path)
# Transfer reference count to canonical path
if dup_path in self.reference_counts:
dup_refs = self.reference_counts[dup_path]
if canonical_path in self.reference_counts:
self.reference_counts[canonical_path] += dup_refs
else:
self.reference_counts[canonical_path] = dup_refs
del self.reference_counts[dup_path]
# Delete the duplicate file
try:
if os.path.exists(full_dup_path):
os.remove(full_dup_path)
files_removed += 1
bytes_saved += file_size
print(f"AssetManager: Removed duplicate {dup_path} (kept {canonical_path})")
except Exception as e:
print(f"AssetManager: Error removing duplicate {dup_path}: {e}")
# Remove from hash tracking
if dup_path in self.asset_hashes:
del self.asset_hashes[dup_path]
print(f"AssetManager: Deduplication complete - removed {files_removed} files, saved {bytes_saved} bytes")
return (files_removed, bytes_saved)
def get_duplicate_stats(self) -> Tuple[int, int, int]:
"""
Get statistics about duplicate assets without modifying anything.
Returns:
Tuple of (duplicate_groups, total_duplicate_files, estimated_bytes_to_save)
"""
duplicates = self.find_duplicates()
if not duplicates:
return (0, 0, 0)
duplicate_groups = len(duplicates)
total_duplicate_files = sum(len(paths) - 1 for paths in duplicates.values())
# Calculate bytes that would be saved
bytes_to_save = 0
for paths in duplicates.values():
# Skip the first (canonical) file, count size of the rest
for dup_path in paths[1:]:
full_path = self.get_absolute_path(dup_path)
try:
bytes_to_save += os.path.getsize(full_path)
except OSError:
pass
return (duplicate_groups, total_duplicate_files, bytes_to_save)
def find_unused_assets(self) -> List[str]:
"""
Find assets that exist in the assets folder but have no references.
Returns:
List of relative paths to unused assets
"""
unused = []
if not os.path.exists(self.assets_folder):
return unused
# Get all files in assets folder
for root, dirs, files in os.walk(self.assets_folder):
for filename in files:
file_path = os.path.join(root, filename)
relative_path = os.path.relpath(file_path, self.project_folder)
# Check if this asset has any references
ref_count = self.reference_counts.get(relative_path, 0)
if ref_count <= 0:
unused.append(relative_path)
if unused:
print(f"AssetManager: Found {len(unused)} unused assets")
return unused
def get_unused_stats(self) -> Tuple[int, int]:
"""
Get statistics about unused assets without modifying anything.
Returns:
Tuple of (unused_file_count, total_bytes)
"""
unused = self.find_unused_assets()
if not unused:
return (0, 0)
total_bytes = 0
for asset_path in unused:
full_path = self.get_absolute_path(asset_path)
try:
total_bytes += os.path.getsize(full_path)
except OSError:
pass
return (len(unused), total_bytes)
def remove_unused_assets(self) -> Tuple[int, int]:
"""
Remove all unused assets from the assets folder.
Returns:
Tuple of (files_removed, bytes_freed)
"""
unused = self.find_unused_assets()
if not unused:
print("AssetManager: No unused assets to remove")
return (0, 0)
files_removed = 0
bytes_freed = 0
for asset_path in unused:
full_path = self.get_absolute_path(asset_path)
try:
file_size = os.path.getsize(full_path)
except OSError:
file_size = 0
try:
if os.path.exists(full_path):
os.remove(full_path)
files_removed += 1
bytes_freed += file_size
print(f"AssetManager: Removed unused asset {asset_path}")
# Clean up tracking
if asset_path in self.reference_counts:
del self.reference_counts[asset_path]
if asset_path in self.asset_hashes:
del self.asset_hashes[asset_path]
except Exception as e:
print(f"AssetManager: Error removing unused asset {asset_path}: {e}")
print(f"AssetManager: Removed {files_removed} unused assets, freed {bytes_freed} bytes")
return (files_removed, bytes_freed)

View File

@ -640,6 +640,165 @@ class FileOperationsMixin:
else:
self.show_status("PDF export failed to start", 3000)
@ribbon_action(label="Clean Assets", tooltip="Find and remove duplicate or unused image files", tab="Home", group="File")
def clean_assets(self):
"""Find and remove duplicate and unused asset files to save space"""
from PyQt6.QtWidgets import QProgressDialog, QCheckBox
from PyQt6.QtCore import Qt
# Helper to format bytes
def format_bytes(num_bytes):
if num_bytes >= 1024 * 1024:
return f"{num_bytes / (1024 * 1024):.1f} MB"
elif num_bytes >= 1024:
return f"{num_bytes / 1024:.1f} KB"
else:
return f"{num_bytes} bytes"
# Scan for issues with progress dialog
progress = QProgressDialog("Scanning assets...", "Cancel", 0, 100, self)
progress.setWindowTitle("Clean Assets")
progress.setWindowModality(Qt.WindowModality.WindowModal)
progress.setValue(10)
# Compute hashes for duplicate detection
self.project.asset_manager.compute_all_hashes()
progress.setValue(40)
if progress.wasCanceled():
return
# Get duplicate stats
dup_groups, dup_files, dup_bytes = self.project.asset_manager.get_duplicate_stats()
progress.setValue(60)
# Get unused stats
unused_files, unused_bytes = self.project.asset_manager.get_unused_stats()
progress.setValue(80)
progress.close()
# Check if there's anything to clean
if dup_files == 0 and unused_files == 0:
QMessageBox.information(
self,
"Assets Clean",
"No duplicate or unused files were found in your project assets."
)
return
# Build dialog with checkboxes for each cleanup type
dialog = QDialog(self)
dialog.setWindowTitle("Clean Assets")
dialog.setMinimumWidth(450)
layout = QVBoxLayout()
# Info label
info_label = QLabel("Select which cleanup operations to perform:")
layout.addWidget(info_label)
# Duplicates checkbox
dup_checkbox = None
if dup_files > 0:
dup_checkbox = QCheckBox(
f"Remove {dup_files} duplicate file(s) in {dup_groups} group(s) "
f"(saves {format_bytes(dup_bytes)})"
)
dup_checkbox.setChecked(True)
dup_checkbox.setToolTip(
"Duplicate files have identical content but different names.\n"
"Image references will be automatically updated to use the kept file."
)
layout.addWidget(dup_checkbox)
# Unused checkbox
unused_checkbox = None
if unused_files > 0:
unused_checkbox = QCheckBox(
f"Remove {unused_files} unused file(s) (saves {format_bytes(unused_bytes)})"
)
unused_checkbox.setChecked(True)
unused_checkbox.setToolTip(
"Unused files exist in the assets folder but are not referenced\n"
"by any image element in your project."
)
layout.addWidget(unused_checkbox)
# Summary
total_files = dup_files + unused_files
total_bytes = dup_bytes + unused_bytes
summary_label = QLabel(f"\nTotal potential savings: {format_bytes(total_bytes)} from {total_files} file(s)")
summary_label.setStyleSheet("font-weight: bold;")
layout.addWidget(summary_label)
# Buttons
button_layout = QHBoxLayout()
cancel_btn = QPushButton("Cancel")
cancel_btn.clicked.connect(dialog.reject)
clean_btn = QPushButton("Clean Selected")
clean_btn.clicked.connect(dialog.accept)
clean_btn.setDefault(True)
button_layout.addStretch()
button_layout.addWidget(cancel_btn)
button_layout.addWidget(clean_btn)
layout.addLayout(button_layout)
dialog.setLayout(layout)
if dialog.exec() != QDialog.DialogCode.Accepted:
return
# Perform selected cleanups
total_removed = 0
total_saved = 0
# Remove duplicates if selected
if dup_checkbox and dup_checkbox.isChecked():
def update_image_references(old_path: str, new_path: str):
"""Update all ImageData elements that reference the old path"""
from pyPhotoAlbum.models import ImageData
for page in self.project.pages:
for element in page.layout.elements:
if isinstance(element, ImageData) and element.image_path == old_path:
element.image_path = new_path
element.mark_modified()
print(f"Updated image reference: {old_path} -> {new_path}")
removed, saved = self.project.asset_manager.deduplicate_assets(
update_references_callback=update_image_references
)
total_removed += removed
total_saved += saved
# Remove unused if selected
if unused_checkbox and unused_checkbox.isChecked():
removed, saved = self.project.asset_manager.remove_unused_assets()
total_removed += removed
total_saved += saved
if total_removed > 0:
# Mark project as dirty since we modified it
self.project.mark_dirty()
# Update view
self.update_view()
# Show result
QMessageBox.information(
self,
"Cleanup Complete",
f"Removed {total_removed} file(s).\n\n"
f"Saved {format_bytes(total_saved)} of disk space.\n\n"
f"Remember to save your project to preserve these changes."
)
self.show_status(f"Asset cleanup complete: removed {total_removed} files, saved {format_bytes(total_saved)}")
else:
self.show_status("No files were removed")
@ribbon_action(label="About", tooltip="About pyPhotoAlbum and data format version", tab="Home", group="File")
def show_about(self):
"""Show about dialog with version information"""

View File

@ -835,6 +835,8 @@ class ThumbnailBrowserDock(QDockWidget):
def _apply_sort(self):
"""Apply current sort mode to image files."""
if not hasattr(self.gl_widget, 'image_files') or not self.gl_widget.image_files:
return
if self.current_sort == "name":
# Sort by filename only (not full path)
self.gl_widget.image_files.sort(key=lambda p: p.name.lower())

469
tests/test_asset_manager.py Normal file
View File

@ -0,0 +1,469 @@
"""
Tests for AssetManager functionality including deduplication and unused asset detection
"""
import os
import pytest
import tempfile
import shutil
from PIL import Image
from pyPhotoAlbum.asset_manager import AssetManager, compute_file_md5
class TestComputeFileMd5:
"""Tests for the compute_file_md5 function"""
def test_compute_md5_existing_file(self, tmp_path):
"""Test MD5 computation for an existing file"""
# Create a test file
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, World!")
md5_hash = compute_file_md5(str(test_file))
assert md5_hash is not None
# Known MD5 for "Hello, World!"
assert md5_hash == "65a8e27d8879283831b664bd8b7f0ad4"
def test_compute_md5_nonexistent_file(self):
"""Test MD5 computation returns None for non-existent file"""
md5_hash = compute_file_md5("/nonexistent/path/file.txt")
assert md5_hash is None
def test_compute_md5_same_content_same_hash(self, tmp_path):
"""Test that identical content produces identical hashes"""
content = b"Test content for hashing"
file1 = tmp_path / "file1.bin"
file2 = tmp_path / "file2.bin"
file1.write_bytes(content)
file2.write_bytes(content)
hash1 = compute_file_md5(str(file1))
hash2 = compute_file_md5(str(file2))
assert hash1 == hash2
def test_compute_md5_different_content_different_hash(self, tmp_path):
"""Test that different content produces different hashes"""
file1 = tmp_path / "file1.txt"
file2 = tmp_path / "file2.txt"
file1.write_text("Content A")
file2.write_text("Content B")
hash1 = compute_file_md5(str(file1))
hash2 = compute_file_md5(str(file2))
assert hash1 != hash2
class TestAssetManagerDeduplication:
"""Tests for AssetManager deduplication methods"""
@pytest.fixture
def asset_manager(self, tmp_path):
"""Create an AssetManager with a temporary project folder"""
project_folder = str(tmp_path / "test_project")
os.makedirs(project_folder)
return AssetManager(project_folder)
@pytest.fixture
def create_test_image(self):
"""Factory fixture for creating test images"""
def _create(path, color="red", size=(100, 100)):
img = Image.new("RGB", size, color=color)
img.save(path)
return path
return _create
def test_compute_all_hashes_empty_folder(self, asset_manager):
"""Test hash computation on empty assets folder"""
hashes = asset_manager.compute_all_hashes()
assert len(hashes) == 0
def test_compute_all_hashes_with_files(self, asset_manager, create_test_image):
"""Test hash computation with files in assets folder"""
# Create some test images
img1 = os.path.join(asset_manager.assets_folder, "image1.png")
img2 = os.path.join(asset_manager.assets_folder, "image2.png")
create_test_image(img1, color="red")
create_test_image(img2, color="blue")
hashes = asset_manager.compute_all_hashes()
assert len(hashes) == 2
assert "assets/image1.png" in hashes
assert "assets/image2.png" in hashes
def test_find_duplicates_no_duplicates(self, asset_manager, create_test_image):
"""Test finding duplicates when there are none"""
img1 = os.path.join(asset_manager.assets_folder, "image1.png")
img2 = os.path.join(asset_manager.assets_folder, "image2.png")
create_test_image(img1, color="red")
create_test_image(img2, color="blue")
duplicates = asset_manager.find_duplicates()
assert len(duplicates) == 0
def test_find_duplicates_with_duplicates(self, asset_manager, tmp_path):
"""Test finding actual duplicate files"""
# Create a source image
source_img = tmp_path / "source.png"
img = Image.new("RGB", (50, 50), color="green")
img.save(str(source_img))
# Copy the same image twice to assets folder
dup1 = os.path.join(asset_manager.assets_folder, "dup1.png")
dup2 = os.path.join(asset_manager.assets_folder, "dup2.png")
shutil.copy(str(source_img), dup1)
shutil.copy(str(source_img), dup2)
duplicates = asset_manager.find_duplicates()
assert len(duplicates) == 1 # One group of duplicates
# The group should contain both files
for paths in duplicates.values():
assert len(paths) == 2
assert "assets/dup1.png" in paths
assert "assets/dup2.png" in paths
def test_get_duplicate_stats_no_duplicates(self, asset_manager, create_test_image):
"""Test duplicate stats when there are no duplicates"""
img1 = os.path.join(asset_manager.assets_folder, "image1.png")
create_test_image(img1, color="red")
groups, files, bytes_to_save = asset_manager.get_duplicate_stats()
assert groups == 0
assert files == 0
assert bytes_to_save == 0
def test_get_duplicate_stats_with_duplicates(self, asset_manager, tmp_path):
"""Test duplicate stats with actual duplicates"""
# Create a source image
source_img = tmp_path / "source.png"
img = Image.new("RGB", (100, 100), color="purple")
img.save(str(source_img))
# Copy to assets folder 3 times (creates 2 duplicates)
for i in range(3):
dest = os.path.join(asset_manager.assets_folder, f"image{i}.png")
shutil.copy(str(source_img), dest)
groups, files, bytes_to_save = asset_manager.get_duplicate_stats()
assert groups == 1 # One group
assert files == 2 # Two extra copies to remove
assert bytes_to_save > 0
def test_deduplicate_assets_removes_files(self, asset_manager, tmp_path):
"""Test that deduplication actually removes duplicate files"""
# Create a source image
source_img = tmp_path / "source.png"
img = Image.new("RGB", (50, 50), color="yellow")
img.save(str(source_img))
# Copy to assets folder 3 times
for i in range(3):
dest = os.path.join(asset_manager.assets_folder, f"image{i}.png")
shutil.copy(str(source_img), dest)
asset_manager.reference_counts[f"assets/image{i}.png"] = 1
# Count files before
files_before = len(os.listdir(asset_manager.assets_folder))
assert files_before == 3
# Run deduplication
files_removed, bytes_saved = asset_manager.deduplicate_assets()
# Check results
assert files_removed == 2
assert bytes_saved > 0
# Count files after
files_after = len(os.listdir(asset_manager.assets_folder))
assert files_after == 1
def test_deduplicate_assets_updates_callback(self, asset_manager, tmp_path):
"""Test that deduplication calls the update callback correctly"""
# Create a source image
source_img = tmp_path / "source.png"
img = Image.new("RGB", (50, 50), color="cyan")
img.save(str(source_img))
# Copy to assets folder
dest1 = os.path.join(asset_manager.assets_folder, "a_first.png")
dest2 = os.path.join(asset_manager.assets_folder, "b_second.png")
shutil.copy(str(source_img), dest1)
shutil.copy(str(source_img), dest2)
# Track callback invocations
callback_calls = []
def track_callback(old_path, new_path):
callback_calls.append((old_path, new_path))
# Run deduplication
asset_manager.deduplicate_assets(update_references_callback=track_callback)
# Callback should have been called for the duplicate
assert len(callback_calls) == 1
# b_second.png should be remapped to a_first.png (alphabetical order)
assert callback_calls[0] == ("assets/b_second.png", "assets/a_first.png")
def test_deduplicate_assets_transfers_reference_counts(self, asset_manager, tmp_path):
"""Test that reference counts are properly transferred during deduplication"""
# Create a source image
source_img = tmp_path / "source.png"
img = Image.new("RGB", (50, 50), color="magenta")
img.save(str(source_img))
# Copy to assets folder
dest1 = os.path.join(asset_manager.assets_folder, "a_first.png")
dest2 = os.path.join(asset_manager.assets_folder, "b_second.png")
shutil.copy(str(source_img), dest1)
shutil.copy(str(source_img), dest2)
# Set reference counts
asset_manager.reference_counts["assets/a_first.png"] = 2
asset_manager.reference_counts["assets/b_second.png"] = 3
# Run deduplication
asset_manager.deduplicate_assets()
# Check reference counts were merged
assert asset_manager.reference_counts.get("assets/a_first.png") == 5
assert "assets/b_second.png" not in asset_manager.reference_counts
def test_serialize_includes_hashes(self, asset_manager, create_test_image):
"""Test that serialization includes asset hashes"""
img1 = os.path.join(asset_manager.assets_folder, "image1.png")
create_test_image(img1, color="red")
asset_manager.compute_all_hashes()
data = asset_manager.serialize()
assert "asset_hashes" in data
assert "assets/image1.png" in data["asset_hashes"]
def test_deserialize_restores_hashes(self, asset_manager):
"""Test that deserialization restores asset hashes"""
test_data = {
"reference_counts": {"assets/test.png": 1},
"asset_hashes": {"assets/test.png": "abc123hash"}
}
asset_manager.deserialize(test_data)
assert asset_manager.asset_hashes.get("assets/test.png") == "abc123hash"
def test_compute_asset_hash_single_file(self, asset_manager, create_test_image):
"""Test computing hash for a single asset"""
img_path = os.path.join(asset_manager.assets_folder, "single.png")
create_test_image(img_path, color="orange")
hash_result = asset_manager.compute_asset_hash("assets/single.png")
assert hash_result is not None
assert "assets/single.png" in asset_manager.asset_hashes
assert asset_manager.asset_hashes["assets/single.png"] == hash_result
class TestAssetManagerIntegration:
"""Integration tests for AssetManager with import and deduplication"""
@pytest.fixture
def asset_manager(self, tmp_path):
"""Create an AssetManager with a temporary project folder"""
project_folder = str(tmp_path / "test_project")
os.makedirs(project_folder)
return AssetManager(project_folder)
def test_import_then_deduplicate(self, asset_manager, tmp_path):
"""Test importing duplicate images and then deduplicating"""
# Create a source image
source_img = tmp_path / "source.png"
img = Image.new("RGB", (80, 80), color="navy")
img.save(str(source_img))
# Import the same image twice
path1 = asset_manager.import_asset(str(source_img))
path2 = asset_manager.import_asset(str(source_img))
assert path1 != path2 # Should have different names due to collision handling
# Check both files exist
assert os.path.exists(asset_manager.get_absolute_path(path1))
assert os.path.exists(asset_manager.get_absolute_path(path2))
# Find duplicates
duplicates = asset_manager.find_duplicates()
assert len(duplicates) == 1
# Deduplicate
files_removed, _ = asset_manager.deduplicate_assets()
assert files_removed == 1
# Only one file should remain
files_in_assets = os.listdir(asset_manager.assets_folder)
assert len(files_in_assets) == 1
class TestAssetManagerUnused:
"""Tests for AssetManager unused asset detection and removal"""
@pytest.fixture
def asset_manager(self, tmp_path):
"""Create an AssetManager with a temporary project folder"""
project_folder = str(tmp_path / "test_project")
os.makedirs(project_folder)
return AssetManager(project_folder)
@pytest.fixture
def create_test_image(self):
"""Factory fixture for creating test images"""
def _create(path, color="red", size=(100, 100)):
img = Image.new("RGB", size, color=color)
img.save(path)
return path
return _create
def test_find_unused_assets_empty_folder(self, asset_manager):
"""Test finding unused assets in empty folder"""
unused = asset_manager.find_unused_assets()
assert len(unused) == 0
def test_find_unused_assets_all_referenced(self, asset_manager, create_test_image):
"""Test finding unused assets when all are referenced"""
img1 = os.path.join(asset_manager.assets_folder, "image1.png")
img2 = os.path.join(asset_manager.assets_folder, "image2.png")
create_test_image(img1, color="red")
create_test_image(img2, color="blue")
# Add references for both
asset_manager.reference_counts["assets/image1.png"] = 1
asset_manager.reference_counts["assets/image2.png"] = 2
unused = asset_manager.find_unused_assets()
assert len(unused) == 0
def test_find_unused_assets_some_unreferenced(self, asset_manager, create_test_image):
"""Test finding unused assets when some have no references"""
img1 = os.path.join(asset_manager.assets_folder, "used.png")
img2 = os.path.join(asset_manager.assets_folder, "unused.png")
create_test_image(img1, color="red")
create_test_image(img2, color="blue")
# Only reference one
asset_manager.reference_counts["assets/used.png"] = 1
unused = asset_manager.find_unused_assets()
assert len(unused) == 1
assert "assets/unused.png" in unused
def test_find_unused_assets_zero_reference_count(self, asset_manager, create_test_image):
"""Test that zero reference count is considered unused"""
img = os.path.join(asset_manager.assets_folder, "orphan.png")
create_test_image(img, color="red")
# Set reference count to 0
asset_manager.reference_counts["assets/orphan.png"] = 0
unused = asset_manager.find_unused_assets()
assert len(unused) == 1
assert "assets/orphan.png" in unused
def test_get_unused_stats_no_unused(self, asset_manager, create_test_image):
"""Test unused stats when all assets are referenced"""
img = os.path.join(asset_manager.assets_folder, "image.png")
create_test_image(img, color="red")
asset_manager.reference_counts["assets/image.png"] = 1
count, total_bytes = asset_manager.get_unused_stats()
assert count == 0
assert total_bytes == 0
def test_get_unused_stats_with_unused(self, asset_manager, create_test_image):
"""Test unused stats with unreferenced files"""
img1 = os.path.join(asset_manager.assets_folder, "unused1.png")
img2 = os.path.join(asset_manager.assets_folder, "unused2.png")
create_test_image(img1, color="red")
create_test_image(img2, color="blue")
# No references for either file
count, total_bytes = asset_manager.get_unused_stats()
assert count == 2
assert total_bytes > 0
def test_remove_unused_assets_removes_files(self, asset_manager, create_test_image):
"""Test that unused assets are actually removed"""
used_path = os.path.join(asset_manager.assets_folder, "used.png")
unused_path = os.path.join(asset_manager.assets_folder, "unused.png")
create_test_image(used_path, color="red")
create_test_image(unused_path, color="blue")
# Only reference the used file
asset_manager.reference_counts["assets/used.png"] = 1
# Remove unused
files_removed, bytes_freed = asset_manager.remove_unused_assets()
assert files_removed == 1
assert bytes_freed > 0
# Check files on disk
assert os.path.exists(used_path)
assert not os.path.exists(unused_path)
def test_remove_unused_assets_no_unused(self, asset_manager, create_test_image):
"""Test removing unused when all assets are referenced"""
img = os.path.join(asset_manager.assets_folder, "used.png")
create_test_image(img, color="red")
asset_manager.reference_counts["assets/used.png"] = 1
files_removed, bytes_freed = asset_manager.remove_unused_assets()
assert files_removed == 0
assert bytes_freed == 0
assert os.path.exists(img)
def test_remove_unused_assets_cleans_tracking(self, asset_manager, create_test_image):
"""Test that removing unused assets cleans up internal tracking"""
img = os.path.join(asset_manager.assets_folder, "orphan.png")
create_test_image(img, color="red")
# Set up tracking with zero refs and a hash
asset_manager.reference_counts["assets/orphan.png"] = 0
asset_manager.asset_hashes["assets/orphan.png"] = "somehash"
asset_manager.remove_unused_assets()
# Tracking should be cleaned up
assert "assets/orphan.png" not in asset_manager.reference_counts
assert "assets/orphan.png" not in asset_manager.asset_hashes
def test_remove_unused_preserves_referenced(self, asset_manager, create_test_image):
"""Test that removing unused preserves all referenced assets"""
# Create several files
for i in range(5):
img = os.path.join(asset_manager.assets_folder, f"image{i}.png")
create_test_image(img, color="red")
# Reference only some of them
asset_manager.reference_counts["assets/image0.png"] = 1
asset_manager.reference_counts["assets/image2.png"] = 3
asset_manager.reference_counts["assets/image4.png"] = 1
files_removed, _ = asset_manager.remove_unused_assets()
assert files_removed == 2 # image1 and image3
# Check that referenced files still exist
assert os.path.exists(os.path.join(asset_manager.assets_folder, "image0.png"))
assert os.path.exists(os.path.join(asset_manager.assets_folder, "image2.png"))
assert os.path.exists(os.path.join(asset_manager.assets_folder, "image4.png"))
# Check that unreferenced files are gone
assert not os.path.exists(os.path.join(asset_manager.assets_folder, "image1.png"))
assert not os.path.exists(os.path.join(asset_manager.assets_folder, "image3.png"))