All checks were successful
Python CI / test (push) Successful in 1m20s
Lint / lint (push) Successful in 1m4s
Tests / test (3.11) (push) Successful in 1m27s
Tests / test (3.12) (push) Successful in 2m25s
Tests / test (3.13) (push) Successful in 2m52s
Tests / test (3.14) (push) Successful in 1m9s
249 lines
9.0 KiB
Python
249 lines
9.0 KiB
Python
"""
|
|
Async project loader for pyPhotoAlbum
|
|
|
|
Loads projects asynchronously with progress updates to prevent UI freezing.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import zipfile
|
|
import tempfile
|
|
from typing import Optional, Tuple
|
|
from pathlib import Path
|
|
from PyQt6.QtCore import QThread, pyqtSignal
|
|
|
|
from pyPhotoAlbum.project import Project
|
|
from pyPhotoAlbum.models import ImageData, set_asset_resolution_context
|
|
from pyPhotoAlbum.version_manager import (
|
|
CURRENT_DATA_VERSION,
|
|
check_version_compatibility,
|
|
VersionCompatibility,
|
|
DataMigration,
|
|
)
|
|
|
|
|
|
class AsyncProjectLoader(QThread):
|
|
"""
|
|
Async worker thread for loading projects from ZIP files.
|
|
|
|
Signals:
|
|
progress_updated(int, int, str): Emitted with (current, total, message)
|
|
load_complete(Project): Emitted when loading succeeds
|
|
load_failed(str): Emitted when loading fails with error message
|
|
"""
|
|
|
|
progress_updated = pyqtSignal(int, int, str) # current, total, message
|
|
load_complete = pyqtSignal(object) # Project instance
|
|
load_failed = pyqtSignal(str) # error message
|
|
|
|
def __init__(self, zip_path: str, extract_to: Optional[str] = None):
|
|
super().__init__()
|
|
self.zip_path = zip_path
|
|
self.extract_to = extract_to
|
|
self._cancelled = False
|
|
|
|
def cancel(self):
|
|
"""Cancel the loading operation"""
|
|
self._cancelled = True
|
|
|
|
def run(self):
|
|
"""Run the async loading operation"""
|
|
try:
|
|
if not os.path.exists(self.zip_path):
|
|
self.load_failed.emit(f"ZIP file not found: {self.zip_path}")
|
|
return
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Starting
|
|
self.progress_updated.emit(0, 100, "Preparing to load...")
|
|
|
|
# Track if we created a temp directory
|
|
temp_dir_obj = None
|
|
|
|
# Determine extraction directory
|
|
if self.extract_to is None:
|
|
zip_basename = os.path.splitext(os.path.basename(self.zip_path))[0]
|
|
temp_dir_obj = tempfile.TemporaryDirectory(prefix=f"pyPhotoAlbum_{zip_basename}_")
|
|
extract_to = temp_dir_obj.name
|
|
else:
|
|
os.makedirs(self.extract_to, exist_ok=True)
|
|
extract_to = self.extract_to
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Extracting ZIP
|
|
self.progress_updated.emit(10, 100, "Extracting project files...")
|
|
|
|
# Extract ZIP contents with progress
|
|
with zipfile.ZipFile(self.zip_path, "r") as zipf:
|
|
file_list = zipf.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, filename in enumerate(file_list):
|
|
if self._cancelled:
|
|
return
|
|
|
|
zipf.extract(filename, extract_to)
|
|
|
|
# Update progress every 10 files or on last file
|
|
if i % 10 == 0 or i == total_files - 1:
|
|
progress = 10 + int((i / total_files) * 30) # 10-40%
|
|
self.progress_updated.emit(progress, 100, f"Extracting files... ({i + 1}/{total_files})")
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Loading project data
|
|
self.progress_updated.emit(45, 100, "Loading project data...")
|
|
|
|
# Load project.json
|
|
project_json_path = os.path.join(extract_to, "project.json")
|
|
if not os.path.exists(project_json_path):
|
|
self.load_failed.emit("Invalid project file: project.json not found")
|
|
return
|
|
|
|
with open(project_json_path, "r") as f:
|
|
project_data = json.load(f)
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Checking version
|
|
self.progress_updated.emit(55, 100, "Checking version compatibility...")
|
|
|
|
# Check version compatibility
|
|
file_version = project_data.get("data_version", project_data.get("serialization_version", "1.0"))
|
|
|
|
is_compatible, error_msg = check_version_compatibility(file_version, self.zip_path)
|
|
if not is_compatible:
|
|
self.load_failed.emit(error_msg)
|
|
return
|
|
|
|
# Apply migrations if needed
|
|
if VersionCompatibility.needs_migration(file_version):
|
|
self.progress_updated.emit(60, 100, f"Migrating from version {file_version}...")
|
|
try:
|
|
project_data = DataMigration.migrate(project_data, file_version, CURRENT_DATA_VERSION)
|
|
except Exception as e:
|
|
self.load_failed.emit(f"Migration failed: {str(e)}")
|
|
return
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Creating project
|
|
self.progress_updated.emit(70, 100, "Creating project...")
|
|
|
|
# Create new project
|
|
project_name = project_data.get("name", "Untitled Project")
|
|
project = Project(name=project_name, folder_path=extract_to)
|
|
|
|
# Deserialize project data
|
|
project.deserialize(project_data)
|
|
|
|
# Update folder path to extraction location
|
|
project.folder_path = extract_to
|
|
project.asset_manager.project_folder = extract_to
|
|
project.asset_manager.assets_folder = os.path.join(extract_to, "assets")
|
|
|
|
# Attach temporary directory to project (if we created one)
|
|
if temp_dir_obj is not None:
|
|
project._temp_dir = temp_dir_obj
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Normalizing paths
|
|
self.progress_updated.emit(85, 100, "Normalizing asset paths...")
|
|
|
|
# Normalize asset paths
|
|
self._normalize_asset_paths(project, extract_to)
|
|
|
|
# Progress: Setting up asset resolution
|
|
self.progress_updated.emit(95, 100, "Setting up asset resolution...")
|
|
|
|
# Set asset resolution context
|
|
# Only set project folder - search paths are reserved for healing functionality
|
|
set_asset_resolution_context(extract_to)
|
|
|
|
if self._cancelled:
|
|
return
|
|
|
|
# Progress: Complete
|
|
self.progress_updated.emit(100, 100, "Loading complete!")
|
|
|
|
# Emit success
|
|
self.load_complete.emit(project)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error loading project: {str(e)}"
|
|
self.load_failed.emit(error_msg)
|
|
|
|
def _normalize_asset_paths(self, project: Project, project_folder: str):
|
|
"""
|
|
Normalize asset paths in a loaded project to be relative to the project folder.
|
|
"""
|
|
normalized_count = 0
|
|
|
|
for page in project.pages:
|
|
for element in page.layout.elements:
|
|
if isinstance(element, ImageData) and element.image_path:
|
|
original_path = element.image_path
|
|
|
|
# Skip if already a simple relative path
|
|
if not os.path.isabs(original_path) and not original_path.startswith("./projects/"):
|
|
continue
|
|
|
|
# Pattern 1: "./projects/XXX/assets/filename.jpg" -> "assets/filename.jpg"
|
|
if "/assets/" in original_path:
|
|
parts = original_path.split("/assets/")
|
|
if len(parts) == 2:
|
|
new_path = os.path.join("assets", parts[1])
|
|
element.image_path = new_path
|
|
normalized_count += 1
|
|
continue
|
|
|
|
# Pattern 2: Absolute path - try to make it relative
|
|
if os.path.isabs(original_path):
|
|
try:
|
|
new_path = os.path.relpath(original_path, project_folder)
|
|
element.image_path = new_path
|
|
normalized_count += 1
|
|
except ValueError:
|
|
pass
|
|
|
|
if normalized_count > 0:
|
|
print(f"Normalized {normalized_count} asset paths")
|
|
|
|
|
|
def load_from_zip_async(
|
|
zip_path: str, extract_to: Optional[str] = None, progress_callback=None, complete_callback=None, error_callback=None
|
|
) -> AsyncProjectLoader:
|
|
"""
|
|
Load a project from a ZIP file asynchronously.
|
|
|
|
Args:
|
|
zip_path: Path to the ZIP file to load
|
|
extract_to: Optional directory to extract to. If None, uses a temporary directory.
|
|
progress_callback: Optional callback(current, total, message) for progress updates
|
|
complete_callback: Optional callback(project) when loading completes
|
|
error_callback: Optional callback(error_msg) when loading fails
|
|
|
|
Returns:
|
|
AsyncProjectLoader instance (already started)
|
|
"""
|
|
loader = AsyncProjectLoader(zip_path, extract_to)
|
|
|
|
if progress_callback:
|
|
loader.progress_updated.connect(progress_callback)
|
|
if complete_callback:
|
|
loader.load_complete.connect(complete_callback)
|
|
if error_callback:
|
|
loader.load_failed.connect(error_callback)
|
|
|
|
loader.start()
|
|
return loader
|