Compare commits

..

2 Commits

Author SHA1 Message Date
7d8d2d42f8 Use pythons tempdir
Some checks failed
Python CI / test (push) Successful in 1m13s
Lint / lint (push) Successful in 1m16s
Tests / test (3.10) (push) Failing after 57s
Tests / test (3.11) (push) Failing after 53s
Tests / test (3.9) (push) Failing after 56s
2025-11-11 17:00:45 +01:00
099ccb4c6b asnyc background 2025-11-11 15:34:04 +01:00
15 changed files with 1782 additions and 142 deletions

View File

@ -444,3 +444,182 @@ class AlignmentManager:
element.size = (new_width, new_height) element.size = (new_width, new_height)
return (element, old_pos, old_size) return (element, old_pos, old_size)
@staticmethod
def maximize_pattern(
elements: List[BaseLayoutElement],
page_size: Tuple[float, float],
min_gap: float = 2.0,
max_iterations: int = 100,
growth_rate: float = 0.05
) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
"""
Maximize element sizes using a crystal growth algorithm.
Elements grow until they are close to borders or each other.
Args:
elements: List of elements to maximize
page_size: (width, height) of the page in mm
min_gap: Minimum gap to maintain between elements and borders (in mm)
max_iterations: Maximum number of growth iterations
growth_rate: Percentage to grow each iteration (0.05 = 5%)
Returns:
List of (element, old_position, old_size) tuples for undo
"""
if not elements:
return []
page_width, page_height = page_size
changes = []
# Record initial states
for elem in elements:
changes.append((elem, elem.position, elem.size))
# Helper function to check if element would collide with boundaries or other elements
def check_collision(elem_idx: int, new_size: Tuple[float, float]) -> bool:
elem = elements[elem_idx]
x, y = elem.position
w, h = new_size
# Check page boundaries
if x < min_gap or y < min_gap:
return True
if x + w > page_width - min_gap:
return True
if y + h > page_height - min_gap:
return True
# Check collision with other elements
for i, other in enumerate(elements):
if i == elem_idx:
continue
other_x, other_y = other.position
other_w, other_h = other.size
# Calculate distances between rectangles
horizontal_gap = max(
other_x - (x + w), # Other is to the right
x - (other_x + other_w) # Other is to the left
)
vertical_gap = max(
other_y - (y + h), # Other is below
y - (other_y + other_h) # Other is above
)
# If rectangles overlap or are too close in both dimensions
if horizontal_gap < min_gap and vertical_gap < min_gap:
return True
return False
# Helper function to get the maximum scale factor for an element
def get_max_scale(elem_idx: int, current_scale: float) -> float:
elem = elements[elem_idx]
old_size = changes[elem_idx][2]
# Binary search for maximum scale
low, high = current_scale, current_scale * 3.0
best_scale = current_scale
for _ in range(20): # Binary search iterations
mid = (low + high) / 2.0
test_size = (old_size[0] * mid, old_size[1] * mid)
if check_collision(elem_idx, test_size):
high = mid
else:
best_scale = mid
low = mid
if high - low < 0.001:
break
return best_scale
# Growth algorithm - iterative expansion
scales = [1.0] * len(elements)
for iteration in range(max_iterations):
any_growth = False
for i, elem in enumerate(elements):
old_size = changes[i][2]
# Try to grow this element
new_scale = scales[i] * (1.0 + growth_rate)
new_size = (old_size[0] * new_scale, old_size[1] * new_scale)
if not check_collision(i, new_size):
scales[i] = new_scale
elem.size = new_size
any_growth = True
else:
# Can't grow uniformly, try to find maximum possible scale
max_scale = get_max_scale(i, scales[i])
if max_scale > scales[i]:
scales[i] = max_scale
elem.size = (old_size[0] * max_scale, old_size[1] * max_scale)
any_growth = True
# If no element could grow, we're done
if not any_growth:
break
# Optional: Center elements slightly within their constrained space
for elem in elements:
x, y = elem.position
w, h = elem.size
# Calculate available space on each side
space_left = x - min_gap
space_right = (page_width - min_gap) - (x + w)
space_top = y - min_gap
space_bottom = (page_height - min_gap) - (y + h)
# Micro-adjust position to center in available space
if space_left >= 0 and space_right >= 0:
adjust_x = (space_right - space_left) / 4.0 # Gentle centering
new_x = max(min_gap, min(page_width - w - min_gap, x + adjust_x))
# Verify this doesn't cause collision
old_pos = elem.position
elem.position = (new_x, y)
collision = False
for other in elements:
if other is elem:
continue
ox, oy = other.position
ow, oh = other.size
if (abs((new_x + w/2) - (ox + ow/2)) < (w + ow)/2 + min_gap and
abs((y + h/2) - (oy + oh/2)) < (h + oh)/2 + min_gap):
collision = True
break
if collision:
elem.position = old_pos
if space_top >= 0 and space_bottom >= 0:
adjust_y = (space_bottom - space_top) / 4.0
new_y = max(min_gap, min(page_height - h - min_gap, y + adjust_y))
old_pos = elem.position
elem.position = (elem.position[0], new_y)
collision = False
for other in elements:
if other is elem:
continue
ox, oy = other.position
ow, oh = other.size
if (abs((elem.position[0] + w/2) - (ox + ow/2)) < (w + ow)/2 + min_gap and
abs((new_y + h/2) - (oy + oh/2)) < (h + oh)/2 + min_gap):
collision = True
break
if collision:
elem.position = old_pos
return changes

View File

@ -0,0 +1,703 @@
"""
Async backend for non-blocking image loading and PDF generation.
This module provides:
- AsyncImageLoader: Load and process images in background
- AsyncPDFGenerator: Generate PDFs without blocking UI
- ImageCache: Intelligent caching with LRU eviction
- WorkerPool: Thread pool for CPU-bound operations
"""
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional, Callable, Dict, Any, Tuple
from collections import OrderedDict
import threading
from PIL import Image
from PyQt6.QtCore import QObject, pyqtSignal
logger = logging.getLogger(__name__)
class LoadPriority(Enum):
"""Priority levels for load requests."""
LOW = 0 # Offscreen, not visible
NORMAL = 1 # Potentially visible soon
HIGH = 2 # Visible on screen
URGENT = 3 # User is actively interacting with
@dataclass(order=True)
class LoadRequest:
"""Request to load and process an image."""
priority: LoadPriority = field(compare=True)
request_id: int = field(compare=True) # Tie-breaker for same priority
path: Path = field(compare=False)
target_size: Optional[Tuple[int, int]] = field(default=None, compare=False)
callback: Optional[Callable] = field(default=None, compare=False)
user_data: Any = field(default=None, compare=False)
class ImageCache:
"""
Thread-safe LRU cache for PIL images with memory management.
Caches both original images and scaled variants to avoid redundant processing.
"""
def __init__(self, max_memory_mb: int = 512):
"""
Initialize cache.
Args:
max_memory_mb: Maximum memory to use for cached images (default 512MB)
"""
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.current_memory_bytes = 0
self._cache: OrderedDict[str, Tuple[Image.Image, int]] = OrderedDict()
self._lock = threading.Lock()
logger.info(f"ImageCache initialized with {max_memory_mb}MB limit")
def _estimate_image_size(self, img: Image.Image) -> int:
"""Estimate memory size of PIL image in bytes."""
# PIL images are typically width * height * bytes_per_pixel
# RGBA = 4 bytes, RGB = 3 bytes, L = 1 byte
mode_sizes = {'RGBA': 4, 'RGB': 3, 'L': 1, 'LA': 2}
bytes_per_pixel = mode_sizes.get(img.mode, 4)
return img.width * img.height * bytes_per_pixel
def _make_key(self, path: Path, target_size: Optional[Tuple[int, int]] = None) -> str:
"""Create cache key from path and optional target size."""
if target_size:
return f"{path}:{target_size[0]}x{target_size[1]}"
return str(path)
def get(self, path: Path, target_size: Optional[Tuple[int, int]] = None) -> Optional[Image.Image]:
"""
Get image from cache.
Args:
path: Path to image file
target_size: Optional target size (width, height)
Returns:
Cached PIL Image or None if not found
"""
key = self._make_key(path, target_size)
with self._lock:
if key in self._cache:
# Move to end (most recently used)
img, size = self._cache.pop(key)
self._cache[key] = (img, size)
logger.debug(f"Cache HIT: {key}")
return img.copy() # Return copy to avoid external modifications
logger.debug(f"Cache MISS: {key}")
return None
def put(self, path: Path, img: Image.Image, target_size: Optional[Tuple[int, int]] = None):
"""
Add image to cache with LRU eviction.
Args:
path: Path to image file
img: PIL Image to cache
target_size: Optional target size used for this variant
"""
key = self._make_key(path, target_size)
img_size = self._estimate_image_size(img)
with self._lock:
# Remove if already exists (update size)
if key in self._cache:
_, old_size = self._cache.pop(key)
self.current_memory_bytes -= old_size
# Evict LRU items if needed
while (self.current_memory_bytes + img_size > self.max_memory_bytes
and len(self._cache) > 0):
evicted_key, (evicted_img, evicted_size) = self._cache.popitem(last=False)
self.current_memory_bytes -= evicted_size
logger.debug(f"Cache EVICT: {evicted_key} ({evicted_size / 1024 / 1024:.1f}MB)")
# Add new image
self._cache[key] = (img.copy(), img_size)
self.current_memory_bytes += img_size
logger.debug(f"Cache PUT: {key} ({img_size / 1024 / 1024:.1f}MB) "
f"[Total: {self.current_memory_bytes / 1024 / 1024:.1f}MB / "
f"{self.max_memory_bytes / 1024 / 1024:.1f}MB, "
f"Items: {len(self._cache)}]")
def clear(self):
"""Clear entire cache."""
with self._lock:
self._cache.clear()
self.current_memory_bytes = 0
logger.info("Cache cleared")
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with self._lock:
return {
'items': len(self._cache),
'memory_mb': self.current_memory_bytes / 1024 / 1024,
'max_memory_mb': self.max_memory_bytes / 1024 / 1024,
'utilization': (self.current_memory_bytes / self.max_memory_bytes) * 100
}
class AsyncImageLoader(QObject):
"""
Asynchronous image loader with priority queue and caching.
Loads images in background threads and emits signals when complete.
Supports concurrent loading, priority-based scheduling, and cancellation.
Example:
loader = AsyncImageLoader()
loader.image_loaded.connect(on_image_ready)
loader.start()
loader.request_load(Path("photo.jpg"), priority=LoadPriority.HIGH)
"""
# Signals for Qt integration
image_loaded = pyqtSignal(object, object, object) # (path, image, user_data)
load_failed = pyqtSignal(object, str, object) # (path, error_msg, user_data)
def __init__(self, cache: Optional[ImageCache] = None, max_workers: int = 4):
"""
Initialize async image loader.
Args:
cache: ImageCache instance (creates new if None)
max_workers: Maximum concurrent worker threads (default 4)
"""
super().__init__()
self.cache = cache or ImageCache()
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix="ImageLoader")
# Priority queue and tracking
self._queue: asyncio.PriorityQueue = None # Created when event loop starts
self._pending_requests: Dict[Path, LoadRequest] = {}
self._active_tasks: Dict[Path, asyncio.Task] = {}
self._next_request_id = 0
self._lock = threading.Lock()
self._shutdown = False
# Event loop for async operations
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
logger.info(f"AsyncImageLoader initialized with {max_workers} workers")
def start(self):
"""Start the async backend event loop."""
if self._loop_thread is not None:
logger.warning("AsyncImageLoader already started")
return
self._shutdown = False
self._loop_thread = threading.Thread(target=self._run_event_loop,
daemon=True,
name="AsyncImageLoader-EventLoop")
self._loop_thread.start()
logger.info("AsyncImageLoader event loop started")
def stop(self):
"""Stop the async backend and cleanup resources."""
if self._loop is None:
return
logger.info("Stopping AsyncImageLoader...")
self._shutdown = True
# Cancel all active tasks
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(self._cancel_all_tasks(), self._loop)
# Stop the event loop
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for thread to finish
if self._loop_thread:
self._loop_thread.join(timeout=5.0)
# Shutdown executor
self.executor.shutdown(wait=True)
logger.info("AsyncImageLoader stopped")
def _run_event_loop(self):
"""Run asyncio event loop in background thread."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
# Create priority queue
self._queue = asyncio.PriorityQueue()
# Start queue processor as background task
self._loop.create_task(self._process_queue())
# Run event loop forever (until stopped)
self._loop.run_forever()
# Cleanup after loop stops
self._loop.close()
async def _process_queue(self):
"""Process load requests from priority queue."""
logger.info("Queue processor started")
while not self._shutdown:
try:
# Wait for request with timeout to check shutdown flag
request = await asyncio.wait_for(self._queue.get(), timeout=0.5)
# Skip if already cancelled
if request.path not in self._pending_requests:
continue
# Process request
task = asyncio.create_task(self._load_image(request))
self._active_tasks[request.path] = task
except asyncio.TimeoutError:
continue # Check shutdown flag
except Exception as e:
logger.error(f"Queue processor error: {e}", exc_info=True)
logger.info("Queue processor stopped")
async def _cancel_all_tasks(self):
"""Cancel all active loading tasks."""
tasks = list(self._active_tasks.values())
for task in tasks:
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._active_tasks.clear()
self._pending_requests.clear()
async def _load_image(self, request: LoadRequest):
"""
Load and process image asynchronously.
Args:
request: LoadRequest containing path, size, and callback info
"""
path = request.path
target_size = request.target_size
try:
# Check cache first
cached_img = self.cache.get(path, target_size)
if cached_img is not None:
logger.debug(f"Loaded from cache: {path}")
self._emit_loaded(path, cached_img, request.user_data)
return
# Load in thread pool (I/O bound)
loop = asyncio.get_event_loop()
img = await loop.run_in_executor(
self.executor,
self._load_and_process_image,
path,
target_size
)
# Cache result
self.cache.put(path, img, target_size)
# Emit success signal
self._emit_loaded(path, img, request.user_data)
logger.debug(f"Loaded: {path} (size: {img.size})")
except Exception as e:
logger.error(f"Failed to load {path}: {e}", exc_info=True)
self._emit_failed(path, str(e), request.user_data)
finally:
# Cleanup tracking
with self._lock:
self._pending_requests.pop(path, None)
self._active_tasks.pop(path, None)
def _load_and_process_image(self, path: Path, target_size: Optional[Tuple[int, int]]) -> Image.Image:
"""
Load image from disk and process (runs in thread pool).
Args:
path: Path to image file
target_size: Optional target size for downsampling
Returns:
Processed PIL Image
"""
# Load image
img = Image.open(path)
# Convert to RGBA for consistency
if img.mode != 'RGBA':
img = img.convert('RGBA')
# Downsample if target size specified
if target_size:
current_size = img.size
if current_size[0] > target_size[0] or current_size[1] > target_size[1]:
img = img.resize(target_size, Image.Resampling.LANCZOS)
logger.debug(f"Downsampled {path}: {current_size} -> {target_size}")
return img
def _emit_loaded(self, path: Path, img: Image.Image, user_data: Any):
"""Emit image_loaded signal (thread-safe)."""
self.image_loaded.emit(path, img, user_data)
def _emit_failed(self, path: Path, error_msg: str, user_data: Any):
"""Emit load_failed signal (thread-safe)."""
self.load_failed.emit(path, error_msg, user_data)
def request_load(self,
path: Path,
priority: LoadPriority = LoadPriority.NORMAL,
target_size: Optional[Tuple[int, int]] = None,
user_data: Any = None) -> bool:
"""
Request image load with specified priority.
Args:
path: Path to image file
priority: Load priority level
target_size: Optional target size (width, height) for downsampling
user_data: Optional user data passed to callback
Returns:
True if request was queued, False if already pending/active
"""
if not self._loop or self._shutdown:
logger.warning("Cannot request load: backend not started")
return False
path = Path(path)
with self._lock:
# Skip if already pending or active
if path in self._pending_requests or path in self._active_tasks:
logger.debug(f"Load already pending: {path}")
return False
# Create request
request = LoadRequest(
priority=priority,
request_id=self._next_request_id,
path=path,
target_size=target_size,
user_data=user_data
)
self._next_request_id += 1
# Track as pending
self._pending_requests[path] = request
# Submit to queue (thread-safe)
asyncio.run_coroutine_threadsafe(
self._queue.put(request),
self._loop
)
logger.debug(f"Queued load: {path} (priority: {priority.name})")
return True
def cancel_load(self, path: Path) -> bool:
"""
Cancel pending image load.
Args:
path: Path to image file
Returns:
True if load was cancelled, False if not found
"""
path = Path(path)
with self._lock:
# Remove from pending
if path in self._pending_requests:
del self._pending_requests[path]
logger.debug(f"Cancelled pending load: {path}")
return True
# Cancel active task
if path in self._active_tasks:
task = self._active_tasks[path]
task.cancel()
logger.debug(f"Cancelled active load: {path}")
return True
return False
def get_stats(self) -> Dict[str, Any]:
"""Get loader statistics."""
with self._lock:
return {
'pending': len(self._pending_requests),
'active': len(self._active_tasks),
'cache': self.cache.get_stats()
}
class AsyncPDFGenerator(QObject):
"""
Asynchronous PDF generator that doesn't block the UI.
Generates PDFs in background thread with progress updates.
Uses shared ImageCache to avoid redundant image loading.
Example:
generator = AsyncPDFGenerator(image_cache)
generator.progress_updated.connect(on_progress)
generator.export_complete.connect(on_complete)
generator.start()
generator.export_pdf(project, "output.pdf")
"""
# Signals for Qt integration
progress_updated = pyqtSignal(int, int, str) # (current, total, message)
export_complete = pyqtSignal(bool, list) # (success, warnings)
export_failed = pyqtSignal(str) # (error_message)
def __init__(self, image_cache: Optional[ImageCache] = None, max_workers: int = 2):
"""
Initialize async PDF generator.
Args:
image_cache: Shared ImageCache instance (creates new if None)
max_workers: Maximum concurrent workers for PDF generation (default 2)
"""
super().__init__()
self.image_cache = image_cache or ImageCache()
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix="PDFGenerator")
# Export state
self._current_export: Optional[asyncio.Task] = None
self._cancel_requested = False
self._lock = threading.Lock()
self._shutdown = False
# Event loop for async operations
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
logger.info(f"AsyncPDFGenerator initialized with {max_workers} workers")
def start(self):
"""Start the async PDF generator event loop."""
if self._loop_thread is not None:
logger.warning("AsyncPDFGenerator already started")
return
self._shutdown = False
self._loop_thread = threading.Thread(target=self._run_event_loop,
daemon=True,
name="AsyncPDFGenerator-EventLoop")
self._loop_thread.start()
logger.info("AsyncPDFGenerator event loop started")
def stop(self):
"""Stop the async PDF generator and cleanup resources."""
if self._loop is None:
return
logger.info("Stopping AsyncPDFGenerator...")
self._shutdown = True
# Cancel active export
if self._current_export and not self._current_export.done():
self._current_export.cancel()
# Stop the event loop
if self._loop and not self._loop.is_closed():
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for thread to finish
if self._loop_thread:
self._loop_thread.join(timeout=5.0)
# Shutdown executor
self.executor.shutdown(wait=True)
logger.info("AsyncPDFGenerator stopped")
def _run_event_loop(self):
"""Run asyncio event loop in background thread."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
# Run event loop forever (until stopped)
self._loop.run_forever()
# Cleanup after loop stops
self._loop.close()
def export_pdf(self, project, output_path: str, export_dpi: int = 300) -> bool:
"""
Request PDF export (non-blocking).
Args:
project: Project instance to export
output_path: Path where PDF should be saved
export_dpi: Target DPI for images (default 300)
Returns:
True if export started, False if already exporting or backend not started
"""
if not self._loop or self._shutdown:
logger.warning("Cannot export: backend not started")
return False
with self._lock:
if self._current_export and not self._current_export.done():
logger.warning("Export already in progress")
return False
self._cancel_requested = False
# Submit export task
self._current_export = asyncio.run_coroutine_threadsafe(
self._export_pdf_async(project, output_path, export_dpi),
self._loop
)
logger.info(f"PDF export started: {output_path}")
return True
def cancel_export(self):
"""Request cancellation of current export."""
with self._lock:
self._cancel_requested = True
if self._current_export and not self._current_export.done():
self._current_export.cancel()
logger.info("PDF export cancellation requested")
async def _export_pdf_async(self, project, output_path: str, export_dpi: int):
"""
Perform PDF export asynchronously.
Args:
project: Project to export
output_path: Output PDF file path
export_dpi: Export DPI setting
"""
try:
# Import PDF exporter (lazy import to avoid circular dependencies)
from pyPhotoAlbum.pdf_exporter import PDFExporter
# Create exporter
exporter = PDFExporter(project, export_dpi=export_dpi)
# Progress callback wrapper
def progress_callback(current, total, message):
if self._cancel_requested:
return False # Signal cancellation
self.progress_updated.emit(current, total, message)
return True
# Run export in thread pool
loop = asyncio.get_event_loop()
success, warnings = await loop.run_in_executor(
self.executor,
self._export_with_cache,
exporter,
output_path,
progress_callback
)
# Emit completion signal
if not self._cancel_requested:
self.export_complete.emit(success, warnings)
logger.info(f"PDF export completed: {output_path} (warnings: {len(warnings)})")
else:
logger.info("PDF export cancelled")
except asyncio.CancelledError:
logger.info("PDF export cancelled by user")
self.export_failed.emit("Export cancelled")
except Exception as e:
logger.error(f"PDF export failed: {e}", exc_info=True)
self.export_failed.emit(str(e))
finally:
with self._lock:
self._current_export = None
def _export_with_cache(self, exporter, output_path: str, progress_callback) -> Tuple[bool, list]:
"""
Run PDF export with image cache integration.
This method patches the exporter to use our cached images.
Args:
exporter: PDFExporter instance
output_path: Output file path
progress_callback: Progress callback function
Returns:
Tuple of (success, warnings)
"""
# Store original Image.open
original_open = Image.open
# Patch Image.open to use cache
def cached_open(path, *args, **kwargs):
# Try cache first
cached_img = self.image_cache.get(Path(path))
if cached_img:
logger.debug(f"PDF using cached image: {path}")
return cached_img
# Load and cache
img = original_open(path, *args, **kwargs)
if img.mode != 'RGBA':
img = img.convert('RGBA')
self.image_cache.put(Path(path), img)
return img
# Temporarily patch Image.open
try:
Image.open = cached_open
return exporter.export(output_path, progress_callback)
finally:
# Restore original
Image.open = original_open
def is_exporting(self) -> bool:
"""Check if export is currently in progress."""
with self._lock:
return (self._current_export is not None
and not self._current_export.done())
def get_stats(self) -> Dict[str, Any]:
"""Get generator statistics."""
with self._lock:
return {
'exporting': self.is_exporting(),
'cache': self.image_cache.get_stats()
}

View File

@ -16,9 +16,11 @@ from pyPhotoAlbum.mixins.element_manipulation import ElementManipulationMixin
from pyPhotoAlbum.mixins.element_selection import ElementSelectionMixin from pyPhotoAlbum.mixins.element_selection import ElementSelectionMixin
from pyPhotoAlbum.mixins.mouse_interaction import MouseInteractionMixin from pyPhotoAlbum.mixins.mouse_interaction import MouseInteractionMixin
from pyPhotoAlbum.mixins.interaction_undo import UndoableInteractionMixin from pyPhotoAlbum.mixins.interaction_undo import UndoableInteractionMixin
from pyPhotoAlbum.mixins.async_loading import AsyncLoadingMixin
class GLWidget( class GLWidget(
AsyncLoadingMixin,
ViewportMixin, ViewportMixin,
RenderingMixin, RenderingMixin,
AssetDropMixin, AssetDropMixin,
@ -33,6 +35,7 @@ class GLWidget(
"""OpenGL widget for pyPhotoAlbum rendering and user interaction """OpenGL widget for pyPhotoAlbum rendering and user interaction
This widget orchestrates multiple mixins to provide: This widget orchestrates multiple mixins to provide:
- Async image loading (non-blocking)
- Viewport control (zoom, pan) - Viewport control (zoom, pan)
- Page rendering (OpenGL) - Page rendering (OpenGL)
- Element selection and manipulation - Element selection and manipulation
@ -46,6 +49,9 @@ class GLWidget(
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(parent) super().__init__(parent)
# Initialize async loading system
self._init_async_loading()
# Initialize OpenGL # Initialize OpenGL
self.setFormat(self.format()) self.setFormat(self.format())
self.setUpdateBehavior(QOpenGLWidget.UpdateBehavior.NoPartialUpdate) self.setUpdateBehavior(QOpenGLWidget.UpdateBehavior.NoPartialUpdate)
@ -54,6 +60,12 @@ class GLWidget(
self.setMouseTracking(True) self.setMouseTracking(True)
self.setAcceptDrops(True) self.setAcceptDrops(True)
def closeEvent(self, event):
"""Handle widget close event."""
# Cleanup async loading
self._cleanup_async_loading()
super().closeEvent(event)
def keyPressEvent(self, event): def keyPressEvent(self, event):
"""Handle key press events""" """Handle key press events"""
if event.key() == Qt.Key.Key_Delete or event.key() == Qt.Key.Key_Backspace: if event.key() == Qt.Key.Key_Delete or event.key() == Qt.Key.Key_Backspace:

View File

@ -0,0 +1,256 @@
"""
Async loading mixin for non-blocking image loading and PDF generation.
"""
from pathlib import Path
from typing import Optional
import logging
from PyQt6.QtCore import QObject
from pyPhotoAlbum.async_backend import AsyncImageLoader, AsyncPDFGenerator, ImageCache, LoadPriority
logger = logging.getLogger(__name__)
class AsyncLoadingMixin:
"""
Mixin to add async loading capabilities to GLWidget.
Provides non-blocking image loading and PDF generation with
progressive updates and shared caching.
"""
def _init_async_loading(self):
"""Initialize async loading components."""
logger.info("Initializing async loading system...")
# Create shared image cache (512MB)
self.image_cache = ImageCache(max_memory_mb=512)
# Create async image loader
self.async_image_loader = AsyncImageLoader(cache=self.image_cache, max_workers=4)
self.async_image_loader.image_loaded.connect(self._on_image_loaded)
self.async_image_loader.load_failed.connect(self._on_image_load_failed)
self.async_image_loader.start()
# Create async PDF generator
self.async_pdf_generator = AsyncPDFGenerator(image_cache=self.image_cache, max_workers=2)
self.async_pdf_generator.progress_updated.connect(self._on_pdf_progress)
self.async_pdf_generator.export_complete.connect(self._on_pdf_complete)
self.async_pdf_generator.export_failed.connect(self._on_pdf_failed)
self.async_pdf_generator.start()
logger.info("Async loading system initialized")
def _cleanup_async_loading(self):
"""Cleanup async loading components."""
logger.info("Cleaning up async loading system...")
if hasattr(self, 'async_image_loader'):
self.async_image_loader.stop()
if hasattr(self, 'async_pdf_generator'):
self.async_pdf_generator.stop()
if hasattr(self, 'image_cache'):
self.image_cache.clear()
logger.info("Async loading system cleaned up")
def _on_image_loaded(self, path: Path, image, user_data):
"""
Handle image loaded callback.
Args:
path: Path to loaded image
image: Loaded PIL Image
user_data: User data (ImageData element)
"""
logger.debug(f"Image loaded callback: {path}")
if user_data and hasattr(user_data, '_on_async_image_loaded'):
user_data._on_async_image_loaded(image)
# Trigger re-render to show newly loaded image
self.update()
def _on_image_load_failed(self, path: Path, error_msg: str, user_data):
"""
Handle image load failure.
Args:
path: Path that failed to load
error_msg: Error message
user_data: User data (ImageData element)
"""
logger.warning(f"Image load failed: {path} - {error_msg}")
if user_data and hasattr(user_data, '_on_async_image_load_failed'):
user_data._on_async_image_load_failed(error_msg)
def _on_pdf_progress(self, current: int, total: int, message: str):
"""
Handle PDF export progress updates.
Args:
current: Current progress (pages completed)
total: Total pages
message: Progress message
"""
logger.debug(f"PDF progress: {current}/{total} - {message}")
# Update progress dialog if it exists
if hasattr(self, '_pdf_progress_dialog') and self._pdf_progress_dialog:
self._pdf_progress_dialog.setValue(current)
self._pdf_progress_dialog.setLabelText(message)
def _on_pdf_complete(self, success: bool, warnings: list):
"""
Handle PDF export completion.
Args:
success: Whether export succeeded
warnings: List of warning messages
"""
logger.info(f"PDF export complete: success={success}, warnings={len(warnings)}")
# Close progress dialog
if hasattr(self, '_pdf_progress_dialog') and self._pdf_progress_dialog:
self._pdf_progress_dialog.close()
self._pdf_progress_dialog = None
# Show completion message
main_window = self.window()
if hasattr(main_window, 'show_status'):
if success:
if warnings:
main_window.show_status(
f"PDF exported successfully with {len(warnings)} warnings",
5000
)
else:
main_window.show_status("PDF exported successfully", 3000)
else:
main_window.show_status("PDF export failed", 5000)
def _on_pdf_failed(self, error_msg: str):
"""
Handle PDF export failure.
Args:
error_msg: Error message
"""
logger.error(f"PDF export failed: {error_msg}")
# Close progress dialog
if hasattr(self, '_pdf_progress_dialog') and self._pdf_progress_dialog:
self._pdf_progress_dialog.close()
self._pdf_progress_dialog = None
# Show error message
main_window = self.window()
if hasattr(main_window, 'show_status'):
main_window.show_status(f"PDF export failed: {error_msg}", 5000)
def request_image_load(self, image_data, priority: LoadPriority = LoadPriority.NORMAL):
"""
Request async load for an ImageData element.
Args:
image_data: ImageData element to load
priority: Load priority level
"""
if not hasattr(self, 'async_image_loader'):
logger.warning("Async image loader not initialized")
return
if not image_data.image_path:
return
# Resolve path
from pyPhotoAlbum.models import get_asset_search_paths
import os
image_full_path = image_data.image_path
if not os.path.isabs(image_data.image_path):
project_folder, search_paths = get_asset_search_paths()
possible_paths = []
if project_folder:
possible_paths.append(os.path.join(project_folder, image_data.image_path))
for search_path in search_paths:
possible_paths.append(os.path.join(search_path, image_data.image_path))
for path in possible_paths:
if os.path.exists(path):
image_full_path = path
break
# Calculate target size (max 2048px like original)
target_size = (2048, 2048) # Will be downsampled if larger
# Request load
self.async_image_loader.request_load(
Path(image_full_path),
priority=priority,
target_size=target_size,
user_data=image_data # Pass element for callback
)
def export_pdf_async(self, project, output_path: str, export_dpi: int = 300):
"""
Export PDF asynchronously without blocking UI.
Args:
project: Project to export
output_path: Output PDF file path
export_dpi: Export DPI (default 300)
"""
if not hasattr(self, 'async_pdf_generator'):
logger.warning("Async PDF generator not initialized")
return False
# Create progress dialog
from PyQt6.QtWidgets import QProgressDialog
from PyQt6.QtCore import Qt
total_pages = sum(
1 if page.is_cover else (2 if page.is_double_spread else 1)
for page in project.pages
)
self._pdf_progress_dialog = QProgressDialog(
"Exporting to PDF...",
"Cancel",
0,
total_pages,
self
)
self._pdf_progress_dialog.setWindowModality(Qt.WindowModality.WindowModal)
self._pdf_progress_dialog.setWindowTitle("PDF Export")
self._pdf_progress_dialog.canceled.connect(self._on_pdf_cancel)
self._pdf_progress_dialog.show()
# Start async export
return self.async_pdf_generator.export_pdf(project, output_path, export_dpi)
def _on_pdf_cancel(self):
"""Handle PDF export cancellation."""
logger.info("User requested PDF export cancellation")
if hasattr(self, 'async_pdf_generator'):
self.async_pdf_generator.cancel_export()
def get_async_stats(self) -> dict:
"""Get async loading system statistics."""
stats = {}
if hasattr(self, 'async_image_loader'):
stats['image_loader'] = self.async_image_loader.get_stats()
if hasattr(self, 'async_pdf_generator'):
stats['pdf_generator'] = self.async_pdf_generator.get_stats()
return stats

View File

@ -4,7 +4,7 @@ Alignment operations mixin for pyPhotoAlbum
from pyPhotoAlbum.decorators import ribbon_action from pyPhotoAlbum.decorators import ribbon_action
from pyPhotoAlbum.alignment import AlignmentManager from pyPhotoAlbum.alignment import AlignmentManager
from pyPhotoAlbum.commands import AlignElementsCommand from pyPhotoAlbum.commands import AlignElementsCommand, ResizeElementsCommand
class AlignmentOperationsMixin: class AlignmentOperationsMixin:
@ -139,3 +139,32 @@ class AlignmentOperationsMixin:
self.project.history.execute(cmd) self.project.history.execute(cmd)
self.update_view() self.update_view()
self.show_status(f"Aligned {len(elements)} elements to vertical center", 2000) self.show_status(f"Aligned {len(elements)} elements to vertical center", 2000)
@ribbon_action(
label="Maximize Pattern",
tooltip="Maximize selected elements using crystal growth algorithm",
tab="Arrange",
group="Size",
requires_selection=True,
min_selection=1
)
def maximize_pattern(self):
"""Maximize selected elements until they are close to borders or each other"""
elements = self._get_selected_elements_list()
if not self.require_selection(min_count=1):
return
# Get page size from current page
page = self.get_current_page()
if not page:
self.show_warning("No Page", "Please create a page first.")
return
page_size = page.layout.size
changes = AlignmentManager.maximize_pattern(elements, page_size)
if changes:
cmd = ResizeElementsCommand(changes)
self.project.history.execute(cmd)
self.update_view()
self.show_status(f"Maximized {len(elements)} element(s) using pattern growth", 2000)

View File

@ -142,6 +142,10 @@ class FileOperationsMixin:
working_dpi = working_dpi_spinbox.value() working_dpi = working_dpi_spinbox.value()
export_dpi = export_dpi_spinbox.value() export_dpi = export_dpi_spinbox.value()
# Cleanup old project if it exists
if hasattr(self, 'project') and self.project:
self.project.cleanup()
# Create project with custom settings # Create project with custom settings
self.project = Project(project_name) self.project = Project(project_name)
self.project.page_size_mm = (width_mm, height_mm) self.project.page_size_mm = (width_mm, height_mm)
@ -182,6 +186,10 @@ class FileOperationsMixin:
if file_path: if file_path:
print(f"Opening project: {file_path}") print(f"Opening project: {file_path}")
# Cleanup old project if it exists
if hasattr(self, 'project') and self.project:
self.project.cleanup()
# Load project from ZIP # Load project from ZIP
project, error = load_from_zip(file_path) project, error = load_from_zip(file_path)
@ -485,11 +493,7 @@ class FileOperationsMixin:
group="Export" group="Export"
) )
def export_pdf(self): def export_pdf(self):
"""Export project to PDF""" """Export project to PDF using async backend (non-blocking)"""
from PyQt6.QtWidgets import QProgressDialog
from PyQt6.QtCore import Qt
from pyPhotoAlbum.pdf_exporter import PDFExporter
# Check if we have pages to export # Check if we have pages to export
if not self.project or not self.project.pages: if not self.project or not self.project.pages:
self.show_status("No pages to export") self.show_status("No pages to export")
@ -510,41 +514,12 @@ class FileOperationsMixin:
if not file_path.lower().endswith('.pdf'): if not file_path.lower().endswith('.pdf'):
file_path += '.pdf' file_path += '.pdf'
# Calculate total pages for progress # Use async PDF export (non-blocking, UI stays responsive)
total_pages = sum(2 if page.is_double_spread else 1 for page in self.project.pages) success = self.gl_widget.export_pdf_async(self.project, file_path, export_dpi=300)
# Create progress dialog
progress = QProgressDialog("Exporting to PDF...", "Cancel", 0, total_pages, self)
progress.setWindowModality(Qt.WindowModality.WindowModal)
progress.setMinimumDuration(0)
progress.setValue(0)
# Progress callback
def update_progress(current, total, message):
progress.setLabelText(message)
progress.setValue(current)
if progress.wasCanceled():
return False
return True
# Export to PDF
exporter = PDFExporter(self.project)
success, warnings = exporter.export(file_path, update_progress)
progress.close()
if success: if success:
message = f"PDF exported successfully to {file_path}" self.show_status("PDF export started...", 2000)
if warnings:
message += f"\n\nWarnings:\n" + "\n".join(warnings)
self.show_status(message)
print(message)
else: else:
error_message = f"PDF export failed" self.show_status("PDF export failed to start", 3000)
if warnings:
error_message += f":\n" + "\n".join(warnings)
self.show_status(error_message)
print(error_message)
@ribbon_action( @ribbon_action(
label="About", label="About",

View File

@ -99,7 +99,7 @@ class SizeOperationsMixin:
element = next(iter(self.gl_widget.selected_elements)) element = next(iter(self.gl_widget.selected_elements))
# Fit to page width # Fit to page width
page_width = page.size[0] page_width = page.layout.size[0]
change = AlignmentManager.fit_to_page_width(element, page_width) change = AlignmentManager.fit_to_page_width(element, page_width)
if change: if change:
@ -130,7 +130,7 @@ class SizeOperationsMixin:
element = next(iter(self.gl_widget.selected_elements)) element = next(iter(self.gl_widget.selected_elements))
# Fit to page height # Fit to page height
page_height = page.size[1] page_height = page.layout.size[1]
change = AlignmentManager.fit_to_page_height(element, page_height) change = AlignmentManager.fit_to_page_height(element, page_height)
if change: if change:
@ -161,8 +161,8 @@ class SizeOperationsMixin:
element = next(iter(self.gl_widget.selected_elements)) element = next(iter(self.gl_widget.selected_elements))
# Fit to page # Fit to page
page_width = page.size[0] page_width = page.layout.size[0]
page_height = page.size[1] page_height = page.layout.size[1]
change = AlignmentManager.fit_to_page(element, page_width, page_height) change = AlignmentManager.fit_to_page(element, page_width, page_height)
if change: if change:

View File

@ -66,6 +66,8 @@ class RenderingMixin:
self._page_renderers.append((renderer, page)) self._page_renderers.append((renderer, page))
renderer.begin_render() renderer.begin_render()
# Pass widget reference for async loading
page.layout._parent_widget = self
page.layout.render(dpi=dpi) page.layout.render(dpi=dpi)
renderer.end_render() renderer.end_render()

View File

@ -62,6 +62,10 @@ class ImageData(BaseLayoutElement):
self.image_path = image_path self.image_path = image_path
self.crop_info = crop_info or (0, 0, 1, 1) # Default: no crop self.crop_info = crop_info or (0, 0, 1, 1) # Default: no crop
# Async loading state
self._async_loading = False
self._async_load_requested = False
def render(self): def render(self):
"""Render the image using OpenGL""" """Render the image using OpenGL"""
from OpenGL.GL import (glBegin, glEnd, glVertex2f, glColor3f, glColor4f, GL_QUADS, GL_LINE_LOOP, from OpenGL.GL import (glBegin, glEnd, glVertex2f, glColor3f, glColor4f, GL_QUADS, GL_LINE_LOOP,
@ -258,6 +262,57 @@ class ImageData(BaseLayoutElement):
self.image_path = data.get("image_path", "") self.image_path = data.get("image_path", "")
self.crop_info = tuple(data.get("crop_info", (0, 0, 1, 1))) self.crop_info = tuple(data.get("crop_info", (0, 0, 1, 1)))
def _on_async_image_loaded(self, pil_image):
"""
Callback when async image loading completes.
Args:
pil_image: Loaded PIL Image (already RGBA, already resized)
"""
from OpenGL.GL import (glGenTextures, glBindTexture, glTexImage2D, GL_TEXTURE_2D,
glTexParameteri, GL_TEXTURE_MIN_FILTER, GL_TEXTURE_MAG_FILTER,
GL_LINEAR, GL_RGBA, GL_UNSIGNED_BYTE, glDeleteTextures)
try:
# Delete old texture if it exists
if hasattr(self, '_texture_id') and self._texture_id:
glDeleteTextures([self._texture_id])
# Create GPU texture from pre-processed PIL image
img_data = pil_image.tobytes()
texture_id = glGenTextures(1)
glBindTexture(GL_TEXTURE_2D, texture_id)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_LINEAR)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
glTexImage2D(GL_TEXTURE_2D, 0, GL_RGBA, pil_image.width, pil_image.height,
0, GL_RGBA, GL_UNSIGNED_BYTE, img_data)
# Cache texture
self._texture_id = texture_id
self._texture_path = self.image_path
self._img_width = pil_image.width
self._img_height = pil_image.height
self._async_loading = False
print(f"ImageData: Async loaded texture for {self.image_path}")
except Exception as e:
print(f"ImageData: Error creating texture from async loaded image: {e}")
self._texture_id = None
self._async_loading = False
def _on_async_image_load_failed(self, error_msg: str):
"""
Callback when async image loading fails.
Args:
error_msg: Error message
"""
print(f"ImageData: Async load failed for {self.image_path}: {error_msg}")
self._async_loading = False
self._async_load_requested = False
class PlaceholderData(BaseLayoutElement): class PlaceholderData(BaseLayoutElement):
"""Class to store placeholder data""" """Class to store placeholder data"""

View File

@ -89,7 +89,27 @@ class PageLayout:
glEnd() glEnd()
# Render elements in list order (list position = z-order) # Render elements in list order (list position = z-order)
# For ImageData elements, request async loading if available
for element in self.elements: for element in self.elements:
# Check if this is an ImageData element that needs async loading
if isinstance(element, ImageData) and not hasattr(element, '_texture_id'):
# Try to get async loader from a parent widget
if hasattr(self, '_async_loader'):
loader = self._async_loader
elif hasattr(self, '_parent_widget') and hasattr(self._parent_widget, 'async_image_loader'):
loader = self._parent_widget.async_image_loader
else:
loader = None
# Request async load if loader is available and not already requested
if loader and not element._async_load_requested:
from pyPhotoAlbum.async_backend import LoadPriority
# Determine priority based on visibility (HIGH for now, can be refined)
if hasattr(self._parent_widget, 'request_image_load'):
self._parent_widget.request_image_load(element, priority=LoadPriority.HIGH)
element._async_load_requested = True
element._async_loading = True
element.render() element.render()
# Draw page border LAST (on top of everything) # Draw page border LAST (on top of everything)

View File

@ -348,6 +348,52 @@ class PDFExporter:
(side == 'right' and element_center_mm >= split_line_mm): (side == 'right' and element_center_mm >= split_line_mm):
self._render_element(c, element, x_offset_mm, page_width_pt, page_height_pt, page_number) self._render_element(c, element, x_offset_mm, page_width_pt, page_height_pt, page_number)
def _resolve_image_path(self, image_path: str) -> Optional[str]:
"""
Resolve an image path, handling both absolute and relative paths.
Uses the same logic as ImageData.render() for consistency.
Args:
image_path: The image path (absolute or relative)
Returns:
Resolved absolute path if found, None otherwise
"""
if not image_path:
return None
# If already absolute and exists, return it
if os.path.isabs(image_path) and os.path.exists(image_path):
return image_path
# For relative paths, try resolution using the same logic as ImageData
from pyPhotoAlbum.models import get_asset_search_paths
project_folder, search_paths = get_asset_search_paths()
possible_paths = []
# Try project folder first if available
if project_folder:
possible_paths.append(os.path.join(project_folder, image_path))
# Try additional search paths
for search_path in search_paths:
possible_paths.append(os.path.join(search_path, image_path))
# Fallback paths for compatibility
possible_paths.extend([
image_path, # Try as-is
os.path.join(os.getcwd(), image_path), # Relative to CWD
os.path.join(os.path.dirname(os.getcwd()), image_path), # Parent of CWD
])
# Find first existing path
for path in possible_paths:
if os.path.exists(path):
return path
return None
def _render_image(self, c: canvas.Canvas, image_element: 'ImageData', x_pt: float, def _render_image(self, c: canvas.Canvas, image_element: 'ImageData', x_pt: float,
y_pt: float, width_pt: float, height_pt: float, page_number: int, y_pt: float, width_pt: float, height_pt: float, page_number: int,
crop_left: float = 0.0, crop_right: float = 1.0, crop_left: float = 0.0, crop_right: float = 1.0,
@ -365,16 +411,19 @@ class PDFExporter:
original_width_pt: Original element width in points (before splitting, for aspect ratio) original_width_pt: Original element width in points (before splitting, for aspect ratio)
original_height_pt: Original element height in points (before splitting, for aspect ratio) original_height_pt: Original element height in points (before splitting, for aspect ratio)
""" """
# Resolve image path (handles both absolute and relative paths)
image_full_path = self._resolve_image_path(image_element.image_path)
# Check if image exists # Check if image exists
if not image_element.image_path or not os.path.exists(image_element.image_path): if not image_full_path:
warning = f"Page {page_number}: Image not found: {image_element.image_path}" warning = f"Page {page_number}: Image not found: {image_element.image_path}"
print(f"WARNING: {warning}") print(f"WARNING: {warning}")
self.warnings.append(warning) self.warnings.append(warning)
return return
try: try:
# Load image # Load image using resolved path
img = Image.open(image_element.image_path) img = Image.open(image_full_path)
img = img.convert('RGBA') img = img.convert('RGBA')
# Apply element's crop_info (from the element's own cropping) # Apply element's crop_info (from the element's own cropping)

View File

@ -119,6 +119,10 @@ class Project:
# Embedded templates - templates that travel with the project # Embedded templates - templates that travel with the project
self.embedded_templates: Dict[str, Dict[str, Any]] = {} self.embedded_templates: Dict[str, Dict[str, Any]] = {}
# Temporary directory management (if loaded from .ppz)
# Using TemporaryDirectory instance that auto-cleans on deletion
self._temp_dir = None
# Initialize asset manager # Initialize asset manager
self.asset_manager = AssetManager(self.folder_path) self.asset_manager = AssetManager(self.folder_path)
@ -360,3 +364,22 @@ class Project:
else: else:
self.history = CommandHistory(max_history=100) self.history = CommandHistory(max_history=100)
self.history.asset_manager = self.asset_manager self.history.asset_manager = self.asset_manager
def cleanup(self):
"""
Cleanup project resources, including temporary directories.
Should be called when the project is closed or no longer needed.
"""
if self._temp_dir is not None:
try:
# Let TemporaryDirectory clean itself up
temp_path = self._temp_dir.name
self._temp_dir.cleanup()
self._temp_dir = None
print(f"Cleaned up temporary project directory: {temp_path}")
except Exception as e:
print(f"Warning: Failed to cleanup temporary directory: {e}")
def __del__(self):
"""Destructor to ensure cleanup happens when project is deleted."""
self.cleanup()

View File

@ -124,8 +124,8 @@ def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Opti
Args: Args:
zip_path: Path to the ZIP file to load zip_path: Path to the ZIP file to load
extract_to: Optional directory to extract to. If None, uses a directory extract_to: Optional directory to extract to. If None, uses a temporary
based on the ZIP filename in ./projects/ directory that will be cleaned up when the project is closed.
Returns: Returns:
Tuple of (project: Optional[Project], error_message: Optional[str]) Tuple of (project: Optional[Project], error_message: Optional[str])
@ -134,14 +134,19 @@ def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Opti
if not os.path.exists(zip_path): if not os.path.exists(zip_path):
return None, f"ZIP file not found: {zip_path}" return None, f"ZIP file not found: {zip_path}"
# Track if we created a temp directory
temp_dir_obj = None
# Determine extraction directory # Determine extraction directory
if extract_to is None: if extract_to is None:
# Extract to ./projects/{zipname}/ # Create a temporary directory using TemporaryDirectory
# This will be attached to the Project and auto-cleaned on deletion
zip_basename = os.path.splitext(os.path.basename(zip_path))[0] zip_basename = os.path.splitext(os.path.basename(zip_path))[0]
extract_to = os.path.join("./projects", zip_basename) temp_dir_obj = tempfile.TemporaryDirectory(prefix=f"pyPhotoAlbum_{zip_basename}_")
extract_to = temp_dir_obj.name
# Create extraction directory else:
os.makedirs(extract_to, exist_ok=True) # Create extraction directory if it doesn't exist
os.makedirs(extract_to, exist_ok=True)
# Extract ZIP contents # Extract ZIP contents
with zipfile.ZipFile(zip_path, 'r') as zipf: with zipfile.ZipFile(zip_path, 'r') as zipf:
@ -189,6 +194,12 @@ def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Opti
project.asset_manager.project_folder = extract_to project.asset_manager.project_folder = extract_to
project.asset_manager.assets_folder = os.path.join(extract_to, "assets") project.asset_manager.assets_folder = os.path.join(extract_to, "assets")
# Attach temporary directory to project (if we created one)
# The TemporaryDirectory will auto-cleanup when the project is deleted
if temp_dir_obj is not None:
project._temp_dir = temp_dir_obj
print(f"Project loaded to temporary directory: {extract_to}")
# Normalize asset paths in all ImageData elements # Normalize asset paths in all ImageData elements
# This fixes old projects that have absolute or wrong relative paths # This fixes old projects that have absolute or wrong relative paths
_normalize_asset_paths(project, extract_to) _normalize_asset_paths(project, extract_to)

134
test_async_nonblocking.py Executable file
View File

@ -0,0 +1,134 @@
#!/usr/bin/env python
"""
Test to verify async loading doesn't block the main thread.
This test demonstrates that the UI remains responsive during image loading.
"""
import time
import sys
from pathlib import Path
from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QTimer
from pyPhotoAlbum.async_backend import AsyncImageLoader, ImageCache, LoadPriority
def test_nonblocking_load():
"""Test that async image loading doesn't block the main thread"""
print("Testing non-blocking async image loading...")
# Track if main thread stays responsive
main_thread_ticks = []
def main_thread_tick():
"""This should continue running during async loads"""
main_thread_ticks.append(time.time())
print(f"✓ Main thread tick {len(main_thread_ticks)} (responsive!)")
# Create Qt application
app = QApplication(sys.argv)
# Create async loader
cache = ImageCache(max_memory_mb=128)
loader = AsyncImageLoader(cache=cache, max_workers=2)
# Track loaded images
loaded_images = []
def on_image_loaded(path, image, user_data):
loaded_images.append(path)
print(f"✓ Loaded: {path} (size: {image.size})")
def on_load_failed(path, error_msg, user_data):
print(f"✗ Failed: {path} - {error_msg}")
loader.image_loaded.connect(on_image_loaded)
loader.load_failed.connect(on_load_failed)
# Start the async loader
loader.start()
print("✓ Async loader started")
# Request some image loads (these would normally block for 50-500ms each)
test_images = [
Path("assets/sample1.jpg"),
Path("assets/sample2.jpg"),
Path("assets/sample3.jpg"),
]
print(f"\nRequesting {len(test_images)} image loads...")
for img_path in test_images:
loader.request_load(img_path, priority=LoadPriority.HIGH)
print(f" → Queued: {img_path}")
print("\nMain thread should remain responsive while images load in background...")
# Setup main thread ticker (should run continuously)
ticker = QTimer()
ticker.timeout.connect(main_thread_tick)
ticker.start(100) # Tick every 100ms
# Setup test timeout
def check_completion():
elapsed = time.time() - start_time
if len(loaded_images) >= len(test_images):
print(f"\n✓ All images loaded in {elapsed:.2f}s")
print(f"✓ Main thread ticked {len(main_thread_ticks)} times during loading")
if len(main_thread_ticks) >= 3:
print("✓ SUCCESS: Main thread remained responsive!")
else:
print("✗ FAIL: Main thread was blocked!")
# Cleanup
ticker.stop()
loader.stop()
app.quit()
elif elapsed > 10.0:
print(f"\n✗ Timeout: Only loaded {len(loaded_images)}/{len(test_images)} images")
ticker.stop()
loader.stop()
app.quit()
# Check completion every 200ms
completion_timer = QTimer()
completion_timer.timeout.connect(check_completion)
completion_timer.start(200)
start_time = time.time()
# Run Qt event loop (this should NOT block)
app.exec()
print("\nTest completed!")
# Report results
print(f"\nResults:")
print(f" Images loaded: {len(loaded_images)}/{len(test_images)}")
print(f" Main thread ticks: {len(main_thread_ticks)}")
print(f" Cache stats: {cache.get_stats()}")
return len(main_thread_ticks) >= 3 # Success if main thread ticked at least 3 times
if __name__ == "__main__":
print("=" * 60)
print("Async Non-Blocking Test")
print("=" * 60)
print()
success = test_nonblocking_load()
print()
print("=" * 60)
if success:
print("✓ TEST PASSED: Async loading is non-blocking")
else:
print("✗ TEST FAILED: Main thread was blocked")
print("=" * 60)
sys.exit(0 if success else 1)

View File

@ -419,3 +419,195 @@ class TestAlignmentManager:
assert abs(gap1 - 50) < 0.01 assert abs(gap1 - 50) < 0.01
assert abs(gap2 - 50) < 0.01 assert abs(gap2 - 50) < 0.01
def test_maximize_pattern_empty_list(self):
"""Test maximize_pattern with empty list"""
changes = AlignmentManager.maximize_pattern([], (297, 210))
assert changes == []
def test_maximize_pattern_single_element(self):
"""Test maximize_pattern with single element"""
# Small element in the middle of the page
elem = ImageData(x=100, y=80, width=20, height=15)
page_size = (297, 210) # A4 landscape in mm
changes = AlignmentManager.maximize_pattern([elem], page_size, min_gap=2.0)
# Element should grow significantly
assert elem.size[0] > 20
assert elem.size[1] > 15
# Should maintain aspect ratio
original_aspect = 20 / 15
new_aspect = elem.size[0] / elem.size[1]
assert abs(original_aspect - new_aspect) < 0.01
# Should not exceed page boundaries (with min_gap)
assert elem.position[0] >= 2.0
assert elem.position[1] >= 2.0
assert elem.position[0] + elem.size[0] <= 297 - 2.0
assert elem.position[1] + elem.size[1] <= 210 - 2.0
# Check undo information
assert len(changes) == 1
assert changes[0][0] == elem
assert changes[0][1] == (100, 80) # old position
assert changes[0][2] == (20, 15) # old size
def test_maximize_pattern_two_elements_horizontal(self):
"""Test maximize_pattern with two elements side by side"""
elem1 = ImageData(x=50, y=80, width=20, height=20)
elem2 = ImageData(x=200, y=80, width=20, height=20)
page_size = (297, 210) # A4 landscape in mm
changes = AlignmentManager.maximize_pattern([elem1, elem2], page_size, min_gap=2.0)
# Both elements should grow
assert elem1.size[0] > 20 and elem1.size[1] > 20
assert elem2.size[0] > 20 and elem2.size[1] > 20
# Elements should not overlap (min_gap = 2.0)
gap_x = max(
elem2.position[0] - (elem1.position[0] + elem1.size[0]),
elem1.position[0] - (elem2.position[0] + elem2.size[0])
)
gap_y = max(
elem2.position[1] - (elem1.position[1] + elem1.size[1]),
elem1.position[1] - (elem2.position[1] + elem2.size[1])
)
# Either horizontal or vertical gap should be >= min_gap
assert gap_x >= 2.0 or gap_y >= 2.0
# Both elements should respect page boundaries
for elem in [elem1, elem2]:
assert elem.position[0] >= 2.0
assert elem.position[1] >= 2.0
assert elem.position[0] + elem.size[0] <= 297 - 2.0
assert elem.position[1] + elem.size[1] <= 210 - 2.0
def test_maximize_pattern_three_elements_grid(self):
"""Test maximize_pattern with three elements in a grid pattern"""
elem1 = ImageData(x=50, y=50, width=15, height=15)
elem2 = ImageData(x=150, y=50, width=15, height=15)
elem3 = ImageData(x=100, y=120, width=15, height=15)
page_size = (297, 210) # A4 landscape in mm
changes = AlignmentManager.maximize_pattern([elem1, elem2, elem3], page_size, min_gap=2.0)
# All elements should grow
for elem in [elem1, elem2, elem3]:
assert elem.size[0] > 15
assert elem.size[1] > 15
# Check no overlaps with min_gap
elements = [elem1, elem2, elem3]
for i, elem_a in enumerate(elements):
for j, elem_b in enumerate(elements):
if i >= j:
continue
# Calculate gaps between rectangles
gap_x = max(
elem_b.position[0] - (elem_a.position[0] + elem_a.size[0]),
elem_a.position[0] - (elem_b.position[0] + elem_b.size[0])
)
gap_y = max(
elem_b.position[1] - (elem_a.position[1] + elem_a.size[1]),
elem_a.position[1] - (elem_b.position[1] + elem_b.size[1])
)
# At least one gap should be >= min_gap
assert gap_x >= 2.0 or gap_y >= 2.0
# Check undo information
assert len(changes) == 3
def test_maximize_pattern_respects_boundaries(self):
"""Test that maximize_pattern respects page boundaries"""
elem = ImageData(x=10, y=10, width=10, height=10)
page_size = (100, 100)
min_gap = 5.0
changes = AlignmentManager.maximize_pattern([elem], page_size, min_gap=min_gap)
# Element should not exceed boundaries
assert elem.position[0] >= min_gap
assert elem.position[1] >= min_gap
assert elem.position[0] + elem.size[0] <= page_size[0] - min_gap
assert elem.position[1] + elem.size[1] <= page_size[1] - min_gap
def test_maximize_pattern_maintains_aspect_ratio(self):
"""Test that maximize_pattern maintains element aspect ratios"""
elem1 = ImageData(x=50, y=50, width=30, height=20) # 3:2 aspect
elem2 = ImageData(x=150, y=50, width=20, height=30) # 2:3 aspect
page_size = (297, 210)
original_aspect1 = elem1.size[0] / elem1.size[1]
original_aspect2 = elem2.size[0] / elem2.size[1]
changes = AlignmentManager.maximize_pattern([elem1, elem2], page_size, min_gap=2.0)
# Aspect ratios should be maintained
new_aspect1 = elem1.size[0] / elem1.size[1]
new_aspect2 = elem2.size[0] / elem2.size[1]
assert abs(original_aspect1 - new_aspect1) < 0.01
assert abs(original_aspect2 - new_aspect2) < 0.01
def test_maximize_pattern_with_constrained_space(self):
"""Test maximize_pattern when elements are tightly packed"""
# Create 4 elements in corners with limited space
elem1 = ImageData(x=10, y=10, width=10, height=10)
elem2 = ImageData(x=140, y=10, width=10, height=10)
elem3 = ImageData(x=10, y=90, width=10, height=10)
elem4 = ImageData(x=140, y=90, width=10, height=10)
page_size = (160, 110)
changes = AlignmentManager.maximize_pattern(
[elem1, elem2, elem3, elem4],
page_size,
min_gap=2.0
)
# All elements should grow
for elem in [elem1, elem2, elem3, elem4]:
assert elem.size[0] > 10
assert elem.size[1] > 10
# Verify no overlaps
elements = [elem1, elem2, elem3, elem4]
for i, elem_a in enumerate(elements):
for j, elem_b in enumerate(elements):
if i >= j:
continue
gap_x = max(
elem_b.position[0] - (elem_a.position[0] + elem_a.size[0]),
elem_a.position[0] - (elem_b.position[0] + elem_b.size[0])
)
gap_y = max(
elem_b.position[1] - (elem_a.position[1] + elem_a.size[1]),
elem_a.position[1] - (elem_b.position[1] + elem_b.size[1])
)
assert gap_x >= 2.0 or gap_y >= 2.0
def test_maximize_pattern_with_different_element_types(self):
"""Test maximize_pattern works with different element types"""
elem1 = ImageData(x=50, y=50, width=20, height=20)
elem2 = PlaceholderData(placeholder_type="image", x=150, y=50, width=20, height=20)
elem3 = TextBoxData(text_content="Test", x=100, y=120, width=20, height=20)
page_size = (297, 210)
changes = AlignmentManager.maximize_pattern([elem1, elem2, elem3], page_size, min_gap=2.0)
# All elements should grow
assert elem1.size[0] > 20
assert elem2.size[0] > 20
assert elem3.size[0] > 20
# Check undo information has correct element types
assert isinstance(changes[0][0], ImageData)
assert isinstance(changes[1][0], PlaceholderData)
assert isinstance(changes[2][0], TextBoxData)