diff --git a/pyPhotoAlbum/asset_manager.py b/pyPhotoAlbum/asset_manager.py index b3e1868..8c19b62 100644 --- a/pyPhotoAlbum/asset_manager.py +++ b/pyPhotoAlbum/asset_manager.py @@ -2,12 +2,37 @@ Asset management system for pyPhotoAlbum with automatic reference counting """ +import hashlib import os import shutil -from typing import Dict, Optional +from typing import Dict, List, Optional, Tuple from pathlib import Path +def compute_file_md5(file_path: str) -> Optional[str]: + """ + Compute MD5 hash of a file. + + Args: + file_path: Path to the file + + Returns: + MD5 hash as hex string, or None if file doesn't exist + """ + if not os.path.exists(file_path): + return None + + hash_md5 = hashlib.md5() + try: + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + except Exception as e: + print(f"AssetManager: Error computing MD5 for {file_path}: {e}") + return None + + class AssetManager: """Manages project assets with automatic reference counting and cleanup""" @@ -21,6 +46,7 @@ class AssetManager: self.project_folder = project_folder self.assets_folder = os.path.join(project_folder, "assets") self.reference_counts: Dict[str, int] = {} # {relative_path: count} + self.asset_hashes: Dict[str, str] = {} # {relative_path: md5_hash} # Create assets folder if it doesn't exist os.makedirs(self.assets_folder, exist_ok=True) @@ -143,9 +169,262 @@ class AssetManager: def serialize(self) -> Dict: """Serialize asset manager state""" - return {"reference_counts": self.reference_counts} + return { + "reference_counts": self.reference_counts, + "asset_hashes": self.asset_hashes, + } def deserialize(self, data: Dict): """Deserialize asset manager state""" self.reference_counts = data.get("reference_counts", {}) + self.asset_hashes = data.get("asset_hashes", {}) print(f"AssetManager: Loaded {len(self.reference_counts)} asset references") + + def compute_asset_hash(self, asset_path: str) -> Optional[str]: + """ + Compute and cache the MD5 hash for an asset. + + Args: + asset_path: Relative path to the asset + + Returns: + MD5 hash as hex string, or None if computation fails + """ + full_path = self.get_absolute_path(asset_path) + md5_hash = compute_file_md5(full_path) + if md5_hash: + self.asset_hashes[asset_path] = md5_hash + return md5_hash + + def compute_all_hashes(self) -> Dict[str, str]: + """ + Compute MD5 hashes for all assets in the assets folder. + + Returns: + Dictionary mapping relative paths to MD5 hashes + """ + self.asset_hashes.clear() + + if not os.path.exists(self.assets_folder): + return self.asset_hashes + + for root, dirs, files in os.walk(self.assets_folder): + for filename in files: + file_path = os.path.join(root, filename) + relative_path = os.path.relpath(file_path, self.project_folder) + md5_hash = compute_file_md5(file_path) + if md5_hash: + self.asset_hashes[relative_path] = md5_hash + + print(f"AssetManager: Computed hashes for {len(self.asset_hashes)} assets") + return self.asset_hashes + + def find_duplicates(self) -> Dict[str, List[str]]: + """ + Find duplicate assets based on MD5 hash. + + Returns: + Dictionary mapping MD5 hash to list of asset paths with that hash. + Only includes hashes with more than one file. + """ + # Compute hashes if not already done + if not self.asset_hashes: + self.compute_all_hashes() + + # Group assets by hash + hash_to_paths: Dict[str, List[str]] = {} + for path, md5_hash in self.asset_hashes.items(): + if md5_hash not in hash_to_paths: + hash_to_paths[md5_hash] = [] + hash_to_paths[md5_hash].append(path) + + # Filter to only duplicates (more than one file with same hash) + duplicates = {h: paths for h, paths in hash_to_paths.items() if len(paths) > 1} + + if duplicates: + total_dups = sum(len(paths) - 1 for paths in duplicates.values()) + print(f"AssetManager: Found {total_dups} duplicate files in {len(duplicates)} groups") + + return duplicates + + def deduplicate_assets(self, update_references_callback=None) -> Tuple[int, int]: + """ + Remove duplicate assets, keeping one canonical copy and updating references. + + Args: + update_references_callback: Optional callback function that takes + (old_path, new_path) to update external references (e.g., ImageData elements) + + Returns: + Tuple of (files_removed, bytes_saved) + """ + duplicates = self.find_duplicates() + if not duplicates: + print("AssetManager: No duplicates found") + return (0, 0) + + files_removed = 0 + bytes_saved = 0 + + for md5_hash, paths in duplicates.items(): + # Sort paths to get consistent canonical path (first alphabetically) + paths.sort() + canonical_path = paths[0] + + # Remove duplicates and update references + for dup_path in paths[1:]: + full_dup_path = self.get_absolute_path(dup_path) + + # Get file size before deletion + try: + file_size = os.path.getsize(full_dup_path) + except OSError: + file_size = 0 + + # Update references if callback provided + if update_references_callback: + update_references_callback(dup_path, canonical_path) + + # Transfer reference count to canonical path + if dup_path in self.reference_counts: + dup_refs = self.reference_counts[dup_path] + if canonical_path in self.reference_counts: + self.reference_counts[canonical_path] += dup_refs + else: + self.reference_counts[canonical_path] = dup_refs + del self.reference_counts[dup_path] + + # Delete the duplicate file + try: + if os.path.exists(full_dup_path): + os.remove(full_dup_path) + files_removed += 1 + bytes_saved += file_size + print(f"AssetManager: Removed duplicate {dup_path} (kept {canonical_path})") + except Exception as e: + print(f"AssetManager: Error removing duplicate {dup_path}: {e}") + + # Remove from hash tracking + if dup_path in self.asset_hashes: + del self.asset_hashes[dup_path] + + print(f"AssetManager: Deduplication complete - removed {files_removed} files, saved {bytes_saved} bytes") + return (files_removed, bytes_saved) + + def get_duplicate_stats(self) -> Tuple[int, int, int]: + """ + Get statistics about duplicate assets without modifying anything. + + Returns: + Tuple of (duplicate_groups, total_duplicate_files, estimated_bytes_to_save) + """ + duplicates = self.find_duplicates() + if not duplicates: + return (0, 0, 0) + + duplicate_groups = len(duplicates) + total_duplicate_files = sum(len(paths) - 1 for paths in duplicates.values()) + + # Calculate bytes that would be saved + bytes_to_save = 0 + for paths in duplicates.values(): + # Skip the first (canonical) file, count size of the rest + for dup_path in paths[1:]: + full_path = self.get_absolute_path(dup_path) + try: + bytes_to_save += os.path.getsize(full_path) + except OSError: + pass + + return (duplicate_groups, total_duplicate_files, bytes_to_save) + + def find_unused_assets(self) -> List[str]: + """ + Find assets that exist in the assets folder but have no references. + + Returns: + List of relative paths to unused assets + """ + unused = [] + + if not os.path.exists(self.assets_folder): + return unused + + # Get all files in assets folder + for root, dirs, files in os.walk(self.assets_folder): + for filename in files: + file_path = os.path.join(root, filename) + relative_path = os.path.relpath(file_path, self.project_folder) + + # Check if this asset has any references + ref_count = self.reference_counts.get(relative_path, 0) + if ref_count <= 0: + unused.append(relative_path) + + if unused: + print(f"AssetManager: Found {len(unused)} unused assets") + + return unused + + def get_unused_stats(self) -> Tuple[int, int]: + """ + Get statistics about unused assets without modifying anything. + + Returns: + Tuple of (unused_file_count, total_bytes) + """ + unused = self.find_unused_assets() + if not unused: + return (0, 0) + + total_bytes = 0 + for asset_path in unused: + full_path = self.get_absolute_path(asset_path) + try: + total_bytes += os.path.getsize(full_path) + except OSError: + pass + + return (len(unused), total_bytes) + + def remove_unused_assets(self) -> Tuple[int, int]: + """ + Remove all unused assets from the assets folder. + + Returns: + Tuple of (files_removed, bytes_freed) + """ + unused = self.find_unused_assets() + if not unused: + print("AssetManager: No unused assets to remove") + return (0, 0) + + files_removed = 0 + bytes_freed = 0 + + for asset_path in unused: + full_path = self.get_absolute_path(asset_path) + + try: + file_size = os.path.getsize(full_path) + except OSError: + file_size = 0 + + try: + if os.path.exists(full_path): + os.remove(full_path) + files_removed += 1 + bytes_freed += file_size + print(f"AssetManager: Removed unused asset {asset_path}") + + # Clean up tracking + if asset_path in self.reference_counts: + del self.reference_counts[asset_path] + if asset_path in self.asset_hashes: + del self.asset_hashes[asset_path] + + except Exception as e: + print(f"AssetManager: Error removing unused asset {asset_path}: {e}") + + print(f"AssetManager: Removed {files_removed} unused assets, freed {bytes_freed} bytes") + return (files_removed, bytes_freed) diff --git a/pyPhotoAlbum/mixins/operations/file_ops.py b/pyPhotoAlbum/mixins/operations/file_ops.py index 02fafaf..7bc96a8 100644 --- a/pyPhotoAlbum/mixins/operations/file_ops.py +++ b/pyPhotoAlbum/mixins/operations/file_ops.py @@ -640,6 +640,165 @@ class FileOperationsMixin: else: self.show_status("PDF export failed to start", 3000) + @ribbon_action(label="Clean Assets", tooltip="Find and remove duplicate or unused image files", tab="Home", group="File") + def clean_assets(self): + """Find and remove duplicate and unused asset files to save space""" + from PyQt6.QtWidgets import QProgressDialog, QCheckBox + from PyQt6.QtCore import Qt + + # Helper to format bytes + def format_bytes(num_bytes): + if num_bytes >= 1024 * 1024: + return f"{num_bytes / (1024 * 1024):.1f} MB" + elif num_bytes >= 1024: + return f"{num_bytes / 1024:.1f} KB" + else: + return f"{num_bytes} bytes" + + # Scan for issues with progress dialog + progress = QProgressDialog("Scanning assets...", "Cancel", 0, 100, self) + progress.setWindowTitle("Clean Assets") + progress.setWindowModality(Qt.WindowModality.WindowModal) + progress.setValue(10) + + # Compute hashes for duplicate detection + self.project.asset_manager.compute_all_hashes() + progress.setValue(40) + + if progress.wasCanceled(): + return + + # Get duplicate stats + dup_groups, dup_files, dup_bytes = self.project.asset_manager.get_duplicate_stats() + progress.setValue(60) + + # Get unused stats + unused_files, unused_bytes = self.project.asset_manager.get_unused_stats() + progress.setValue(80) + + progress.close() + + # Check if there's anything to clean + if dup_files == 0 and unused_files == 0: + QMessageBox.information( + self, + "Assets Clean", + "No duplicate or unused files were found in your project assets." + ) + return + + # Build dialog with checkboxes for each cleanup type + dialog = QDialog(self) + dialog.setWindowTitle("Clean Assets") + dialog.setMinimumWidth(450) + + layout = QVBoxLayout() + + # Info label + info_label = QLabel("Select which cleanup operations to perform:") + layout.addWidget(info_label) + + # Duplicates checkbox + dup_checkbox = None + if dup_files > 0: + dup_checkbox = QCheckBox( + f"Remove {dup_files} duplicate file(s) in {dup_groups} group(s) " + f"(saves {format_bytes(dup_bytes)})" + ) + dup_checkbox.setChecked(True) + dup_checkbox.setToolTip( + "Duplicate files have identical content but different names.\n" + "Image references will be automatically updated to use the kept file." + ) + layout.addWidget(dup_checkbox) + + # Unused checkbox + unused_checkbox = None + if unused_files > 0: + unused_checkbox = QCheckBox( + f"Remove {unused_files} unused file(s) (saves {format_bytes(unused_bytes)})" + ) + unused_checkbox.setChecked(True) + unused_checkbox.setToolTip( + "Unused files exist in the assets folder but are not referenced\n" + "by any image element in your project." + ) + layout.addWidget(unused_checkbox) + + # Summary + total_files = dup_files + unused_files + total_bytes = dup_bytes + unused_bytes + summary_label = QLabel(f"\nTotal potential savings: {format_bytes(total_bytes)} from {total_files} file(s)") + summary_label.setStyleSheet("font-weight: bold;") + layout.addWidget(summary_label) + + # Buttons + button_layout = QHBoxLayout() + cancel_btn = QPushButton("Cancel") + cancel_btn.clicked.connect(dialog.reject) + clean_btn = QPushButton("Clean Selected") + clean_btn.clicked.connect(dialog.accept) + clean_btn.setDefault(True) + + button_layout.addStretch() + button_layout.addWidget(cancel_btn) + button_layout.addWidget(clean_btn) + layout.addLayout(button_layout) + + dialog.setLayout(layout) + + if dialog.exec() != QDialog.DialogCode.Accepted: + return + + # Perform selected cleanups + total_removed = 0 + total_saved = 0 + + # Remove duplicates if selected + if dup_checkbox and dup_checkbox.isChecked(): + def update_image_references(old_path: str, new_path: str): + """Update all ImageData elements that reference the old path""" + from pyPhotoAlbum.models import ImageData + + for page in self.project.pages: + for element in page.layout.elements: + if isinstance(element, ImageData) and element.image_path == old_path: + element.image_path = new_path + element.mark_modified() + print(f"Updated image reference: {old_path} -> {new_path}") + + removed, saved = self.project.asset_manager.deduplicate_assets( + update_references_callback=update_image_references + ) + total_removed += removed + total_saved += saved + + # Remove unused if selected + if unused_checkbox and unused_checkbox.isChecked(): + removed, saved = self.project.asset_manager.remove_unused_assets() + total_removed += removed + total_saved += saved + + if total_removed > 0: + # Mark project as dirty since we modified it + self.project.mark_dirty() + + # Update view + self.update_view() + + # Show result + QMessageBox.information( + self, + "Cleanup Complete", + f"Removed {total_removed} file(s).\n\n" + f"Saved {format_bytes(total_saved)} of disk space.\n\n" + f"Remember to save your project to preserve these changes." + ) + + self.show_status(f"Asset cleanup complete: removed {total_removed} files, saved {format_bytes(total_saved)}") + else: + self.show_status("No files were removed") + @ribbon_action(label="About", tooltip="About pyPhotoAlbum and data format version", tab="Home", group="File") def show_about(self): """Show about dialog with version information""" diff --git a/pyPhotoAlbum/thumbnail_browser.py b/pyPhotoAlbum/thumbnail_browser.py index ede5993..3420aaf 100644 --- a/pyPhotoAlbum/thumbnail_browser.py +++ b/pyPhotoAlbum/thumbnail_browser.py @@ -835,6 +835,8 @@ class ThumbnailBrowserDock(QDockWidget): def _apply_sort(self): """Apply current sort mode to image files.""" + if not hasattr(self.gl_widget, 'image_files') or not self.gl_widget.image_files: + return if self.current_sort == "name": # Sort by filename only (not full path) self.gl_widget.image_files.sort(key=lambda p: p.name.lower()) diff --git a/tests/test_asset_manager.py b/tests/test_asset_manager.py new file mode 100644 index 0000000..a60905e --- /dev/null +++ b/tests/test_asset_manager.py @@ -0,0 +1,469 @@ +""" +Tests for AssetManager functionality including deduplication and unused asset detection +""" + +import os +import pytest +import tempfile +import shutil +from PIL import Image + +from pyPhotoAlbum.asset_manager import AssetManager, compute_file_md5 + + +class TestComputeFileMd5: + """Tests for the compute_file_md5 function""" + + def test_compute_md5_existing_file(self, tmp_path): + """Test MD5 computation for an existing file""" + # Create a test file + test_file = tmp_path / "test.txt" + test_file.write_text("Hello, World!") + + md5_hash = compute_file_md5(str(test_file)) + assert md5_hash is not None + # Known MD5 for "Hello, World!" + assert md5_hash == "65a8e27d8879283831b664bd8b7f0ad4" + + def test_compute_md5_nonexistent_file(self): + """Test MD5 computation returns None for non-existent file""" + md5_hash = compute_file_md5("/nonexistent/path/file.txt") + assert md5_hash is None + + def test_compute_md5_same_content_same_hash(self, tmp_path): + """Test that identical content produces identical hashes""" + content = b"Test content for hashing" + + file1 = tmp_path / "file1.bin" + file2 = tmp_path / "file2.bin" + file1.write_bytes(content) + file2.write_bytes(content) + + hash1 = compute_file_md5(str(file1)) + hash2 = compute_file_md5(str(file2)) + + assert hash1 == hash2 + + def test_compute_md5_different_content_different_hash(self, tmp_path): + """Test that different content produces different hashes""" + file1 = tmp_path / "file1.txt" + file2 = tmp_path / "file2.txt" + file1.write_text("Content A") + file2.write_text("Content B") + + hash1 = compute_file_md5(str(file1)) + hash2 = compute_file_md5(str(file2)) + + assert hash1 != hash2 + + +class TestAssetManagerDeduplication: + """Tests for AssetManager deduplication methods""" + + @pytest.fixture + def asset_manager(self, tmp_path): + """Create an AssetManager with a temporary project folder""" + project_folder = str(tmp_path / "test_project") + os.makedirs(project_folder) + return AssetManager(project_folder) + + @pytest.fixture + def create_test_image(self): + """Factory fixture for creating test images""" + def _create(path, color="red", size=(100, 100)): + img = Image.new("RGB", size, color=color) + img.save(path) + return path + return _create + + def test_compute_all_hashes_empty_folder(self, asset_manager): + """Test hash computation on empty assets folder""" + hashes = asset_manager.compute_all_hashes() + assert len(hashes) == 0 + + def test_compute_all_hashes_with_files(self, asset_manager, create_test_image): + """Test hash computation with files in assets folder""" + # Create some test images + img1 = os.path.join(asset_manager.assets_folder, "image1.png") + img2 = os.path.join(asset_manager.assets_folder, "image2.png") + create_test_image(img1, color="red") + create_test_image(img2, color="blue") + + hashes = asset_manager.compute_all_hashes() + + assert len(hashes) == 2 + assert "assets/image1.png" in hashes + assert "assets/image2.png" in hashes + + def test_find_duplicates_no_duplicates(self, asset_manager, create_test_image): + """Test finding duplicates when there are none""" + img1 = os.path.join(asset_manager.assets_folder, "image1.png") + img2 = os.path.join(asset_manager.assets_folder, "image2.png") + create_test_image(img1, color="red") + create_test_image(img2, color="blue") + + duplicates = asset_manager.find_duplicates() + assert len(duplicates) == 0 + + def test_find_duplicates_with_duplicates(self, asset_manager, tmp_path): + """Test finding actual duplicate files""" + # Create a source image + source_img = tmp_path / "source.png" + img = Image.new("RGB", (50, 50), color="green") + img.save(str(source_img)) + + # Copy the same image twice to assets folder + dup1 = os.path.join(asset_manager.assets_folder, "dup1.png") + dup2 = os.path.join(asset_manager.assets_folder, "dup2.png") + shutil.copy(str(source_img), dup1) + shutil.copy(str(source_img), dup2) + + duplicates = asset_manager.find_duplicates() + + assert len(duplicates) == 1 # One group of duplicates + # The group should contain both files + for paths in duplicates.values(): + assert len(paths) == 2 + assert "assets/dup1.png" in paths + assert "assets/dup2.png" in paths + + def test_get_duplicate_stats_no_duplicates(self, asset_manager, create_test_image): + """Test duplicate stats when there are no duplicates""" + img1 = os.path.join(asset_manager.assets_folder, "image1.png") + create_test_image(img1, color="red") + + groups, files, bytes_to_save = asset_manager.get_duplicate_stats() + + assert groups == 0 + assert files == 0 + assert bytes_to_save == 0 + + def test_get_duplicate_stats_with_duplicates(self, asset_manager, tmp_path): + """Test duplicate stats with actual duplicates""" + # Create a source image + source_img = tmp_path / "source.png" + img = Image.new("RGB", (100, 100), color="purple") + img.save(str(source_img)) + + # Copy to assets folder 3 times (creates 2 duplicates) + for i in range(3): + dest = os.path.join(asset_manager.assets_folder, f"image{i}.png") + shutil.copy(str(source_img), dest) + + groups, files, bytes_to_save = asset_manager.get_duplicate_stats() + + assert groups == 1 # One group + assert files == 2 # Two extra copies to remove + assert bytes_to_save > 0 + + def test_deduplicate_assets_removes_files(self, asset_manager, tmp_path): + """Test that deduplication actually removes duplicate files""" + # Create a source image + source_img = tmp_path / "source.png" + img = Image.new("RGB", (50, 50), color="yellow") + img.save(str(source_img)) + + # Copy to assets folder 3 times + for i in range(3): + dest = os.path.join(asset_manager.assets_folder, f"image{i}.png") + shutil.copy(str(source_img), dest) + asset_manager.reference_counts[f"assets/image{i}.png"] = 1 + + # Count files before + files_before = len(os.listdir(asset_manager.assets_folder)) + assert files_before == 3 + + # Run deduplication + files_removed, bytes_saved = asset_manager.deduplicate_assets() + + # Check results + assert files_removed == 2 + assert bytes_saved > 0 + + # Count files after + files_after = len(os.listdir(asset_manager.assets_folder)) + assert files_after == 1 + + def test_deduplicate_assets_updates_callback(self, asset_manager, tmp_path): + """Test that deduplication calls the update callback correctly""" + # Create a source image + source_img = tmp_path / "source.png" + img = Image.new("RGB", (50, 50), color="cyan") + img.save(str(source_img)) + + # Copy to assets folder + dest1 = os.path.join(asset_manager.assets_folder, "a_first.png") + dest2 = os.path.join(asset_manager.assets_folder, "b_second.png") + shutil.copy(str(source_img), dest1) + shutil.copy(str(source_img), dest2) + + # Track callback invocations + callback_calls = [] + + def track_callback(old_path, new_path): + callback_calls.append((old_path, new_path)) + + # Run deduplication + asset_manager.deduplicate_assets(update_references_callback=track_callback) + + # Callback should have been called for the duplicate + assert len(callback_calls) == 1 + # b_second.png should be remapped to a_first.png (alphabetical order) + assert callback_calls[0] == ("assets/b_second.png", "assets/a_first.png") + + def test_deduplicate_assets_transfers_reference_counts(self, asset_manager, tmp_path): + """Test that reference counts are properly transferred during deduplication""" + # Create a source image + source_img = tmp_path / "source.png" + img = Image.new("RGB", (50, 50), color="magenta") + img.save(str(source_img)) + + # Copy to assets folder + dest1 = os.path.join(asset_manager.assets_folder, "a_first.png") + dest2 = os.path.join(asset_manager.assets_folder, "b_second.png") + shutil.copy(str(source_img), dest1) + shutil.copy(str(source_img), dest2) + + # Set reference counts + asset_manager.reference_counts["assets/a_first.png"] = 2 + asset_manager.reference_counts["assets/b_second.png"] = 3 + + # Run deduplication + asset_manager.deduplicate_assets() + + # Check reference counts were merged + assert asset_manager.reference_counts.get("assets/a_first.png") == 5 + assert "assets/b_second.png" not in asset_manager.reference_counts + + def test_serialize_includes_hashes(self, asset_manager, create_test_image): + """Test that serialization includes asset hashes""" + img1 = os.path.join(asset_manager.assets_folder, "image1.png") + create_test_image(img1, color="red") + asset_manager.compute_all_hashes() + + data = asset_manager.serialize() + + assert "asset_hashes" in data + assert "assets/image1.png" in data["asset_hashes"] + + def test_deserialize_restores_hashes(self, asset_manager): + """Test that deserialization restores asset hashes""" + test_data = { + "reference_counts": {"assets/test.png": 1}, + "asset_hashes": {"assets/test.png": "abc123hash"} + } + + asset_manager.deserialize(test_data) + + assert asset_manager.asset_hashes.get("assets/test.png") == "abc123hash" + + def test_compute_asset_hash_single_file(self, asset_manager, create_test_image): + """Test computing hash for a single asset""" + img_path = os.path.join(asset_manager.assets_folder, "single.png") + create_test_image(img_path, color="orange") + + hash_result = asset_manager.compute_asset_hash("assets/single.png") + + assert hash_result is not None + assert "assets/single.png" in asset_manager.asset_hashes + assert asset_manager.asset_hashes["assets/single.png"] == hash_result + + +class TestAssetManagerIntegration: + """Integration tests for AssetManager with import and deduplication""" + + @pytest.fixture + def asset_manager(self, tmp_path): + """Create an AssetManager with a temporary project folder""" + project_folder = str(tmp_path / "test_project") + os.makedirs(project_folder) + return AssetManager(project_folder) + + def test_import_then_deduplicate(self, asset_manager, tmp_path): + """Test importing duplicate images and then deduplicating""" + # Create a source image + source_img = tmp_path / "source.png" + img = Image.new("RGB", (80, 80), color="navy") + img.save(str(source_img)) + + # Import the same image twice + path1 = asset_manager.import_asset(str(source_img)) + path2 = asset_manager.import_asset(str(source_img)) + + assert path1 != path2 # Should have different names due to collision handling + + # Check both files exist + assert os.path.exists(asset_manager.get_absolute_path(path1)) + assert os.path.exists(asset_manager.get_absolute_path(path2)) + + # Find duplicates + duplicates = asset_manager.find_duplicates() + assert len(duplicates) == 1 + + # Deduplicate + files_removed, _ = asset_manager.deduplicate_assets() + assert files_removed == 1 + + # Only one file should remain + files_in_assets = os.listdir(asset_manager.assets_folder) + assert len(files_in_assets) == 1 + + +class TestAssetManagerUnused: + """Tests for AssetManager unused asset detection and removal""" + + @pytest.fixture + def asset_manager(self, tmp_path): + """Create an AssetManager with a temporary project folder""" + project_folder = str(tmp_path / "test_project") + os.makedirs(project_folder) + return AssetManager(project_folder) + + @pytest.fixture + def create_test_image(self): + """Factory fixture for creating test images""" + def _create(path, color="red", size=(100, 100)): + img = Image.new("RGB", size, color=color) + img.save(path) + return path + return _create + + def test_find_unused_assets_empty_folder(self, asset_manager): + """Test finding unused assets in empty folder""" + unused = asset_manager.find_unused_assets() + assert len(unused) == 0 + + def test_find_unused_assets_all_referenced(self, asset_manager, create_test_image): + """Test finding unused assets when all are referenced""" + img1 = os.path.join(asset_manager.assets_folder, "image1.png") + img2 = os.path.join(asset_manager.assets_folder, "image2.png") + create_test_image(img1, color="red") + create_test_image(img2, color="blue") + + # Add references for both + asset_manager.reference_counts["assets/image1.png"] = 1 + asset_manager.reference_counts["assets/image2.png"] = 2 + + unused = asset_manager.find_unused_assets() + assert len(unused) == 0 + + def test_find_unused_assets_some_unreferenced(self, asset_manager, create_test_image): + """Test finding unused assets when some have no references""" + img1 = os.path.join(asset_manager.assets_folder, "used.png") + img2 = os.path.join(asset_manager.assets_folder, "unused.png") + create_test_image(img1, color="red") + create_test_image(img2, color="blue") + + # Only reference one + asset_manager.reference_counts["assets/used.png"] = 1 + + unused = asset_manager.find_unused_assets() + assert len(unused) == 1 + assert "assets/unused.png" in unused + + def test_find_unused_assets_zero_reference_count(self, asset_manager, create_test_image): + """Test that zero reference count is considered unused""" + img = os.path.join(asset_manager.assets_folder, "orphan.png") + create_test_image(img, color="red") + + # Set reference count to 0 + asset_manager.reference_counts["assets/orphan.png"] = 0 + + unused = asset_manager.find_unused_assets() + assert len(unused) == 1 + assert "assets/orphan.png" in unused + + def test_get_unused_stats_no_unused(self, asset_manager, create_test_image): + """Test unused stats when all assets are referenced""" + img = os.path.join(asset_manager.assets_folder, "image.png") + create_test_image(img, color="red") + asset_manager.reference_counts["assets/image.png"] = 1 + + count, total_bytes = asset_manager.get_unused_stats() + assert count == 0 + assert total_bytes == 0 + + def test_get_unused_stats_with_unused(self, asset_manager, create_test_image): + """Test unused stats with unreferenced files""" + img1 = os.path.join(asset_manager.assets_folder, "unused1.png") + img2 = os.path.join(asset_manager.assets_folder, "unused2.png") + create_test_image(img1, color="red") + create_test_image(img2, color="blue") + + # No references for either file + + count, total_bytes = asset_manager.get_unused_stats() + assert count == 2 + assert total_bytes > 0 + + def test_remove_unused_assets_removes_files(self, asset_manager, create_test_image): + """Test that unused assets are actually removed""" + used_path = os.path.join(asset_manager.assets_folder, "used.png") + unused_path = os.path.join(asset_manager.assets_folder, "unused.png") + create_test_image(used_path, color="red") + create_test_image(unused_path, color="blue") + + # Only reference the used file + asset_manager.reference_counts["assets/used.png"] = 1 + + # Remove unused + files_removed, bytes_freed = asset_manager.remove_unused_assets() + + assert files_removed == 1 + assert bytes_freed > 0 + + # Check files on disk + assert os.path.exists(used_path) + assert not os.path.exists(unused_path) + + def test_remove_unused_assets_no_unused(self, asset_manager, create_test_image): + """Test removing unused when all assets are referenced""" + img = os.path.join(asset_manager.assets_folder, "used.png") + create_test_image(img, color="red") + asset_manager.reference_counts["assets/used.png"] = 1 + + files_removed, bytes_freed = asset_manager.remove_unused_assets() + + assert files_removed == 0 + assert bytes_freed == 0 + assert os.path.exists(img) + + def test_remove_unused_assets_cleans_tracking(self, asset_manager, create_test_image): + """Test that removing unused assets cleans up internal tracking""" + img = os.path.join(asset_manager.assets_folder, "orphan.png") + create_test_image(img, color="red") + + # Set up tracking with zero refs and a hash + asset_manager.reference_counts["assets/orphan.png"] = 0 + asset_manager.asset_hashes["assets/orphan.png"] = "somehash" + + asset_manager.remove_unused_assets() + + # Tracking should be cleaned up + assert "assets/orphan.png" not in asset_manager.reference_counts + assert "assets/orphan.png" not in asset_manager.asset_hashes + + def test_remove_unused_preserves_referenced(self, asset_manager, create_test_image): + """Test that removing unused preserves all referenced assets""" + # Create several files + for i in range(5): + img = os.path.join(asset_manager.assets_folder, f"image{i}.png") + create_test_image(img, color="red") + + # Reference only some of them + asset_manager.reference_counts["assets/image0.png"] = 1 + asset_manager.reference_counts["assets/image2.png"] = 3 + asset_manager.reference_counts["assets/image4.png"] = 1 + + files_removed, _ = asset_manager.remove_unused_assets() + + assert files_removed == 2 # image1 and image3 + + # Check that referenced files still exist + assert os.path.exists(os.path.join(asset_manager.assets_folder, "image0.png")) + assert os.path.exists(os.path.join(asset_manager.assets_folder, "image2.png")) + assert os.path.exists(os.path.join(asset_manager.assets_folder, "image4.png")) + + # Check that unreferenced files are gone + assert not os.path.exists(os.path.join(asset_manager.assets_folder, "image1.png")) + assert not os.path.exists(os.path.join(asset_manager.assets_folder, "image3.png"))