pyPhotoAlbum/pyPhotoAlbum/async_backend.py
2025-11-11 15:34:04 +01:00

704 lines
24 KiB
Python

"""
Async backend for non-blocking image loading and PDF generation.
This module provides:
- AsyncImageLoader: Load and process images in background
- AsyncPDFGenerator: Generate PDFs without blocking UI
- ImageCache: Intelligent caching with LRU eviction
- WorkerPool: Thread pool for CPU-bound operations
"""
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional, Callable, Dict, Any, Tuple
from collections import OrderedDict
import threading
from PIL import Image
from PyQt6.QtCore import QObject, pyqtSignal
logger = logging.getLogger(__name__)
class LoadPriority(Enum):
"""Priority levels for load requests."""
LOW = 0 # Offscreen, not visible
NORMAL = 1 # Potentially visible soon
HIGH = 2 # Visible on screen
URGENT = 3 # User is actively interacting with
@dataclass(order=True)
class LoadRequest:
"""Request to load and process an image."""
priority: LoadPriority = field(compare=True)
request_id: int = field(compare=True) # Tie-breaker for same priority
path: Path = field(compare=False)
target_size: Optional[Tuple[int, int]] = field(default=None, compare=False)
callback: Optional[Callable] = field(default=None, compare=False)
user_data: Any = field(default=None, compare=False)
class ImageCache:
"""
Thread-safe LRU cache for PIL images with memory management.
Caches both original images and scaled variants to avoid redundant processing.
"""
def __init__(self, max_memory_mb: int = 512):
"""
Initialize cache.
Args:
max_memory_mb: Maximum memory to use for cached images (default 512MB)
"""
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.current_memory_bytes = 0
self._cache: OrderedDict[str, Tuple[Image.Image, int]] = OrderedDict()
self._lock = threading.Lock()
logger.info(f"ImageCache initialized with {max_memory_mb}MB limit")
def _estimate_image_size(self, img: Image.Image) -> int:
"""Estimate memory size of PIL image in bytes."""
# PIL images are typically width * height * bytes_per_pixel
# RGBA = 4 bytes, RGB = 3 bytes, L = 1 byte
mode_sizes = {'RGBA': 4, 'RGB': 3, 'L': 1, 'LA': 2}
bytes_per_pixel = mode_sizes.get(img.mode, 4)
return img.width * img.height * bytes_per_pixel
def _make_key(self, path: Path, target_size: Optional[Tuple[int, int]] = None) -> str:
"""Create cache key from path and optional target size."""
if target_size:
return f"{path}:{target_size[0]}x{target_size[1]}"
return str(path)
def get(self, path: Path, target_size: Optional[Tuple[int, int]] = None) -> Optional[Image.Image]:
"""
Get image from cache.
Args:
path: Path to image file
target_size: Optional target size (width, height)
Returns:
Cached PIL Image or None if not found
"""
key = self._make_key(path, target_size)
with self._lock:
if key in self._cache:
# Move to end (most recently used)
img, size = self._cache.pop(key)
self._cache[key] = (img, size)
logger.debug(f"Cache HIT: {key}")
return img.copy() # Return copy to avoid external modifications
logger.debug(f"Cache MISS: {key}")
return None
def put(self, path: Path, img: Image.Image, target_size: Optional[Tuple[int, int]] = None):
"""
Add image to cache with LRU eviction.
Args:
path: Path to image file
img: PIL Image to cache
target_size: Optional target size used for this variant
"""
key = self._make_key(path, target_size)
img_size = self._estimate_image_size(img)
with self._lock:
# Remove if already exists (update size)
if key in self._cache:
_, old_size = self._cache.pop(key)
self.current_memory_bytes -= old_size
# Evict LRU items if needed
while (self.current_memory_bytes + img_size > self.max_memory_bytes
and len(self._cache) > 0):
evicted_key, (evicted_img, evicted_size) = self._cache.popitem(last=False)
self.current_memory_bytes -= evicted_size
logger.debug(f"Cache EVICT: {evicted_key} ({evicted_size / 1024 / 1024:.1f}MB)")
# Add new image
self._cache[key] = (img.copy(), img_size)
self.current_memory_bytes += img_size
logger.debug(f"Cache PUT: {key} ({img_size / 1024 / 1024:.1f}MB) "
f"[Total: {self.current_memory_bytes / 1024 / 1024:.1f}MB / "
f"{self.max_memory_bytes / 1024 / 1024:.1f}MB, "
f"Items: {len(self._cache)}]")
def clear(self):
"""Clear entire cache."""
with self._lock:
self._cache.clear()
self.current_memory_bytes = 0
logger.info("Cache cleared")
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with self._lock:
return {
'items': len(self._cache),
'memory_mb': self.current_memory_bytes / 1024 / 1024,
'max_memory_mb': self.max_memory_bytes / 1024 / 1024,
'utilization': (self.current_memory_bytes / self.max_memory_bytes) * 100
}
class AsyncImageLoader(QObject):
"""
Asynchronous image loader with priority queue and caching.
Loads images in background threads and emits signals when complete.
Supports concurrent loading, priority-based scheduling, and cancellation.
Example:
loader = AsyncImageLoader()
loader.image_loaded.connect(on_image_ready)
loader.start()
loader.request_load(Path("photo.jpg"), priority=LoadPriority.HIGH)
"""
# Signals for Qt integration
image_loaded = pyqtSignal(object, object, object) # (path, image, user_data)
load_failed = pyqtSignal(object, str, object) # (path, error_msg, user_data)
def __init__(self, cache: Optional[ImageCache] = None, max_workers: int = 4):
"""
Initialize async image loader.
Args:
cache: ImageCache instance (creates new if None)
max_workers: Maximum concurrent worker threads (default 4)
"""
super().__init__()
self.cache = cache or ImageCache()
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix="ImageLoader")
# Priority queue and tracking
self._queue: asyncio.PriorityQueue = None # Created when event loop starts
self._pending_requests: Dict[Path, LoadRequest] = {}
self._active_tasks: Dict[Path, asyncio.Task] = {}
self._next_request_id = 0
self._lock = threading.Lock()
self._shutdown = False
# Event loop for async operations
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
logger.info(f"AsyncImageLoader initialized with {max_workers} workers")
def start(self):
"""Start the async backend event loop."""
if self._loop_thread is not None:
logger.warning("AsyncImageLoader already started")
return
self._shutdown = False
self._loop_thread = threading.Thread(target=self._run_event_loop,
daemon=True,
name="AsyncImageLoader-EventLoop")
self._loop_thread.start()
logger.info("AsyncImageLoader event loop started")
def stop(self):
"""Stop the async backend and cleanup resources."""
if self._loop is None:
return
logger.info("Stopping AsyncImageLoader...")
self._shutdown = True
# Cancel all active tasks
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(self._cancel_all_tasks(), self._loop)
# Stop the event loop
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for thread to finish
if self._loop_thread:
self._loop_thread.join(timeout=5.0)
# Shutdown executor
self.executor.shutdown(wait=True)
logger.info("AsyncImageLoader stopped")
def _run_event_loop(self):
"""Run asyncio event loop in background thread."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
# Create priority queue
self._queue = asyncio.PriorityQueue()
# Start queue processor as background task
self._loop.create_task(self._process_queue())
# Run event loop forever (until stopped)
self._loop.run_forever()
# Cleanup after loop stops
self._loop.close()
async def _process_queue(self):
"""Process load requests from priority queue."""
logger.info("Queue processor started")
while not self._shutdown:
try:
# Wait for request with timeout to check shutdown flag
request = await asyncio.wait_for(self._queue.get(), timeout=0.5)
# Skip if already cancelled
if request.path not in self._pending_requests:
continue
# Process request
task = asyncio.create_task(self._load_image(request))
self._active_tasks[request.path] = task
except asyncio.TimeoutError:
continue # Check shutdown flag
except Exception as e:
logger.error(f"Queue processor error: {e}", exc_info=True)
logger.info("Queue processor stopped")
async def _cancel_all_tasks(self):
"""Cancel all active loading tasks."""
tasks = list(self._active_tasks.values())
for task in tasks:
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._active_tasks.clear()
self._pending_requests.clear()
async def _load_image(self, request: LoadRequest):
"""
Load and process image asynchronously.
Args:
request: LoadRequest containing path, size, and callback info
"""
path = request.path
target_size = request.target_size
try:
# Check cache first
cached_img = self.cache.get(path, target_size)
if cached_img is not None:
logger.debug(f"Loaded from cache: {path}")
self._emit_loaded(path, cached_img, request.user_data)
return
# Load in thread pool (I/O bound)
loop = asyncio.get_event_loop()
img = await loop.run_in_executor(
self.executor,
self._load_and_process_image,
path,
target_size
)
# Cache result
self.cache.put(path, img, target_size)
# Emit success signal
self._emit_loaded(path, img, request.user_data)
logger.debug(f"Loaded: {path} (size: {img.size})")
except Exception as e:
logger.error(f"Failed to load {path}: {e}", exc_info=True)
self._emit_failed(path, str(e), request.user_data)
finally:
# Cleanup tracking
with self._lock:
self._pending_requests.pop(path, None)
self._active_tasks.pop(path, None)
def _load_and_process_image(self, path: Path, target_size: Optional[Tuple[int, int]]) -> Image.Image:
"""
Load image from disk and process (runs in thread pool).
Args:
path: Path to image file
target_size: Optional target size for downsampling
Returns:
Processed PIL Image
"""
# Load image
img = Image.open(path)
# Convert to RGBA for consistency
if img.mode != 'RGBA':
img = img.convert('RGBA')
# Downsample if target size specified
if target_size:
current_size = img.size
if current_size[0] > target_size[0] or current_size[1] > target_size[1]:
img = img.resize(target_size, Image.Resampling.LANCZOS)
logger.debug(f"Downsampled {path}: {current_size} -> {target_size}")
return img
def _emit_loaded(self, path: Path, img: Image.Image, user_data: Any):
"""Emit image_loaded signal (thread-safe)."""
self.image_loaded.emit(path, img, user_data)
def _emit_failed(self, path: Path, error_msg: str, user_data: Any):
"""Emit load_failed signal (thread-safe)."""
self.load_failed.emit(path, error_msg, user_data)
def request_load(self,
path: Path,
priority: LoadPriority = LoadPriority.NORMAL,
target_size: Optional[Tuple[int, int]] = None,
user_data: Any = None) -> bool:
"""
Request image load with specified priority.
Args:
path: Path to image file
priority: Load priority level
target_size: Optional target size (width, height) for downsampling
user_data: Optional user data passed to callback
Returns:
True if request was queued, False if already pending/active
"""
if not self._loop or self._shutdown:
logger.warning("Cannot request load: backend not started")
return False
path = Path(path)
with self._lock:
# Skip if already pending or active
if path in self._pending_requests or path in self._active_tasks:
logger.debug(f"Load already pending: {path}")
return False
# Create request
request = LoadRequest(
priority=priority,
request_id=self._next_request_id,
path=path,
target_size=target_size,
user_data=user_data
)
self._next_request_id += 1
# Track as pending
self._pending_requests[path] = request
# Submit to queue (thread-safe)
asyncio.run_coroutine_threadsafe(
self._queue.put(request),
self._loop
)
logger.debug(f"Queued load: {path} (priority: {priority.name})")
return True
def cancel_load(self, path: Path) -> bool:
"""
Cancel pending image load.
Args:
path: Path to image file
Returns:
True if load was cancelled, False if not found
"""
path = Path(path)
with self._lock:
# Remove from pending
if path in self._pending_requests:
del self._pending_requests[path]
logger.debug(f"Cancelled pending load: {path}")
return True
# Cancel active task
if path in self._active_tasks:
task = self._active_tasks[path]
task.cancel()
logger.debug(f"Cancelled active load: {path}")
return True
return False
def get_stats(self) -> Dict[str, Any]:
"""Get loader statistics."""
with self._lock:
return {
'pending': len(self._pending_requests),
'active': len(self._active_tasks),
'cache': self.cache.get_stats()
}
class AsyncPDFGenerator(QObject):
"""
Asynchronous PDF generator that doesn't block the UI.
Generates PDFs in background thread with progress updates.
Uses shared ImageCache to avoid redundant image loading.
Example:
generator = AsyncPDFGenerator(image_cache)
generator.progress_updated.connect(on_progress)
generator.export_complete.connect(on_complete)
generator.start()
generator.export_pdf(project, "output.pdf")
"""
# Signals for Qt integration
progress_updated = pyqtSignal(int, int, str) # (current, total, message)
export_complete = pyqtSignal(bool, list) # (success, warnings)
export_failed = pyqtSignal(str) # (error_message)
def __init__(self, image_cache: Optional[ImageCache] = None, max_workers: int = 2):
"""
Initialize async PDF generator.
Args:
image_cache: Shared ImageCache instance (creates new if None)
max_workers: Maximum concurrent workers for PDF generation (default 2)
"""
super().__init__()
self.image_cache = image_cache or ImageCache()
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix="PDFGenerator")
# Export state
self._current_export: Optional[asyncio.Task] = None
self._cancel_requested = False
self._lock = threading.Lock()
self._shutdown = False
# Event loop for async operations
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
logger.info(f"AsyncPDFGenerator initialized with {max_workers} workers")
def start(self):
"""Start the async PDF generator event loop."""
if self._loop_thread is not None:
logger.warning("AsyncPDFGenerator already started")
return
self._shutdown = False
self._loop_thread = threading.Thread(target=self._run_event_loop,
daemon=True,
name="AsyncPDFGenerator-EventLoop")
self._loop_thread.start()
logger.info("AsyncPDFGenerator event loop started")
def stop(self):
"""Stop the async PDF generator and cleanup resources."""
if self._loop is None:
return
logger.info("Stopping AsyncPDFGenerator...")
self._shutdown = True
# Cancel active export
if self._current_export and not self._current_export.done():
self._current_export.cancel()
# Stop the event loop
if self._loop and not self._loop.is_closed():
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for thread to finish
if self._loop_thread:
self._loop_thread.join(timeout=5.0)
# Shutdown executor
self.executor.shutdown(wait=True)
logger.info("AsyncPDFGenerator stopped")
def _run_event_loop(self):
"""Run asyncio event loop in background thread."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
# Run event loop forever (until stopped)
self._loop.run_forever()
# Cleanup after loop stops
self._loop.close()
def export_pdf(self, project, output_path: str, export_dpi: int = 300) -> bool:
"""
Request PDF export (non-blocking).
Args:
project: Project instance to export
output_path: Path where PDF should be saved
export_dpi: Target DPI for images (default 300)
Returns:
True if export started, False if already exporting or backend not started
"""
if not self._loop or self._shutdown:
logger.warning("Cannot export: backend not started")
return False
with self._lock:
if self._current_export and not self._current_export.done():
logger.warning("Export already in progress")
return False
self._cancel_requested = False
# Submit export task
self._current_export = asyncio.run_coroutine_threadsafe(
self._export_pdf_async(project, output_path, export_dpi),
self._loop
)
logger.info(f"PDF export started: {output_path}")
return True
def cancel_export(self):
"""Request cancellation of current export."""
with self._lock:
self._cancel_requested = True
if self._current_export and not self._current_export.done():
self._current_export.cancel()
logger.info("PDF export cancellation requested")
async def _export_pdf_async(self, project, output_path: str, export_dpi: int):
"""
Perform PDF export asynchronously.
Args:
project: Project to export
output_path: Output PDF file path
export_dpi: Export DPI setting
"""
try:
# Import PDF exporter (lazy import to avoid circular dependencies)
from pyPhotoAlbum.pdf_exporter import PDFExporter
# Create exporter
exporter = PDFExporter(project, export_dpi=export_dpi)
# Progress callback wrapper
def progress_callback(current, total, message):
if self._cancel_requested:
return False # Signal cancellation
self.progress_updated.emit(current, total, message)
return True
# Run export in thread pool
loop = asyncio.get_event_loop()
success, warnings = await loop.run_in_executor(
self.executor,
self._export_with_cache,
exporter,
output_path,
progress_callback
)
# Emit completion signal
if not self._cancel_requested:
self.export_complete.emit(success, warnings)
logger.info(f"PDF export completed: {output_path} (warnings: {len(warnings)})")
else:
logger.info("PDF export cancelled")
except asyncio.CancelledError:
logger.info("PDF export cancelled by user")
self.export_failed.emit("Export cancelled")
except Exception as e:
logger.error(f"PDF export failed: {e}", exc_info=True)
self.export_failed.emit(str(e))
finally:
with self._lock:
self._current_export = None
def _export_with_cache(self, exporter, output_path: str, progress_callback) -> Tuple[bool, list]:
"""
Run PDF export with image cache integration.
This method patches the exporter to use our cached images.
Args:
exporter: PDFExporter instance
output_path: Output file path
progress_callback: Progress callback function
Returns:
Tuple of (success, warnings)
"""
# Store original Image.open
original_open = Image.open
# Patch Image.open to use cache
def cached_open(path, *args, **kwargs):
# Try cache first
cached_img = self.image_cache.get(Path(path))
if cached_img:
logger.debug(f"PDF using cached image: {path}")
return cached_img
# Load and cache
img = original_open(path, *args, **kwargs)
if img.mode != 'RGBA':
img = img.convert('RGBA')
self.image_cache.put(Path(path), img)
return img
# Temporarily patch Image.open
try:
Image.open = cached_open
return exporter.export(output_path, progress_callback)
finally:
# Restore original
Image.open = original_open
def is_exporting(self) -> bool:
"""Check if export is currently in progress."""
with self._lock:
return (self._current_export is not None
and not self._current_export.done())
def get_stats(self) -> Dict[str, Any]:
"""Get generator statistics."""
with self._lock:
return {
'exporting': self.is_exporting(),
'cache': self.image_cache.get_stats()
}