All checks were successful
Python CI / test (push) Successful in 1m28s
Lint / lint (push) Successful in 1m4s
Tests / test (3.11) (push) Successful in 1m41s
Tests / test (3.12) (push) Successful in 1m42s
Tests / test (3.13) (push) Successful in 1m35s
Tests / test (3.14) (push) Successful in 1m15s
825 lines
26 KiB
Python
825 lines
26 KiB
Python
"""
|
|
Tests for async_backend module
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from unittest.mock import Mock, MagicMock, patch, call
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
|
|
class TestLoadPriority:
|
|
"""Tests for LoadPriority enum"""
|
|
|
|
def test_load_priority_values(self):
|
|
"""Test that LoadPriority enum has correct values"""
|
|
from pyPhotoAlbum.async_backend import LoadPriority
|
|
|
|
assert LoadPriority.LOW.value == 0
|
|
assert LoadPriority.NORMAL.value == 1
|
|
assert LoadPriority.HIGH.value == 2
|
|
assert LoadPriority.URGENT.value == 3
|
|
|
|
def test_load_priority_ordering(self):
|
|
"""Test that LoadPriority values are ordered correctly"""
|
|
from pyPhotoAlbum.async_backend import LoadPriority
|
|
|
|
assert LoadPriority.LOW.value < LoadPriority.NORMAL.value
|
|
assert LoadPriority.NORMAL.value < LoadPriority.HIGH.value
|
|
assert LoadPriority.HIGH.value < LoadPriority.URGENT.value
|
|
|
|
|
|
class TestGetImageDimensions:
|
|
"""Tests for get_image_dimensions function"""
|
|
|
|
def test_get_image_dimensions_simple(self, tmp_path):
|
|
"""Test getting dimensions of a simple image"""
|
|
from pyPhotoAlbum.async_backend import get_image_dimensions
|
|
|
|
# Create a test image
|
|
img = Image.new("RGB", (800, 600), color="red")
|
|
img_path = tmp_path / "test.jpg"
|
|
img.save(img_path)
|
|
|
|
dims = get_image_dimensions(str(img_path))
|
|
|
|
assert dims == (800, 600)
|
|
|
|
def test_get_image_dimensions_with_max_size_width_larger(self, tmp_path):
|
|
"""Test dimensions scaled down when width is larger"""
|
|
from pyPhotoAlbum.async_backend import get_image_dimensions
|
|
|
|
# Create a wide image
|
|
img = Image.new("RGB", (1000, 500), color="blue")
|
|
img_path = tmp_path / "wide.jpg"
|
|
img.save(img_path)
|
|
|
|
dims = get_image_dimensions(str(img_path), max_size=300)
|
|
|
|
# Should be scaled down to fit within 300
|
|
assert dims == (300, 150)
|
|
|
|
def test_get_image_dimensions_with_max_size_height_larger(self, tmp_path):
|
|
"""Test dimensions scaled down when height is larger"""
|
|
from pyPhotoAlbum.async_backend import get_image_dimensions
|
|
|
|
# Create a tall image
|
|
img = Image.new("RGB", (500, 1000), color="green")
|
|
img_path = tmp_path / "tall.jpg"
|
|
img.save(img_path)
|
|
|
|
dims = get_image_dimensions(str(img_path), max_size=300)
|
|
|
|
# Should be scaled down to fit within 300
|
|
assert dims == (150, 300)
|
|
|
|
def test_get_image_dimensions_already_smaller_than_max(self, tmp_path):
|
|
"""Test dimensions not scaled when already smaller than max"""
|
|
from pyPhotoAlbum.async_backend import get_image_dimensions
|
|
|
|
img = Image.new("RGB", (200, 150), color="yellow")
|
|
img_path = tmp_path / "small.jpg"
|
|
img.save(img_path)
|
|
|
|
dims = get_image_dimensions(str(img_path), max_size=300)
|
|
|
|
# Should remain the same
|
|
assert dims == (200, 150)
|
|
|
|
def test_get_image_dimensions_invalid_file(self):
|
|
"""Test get_image_dimensions with invalid file returns None"""
|
|
from pyPhotoAlbum.async_backend import get_image_dimensions
|
|
|
|
dims = get_image_dimensions("/nonexistent/file.jpg")
|
|
|
|
assert dims is None
|
|
|
|
def test_get_image_dimensions_not_an_image(self, tmp_path):
|
|
"""Test get_image_dimensions with non-image file returns None"""
|
|
from pyPhotoAlbum.async_backend import get_image_dimensions
|
|
|
|
text_file = tmp_path / "not_image.txt"
|
|
text_file.write_text("This is not an image")
|
|
|
|
dims = get_image_dimensions(str(text_file))
|
|
|
|
assert dims is None
|
|
|
|
|
|
class TestLoadRequest:
|
|
"""Tests for LoadRequest dataclass"""
|
|
|
|
def test_load_request_creation(self):
|
|
"""Test creating a LoadRequest"""
|
|
from pyPhotoAlbum.async_backend import LoadRequest, LoadPriority
|
|
|
|
request = LoadRequest(
|
|
priority=LoadPriority.HIGH,
|
|
request_id=1,
|
|
path=Path("/test/image.jpg"),
|
|
target_size=(300, 300),
|
|
callback=None,
|
|
user_data={"test": "data"},
|
|
)
|
|
|
|
assert request.priority == LoadPriority.HIGH
|
|
assert request.request_id == 1
|
|
assert request.path == Path("/test/image.jpg")
|
|
assert request.target_size == (300, 300)
|
|
assert request.user_data == {"test": "data"}
|
|
|
|
def test_load_request_ordering_by_priority(self):
|
|
"""Test that LoadRequests are ordered by priority (fixed with IntEnum)"""
|
|
from pyPhotoAlbum.async_backend import LoadRequest, LoadPriority
|
|
|
|
req1 = LoadRequest(priority=LoadPriority.LOW, request_id=1, path=Path("/a.jpg"))
|
|
req2 = LoadRequest(priority=LoadPriority.HIGH, request_id=2, path=Path("/b.jpg"))
|
|
|
|
# LOW priority (value 0) should be < HIGH priority (value 2) in the priority queue
|
|
# This means LOW will be processed before HIGH (priority queue uses min-heap)
|
|
assert req1 < req2
|
|
|
|
def test_load_request_ordering_by_id_when_same_priority(self):
|
|
"""Test that LoadRequests with same priority are ordered by request_id"""
|
|
from pyPhotoAlbum.async_backend import LoadRequest, LoadPriority
|
|
|
|
req1 = LoadRequest(priority=LoadPriority.NORMAL, request_id=1, path=Path("/a.jpg"))
|
|
req2 = LoadRequest(priority=LoadPriority.NORMAL, request_id=2, path=Path("/b.jpg"))
|
|
|
|
assert req1 < req2
|
|
|
|
|
|
class TestImageCache:
|
|
"""Tests for ImageCache class"""
|
|
|
|
def test_image_cache_init(self):
|
|
"""Test ImageCache initialization"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache(max_memory_mb=256)
|
|
|
|
assert cache.max_memory_bytes == 256 * 1024 * 1024
|
|
assert cache.current_memory_bytes == 0
|
|
|
|
def test_image_cache_estimate_image_size_rgba(self):
|
|
"""Test estimating RGBA image size"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img = Image.new("RGBA", (100, 100))
|
|
|
|
size = cache._estimate_image_size(img)
|
|
|
|
# 100 * 100 * 4 bytes (RGBA)
|
|
assert size == 40000
|
|
|
|
def test_image_cache_estimate_image_size_rgb(self):
|
|
"""Test estimating RGB image size"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img = Image.new("RGB", (100, 100))
|
|
|
|
size = cache._estimate_image_size(img)
|
|
|
|
# 100 * 100 * 3 bytes (RGB)
|
|
assert size == 30000
|
|
|
|
def test_image_cache_make_key_without_size(self):
|
|
"""Test making cache key without target size"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
key = cache._make_key(Path("/test/image.jpg"))
|
|
|
|
assert key == "/test/image.jpg"
|
|
|
|
def test_image_cache_make_key_with_size(self):
|
|
"""Test making cache key with target size"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
key = cache._make_key(Path("/test/image.jpg"), (300, 300))
|
|
|
|
assert key == "/test/image.jpg:300x300"
|
|
|
|
def test_image_cache_put_and_get(self):
|
|
"""Test putting and getting image from cache"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img = Image.new("RGB", (100, 100), color="red")
|
|
path = Path("/test/image.jpg")
|
|
|
|
cache.put(path, img)
|
|
cached_img = cache.get(path)
|
|
|
|
assert cached_img is not None
|
|
assert cached_img.size == img.size
|
|
assert cached_img.mode == img.mode
|
|
|
|
def test_image_cache_get_returns_copy(self):
|
|
"""Test that get returns a copy of the image"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img = Image.new("RGB", (100, 100))
|
|
path = Path("/test/image.jpg")
|
|
|
|
cache.put(path, img)
|
|
cached_img = cache.get(path)
|
|
|
|
# Modify the cached image
|
|
cached_img.putpixel((0, 0), (255, 0, 0))
|
|
|
|
# Get it again - should be unchanged
|
|
cached_img2 = cache.get(path)
|
|
assert cached_img2.getpixel((0, 0)) != (255, 0, 0)
|
|
|
|
def test_image_cache_miss(self):
|
|
"""Test cache miss returns None"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
cached_img = cache.get(Path("/nonexistent.jpg"))
|
|
|
|
assert cached_img is None
|
|
|
|
def test_image_cache_different_sizes_different_keys(self):
|
|
"""Test that different target sizes use different cache keys"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img1 = Image.new("RGB", (100, 100), color="red")
|
|
img2 = Image.new("RGB", (50, 50), color="blue")
|
|
path = Path("/test/image.jpg")
|
|
|
|
cache.put(path, img1, target_size=None)
|
|
cache.put(path, img2, target_size=(50, 50))
|
|
|
|
cached_full = cache.get(path, target_size=None)
|
|
cached_small = cache.get(path, target_size=(50, 50))
|
|
|
|
assert cached_full.size == (100, 100)
|
|
assert cached_small.size == (50, 50)
|
|
|
|
def test_image_cache_lru_eviction(self):
|
|
"""Test that LRU items are evicted when cache is full"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
# Small cache that can hold only 1 small image
|
|
cache = ImageCache(max_memory_mb=1)
|
|
|
|
# Create images that will fill the cache
|
|
img1 = Image.new("RGB", (500, 500)) # ~750KB
|
|
img2 = Image.new("RGB", (500, 500)) # ~750KB
|
|
|
|
# Add img1
|
|
cache.put(Path("/img1.jpg"), img1)
|
|
assert cache.get(Path("/img1.jpg")) is not None
|
|
|
|
# Add img2 - should evict img1 due to memory limit
|
|
cache.put(Path("/img2.jpg"), img2)
|
|
|
|
# img1 should be evicted to make room for img2
|
|
assert cache.get(Path("/img1.jpg")) is None
|
|
# img2 should be there
|
|
assert cache.get(Path("/img2.jpg")) is not None
|
|
|
|
def test_image_cache_update_existing(self):
|
|
"""Test updating an existing cache entry"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img1 = Image.new("RGB", (100, 100), color="red")
|
|
img2 = Image.new("RGB", (200, 200), color="blue")
|
|
path = Path("/test/image.jpg")
|
|
|
|
cache.put(path, img1)
|
|
cache.put(path, img2) # Update
|
|
|
|
cached = cache.get(path)
|
|
assert cached.size == (200, 200)
|
|
|
|
def test_image_cache_clear(self):
|
|
"""Test clearing the cache"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img = Image.new("RGB", (100, 100))
|
|
|
|
cache.put(Path("/img1.jpg"), img)
|
|
cache.put(Path("/img2.jpg"), img)
|
|
|
|
cache.clear()
|
|
|
|
assert cache.current_memory_bytes == 0
|
|
assert cache.get(Path("/img1.jpg")) is None
|
|
assert cache.get(Path("/img2.jpg")) is None
|
|
|
|
def test_image_cache_get_stats(self):
|
|
"""Test getting cache statistics"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache(max_memory_mb=100)
|
|
img = Image.new("RGB", (100, 100))
|
|
|
|
cache.put(Path("/img1.jpg"), img)
|
|
cache.put(Path("/img2.jpg"), img)
|
|
|
|
stats = cache.get_stats()
|
|
|
|
assert stats["items"] == 2
|
|
assert stats["memory_mb"] > 0
|
|
assert stats["max_memory_mb"] == 100
|
|
assert 0 <= stats["utilization"] <= 100
|
|
|
|
def test_image_cache_thread_safety(self):
|
|
"""Test that cache operations are thread-safe"""
|
|
from pyPhotoAlbum.async_backend import ImageCache
|
|
|
|
cache = ImageCache()
|
|
img = Image.new("RGB", (50, 50))
|
|
|
|
def put_images(start):
|
|
for i in range(start, start + 10):
|
|
cache.put(Path(f"/img{i}.jpg"), img)
|
|
|
|
def get_images(start):
|
|
for i in range(start, start + 10):
|
|
cache.get(Path(f"/img{i}.jpg"))
|
|
|
|
threads = []
|
|
for i in range(5):
|
|
t1 = threading.Thread(target=put_images, args=(i * 10,))
|
|
t2 = threading.Thread(target=get_images, args=(i * 10,))
|
|
threads.extend([t1, t2])
|
|
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Should not crash
|
|
assert cache.current_memory_bytes >= 0
|
|
|
|
|
|
class TestAsyncImageLoader:
|
|
"""Tests for AsyncImageLoader class"""
|
|
|
|
def test_async_image_loader_init(self):
|
|
"""Test AsyncImageLoader initialization"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader, ImageCache
|
|
|
|
cache = ImageCache()
|
|
loader = AsyncImageLoader(cache=cache, max_workers=2)
|
|
|
|
assert loader.cache is cache
|
|
assert loader.max_workers == 2
|
|
assert loader._shutdown is False
|
|
|
|
def test_async_image_loader_init_creates_cache(self):
|
|
"""Test AsyncImageLoader creates cache if not provided"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
|
|
assert loader.cache is not None
|
|
|
|
def test_async_image_loader_start(self):
|
|
"""Test starting AsyncImageLoader"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
loader.start()
|
|
|
|
# Give it time to start
|
|
time.sleep(0.1)
|
|
|
|
assert loader._loop is not None
|
|
assert loader._loop_thread is not None
|
|
assert loader._loop_thread.is_alive()
|
|
|
|
loader.stop()
|
|
|
|
def test_async_image_loader_start_twice(self):
|
|
"""Test starting AsyncImageLoader twice doesn't create multiple threads"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
loader.start()
|
|
time.sleep(0.1)
|
|
|
|
thread1 = loader._loop_thread
|
|
|
|
loader.start() # Should warn but not create new thread
|
|
time.sleep(0.1)
|
|
|
|
assert loader._loop_thread is thread1
|
|
|
|
loader.stop()
|
|
|
|
def test_async_image_loader_stop(self):
|
|
"""Test stopping AsyncImageLoader"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
loader.start()
|
|
time.sleep(0.1)
|
|
|
|
loader.stop()
|
|
time.sleep(0.2)
|
|
|
|
assert loader._shutdown is True
|
|
|
|
def test_async_image_loader_load_and_process_image(self, tmp_path):
|
|
"""Test _load_and_process_image method"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
# Create test image
|
|
img = Image.new("RGB", (800, 600), color="blue")
|
|
img_path = tmp_path / "test.jpg"
|
|
img.save(img_path)
|
|
|
|
loader = AsyncImageLoader()
|
|
result = loader._load_and_process_image(img_path, None)
|
|
|
|
assert result is not None
|
|
assert result.mode == "RGBA" # Should be converted to RGBA
|
|
|
|
def test_async_image_loader_load_and_process_image_with_resize(self, tmp_path):
|
|
"""Test _load_and_process_image with target size"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
# Create large test image
|
|
img = Image.new("RGB", (2000, 1500), color="green")
|
|
img_path = tmp_path / "large.jpg"
|
|
img.save(img_path)
|
|
|
|
loader = AsyncImageLoader()
|
|
result = loader._load_and_process_image(img_path, (500, 500))
|
|
|
|
assert result is not None
|
|
# Should be resized to fit within 500x500
|
|
assert result.size[0] <= 500
|
|
assert result.size[1] <= 500
|
|
|
|
def test_async_image_loader_emit_loaded(self, qtbot):
|
|
"""Test _emit_loaded signal"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
|
|
signal_received = []
|
|
|
|
def on_loaded(path, img, user_data):
|
|
signal_received.append((path, img, user_data))
|
|
|
|
loader.image_loaded.connect(on_loaded)
|
|
|
|
mock_img = Mock()
|
|
user_data = {"test": "data"}
|
|
|
|
loader._emit_loaded(Path("/test.jpg"), mock_img, user_data)
|
|
|
|
assert len(signal_received) == 1
|
|
assert signal_received[0][0] == Path("/test.jpg")
|
|
assert signal_received[0][2] == user_data
|
|
|
|
def test_async_image_loader_emit_failed(self, qtbot):
|
|
"""Test _emit_failed signal"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
|
|
signal_received = []
|
|
|
|
def on_failed(path, error, user_data):
|
|
signal_received.append((path, error, user_data))
|
|
|
|
loader.load_failed.connect(on_failed)
|
|
|
|
user_data = {"test": "data"}
|
|
|
|
loader._emit_failed(Path("/test.jpg"), "Error message", user_data)
|
|
|
|
assert len(signal_received) == 1
|
|
assert signal_received[0][0] == Path("/test.jpg")
|
|
assert signal_received[0][1] == "Error message"
|
|
|
|
def test_async_image_loader_request_load_not_started(self):
|
|
"""Test request_load when loader not started"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader, LoadPriority
|
|
|
|
loader = AsyncImageLoader()
|
|
|
|
result = loader.request_load(Path("/test.jpg"), priority=LoadPriority.HIGH)
|
|
|
|
assert result is False
|
|
|
|
def test_async_image_loader_request_load_success(self, tmp_path):
|
|
"""Test successful request_load"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader, LoadPriority
|
|
|
|
# Create test image
|
|
img = Image.new("RGB", (100, 100), color="red")
|
|
img_path = tmp_path / "test.jpg"
|
|
img.save(img_path)
|
|
|
|
loader = AsyncImageLoader()
|
|
loader.start()
|
|
time.sleep(0.1)
|
|
|
|
result = loader.request_load(img_path, priority=LoadPriority.HIGH)
|
|
|
|
assert result is True
|
|
|
|
loader.stop()
|
|
|
|
def test_async_image_loader_request_load_duplicate(self, tmp_path):
|
|
"""Test requesting same image twice returns False"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader, LoadPriority
|
|
|
|
img = Image.new("RGB", (100, 100))
|
|
img_path = tmp_path / "test.jpg"
|
|
img.save(img_path)
|
|
|
|
loader = AsyncImageLoader()
|
|
loader.start()
|
|
time.sleep(0.1)
|
|
|
|
result1 = loader.request_load(img_path, priority=LoadPriority.HIGH)
|
|
result2 = loader.request_load(img_path, priority=LoadPriority.HIGH)
|
|
|
|
assert result1 is True
|
|
assert result2 is False # Already pending
|
|
|
|
loader.stop()
|
|
|
|
def test_async_image_loader_cancel_pending(self, tmp_path):
|
|
"""Test canceling a pending load request"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader, LoadPriority
|
|
|
|
img = Image.new("RGB", (100, 100))
|
|
img_path = tmp_path / "test.jpg"
|
|
img.save(img_path)
|
|
|
|
loader = AsyncImageLoader()
|
|
loader.start()
|
|
time.sleep(0.1)
|
|
|
|
loader.request_load(img_path, priority=LoadPriority.LOW)
|
|
result = loader.cancel_load(img_path)
|
|
|
|
assert result is True
|
|
|
|
loader.stop()
|
|
|
|
def test_async_image_loader_cancel_nonexistent(self):
|
|
"""Test canceling a non-existent load request"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
|
|
result = loader.cancel_load(Path("/nonexistent.jpg"))
|
|
|
|
assert result is False
|
|
|
|
def test_async_image_loader_get_stats(self):
|
|
"""Test getting loader statistics"""
|
|
from pyPhotoAlbum.async_backend import AsyncImageLoader
|
|
|
|
loader = AsyncImageLoader()
|
|
|
|
stats = loader.get_stats()
|
|
|
|
assert "pending" in stats
|
|
assert "active" in stats
|
|
assert "cache" in stats
|
|
|
|
|
|
class TestAsyncPDFGenerator:
|
|
"""Tests for AsyncPDFGenerator class"""
|
|
|
|
def test_async_pdf_generator_init(self):
|
|
"""Test AsyncPDFGenerator initialization"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator, ImageCache
|
|
|
|
cache = ImageCache()
|
|
generator = AsyncPDFGenerator(image_cache=cache, max_workers=1)
|
|
|
|
assert generator.image_cache is cache
|
|
assert generator.max_workers == 1
|
|
assert generator._shutdown is False
|
|
|
|
def test_async_pdf_generator_init_creates_cache(self):
|
|
"""Test AsyncPDFGenerator creates cache if not provided"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
assert generator.image_cache is not None
|
|
|
|
def test_async_pdf_generator_start(self):
|
|
"""Test starting AsyncPDFGenerator"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
generator.start()
|
|
|
|
time.sleep(0.1)
|
|
|
|
assert generator._loop is not None
|
|
assert generator._loop_thread is not None
|
|
assert generator._loop_thread.is_alive()
|
|
|
|
generator.stop()
|
|
|
|
def test_async_pdf_generator_start_twice(self):
|
|
"""Test starting AsyncPDFGenerator twice doesn't create multiple threads"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
generator.start()
|
|
time.sleep(0.1)
|
|
|
|
thread1 = generator._loop_thread
|
|
|
|
generator.start() # Should warn
|
|
time.sleep(0.1)
|
|
|
|
assert generator._loop_thread is thread1
|
|
|
|
generator.stop()
|
|
|
|
def test_async_pdf_generator_stop(self):
|
|
"""Test stopping AsyncPDFGenerator"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
generator.start()
|
|
time.sleep(0.1)
|
|
|
|
generator.stop()
|
|
time.sleep(0.2)
|
|
|
|
assert generator._shutdown is True
|
|
|
|
def test_async_pdf_generator_export_not_started(self):
|
|
"""Test export_pdf when generator not started"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
mock_project = Mock()
|
|
|
|
result = generator.export_pdf(mock_project, "/output.pdf")
|
|
|
|
assert result is False
|
|
|
|
def test_async_pdf_generator_export_already_exporting(self):
|
|
"""Test export_pdf when already exporting"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
generator.start()
|
|
time.sleep(0.1)
|
|
|
|
mock_project = Mock()
|
|
|
|
# Start first export
|
|
generator._current_export = Mock()
|
|
generator._current_export.done.return_value = False
|
|
|
|
result = generator.export_pdf(mock_project, "/output.pdf")
|
|
|
|
assert result is False
|
|
|
|
generator.stop()
|
|
|
|
def test_async_pdf_generator_cancel_export(self):
|
|
"""Test cancel_export method"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
# Mock an active export
|
|
generator._current_export = Mock()
|
|
generator._current_export.done.return_value = False
|
|
|
|
generator.cancel_export()
|
|
|
|
assert generator._cancel_requested is True
|
|
generator._current_export.cancel.assert_called_once()
|
|
|
|
def test_async_pdf_generator_is_exporting_true(self):
|
|
"""Test is_exporting returns True when exporting"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
generator._current_export = Mock()
|
|
generator._current_export.done.return_value = False
|
|
|
|
assert generator.is_exporting() is True
|
|
|
|
def test_async_pdf_generator_is_exporting_false(self):
|
|
"""Test is_exporting returns False when not exporting"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
assert generator.is_exporting() is False
|
|
|
|
def test_async_pdf_generator_get_stats(self):
|
|
"""Test getting generator statistics"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
stats = generator.get_stats()
|
|
|
|
assert "exporting" in stats
|
|
assert "cache" in stats
|
|
|
|
def test_async_pdf_generator_export_with_cache_uses_cache(self):
|
|
"""Test _export_with_cache uses cached images"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
from PIL import Image
|
|
from unittest.mock import patch
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
# Mock exporter that tries to open an image
|
|
mock_exporter = Mock()
|
|
mock_exporter.export.return_value = (True, [])
|
|
|
|
def mock_progress(current, total, msg):
|
|
return True
|
|
|
|
# Run export (just verify the method exists and can be called)
|
|
with patch('PIL.Image.open') as mock_open:
|
|
mock_img = Image.new("RGBA", (50, 50), color="blue")
|
|
mock_open.return_value = mock_img
|
|
|
|
success, warnings = generator._export_with_cache(mock_exporter, "/fake/output.pdf", mock_progress)
|
|
|
|
assert success is True
|
|
mock_exporter.export.assert_called_once()
|
|
|
|
def test_async_pdf_generator_progress_signal(self, qtbot):
|
|
"""Test progress_updated signal"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
signal_received = []
|
|
|
|
def on_progress(current, total, message):
|
|
signal_received.append((current, total, message))
|
|
|
|
generator.progress_updated.connect(on_progress)
|
|
|
|
generator.progress_updated.emit(5, 10, "Processing page 5")
|
|
|
|
assert len(signal_received) == 1
|
|
assert signal_received[0] == (5, 10, "Processing page 5")
|
|
|
|
def test_async_pdf_generator_complete_signal(self, qtbot):
|
|
"""Test export_complete signal"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
signal_received = []
|
|
|
|
def on_complete(success, warnings):
|
|
signal_received.append((success, warnings))
|
|
|
|
generator.export_complete.connect(on_complete)
|
|
|
|
generator.export_complete.emit(True, ["warning1"])
|
|
|
|
assert len(signal_received) == 1
|
|
assert signal_received[0] == (True, ["warning1"])
|
|
|
|
def test_async_pdf_generator_failed_signal(self, qtbot):
|
|
"""Test export_failed signal"""
|
|
from pyPhotoAlbum.async_backend import AsyncPDFGenerator
|
|
|
|
generator = AsyncPDFGenerator()
|
|
|
|
signal_received = []
|
|
|
|
def on_failed(error_msg):
|
|
signal_received.append(error_msg)
|
|
|
|
generator.export_failed.connect(on_failed)
|
|
|
|
generator.export_failed.emit("Export failed")
|
|
|
|
assert len(signal_received) == 1
|
|
assert signal_received[0] == "Export failed"
|