Compare commits
No commits in common. "7d8d2d42f85ad9f71fc6058f94efc94193d3dfa5" and "aa3eacf627104008d37d2468230a5837c89774eb" have entirely different histories.
7d8d2d42f8
...
aa3eacf627
@ -8,323 +8,323 @@ from pyPhotoAlbum.models import BaseLayoutElement
|
|||||||
|
|
||||||
class AlignmentManager:
|
class AlignmentManager:
|
||||||
"""Manages alignment and distribution operations on multiple elements"""
|
"""Manages alignment and distribution operations on multiple elements"""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_bounds(elements: List[BaseLayoutElement]) -> Tuple[float, float, float, float]:
|
def get_bounds(elements: List[BaseLayoutElement]) -> Tuple[float, float, float, float]:
|
||||||
"""
|
"""
|
||||||
Get the bounding box of multiple elements.
|
Get the bounding box of multiple elements.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(min_x, min_y, max_x, max_y)
|
(min_x, min_y, max_x, max_y)
|
||||||
"""
|
"""
|
||||||
if not elements:
|
if not elements:
|
||||||
return (0, 0, 0, 0)
|
return (0, 0, 0, 0)
|
||||||
|
|
||||||
min_x = min(elem.position[0] for elem in elements)
|
min_x = min(elem.position[0] for elem in elements)
|
||||||
min_y = min(elem.position[1] for elem in elements)
|
min_y = min(elem.position[1] for elem in elements)
|
||||||
max_x = max(elem.position[0] + elem.size[0] for elem in elements)
|
max_x = max(elem.position[0] + elem.size[0] for elem in elements)
|
||||||
max_y = max(elem.position[1] + elem.size[1] for elem in elements)
|
max_y = max(elem.position[1] + elem.size[1] for elem in elements)
|
||||||
|
|
||||||
return (min_x, min_y, max_x, max_y)
|
return (min_x, min_y, max_x, max_y)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def align_left(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def align_left(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Align all elements to the leftmost element.
|
Align all elements to the leftmost element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
min_x = min(elem.position[0] for elem in elements)
|
min_x = min(elem.position[0] for elem in elements)
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements:
|
for elem in elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
elem.position = (min_x, elem.position[1])
|
elem.position = (min_x, elem.position[1])
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def align_right(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def align_right(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Align all elements to the rightmost element.
|
Align all elements to the rightmost element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
max_right = max(elem.position[0] + elem.size[0] for elem in elements)
|
max_right = max(elem.position[0] + elem.size[0] for elem in elements)
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements:
|
for elem in elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
new_x = max_right - elem.size[0]
|
new_x = max_right - elem.size[0]
|
||||||
elem.position = (new_x, elem.position[1])
|
elem.position = (new_x, elem.position[1])
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def align_top(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def align_top(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Align all elements to the topmost element.
|
Align all elements to the topmost element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
min_y = min(elem.position[1] for elem in elements)
|
min_y = min(elem.position[1] for elem in elements)
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements:
|
for elem in elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
elem.position = (elem.position[0], min_y)
|
elem.position = (elem.position[0], min_y)
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def align_bottom(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def align_bottom(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Align all elements to the bottommost element.
|
Align all elements to the bottommost element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
max_bottom = max(elem.position[1] + elem.size[1] for elem in elements)
|
max_bottom = max(elem.position[1] + elem.size[1] for elem in elements)
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements:
|
for elem in elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
new_y = max_bottom - elem.size[1]
|
new_y = max_bottom - elem.size[1]
|
||||||
elem.position = (elem.position[0], new_y)
|
elem.position = (elem.position[0], new_y)
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def align_horizontal_center(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def align_horizontal_center(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Align all elements to horizontal center.
|
Align all elements to horizontal center.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Calculate average center
|
# Calculate average center
|
||||||
centers = [elem.position[0] + elem.size[0] / 2 for elem in elements]
|
centers = [elem.position[0] + elem.size[0] / 2 for elem in elements]
|
||||||
avg_center = sum(centers) / len(centers)
|
avg_center = sum(centers) / len(centers)
|
||||||
|
|
||||||
changes = []
|
changes = []
|
||||||
for elem in elements:
|
for elem in elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
new_x = avg_center - elem.size[0] / 2
|
new_x = avg_center - elem.size[0] / 2
|
||||||
elem.position = (new_x, elem.position[1])
|
elem.position = (new_x, elem.position[1])
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def align_vertical_center(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def align_vertical_center(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Align all elements to vertical center.
|
Align all elements to vertical center.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Calculate average center
|
# Calculate average center
|
||||||
centers = [elem.position[1] + elem.size[1] / 2 for elem in elements]
|
centers = [elem.position[1] + elem.size[1] / 2 for elem in elements]
|
||||||
avg_center = sum(centers) / len(centers)
|
avg_center = sum(centers) / len(centers)
|
||||||
|
|
||||||
changes = []
|
changes = []
|
||||||
for elem in elements:
|
for elem in elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
new_y = avg_center - elem.size[1] / 2
|
new_y = avg_center - elem.size[1] / 2
|
||||||
elem.position = (elem.position[0], new_y)
|
elem.position = (elem.position[0], new_y)
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_same_size(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
def make_same_size(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Make all elements the same size as the first element.
|
Make all elements the same size as the first element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position, old_size) tuples for undo
|
List of (element, old_position, old_size) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
target_size = elements[0].size
|
target_size = elements[0].size
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements[1:]:
|
for elem in elements[1:]:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
old_size = elem.size
|
old_size = elem.size
|
||||||
elem.size = target_size
|
elem.size = target_size
|
||||||
changes.append((elem, old_pos, old_size))
|
changes.append((elem, old_pos, old_size))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_same_width(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
def make_same_width(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Make all elements the same width as the first element.
|
Make all elements the same width as the first element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position, old_size) tuples for undo
|
List of (element, old_position, old_size) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
target_width = elements[0].size[0]
|
target_width = elements[0].size[0]
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements[1:]:
|
for elem in elements[1:]:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
old_size = elem.size
|
old_size = elem.size
|
||||||
elem.size = (target_width, elem.size[1])
|
elem.size = (target_width, elem.size[1])
|
||||||
changes.append((elem, old_pos, old_size))
|
changes.append((elem, old_pos, old_size))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_same_height(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
def make_same_height(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Make all elements the same height as the first element.
|
Make all elements the same height as the first element.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position, old_size) tuples for undo
|
List of (element, old_position, old_size) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 2:
|
if len(elements) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
target_height = elements[0].size[1]
|
target_height = elements[0].size[1]
|
||||||
changes = []
|
changes = []
|
||||||
|
|
||||||
for elem in elements[1:]:
|
for elem in elements[1:]:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
old_size = elem.size
|
old_size = elem.size
|
||||||
elem.size = (elem.size[0], target_height)
|
elem.size = (elem.size[0], target_height)
|
||||||
changes.append((elem, old_pos, old_size))
|
changes.append((elem, old_pos, old_size))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def distribute_horizontally(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def distribute_horizontally(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Distribute elements evenly across horizontal span.
|
Distribute elements evenly across horizontal span.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 3:
|
if len(elements) < 3:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Sort by x position
|
# Sort by x position
|
||||||
sorted_elements = sorted(elements, key=lambda e: e.position[0])
|
sorted_elements = sorted(elements, key=lambda e: e.position[0])
|
||||||
|
|
||||||
# Get leftmost and rightmost positions
|
# Get leftmost and rightmost positions
|
||||||
min_x = sorted_elements[0].position[0]
|
min_x = sorted_elements[0].position[0]
|
||||||
max_x = sorted_elements[-1].position[0]
|
max_x = sorted_elements[-1].position[0]
|
||||||
|
|
||||||
# Calculate spacing between centers
|
# Calculate spacing between centers
|
||||||
total_span = max_x - min_x
|
total_span = max_x - min_x
|
||||||
spacing = total_span / (len(sorted_elements) - 1)
|
spacing = total_span / (len(sorted_elements) - 1)
|
||||||
|
|
||||||
changes = []
|
changes = []
|
||||||
for i, elem in enumerate(sorted_elements):
|
for i, elem in enumerate(sorted_elements):
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
new_x = min_x + (i * spacing)
|
new_x = min_x + (i * spacing)
|
||||||
elem.position = (new_x, elem.position[1])
|
elem.position = (new_x, elem.position[1])
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def distribute_vertically(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def distribute_vertically(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Distribute elements evenly across vertical span.
|
Distribute elements evenly across vertical span.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 3:
|
if len(elements) < 3:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Sort by y position
|
# Sort by y position
|
||||||
sorted_elements = sorted(elements, key=lambda e: e.position[1])
|
sorted_elements = sorted(elements, key=lambda e: e.position[1])
|
||||||
|
|
||||||
# Get topmost and bottommost positions
|
# Get topmost and bottommost positions
|
||||||
min_y = sorted_elements[0].position[1]
|
min_y = sorted_elements[0].position[1]
|
||||||
max_y = sorted_elements[-1].position[1]
|
max_y = sorted_elements[-1].position[1]
|
||||||
|
|
||||||
# Calculate spacing between centers
|
# Calculate spacing between centers
|
||||||
total_span = max_y - min_y
|
total_span = max_y - min_y
|
||||||
spacing = total_span / (len(sorted_elements) - 1)
|
spacing = total_span / (len(sorted_elements) - 1)
|
||||||
|
|
||||||
changes = []
|
changes = []
|
||||||
for i, elem in enumerate(sorted_elements):
|
for i, elem in enumerate(sorted_elements):
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
new_y = min_y + (i * spacing)
|
new_y = min_y + (i * spacing)
|
||||||
elem.position = (elem.position[0], new_y)
|
elem.position = (elem.position[0], new_y)
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def space_horizontally(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def space_horizontally(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
Distribute elements with equal spacing between them horizontally.
|
Distribute elements with equal spacing between them horizontally.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of (element, old_position) tuples for undo
|
List of (element, old_position) tuples for undo
|
||||||
"""
|
"""
|
||||||
if len(elements) < 3:
|
if len(elements) < 3:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Sort by x position
|
# Sort by x position
|
||||||
sorted_elements = sorted(elements, key=lambda e: e.position[0])
|
sorted_elements = sorted(elements, key=lambda e: e.position[0])
|
||||||
|
|
||||||
# Get leftmost and rightmost boundaries
|
# Get leftmost and rightmost boundaries
|
||||||
min_x = sorted_elements[0].position[0]
|
min_x = sorted_elements[0].position[0]
|
||||||
max_right = sorted_elements[-1].position[0] + sorted_elements[-1].size[0]
|
max_right = sorted_elements[-1].position[0] + sorted_elements[-1].size[0]
|
||||||
|
|
||||||
# Calculate total width of all elements
|
# Calculate total width of all elements
|
||||||
total_width = sum(elem.size[0] for elem in sorted_elements)
|
total_width = sum(elem.size[0] for elem in sorted_elements)
|
||||||
|
|
||||||
# Calculate available space and spacing
|
# Calculate available space and spacing
|
||||||
available_space = max_right - min_x - total_width
|
available_space = max_right - min_x - total_width
|
||||||
spacing = available_space / (len(sorted_elements) - 1)
|
spacing = available_space / (len(sorted_elements) - 1)
|
||||||
|
|
||||||
changes = []
|
changes = []
|
||||||
current_x = min_x
|
current_x = min_x
|
||||||
|
|
||||||
for elem in sorted_elements:
|
for elem in sorted_elements:
|
||||||
old_pos = elem.position
|
old_pos = elem.position
|
||||||
elem.position = (current_x, elem.position[1])
|
elem.position = (current_x, elem.position[1])
|
||||||
changes.append((elem, old_pos))
|
changes.append((elem, old_pos))
|
||||||
current_x += elem.size[0] + spacing
|
current_x += elem.size[0] + spacing
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def space_vertically(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
def space_vertically(elements: List[BaseLayoutElement]) -> List[Tuple[BaseLayoutElement, Tuple[float, float]]]:
|
||||||
"""
|
"""
|
||||||
@ -444,182 +444,3 @@ class AlignmentManager:
|
|||||||
element.size = (new_width, new_height)
|
element.size = (new_width, new_height)
|
||||||
|
|
||||||
return (element, old_pos, old_size)
|
return (element, old_pos, old_size)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def maximize_pattern(
|
|
||||||
elements: List[BaseLayoutElement],
|
|
||||||
page_size: Tuple[float, float],
|
|
||||||
min_gap: float = 2.0,
|
|
||||||
max_iterations: int = 100,
|
|
||||||
growth_rate: float = 0.05
|
|
||||||
) -> List[Tuple[BaseLayoutElement, Tuple[float, float], Tuple[float, float]]]:
|
|
||||||
"""
|
|
||||||
Maximize element sizes using a crystal growth algorithm.
|
|
||||||
Elements grow until they are close to borders or each other.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
elements: List of elements to maximize
|
|
||||||
page_size: (width, height) of the page in mm
|
|
||||||
min_gap: Minimum gap to maintain between elements and borders (in mm)
|
|
||||||
max_iterations: Maximum number of growth iterations
|
|
||||||
growth_rate: Percentage to grow each iteration (0.05 = 5%)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of (element, old_position, old_size) tuples for undo
|
|
||||||
"""
|
|
||||||
if not elements:
|
|
||||||
return []
|
|
||||||
|
|
||||||
page_width, page_height = page_size
|
|
||||||
changes = []
|
|
||||||
|
|
||||||
# Record initial states
|
|
||||||
for elem in elements:
|
|
||||||
changes.append((elem, elem.position, elem.size))
|
|
||||||
|
|
||||||
# Helper function to check if element would collide with boundaries or other elements
|
|
||||||
def check_collision(elem_idx: int, new_size: Tuple[float, float]) -> bool:
|
|
||||||
elem = elements[elem_idx]
|
|
||||||
x, y = elem.position
|
|
||||||
w, h = new_size
|
|
||||||
|
|
||||||
# Check page boundaries
|
|
||||||
if x < min_gap or y < min_gap:
|
|
||||||
return True
|
|
||||||
if x + w > page_width - min_gap:
|
|
||||||
return True
|
|
||||||
if y + h > page_height - min_gap:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Check collision with other elements
|
|
||||||
for i, other in enumerate(elements):
|
|
||||||
if i == elem_idx:
|
|
||||||
continue
|
|
||||||
|
|
||||||
other_x, other_y = other.position
|
|
||||||
other_w, other_h = other.size
|
|
||||||
|
|
||||||
# Calculate distances between rectangles
|
|
||||||
horizontal_gap = max(
|
|
||||||
other_x - (x + w), # Other is to the right
|
|
||||||
x - (other_x + other_w) # Other is to the left
|
|
||||||
)
|
|
||||||
|
|
||||||
vertical_gap = max(
|
|
||||||
other_y - (y + h), # Other is below
|
|
||||||
y - (other_y + other_h) # Other is above
|
|
||||||
)
|
|
||||||
|
|
||||||
# If rectangles overlap or are too close in both dimensions
|
|
||||||
if horizontal_gap < min_gap and vertical_gap < min_gap:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Helper function to get the maximum scale factor for an element
|
|
||||||
def get_max_scale(elem_idx: int, current_scale: float) -> float:
|
|
||||||
elem = elements[elem_idx]
|
|
||||||
old_size = changes[elem_idx][2]
|
|
||||||
|
|
||||||
# Binary search for maximum scale
|
|
||||||
low, high = current_scale, current_scale * 3.0
|
|
||||||
best_scale = current_scale
|
|
||||||
|
|
||||||
for _ in range(20): # Binary search iterations
|
|
||||||
mid = (low + high) / 2.0
|
|
||||||
test_size = (old_size[0] * mid, old_size[1] * mid)
|
|
||||||
|
|
||||||
if check_collision(elem_idx, test_size):
|
|
||||||
high = mid
|
|
||||||
else:
|
|
||||||
best_scale = mid
|
|
||||||
low = mid
|
|
||||||
|
|
||||||
if high - low < 0.001:
|
|
||||||
break
|
|
||||||
|
|
||||||
return best_scale
|
|
||||||
|
|
||||||
# Growth algorithm - iterative expansion
|
|
||||||
scales = [1.0] * len(elements)
|
|
||||||
|
|
||||||
for iteration in range(max_iterations):
|
|
||||||
any_growth = False
|
|
||||||
|
|
||||||
for i, elem in enumerate(elements):
|
|
||||||
old_size = changes[i][2]
|
|
||||||
|
|
||||||
# Try to grow this element
|
|
||||||
new_scale = scales[i] * (1.0 + growth_rate)
|
|
||||||
new_size = (old_size[0] * new_scale, old_size[1] * new_scale)
|
|
||||||
|
|
||||||
if not check_collision(i, new_size):
|
|
||||||
scales[i] = new_scale
|
|
||||||
elem.size = new_size
|
|
||||||
any_growth = True
|
|
||||||
else:
|
|
||||||
# Can't grow uniformly, try to find maximum possible scale
|
|
||||||
max_scale = get_max_scale(i, scales[i])
|
|
||||||
if max_scale > scales[i]:
|
|
||||||
scales[i] = max_scale
|
|
||||||
elem.size = (old_size[0] * max_scale, old_size[1] * max_scale)
|
|
||||||
any_growth = True
|
|
||||||
|
|
||||||
# If no element could grow, we're done
|
|
||||||
if not any_growth:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Optional: Center elements slightly within their constrained space
|
|
||||||
for elem in elements:
|
|
||||||
x, y = elem.position
|
|
||||||
w, h = elem.size
|
|
||||||
|
|
||||||
# Calculate available space on each side
|
|
||||||
space_left = x - min_gap
|
|
||||||
space_right = (page_width - min_gap) - (x + w)
|
|
||||||
space_top = y - min_gap
|
|
||||||
space_bottom = (page_height - min_gap) - (y + h)
|
|
||||||
|
|
||||||
# Micro-adjust position to center in available space
|
|
||||||
if space_left >= 0 and space_right >= 0:
|
|
||||||
adjust_x = (space_right - space_left) / 4.0 # Gentle centering
|
|
||||||
new_x = max(min_gap, min(page_width - w - min_gap, x + adjust_x))
|
|
||||||
|
|
||||||
# Verify this doesn't cause collision
|
|
||||||
old_pos = elem.position
|
|
||||||
elem.position = (new_x, y)
|
|
||||||
collision = False
|
|
||||||
for other in elements:
|
|
||||||
if other is elem:
|
|
||||||
continue
|
|
||||||
ox, oy = other.position
|
|
||||||
ow, oh = other.size
|
|
||||||
if (abs((new_x + w/2) - (ox + ow/2)) < (w + ow)/2 + min_gap and
|
|
||||||
abs((y + h/2) - (oy + oh/2)) < (h + oh)/2 + min_gap):
|
|
||||||
collision = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if collision:
|
|
||||||
elem.position = old_pos
|
|
||||||
|
|
||||||
if space_top >= 0 and space_bottom >= 0:
|
|
||||||
adjust_y = (space_bottom - space_top) / 4.0
|
|
||||||
new_y = max(min_gap, min(page_height - h - min_gap, y + adjust_y))
|
|
||||||
|
|
||||||
old_pos = elem.position
|
|
||||||
elem.position = (elem.position[0], new_y)
|
|
||||||
collision = False
|
|
||||||
for other in elements:
|
|
||||||
if other is elem:
|
|
||||||
continue
|
|
||||||
ox, oy = other.position
|
|
||||||
ow, oh = other.size
|
|
||||||
if (abs((elem.position[0] + w/2) - (ox + ow/2)) < (w + ow)/2 + min_gap and
|
|
||||||
abs((new_y + h/2) - (oy + oh/2)) < (h + oh)/2 + min_gap):
|
|
||||||
collision = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if collision:
|
|
||||||
elem.position = old_pos
|
|
||||||
|
|
||||||
return changes
|
|
||||||
|
|||||||
@ -1,703 +0,0 @@
|
|||||||
"""
|
|
||||||
Async backend for non-blocking image loading and PDF generation.
|
|
||||||
|
|
||||||
This module provides:
|
|
||||||
- AsyncImageLoader: Load and process images in background
|
|
||||||
- AsyncPDFGenerator: Generate PDFs without blocking UI
|
|
||||||
- ImageCache: Intelligent caching with LRU eviction
|
|
||||||
- WorkerPool: Thread pool for CPU-bound operations
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from enum import Enum
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional, Callable, Dict, Any, Tuple
|
|
||||||
from collections import OrderedDict
|
|
||||||
import threading
|
|
||||||
|
|
||||||
from PIL import Image
|
|
||||||
from PyQt6.QtCore import QObject, pyqtSignal
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class LoadPriority(Enum):
|
|
||||||
"""Priority levels for load requests."""
|
|
||||||
LOW = 0 # Offscreen, not visible
|
|
||||||
NORMAL = 1 # Potentially visible soon
|
|
||||||
HIGH = 2 # Visible on screen
|
|
||||||
URGENT = 3 # User is actively interacting with
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(order=True)
|
|
||||||
class LoadRequest:
|
|
||||||
"""Request to load and process an image."""
|
|
||||||
priority: LoadPriority = field(compare=True)
|
|
||||||
request_id: int = field(compare=True) # Tie-breaker for same priority
|
|
||||||
path: Path = field(compare=False)
|
|
||||||
target_size: Optional[Tuple[int, int]] = field(default=None, compare=False)
|
|
||||||
callback: Optional[Callable] = field(default=None, compare=False)
|
|
||||||
user_data: Any = field(default=None, compare=False)
|
|
||||||
|
|
||||||
|
|
||||||
class ImageCache:
|
|
||||||
"""
|
|
||||||
Thread-safe LRU cache for PIL images with memory management.
|
|
||||||
|
|
||||||
Caches both original images and scaled variants to avoid redundant processing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, max_memory_mb: int = 512):
|
|
||||||
"""
|
|
||||||
Initialize cache.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
max_memory_mb: Maximum memory to use for cached images (default 512MB)
|
|
||||||
"""
|
|
||||||
self.max_memory_bytes = max_memory_mb * 1024 * 1024
|
|
||||||
self.current_memory_bytes = 0
|
|
||||||
self._cache: OrderedDict[str, Tuple[Image.Image, int]] = OrderedDict()
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
logger.info(f"ImageCache initialized with {max_memory_mb}MB limit")
|
|
||||||
|
|
||||||
def _estimate_image_size(self, img: Image.Image) -> int:
|
|
||||||
"""Estimate memory size of PIL image in bytes."""
|
|
||||||
# PIL images are typically width * height * bytes_per_pixel
|
|
||||||
# RGBA = 4 bytes, RGB = 3 bytes, L = 1 byte
|
|
||||||
mode_sizes = {'RGBA': 4, 'RGB': 3, 'L': 1, 'LA': 2}
|
|
||||||
bytes_per_pixel = mode_sizes.get(img.mode, 4)
|
|
||||||
return img.width * img.height * bytes_per_pixel
|
|
||||||
|
|
||||||
def _make_key(self, path: Path, target_size: Optional[Tuple[int, int]] = None) -> str:
|
|
||||||
"""Create cache key from path and optional target size."""
|
|
||||||
if target_size:
|
|
||||||
return f"{path}:{target_size[0]}x{target_size[1]}"
|
|
||||||
return str(path)
|
|
||||||
|
|
||||||
def get(self, path: Path, target_size: Optional[Tuple[int, int]] = None) -> Optional[Image.Image]:
|
|
||||||
"""
|
|
||||||
Get image from cache.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path to image file
|
|
||||||
target_size: Optional target size (width, height)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Cached PIL Image or None if not found
|
|
||||||
"""
|
|
||||||
key = self._make_key(path, target_size)
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
if key in self._cache:
|
|
||||||
# Move to end (most recently used)
|
|
||||||
img, size = self._cache.pop(key)
|
|
||||||
self._cache[key] = (img, size)
|
|
||||||
logger.debug(f"Cache HIT: {key}")
|
|
||||||
return img.copy() # Return copy to avoid external modifications
|
|
||||||
|
|
||||||
logger.debug(f"Cache MISS: {key}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def put(self, path: Path, img: Image.Image, target_size: Optional[Tuple[int, int]] = None):
|
|
||||||
"""
|
|
||||||
Add image to cache with LRU eviction.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path to image file
|
|
||||||
img: PIL Image to cache
|
|
||||||
target_size: Optional target size used for this variant
|
|
||||||
"""
|
|
||||||
key = self._make_key(path, target_size)
|
|
||||||
img_size = self._estimate_image_size(img)
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
# Remove if already exists (update size)
|
|
||||||
if key in self._cache:
|
|
||||||
_, old_size = self._cache.pop(key)
|
|
||||||
self.current_memory_bytes -= old_size
|
|
||||||
|
|
||||||
# Evict LRU items if needed
|
|
||||||
while (self.current_memory_bytes + img_size > self.max_memory_bytes
|
|
||||||
and len(self._cache) > 0):
|
|
||||||
evicted_key, (evicted_img, evicted_size) = self._cache.popitem(last=False)
|
|
||||||
self.current_memory_bytes -= evicted_size
|
|
||||||
logger.debug(f"Cache EVICT: {evicted_key} ({evicted_size / 1024 / 1024:.1f}MB)")
|
|
||||||
|
|
||||||
# Add new image
|
|
||||||
self._cache[key] = (img.copy(), img_size)
|
|
||||||
self.current_memory_bytes += img_size
|
|
||||||
|
|
||||||
logger.debug(f"Cache PUT: {key} ({img_size / 1024 / 1024:.1f}MB) "
|
|
||||||
f"[Total: {self.current_memory_bytes / 1024 / 1024:.1f}MB / "
|
|
||||||
f"{self.max_memory_bytes / 1024 / 1024:.1f}MB, "
|
|
||||||
f"Items: {len(self._cache)}]")
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
"""Clear entire cache."""
|
|
||||||
with self._lock:
|
|
||||||
self._cache.clear()
|
|
||||||
self.current_memory_bytes = 0
|
|
||||||
logger.info("Cache cleared")
|
|
||||||
|
|
||||||
def get_stats(self) -> Dict[str, Any]:
|
|
||||||
"""Get cache statistics."""
|
|
||||||
with self._lock:
|
|
||||||
return {
|
|
||||||
'items': len(self._cache),
|
|
||||||
'memory_mb': self.current_memory_bytes / 1024 / 1024,
|
|
||||||
'max_memory_mb': self.max_memory_bytes / 1024 / 1024,
|
|
||||||
'utilization': (self.current_memory_bytes / self.max_memory_bytes) * 100
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncImageLoader(QObject):
|
|
||||||
"""
|
|
||||||
Asynchronous image loader with priority queue and caching.
|
|
||||||
|
|
||||||
Loads images in background threads and emits signals when complete.
|
|
||||||
Supports concurrent loading, priority-based scheduling, and cancellation.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
loader = AsyncImageLoader()
|
|
||||||
loader.image_loaded.connect(on_image_ready)
|
|
||||||
loader.start()
|
|
||||||
loader.request_load(Path("photo.jpg"), priority=LoadPriority.HIGH)
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Signals for Qt integration
|
|
||||||
image_loaded = pyqtSignal(object, object, object) # (path, image, user_data)
|
|
||||||
load_failed = pyqtSignal(object, str, object) # (path, error_msg, user_data)
|
|
||||||
|
|
||||||
def __init__(self, cache: Optional[ImageCache] = None, max_workers: int = 4):
|
|
||||||
"""
|
|
||||||
Initialize async image loader.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
cache: ImageCache instance (creates new if None)
|
|
||||||
max_workers: Maximum concurrent worker threads (default 4)
|
|
||||||
"""
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.cache = cache or ImageCache()
|
|
||||||
self.max_workers = max_workers
|
|
||||||
self.executor = ThreadPoolExecutor(max_workers=max_workers,
|
|
||||||
thread_name_prefix="ImageLoader")
|
|
||||||
|
|
||||||
# Priority queue and tracking
|
|
||||||
self._queue: asyncio.PriorityQueue = None # Created when event loop starts
|
|
||||||
self._pending_requests: Dict[Path, LoadRequest] = {}
|
|
||||||
self._active_tasks: Dict[Path, asyncio.Task] = {}
|
|
||||||
self._next_request_id = 0
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
self._shutdown = False
|
|
||||||
|
|
||||||
# Event loop for async operations
|
|
||||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
||||||
self._loop_thread: Optional[threading.Thread] = None
|
|
||||||
|
|
||||||
logger.info(f"AsyncImageLoader initialized with {max_workers} workers")
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the async backend event loop."""
|
|
||||||
if self._loop_thread is not None:
|
|
||||||
logger.warning("AsyncImageLoader already started")
|
|
||||||
return
|
|
||||||
|
|
||||||
self._shutdown = False
|
|
||||||
self._loop_thread = threading.Thread(target=self._run_event_loop,
|
|
||||||
daemon=True,
|
|
||||||
name="AsyncImageLoader-EventLoop")
|
|
||||||
self._loop_thread.start()
|
|
||||||
logger.info("AsyncImageLoader event loop started")
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the async backend and cleanup resources."""
|
|
||||||
if self._loop is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info("Stopping AsyncImageLoader...")
|
|
||||||
self._shutdown = True
|
|
||||||
|
|
||||||
# Cancel all active tasks
|
|
||||||
if self._loop and not self._loop.is_closed():
|
|
||||||
asyncio.run_coroutine_threadsafe(self._cancel_all_tasks(), self._loop)
|
|
||||||
|
|
||||||
# Stop the event loop
|
|
||||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
|
||||||
|
|
||||||
# Wait for thread to finish
|
|
||||||
if self._loop_thread:
|
|
||||||
self._loop_thread.join(timeout=5.0)
|
|
||||||
|
|
||||||
# Shutdown executor
|
|
||||||
self.executor.shutdown(wait=True)
|
|
||||||
|
|
||||||
logger.info("AsyncImageLoader stopped")
|
|
||||||
|
|
||||||
def _run_event_loop(self):
|
|
||||||
"""Run asyncio event loop in background thread."""
|
|
||||||
self._loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(self._loop)
|
|
||||||
|
|
||||||
# Create priority queue
|
|
||||||
self._queue = asyncio.PriorityQueue()
|
|
||||||
|
|
||||||
# Start queue processor as background task
|
|
||||||
self._loop.create_task(self._process_queue())
|
|
||||||
|
|
||||||
# Run event loop forever (until stopped)
|
|
||||||
self._loop.run_forever()
|
|
||||||
|
|
||||||
# Cleanup after loop stops
|
|
||||||
self._loop.close()
|
|
||||||
|
|
||||||
async def _process_queue(self):
|
|
||||||
"""Process load requests from priority queue."""
|
|
||||||
logger.info("Queue processor started")
|
|
||||||
|
|
||||||
while not self._shutdown:
|
|
||||||
try:
|
|
||||||
# Wait for request with timeout to check shutdown flag
|
|
||||||
request = await asyncio.wait_for(self._queue.get(), timeout=0.5)
|
|
||||||
|
|
||||||
# Skip if already cancelled
|
|
||||||
if request.path not in self._pending_requests:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Process request
|
|
||||||
task = asyncio.create_task(self._load_image(request))
|
|
||||||
self._active_tasks[request.path] = task
|
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
continue # Check shutdown flag
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Queue processor error: {e}", exc_info=True)
|
|
||||||
|
|
||||||
logger.info("Queue processor stopped")
|
|
||||||
|
|
||||||
async def _cancel_all_tasks(self):
|
|
||||||
"""Cancel all active loading tasks."""
|
|
||||||
tasks = list(self._active_tasks.values())
|
|
||||||
for task in tasks:
|
|
||||||
task.cancel()
|
|
||||||
|
|
||||||
if tasks:
|
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
|
||||||
|
|
||||||
self._active_tasks.clear()
|
|
||||||
self._pending_requests.clear()
|
|
||||||
|
|
||||||
async def _load_image(self, request: LoadRequest):
|
|
||||||
"""
|
|
||||||
Load and process image asynchronously.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
request: LoadRequest containing path, size, and callback info
|
|
||||||
"""
|
|
||||||
path = request.path
|
|
||||||
target_size = request.target_size
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Check cache first
|
|
||||||
cached_img = self.cache.get(path, target_size)
|
|
||||||
if cached_img is not None:
|
|
||||||
logger.debug(f"Loaded from cache: {path}")
|
|
||||||
self._emit_loaded(path, cached_img, request.user_data)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Load in thread pool (I/O bound)
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
img = await loop.run_in_executor(
|
|
||||||
self.executor,
|
|
||||||
self._load_and_process_image,
|
|
||||||
path,
|
|
||||||
target_size
|
|
||||||
)
|
|
||||||
|
|
||||||
# Cache result
|
|
||||||
self.cache.put(path, img, target_size)
|
|
||||||
|
|
||||||
# Emit success signal
|
|
||||||
self._emit_loaded(path, img, request.user_data)
|
|
||||||
|
|
||||||
logger.debug(f"Loaded: {path} (size: {img.size})")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to load {path}: {e}", exc_info=True)
|
|
||||||
self._emit_failed(path, str(e), request.user_data)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# Cleanup tracking
|
|
||||||
with self._lock:
|
|
||||||
self._pending_requests.pop(path, None)
|
|
||||||
self._active_tasks.pop(path, None)
|
|
||||||
|
|
||||||
def _load_and_process_image(self, path: Path, target_size: Optional[Tuple[int, int]]) -> Image.Image:
|
|
||||||
"""
|
|
||||||
Load image from disk and process (runs in thread pool).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path to image file
|
|
||||||
target_size: Optional target size for downsampling
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Processed PIL Image
|
|
||||||
"""
|
|
||||||
# Load image
|
|
||||||
img = Image.open(path)
|
|
||||||
|
|
||||||
# Convert to RGBA for consistency
|
|
||||||
if img.mode != 'RGBA':
|
|
||||||
img = img.convert('RGBA')
|
|
||||||
|
|
||||||
# Downsample if target size specified
|
|
||||||
if target_size:
|
|
||||||
current_size = img.size
|
|
||||||
if current_size[0] > target_size[0] or current_size[1] > target_size[1]:
|
|
||||||
img = img.resize(target_size, Image.Resampling.LANCZOS)
|
|
||||||
logger.debug(f"Downsampled {path}: {current_size} -> {target_size}")
|
|
||||||
|
|
||||||
return img
|
|
||||||
|
|
||||||
def _emit_loaded(self, path: Path, img: Image.Image, user_data: Any):
|
|
||||||
"""Emit image_loaded signal (thread-safe)."""
|
|
||||||
self.image_loaded.emit(path, img, user_data)
|
|
||||||
|
|
||||||
def _emit_failed(self, path: Path, error_msg: str, user_data: Any):
|
|
||||||
"""Emit load_failed signal (thread-safe)."""
|
|
||||||
self.load_failed.emit(path, error_msg, user_data)
|
|
||||||
|
|
||||||
def request_load(self,
|
|
||||||
path: Path,
|
|
||||||
priority: LoadPriority = LoadPriority.NORMAL,
|
|
||||||
target_size: Optional[Tuple[int, int]] = None,
|
|
||||||
user_data: Any = None) -> bool:
|
|
||||||
"""
|
|
||||||
Request image load with specified priority.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path to image file
|
|
||||||
priority: Load priority level
|
|
||||||
target_size: Optional target size (width, height) for downsampling
|
|
||||||
user_data: Optional user data passed to callback
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if request was queued, False if already pending/active
|
|
||||||
"""
|
|
||||||
if not self._loop or self._shutdown:
|
|
||||||
logger.warning("Cannot request load: backend not started")
|
|
||||||
return False
|
|
||||||
|
|
||||||
path = Path(path)
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
# Skip if already pending or active
|
|
||||||
if path in self._pending_requests or path in self._active_tasks:
|
|
||||||
logger.debug(f"Load already pending: {path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create request
|
|
||||||
request = LoadRequest(
|
|
||||||
priority=priority,
|
|
||||||
request_id=self._next_request_id,
|
|
||||||
path=path,
|
|
||||||
target_size=target_size,
|
|
||||||
user_data=user_data
|
|
||||||
)
|
|
||||||
self._next_request_id += 1
|
|
||||||
|
|
||||||
# Track as pending
|
|
||||||
self._pending_requests[path] = request
|
|
||||||
|
|
||||||
# Submit to queue (thread-safe)
|
|
||||||
asyncio.run_coroutine_threadsafe(
|
|
||||||
self._queue.put(request),
|
|
||||||
self._loop
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug(f"Queued load: {path} (priority: {priority.name})")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def cancel_load(self, path: Path) -> bool:
|
|
||||||
"""
|
|
||||||
Cancel pending image load.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path to image file
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if load was cancelled, False if not found
|
|
||||||
"""
|
|
||||||
path = Path(path)
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
# Remove from pending
|
|
||||||
if path in self._pending_requests:
|
|
||||||
del self._pending_requests[path]
|
|
||||||
logger.debug(f"Cancelled pending load: {path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Cancel active task
|
|
||||||
if path in self._active_tasks:
|
|
||||||
task = self._active_tasks[path]
|
|
||||||
task.cancel()
|
|
||||||
logger.debug(f"Cancelled active load: {path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_stats(self) -> Dict[str, Any]:
|
|
||||||
"""Get loader statistics."""
|
|
||||||
with self._lock:
|
|
||||||
return {
|
|
||||||
'pending': len(self._pending_requests),
|
|
||||||
'active': len(self._active_tasks),
|
|
||||||
'cache': self.cache.get_stats()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncPDFGenerator(QObject):
|
|
||||||
"""
|
|
||||||
Asynchronous PDF generator that doesn't block the UI.
|
|
||||||
|
|
||||||
Generates PDFs in background thread with progress updates.
|
|
||||||
Uses shared ImageCache to avoid redundant image loading.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
generator = AsyncPDFGenerator(image_cache)
|
|
||||||
generator.progress_updated.connect(on_progress)
|
|
||||||
generator.export_complete.connect(on_complete)
|
|
||||||
generator.start()
|
|
||||||
generator.export_pdf(project, "output.pdf")
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Signals for Qt integration
|
|
||||||
progress_updated = pyqtSignal(int, int, str) # (current, total, message)
|
|
||||||
export_complete = pyqtSignal(bool, list) # (success, warnings)
|
|
||||||
export_failed = pyqtSignal(str) # (error_message)
|
|
||||||
|
|
||||||
def __init__(self, image_cache: Optional[ImageCache] = None, max_workers: int = 2):
|
|
||||||
"""
|
|
||||||
Initialize async PDF generator.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
image_cache: Shared ImageCache instance (creates new if None)
|
|
||||||
max_workers: Maximum concurrent workers for PDF generation (default 2)
|
|
||||||
"""
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.image_cache = image_cache or ImageCache()
|
|
||||||
self.max_workers = max_workers
|
|
||||||
self.executor = ThreadPoolExecutor(max_workers=max_workers,
|
|
||||||
thread_name_prefix="PDFGenerator")
|
|
||||||
|
|
||||||
# Export state
|
|
||||||
self._current_export: Optional[asyncio.Task] = None
|
|
||||||
self._cancel_requested = False
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
self._shutdown = False
|
|
||||||
|
|
||||||
# Event loop for async operations
|
|
||||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
||||||
self._loop_thread: Optional[threading.Thread] = None
|
|
||||||
|
|
||||||
logger.info(f"AsyncPDFGenerator initialized with {max_workers} workers")
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the async PDF generator event loop."""
|
|
||||||
if self._loop_thread is not None:
|
|
||||||
logger.warning("AsyncPDFGenerator already started")
|
|
||||||
return
|
|
||||||
|
|
||||||
self._shutdown = False
|
|
||||||
self._loop_thread = threading.Thread(target=self._run_event_loop,
|
|
||||||
daemon=True,
|
|
||||||
name="AsyncPDFGenerator-EventLoop")
|
|
||||||
self._loop_thread.start()
|
|
||||||
logger.info("AsyncPDFGenerator event loop started")
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the async PDF generator and cleanup resources."""
|
|
||||||
if self._loop is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info("Stopping AsyncPDFGenerator...")
|
|
||||||
self._shutdown = True
|
|
||||||
|
|
||||||
# Cancel active export
|
|
||||||
if self._current_export and not self._current_export.done():
|
|
||||||
self._current_export.cancel()
|
|
||||||
|
|
||||||
# Stop the event loop
|
|
||||||
if self._loop and not self._loop.is_closed():
|
|
||||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
|
||||||
|
|
||||||
# Wait for thread to finish
|
|
||||||
if self._loop_thread:
|
|
||||||
self._loop_thread.join(timeout=5.0)
|
|
||||||
|
|
||||||
# Shutdown executor
|
|
||||||
self.executor.shutdown(wait=True)
|
|
||||||
|
|
||||||
logger.info("AsyncPDFGenerator stopped")
|
|
||||||
|
|
||||||
def _run_event_loop(self):
|
|
||||||
"""Run asyncio event loop in background thread."""
|
|
||||||
self._loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(self._loop)
|
|
||||||
|
|
||||||
# Run event loop forever (until stopped)
|
|
||||||
self._loop.run_forever()
|
|
||||||
|
|
||||||
# Cleanup after loop stops
|
|
||||||
self._loop.close()
|
|
||||||
|
|
||||||
def export_pdf(self, project, output_path: str, export_dpi: int = 300) -> bool:
|
|
||||||
"""
|
|
||||||
Request PDF export (non-blocking).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
project: Project instance to export
|
|
||||||
output_path: Path where PDF should be saved
|
|
||||||
export_dpi: Target DPI for images (default 300)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if export started, False if already exporting or backend not started
|
|
||||||
"""
|
|
||||||
if not self._loop or self._shutdown:
|
|
||||||
logger.warning("Cannot export: backend not started")
|
|
||||||
return False
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
if self._current_export and not self._current_export.done():
|
|
||||||
logger.warning("Export already in progress")
|
|
||||||
return False
|
|
||||||
|
|
||||||
self._cancel_requested = False
|
|
||||||
|
|
||||||
# Submit export task
|
|
||||||
self._current_export = asyncio.run_coroutine_threadsafe(
|
|
||||||
self._export_pdf_async(project, output_path, export_dpi),
|
|
||||||
self._loop
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"PDF export started: {output_path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def cancel_export(self):
|
|
||||||
"""Request cancellation of current export."""
|
|
||||||
with self._lock:
|
|
||||||
self._cancel_requested = True
|
|
||||||
if self._current_export and not self._current_export.done():
|
|
||||||
self._current_export.cancel()
|
|
||||||
logger.info("PDF export cancellation requested")
|
|
||||||
|
|
||||||
async def _export_pdf_async(self, project, output_path: str, export_dpi: int):
|
|
||||||
"""
|
|
||||||
Perform PDF export asynchronously.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
project: Project to export
|
|
||||||
output_path: Output PDF file path
|
|
||||||
export_dpi: Export DPI setting
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Import PDF exporter (lazy import to avoid circular dependencies)
|
|
||||||
from pyPhotoAlbum.pdf_exporter import PDFExporter
|
|
||||||
|
|
||||||
# Create exporter
|
|
||||||
exporter = PDFExporter(project, export_dpi=export_dpi)
|
|
||||||
|
|
||||||
# Progress callback wrapper
|
|
||||||
def progress_callback(current, total, message):
|
|
||||||
if self._cancel_requested:
|
|
||||||
return False # Signal cancellation
|
|
||||||
self.progress_updated.emit(current, total, message)
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Run export in thread pool
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
success, warnings = await loop.run_in_executor(
|
|
||||||
self.executor,
|
|
||||||
self._export_with_cache,
|
|
||||||
exporter,
|
|
||||||
output_path,
|
|
||||||
progress_callback
|
|
||||||
)
|
|
||||||
|
|
||||||
# Emit completion signal
|
|
||||||
if not self._cancel_requested:
|
|
||||||
self.export_complete.emit(success, warnings)
|
|
||||||
logger.info(f"PDF export completed: {output_path} (warnings: {len(warnings)})")
|
|
||||||
else:
|
|
||||||
logger.info("PDF export cancelled")
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.info("PDF export cancelled by user")
|
|
||||||
self.export_failed.emit("Export cancelled")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"PDF export failed: {e}", exc_info=True)
|
|
||||||
self.export_failed.emit(str(e))
|
|
||||||
|
|
||||||
finally:
|
|
||||||
with self._lock:
|
|
||||||
self._current_export = None
|
|
||||||
|
|
||||||
def _export_with_cache(self, exporter, output_path: str, progress_callback) -> Tuple[bool, list]:
|
|
||||||
"""
|
|
||||||
Run PDF export with image cache integration.
|
|
||||||
|
|
||||||
This method patches the exporter to use our cached images.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
exporter: PDFExporter instance
|
|
||||||
output_path: Output file path
|
|
||||||
progress_callback: Progress callback function
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of (success, warnings)
|
|
||||||
"""
|
|
||||||
# Store original Image.open
|
|
||||||
original_open = Image.open
|
|
||||||
|
|
||||||
# Patch Image.open to use cache
|
|
||||||
def cached_open(path, *args, **kwargs):
|
|
||||||
# Try cache first
|
|
||||||
cached_img = self.image_cache.get(Path(path))
|
|
||||||
if cached_img:
|
|
||||||
logger.debug(f"PDF using cached image: {path}")
|
|
||||||
return cached_img
|
|
||||||
|
|
||||||
# Load and cache
|
|
||||||
img = original_open(path, *args, **kwargs)
|
|
||||||
if img.mode != 'RGBA':
|
|
||||||
img = img.convert('RGBA')
|
|
||||||
self.image_cache.put(Path(path), img)
|
|
||||||
return img
|
|
||||||
|
|
||||||
# Temporarily patch Image.open
|
|
||||||
try:
|
|
||||||
Image.open = cached_open
|
|
||||||
return exporter.export(output_path, progress_callback)
|
|
||||||
finally:
|
|
||||||
# Restore original
|
|
||||||
Image.open = original_open
|
|
||||||
|
|
||||||
def is_exporting(self) -> bool:
|
|
||||||
"""Check if export is currently in progress."""
|
|
||||||
with self._lock:
|
|
||||||
return (self._current_export is not None
|
|
||||||
and not self._current_export.done())
|
|
||||||
|
|
||||||
def get_stats(self) -> Dict[str, Any]:
|
|
||||||
"""Get generator statistics."""
|
|
||||||
with self._lock:
|
|
||||||
return {
|
|
||||||
'exporting': self.is_exporting(),
|
|
||||||
'cache': self.image_cache.get_stats()
|
|
||||||
}
|
|
||||||
@ -16,11 +16,9 @@ from pyPhotoAlbum.mixins.element_manipulation import ElementManipulationMixin
|
|||||||
from pyPhotoAlbum.mixins.element_selection import ElementSelectionMixin
|
from pyPhotoAlbum.mixins.element_selection import ElementSelectionMixin
|
||||||
from pyPhotoAlbum.mixins.mouse_interaction import MouseInteractionMixin
|
from pyPhotoAlbum.mixins.mouse_interaction import MouseInteractionMixin
|
||||||
from pyPhotoAlbum.mixins.interaction_undo import UndoableInteractionMixin
|
from pyPhotoAlbum.mixins.interaction_undo import UndoableInteractionMixin
|
||||||
from pyPhotoAlbum.mixins.async_loading import AsyncLoadingMixin
|
|
||||||
|
|
||||||
|
|
||||||
class GLWidget(
|
class GLWidget(
|
||||||
AsyncLoadingMixin,
|
|
||||||
ViewportMixin,
|
ViewportMixin,
|
||||||
RenderingMixin,
|
RenderingMixin,
|
||||||
AssetDropMixin,
|
AssetDropMixin,
|
||||||
@ -35,7 +33,6 @@ class GLWidget(
|
|||||||
"""OpenGL widget for pyPhotoAlbum rendering and user interaction
|
"""OpenGL widget for pyPhotoAlbum rendering and user interaction
|
||||||
|
|
||||||
This widget orchestrates multiple mixins to provide:
|
This widget orchestrates multiple mixins to provide:
|
||||||
- Async image loading (non-blocking)
|
|
||||||
- Viewport control (zoom, pan)
|
- Viewport control (zoom, pan)
|
||||||
- Page rendering (OpenGL)
|
- Page rendering (OpenGL)
|
||||||
- Element selection and manipulation
|
- Element selection and manipulation
|
||||||
@ -49,9 +46,6 @@ class GLWidget(
|
|||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
|
|
||||||
# Initialize async loading system
|
|
||||||
self._init_async_loading()
|
|
||||||
|
|
||||||
# Initialize OpenGL
|
# Initialize OpenGL
|
||||||
self.setFormat(self.format())
|
self.setFormat(self.format())
|
||||||
self.setUpdateBehavior(QOpenGLWidget.UpdateBehavior.NoPartialUpdate)
|
self.setUpdateBehavior(QOpenGLWidget.UpdateBehavior.NoPartialUpdate)
|
||||||
@ -60,12 +54,6 @@ class GLWidget(
|
|||||||
self.setMouseTracking(True)
|
self.setMouseTracking(True)
|
||||||
self.setAcceptDrops(True)
|
self.setAcceptDrops(True)
|
||||||
|
|
||||||
def closeEvent(self, event):
|
|
||||||
"""Handle widget close event."""
|
|
||||||
# Cleanup async loading
|
|
||||||
self._cleanup_async_loading()
|
|
||||||
super().closeEvent(event)
|
|
||||||
|
|
||||||
def keyPressEvent(self, event):
|
def keyPressEvent(self, event):
|
||||||
"""Handle key press events"""
|
"""Handle key press events"""
|
||||||
if event.key() == Qt.Key.Key_Delete or event.key() == Qt.Key.Key_Backspace:
|
if event.key() == Qt.Key.Key_Delete or event.key() == Qt.Key.Key_Backspace:
|
||||||
|
|||||||
@ -1,256 +0,0 @@
|
|||||||
"""
|
|
||||||
Async loading mixin for non-blocking image loading and PDF generation.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from PyQt6.QtCore import QObject
|
|
||||||
|
|
||||||
from pyPhotoAlbum.async_backend import AsyncImageLoader, AsyncPDFGenerator, ImageCache, LoadPriority
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncLoadingMixin:
|
|
||||||
"""
|
|
||||||
Mixin to add async loading capabilities to GLWidget.
|
|
||||||
|
|
||||||
Provides non-blocking image loading and PDF generation with
|
|
||||||
progressive updates and shared caching.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def _init_async_loading(self):
|
|
||||||
"""Initialize async loading components."""
|
|
||||||
logger.info("Initializing async loading system...")
|
|
||||||
|
|
||||||
# Create shared image cache (512MB)
|
|
||||||
self.image_cache = ImageCache(max_memory_mb=512)
|
|
||||||
|
|
||||||
# Create async image loader
|
|
||||||
self.async_image_loader = AsyncImageLoader(cache=self.image_cache, max_workers=4)
|
|
||||||
self.async_image_loader.image_loaded.connect(self._on_image_loaded)
|
|
||||||
self.async_image_loader.load_failed.connect(self._on_image_load_failed)
|
|
||||||
self.async_image_loader.start()
|
|
||||||
|
|
||||||
# Create async PDF generator
|
|
||||||
self.async_pdf_generator = AsyncPDFGenerator(image_cache=self.image_cache, max_workers=2)
|
|
||||||
self.async_pdf_generator.progress_updated.connect(self._on_pdf_progress)
|
|
||||||
self.async_pdf_generator.export_complete.connect(self._on_pdf_complete)
|
|
||||||
self.async_pdf_generator.export_failed.connect(self._on_pdf_failed)
|
|
||||||
self.async_pdf_generator.start()
|
|
||||||
|
|
||||||
logger.info("Async loading system initialized")
|
|
||||||
|
|
||||||
def _cleanup_async_loading(self):
|
|
||||||
"""Cleanup async loading components."""
|
|
||||||
logger.info("Cleaning up async loading system...")
|
|
||||||
|
|
||||||
if hasattr(self, 'async_image_loader'):
|
|
||||||
self.async_image_loader.stop()
|
|
||||||
|
|
||||||
if hasattr(self, 'async_pdf_generator'):
|
|
||||||
self.async_pdf_generator.stop()
|
|
||||||
|
|
||||||
if hasattr(self, 'image_cache'):
|
|
||||||
self.image_cache.clear()
|
|
||||||
|
|
||||||
logger.info("Async loading system cleaned up")
|
|
||||||
|
|
||||||
def _on_image_loaded(self, path: Path, image, user_data):
|
|
||||||
"""
|
|
||||||
Handle image loaded callback.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path to loaded image
|
|
||||||
image: Loaded PIL Image
|
|
||||||
user_data: User data (ImageData element)
|
|
||||||
"""
|
|
||||||
logger.debug(f"Image loaded callback: {path}")
|
|
||||||
|
|
||||||
if user_data and hasattr(user_data, '_on_async_image_loaded'):
|
|
||||||
user_data._on_async_image_loaded(image)
|
|
||||||
|
|
||||||
# Trigger re-render to show newly loaded image
|
|
||||||
self.update()
|
|
||||||
|
|
||||||
def _on_image_load_failed(self, path: Path, error_msg: str, user_data):
|
|
||||||
"""
|
|
||||||
Handle image load failure.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: Path that failed to load
|
|
||||||
error_msg: Error message
|
|
||||||
user_data: User data (ImageData element)
|
|
||||||
"""
|
|
||||||
logger.warning(f"Image load failed: {path} - {error_msg}")
|
|
||||||
|
|
||||||
if user_data and hasattr(user_data, '_on_async_image_load_failed'):
|
|
||||||
user_data._on_async_image_load_failed(error_msg)
|
|
||||||
|
|
||||||
def _on_pdf_progress(self, current: int, total: int, message: str):
|
|
||||||
"""
|
|
||||||
Handle PDF export progress updates.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
current: Current progress (pages completed)
|
|
||||||
total: Total pages
|
|
||||||
message: Progress message
|
|
||||||
"""
|
|
||||||
logger.debug(f"PDF progress: {current}/{total} - {message}")
|
|
||||||
|
|
||||||
# Update progress dialog if it exists
|
|
||||||
if hasattr(self, '_pdf_progress_dialog') and self._pdf_progress_dialog:
|
|
||||||
self._pdf_progress_dialog.setValue(current)
|
|
||||||
self._pdf_progress_dialog.setLabelText(message)
|
|
||||||
|
|
||||||
def _on_pdf_complete(self, success: bool, warnings: list):
|
|
||||||
"""
|
|
||||||
Handle PDF export completion.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
success: Whether export succeeded
|
|
||||||
warnings: List of warning messages
|
|
||||||
"""
|
|
||||||
logger.info(f"PDF export complete: success={success}, warnings={len(warnings)}")
|
|
||||||
|
|
||||||
# Close progress dialog
|
|
||||||
if hasattr(self, '_pdf_progress_dialog') and self._pdf_progress_dialog:
|
|
||||||
self._pdf_progress_dialog.close()
|
|
||||||
self._pdf_progress_dialog = None
|
|
||||||
|
|
||||||
# Show completion message
|
|
||||||
main_window = self.window()
|
|
||||||
if hasattr(main_window, 'show_status'):
|
|
||||||
if success:
|
|
||||||
if warnings:
|
|
||||||
main_window.show_status(
|
|
||||||
f"PDF exported successfully with {len(warnings)} warnings",
|
|
||||||
5000
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
main_window.show_status("PDF exported successfully", 3000)
|
|
||||||
else:
|
|
||||||
main_window.show_status("PDF export failed", 5000)
|
|
||||||
|
|
||||||
def _on_pdf_failed(self, error_msg: str):
|
|
||||||
"""
|
|
||||||
Handle PDF export failure.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
error_msg: Error message
|
|
||||||
"""
|
|
||||||
logger.error(f"PDF export failed: {error_msg}")
|
|
||||||
|
|
||||||
# Close progress dialog
|
|
||||||
if hasattr(self, '_pdf_progress_dialog') and self._pdf_progress_dialog:
|
|
||||||
self._pdf_progress_dialog.close()
|
|
||||||
self._pdf_progress_dialog = None
|
|
||||||
|
|
||||||
# Show error message
|
|
||||||
main_window = self.window()
|
|
||||||
if hasattr(main_window, 'show_status'):
|
|
||||||
main_window.show_status(f"PDF export failed: {error_msg}", 5000)
|
|
||||||
|
|
||||||
def request_image_load(self, image_data, priority: LoadPriority = LoadPriority.NORMAL):
|
|
||||||
"""
|
|
||||||
Request async load for an ImageData element.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
image_data: ImageData element to load
|
|
||||||
priority: Load priority level
|
|
||||||
"""
|
|
||||||
if not hasattr(self, 'async_image_loader'):
|
|
||||||
logger.warning("Async image loader not initialized")
|
|
||||||
return
|
|
||||||
|
|
||||||
if not image_data.image_path:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Resolve path
|
|
||||||
from pyPhotoAlbum.models import get_asset_search_paths
|
|
||||||
import os
|
|
||||||
|
|
||||||
image_full_path = image_data.image_path
|
|
||||||
if not os.path.isabs(image_data.image_path):
|
|
||||||
project_folder, search_paths = get_asset_search_paths()
|
|
||||||
possible_paths = []
|
|
||||||
|
|
||||||
if project_folder:
|
|
||||||
possible_paths.append(os.path.join(project_folder, image_data.image_path))
|
|
||||||
|
|
||||||
for search_path in search_paths:
|
|
||||||
possible_paths.append(os.path.join(search_path, image_data.image_path))
|
|
||||||
|
|
||||||
for path in possible_paths:
|
|
||||||
if os.path.exists(path):
|
|
||||||
image_full_path = path
|
|
||||||
break
|
|
||||||
|
|
||||||
# Calculate target size (max 2048px like original)
|
|
||||||
target_size = (2048, 2048) # Will be downsampled if larger
|
|
||||||
|
|
||||||
# Request load
|
|
||||||
self.async_image_loader.request_load(
|
|
||||||
Path(image_full_path),
|
|
||||||
priority=priority,
|
|
||||||
target_size=target_size,
|
|
||||||
user_data=image_data # Pass element for callback
|
|
||||||
)
|
|
||||||
|
|
||||||
def export_pdf_async(self, project, output_path: str, export_dpi: int = 300):
|
|
||||||
"""
|
|
||||||
Export PDF asynchronously without blocking UI.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
project: Project to export
|
|
||||||
output_path: Output PDF file path
|
|
||||||
export_dpi: Export DPI (default 300)
|
|
||||||
"""
|
|
||||||
if not hasattr(self, 'async_pdf_generator'):
|
|
||||||
logger.warning("Async PDF generator not initialized")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create progress dialog
|
|
||||||
from PyQt6.QtWidgets import QProgressDialog
|
|
||||||
from PyQt6.QtCore import Qt
|
|
||||||
|
|
||||||
total_pages = sum(
|
|
||||||
1 if page.is_cover else (2 if page.is_double_spread else 1)
|
|
||||||
for page in project.pages
|
|
||||||
)
|
|
||||||
|
|
||||||
self._pdf_progress_dialog = QProgressDialog(
|
|
||||||
"Exporting to PDF...",
|
|
||||||
"Cancel",
|
|
||||||
0,
|
|
||||||
total_pages,
|
|
||||||
self
|
|
||||||
)
|
|
||||||
self._pdf_progress_dialog.setWindowModality(Qt.WindowModality.WindowModal)
|
|
||||||
self._pdf_progress_dialog.setWindowTitle("PDF Export")
|
|
||||||
self._pdf_progress_dialog.canceled.connect(self._on_pdf_cancel)
|
|
||||||
self._pdf_progress_dialog.show()
|
|
||||||
|
|
||||||
# Start async export
|
|
||||||
return self.async_pdf_generator.export_pdf(project, output_path, export_dpi)
|
|
||||||
|
|
||||||
def _on_pdf_cancel(self):
|
|
||||||
"""Handle PDF export cancellation."""
|
|
||||||
logger.info("User requested PDF export cancellation")
|
|
||||||
|
|
||||||
if hasattr(self, 'async_pdf_generator'):
|
|
||||||
self.async_pdf_generator.cancel_export()
|
|
||||||
|
|
||||||
def get_async_stats(self) -> dict:
|
|
||||||
"""Get async loading system statistics."""
|
|
||||||
stats = {}
|
|
||||||
|
|
||||||
if hasattr(self, 'async_image_loader'):
|
|
||||||
stats['image_loader'] = self.async_image_loader.get_stats()
|
|
||||||
|
|
||||||
if hasattr(self, 'async_pdf_generator'):
|
|
||||||
stats['pdf_generator'] = self.async_pdf_generator.get_stats()
|
|
||||||
|
|
||||||
return stats
|
|
||||||
@ -4,7 +4,7 @@ Alignment operations mixin for pyPhotoAlbum
|
|||||||
|
|
||||||
from pyPhotoAlbum.decorators import ribbon_action
|
from pyPhotoAlbum.decorators import ribbon_action
|
||||||
from pyPhotoAlbum.alignment import AlignmentManager
|
from pyPhotoAlbum.alignment import AlignmentManager
|
||||||
from pyPhotoAlbum.commands import AlignElementsCommand, ResizeElementsCommand
|
from pyPhotoAlbum.commands import AlignElementsCommand
|
||||||
|
|
||||||
|
|
||||||
class AlignmentOperationsMixin:
|
class AlignmentOperationsMixin:
|
||||||
@ -139,32 +139,3 @@ class AlignmentOperationsMixin:
|
|||||||
self.project.history.execute(cmd)
|
self.project.history.execute(cmd)
|
||||||
self.update_view()
|
self.update_view()
|
||||||
self.show_status(f"Aligned {len(elements)} elements to vertical center", 2000)
|
self.show_status(f"Aligned {len(elements)} elements to vertical center", 2000)
|
||||||
|
|
||||||
@ribbon_action(
|
|
||||||
label="Maximize Pattern",
|
|
||||||
tooltip="Maximize selected elements using crystal growth algorithm",
|
|
||||||
tab="Arrange",
|
|
||||||
group="Size",
|
|
||||||
requires_selection=True,
|
|
||||||
min_selection=1
|
|
||||||
)
|
|
||||||
def maximize_pattern(self):
|
|
||||||
"""Maximize selected elements until they are close to borders or each other"""
|
|
||||||
elements = self._get_selected_elements_list()
|
|
||||||
if not self.require_selection(min_count=1):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get page size from current page
|
|
||||||
page = self.get_current_page()
|
|
||||||
if not page:
|
|
||||||
self.show_warning("No Page", "Please create a page first.")
|
|
||||||
return
|
|
||||||
|
|
||||||
page_size = page.layout.size
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern(elements, page_size)
|
|
||||||
if changes:
|
|
||||||
cmd = ResizeElementsCommand(changes)
|
|
||||||
self.project.history.execute(cmd)
|
|
||||||
self.update_view()
|
|
||||||
self.show_status(f"Maximized {len(elements)} element(s) using pattern growth", 2000)
|
|
||||||
|
|||||||
@ -141,11 +141,7 @@ class FileOperationsMixin:
|
|||||||
height_mm = height_spinbox.value()
|
height_mm = height_spinbox.value()
|
||||||
working_dpi = working_dpi_spinbox.value()
|
working_dpi = working_dpi_spinbox.value()
|
||||||
export_dpi = export_dpi_spinbox.value()
|
export_dpi = export_dpi_spinbox.value()
|
||||||
|
|
||||||
# Cleanup old project if it exists
|
|
||||||
if hasattr(self, 'project') and self.project:
|
|
||||||
self.project.cleanup()
|
|
||||||
|
|
||||||
# Create project with custom settings
|
# Create project with custom settings
|
||||||
self.project = Project(project_name)
|
self.project = Project(project_name)
|
||||||
self.project.page_size_mm = (width_mm, height_mm)
|
self.project.page_size_mm = (width_mm, height_mm)
|
||||||
@ -186,13 +182,9 @@ class FileOperationsMixin:
|
|||||||
if file_path:
|
if file_path:
|
||||||
print(f"Opening project: {file_path}")
|
print(f"Opening project: {file_path}")
|
||||||
|
|
||||||
# Cleanup old project if it exists
|
|
||||||
if hasattr(self, 'project') and self.project:
|
|
||||||
self.project.cleanup()
|
|
||||||
|
|
||||||
# Load project from ZIP
|
# Load project from ZIP
|
||||||
project, error = load_from_zip(file_path)
|
project, error = load_from_zip(file_path)
|
||||||
|
|
||||||
if project:
|
if project:
|
||||||
self.project = project
|
self.project = project
|
||||||
self.current_page_index = 0 # Reset to first page
|
self.current_page_index = 0 # Reset to first page
|
||||||
@ -493,12 +485,16 @@ class FileOperationsMixin:
|
|||||||
group="Export"
|
group="Export"
|
||||||
)
|
)
|
||||||
def export_pdf(self):
|
def export_pdf(self):
|
||||||
"""Export project to PDF using async backend (non-blocking)"""
|
"""Export project to PDF"""
|
||||||
|
from PyQt6.QtWidgets import QProgressDialog
|
||||||
|
from PyQt6.QtCore import Qt
|
||||||
|
from pyPhotoAlbum.pdf_exporter import PDFExporter
|
||||||
|
|
||||||
# Check if we have pages to export
|
# Check if we have pages to export
|
||||||
if not self.project or not self.project.pages:
|
if not self.project or not self.project.pages:
|
||||||
self.show_status("No pages to export")
|
self.show_status("No pages to export")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Show file save dialog
|
# Show file save dialog
|
||||||
file_path, _ = QFileDialog.getSaveFileName(
|
file_path, _ = QFileDialog.getSaveFileName(
|
||||||
self,
|
self,
|
||||||
@ -506,20 +502,49 @@ class FileOperationsMixin:
|
|||||||
"",
|
"",
|
||||||
"PDF Files (*.pdf);;All Files (*)"
|
"PDF Files (*.pdf);;All Files (*)"
|
||||||
)
|
)
|
||||||
|
|
||||||
if not file_path:
|
if not file_path:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Ensure .pdf extension
|
# Ensure .pdf extension
|
||||||
if not file_path.lower().endswith('.pdf'):
|
if not file_path.lower().endswith('.pdf'):
|
||||||
file_path += '.pdf'
|
file_path += '.pdf'
|
||||||
|
|
||||||
# Use async PDF export (non-blocking, UI stays responsive)
|
# Calculate total pages for progress
|
||||||
success = self.gl_widget.export_pdf_async(self.project, file_path, export_dpi=300)
|
total_pages = sum(2 if page.is_double_spread else 1 for page in self.project.pages)
|
||||||
|
|
||||||
|
# Create progress dialog
|
||||||
|
progress = QProgressDialog("Exporting to PDF...", "Cancel", 0, total_pages, self)
|
||||||
|
progress.setWindowModality(Qt.WindowModality.WindowModal)
|
||||||
|
progress.setMinimumDuration(0)
|
||||||
|
progress.setValue(0)
|
||||||
|
|
||||||
|
# Progress callback
|
||||||
|
def update_progress(current, total, message):
|
||||||
|
progress.setLabelText(message)
|
||||||
|
progress.setValue(current)
|
||||||
|
if progress.wasCanceled():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Export to PDF
|
||||||
|
exporter = PDFExporter(self.project)
|
||||||
|
success, warnings = exporter.export(file_path, update_progress)
|
||||||
|
|
||||||
|
progress.close()
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
self.show_status("PDF export started...", 2000)
|
message = f"PDF exported successfully to {file_path}"
|
||||||
|
if warnings:
|
||||||
|
message += f"\n\nWarnings:\n" + "\n".join(warnings)
|
||||||
|
self.show_status(message)
|
||||||
|
print(message)
|
||||||
else:
|
else:
|
||||||
self.show_status("PDF export failed to start", 3000)
|
error_message = f"PDF export failed"
|
||||||
|
if warnings:
|
||||||
|
error_message += f":\n" + "\n".join(warnings)
|
||||||
|
self.show_status(error_message)
|
||||||
|
print(error_message)
|
||||||
|
|
||||||
@ribbon_action(
|
@ribbon_action(
|
||||||
label="About",
|
label="About",
|
||||||
|
|||||||
@ -99,7 +99,7 @@ class SizeOperationsMixin:
|
|||||||
element = next(iter(self.gl_widget.selected_elements))
|
element = next(iter(self.gl_widget.selected_elements))
|
||||||
|
|
||||||
# Fit to page width
|
# Fit to page width
|
||||||
page_width = page.layout.size[0]
|
page_width = page.size[0]
|
||||||
change = AlignmentManager.fit_to_page_width(element, page_width)
|
change = AlignmentManager.fit_to_page_width(element, page_width)
|
||||||
|
|
||||||
if change:
|
if change:
|
||||||
@ -130,7 +130,7 @@ class SizeOperationsMixin:
|
|||||||
element = next(iter(self.gl_widget.selected_elements))
|
element = next(iter(self.gl_widget.selected_elements))
|
||||||
|
|
||||||
# Fit to page height
|
# Fit to page height
|
||||||
page_height = page.layout.size[1]
|
page_height = page.size[1]
|
||||||
change = AlignmentManager.fit_to_page_height(element, page_height)
|
change = AlignmentManager.fit_to_page_height(element, page_height)
|
||||||
|
|
||||||
if change:
|
if change:
|
||||||
@ -161,8 +161,8 @@ class SizeOperationsMixin:
|
|||||||
element = next(iter(self.gl_widget.selected_elements))
|
element = next(iter(self.gl_widget.selected_elements))
|
||||||
|
|
||||||
# Fit to page
|
# Fit to page
|
||||||
page_width = page.layout.size[0]
|
page_width = page.size[0]
|
||||||
page_height = page.layout.size[1]
|
page_height = page.size[1]
|
||||||
change = AlignmentManager.fit_to_page(element, page_width, page_height)
|
change = AlignmentManager.fit_to_page(element, page_width, page_height)
|
||||||
|
|
||||||
if change:
|
if change:
|
||||||
|
|||||||
@ -66,8 +66,6 @@ class RenderingMixin:
|
|||||||
self._page_renderers.append((renderer, page))
|
self._page_renderers.append((renderer, page))
|
||||||
|
|
||||||
renderer.begin_render()
|
renderer.begin_render()
|
||||||
# Pass widget reference for async loading
|
|
||||||
page.layout._parent_widget = self
|
|
||||||
page.layout.render(dpi=dpi)
|
page.layout.render(dpi=dpi)
|
||||||
renderer.end_render()
|
renderer.end_render()
|
||||||
|
|
||||||
|
|||||||
@ -62,10 +62,6 @@ class ImageData(BaseLayoutElement):
|
|||||||
self.image_path = image_path
|
self.image_path = image_path
|
||||||
self.crop_info = crop_info or (0, 0, 1, 1) # Default: no crop
|
self.crop_info = crop_info or (0, 0, 1, 1) # Default: no crop
|
||||||
|
|
||||||
# Async loading state
|
|
||||||
self._async_loading = False
|
|
||||||
self._async_load_requested = False
|
|
||||||
|
|
||||||
def render(self):
|
def render(self):
|
||||||
"""Render the image using OpenGL"""
|
"""Render the image using OpenGL"""
|
||||||
from OpenGL.GL import (glBegin, glEnd, glVertex2f, glColor3f, glColor4f, GL_QUADS, GL_LINE_LOOP,
|
from OpenGL.GL import (glBegin, glEnd, glVertex2f, glColor3f, glColor4f, GL_QUADS, GL_LINE_LOOP,
|
||||||
@ -262,57 +258,6 @@ class ImageData(BaseLayoutElement):
|
|||||||
self.image_path = data.get("image_path", "")
|
self.image_path = data.get("image_path", "")
|
||||||
self.crop_info = tuple(data.get("crop_info", (0, 0, 1, 1)))
|
self.crop_info = tuple(data.get("crop_info", (0, 0, 1, 1)))
|
||||||
|
|
||||||
def _on_async_image_loaded(self, pil_image):
|
|
||||||
"""
|
|
||||||
Callback when async image loading completes.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
pil_image: Loaded PIL Image (already RGBA, already resized)
|
|
||||||
"""
|
|
||||||
from OpenGL.GL import (glGenTextures, glBindTexture, glTexImage2D, GL_TEXTURE_2D,
|
|
||||||
glTexParameteri, GL_TEXTURE_MIN_FILTER, GL_TEXTURE_MAG_FILTER,
|
|
||||||
GL_LINEAR, GL_RGBA, GL_UNSIGNED_BYTE, glDeleteTextures)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Delete old texture if it exists
|
|
||||||
if hasattr(self, '_texture_id') and self._texture_id:
|
|
||||||
glDeleteTextures([self._texture_id])
|
|
||||||
|
|
||||||
# Create GPU texture from pre-processed PIL image
|
|
||||||
img_data = pil_image.tobytes()
|
|
||||||
|
|
||||||
texture_id = glGenTextures(1)
|
|
||||||
glBindTexture(GL_TEXTURE_2D, texture_id)
|
|
||||||
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_LINEAR)
|
|
||||||
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
|
|
||||||
glTexImage2D(GL_TEXTURE_2D, 0, GL_RGBA, pil_image.width, pil_image.height,
|
|
||||||
0, GL_RGBA, GL_UNSIGNED_BYTE, img_data)
|
|
||||||
|
|
||||||
# Cache texture
|
|
||||||
self._texture_id = texture_id
|
|
||||||
self._texture_path = self.image_path
|
|
||||||
self._img_width = pil_image.width
|
|
||||||
self._img_height = pil_image.height
|
|
||||||
self._async_loading = False
|
|
||||||
|
|
||||||
print(f"ImageData: Async loaded texture for {self.image_path}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ImageData: Error creating texture from async loaded image: {e}")
|
|
||||||
self._texture_id = None
|
|
||||||
self._async_loading = False
|
|
||||||
|
|
||||||
def _on_async_image_load_failed(self, error_msg: str):
|
|
||||||
"""
|
|
||||||
Callback when async image loading fails.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
error_msg: Error message
|
|
||||||
"""
|
|
||||||
print(f"ImageData: Async load failed for {self.image_path}: {error_msg}")
|
|
||||||
self._async_loading = False
|
|
||||||
self._async_load_requested = False
|
|
||||||
|
|
||||||
class PlaceholderData(BaseLayoutElement):
|
class PlaceholderData(BaseLayoutElement):
|
||||||
"""Class to store placeholder data"""
|
"""Class to store placeholder data"""
|
||||||
|
|
||||||
|
|||||||
@ -89,27 +89,7 @@ class PageLayout:
|
|||||||
glEnd()
|
glEnd()
|
||||||
|
|
||||||
# Render elements in list order (list position = z-order)
|
# Render elements in list order (list position = z-order)
|
||||||
# For ImageData elements, request async loading if available
|
|
||||||
for element in self.elements:
|
for element in self.elements:
|
||||||
# Check if this is an ImageData element that needs async loading
|
|
||||||
if isinstance(element, ImageData) and not hasattr(element, '_texture_id'):
|
|
||||||
# Try to get async loader from a parent widget
|
|
||||||
if hasattr(self, '_async_loader'):
|
|
||||||
loader = self._async_loader
|
|
||||||
elif hasattr(self, '_parent_widget') and hasattr(self._parent_widget, 'async_image_loader'):
|
|
||||||
loader = self._parent_widget.async_image_loader
|
|
||||||
else:
|
|
||||||
loader = None
|
|
||||||
|
|
||||||
# Request async load if loader is available and not already requested
|
|
||||||
if loader and not element._async_load_requested:
|
|
||||||
from pyPhotoAlbum.async_backend import LoadPriority
|
|
||||||
# Determine priority based on visibility (HIGH for now, can be refined)
|
|
||||||
if hasattr(self._parent_widget, 'request_image_load'):
|
|
||||||
self._parent_widget.request_image_load(element, priority=LoadPriority.HIGH)
|
|
||||||
element._async_load_requested = True
|
|
||||||
element._async_loading = True
|
|
||||||
|
|
||||||
element.render()
|
element.render()
|
||||||
|
|
||||||
# Draw page border LAST (on top of everything)
|
# Draw page border LAST (on top of everything)
|
||||||
|
|||||||
@ -348,59 +348,13 @@ class PDFExporter:
|
|||||||
(side == 'right' and element_center_mm >= split_line_mm):
|
(side == 'right' and element_center_mm >= split_line_mm):
|
||||||
self._render_element(c, element, x_offset_mm, page_width_pt, page_height_pt, page_number)
|
self._render_element(c, element, x_offset_mm, page_width_pt, page_height_pt, page_number)
|
||||||
|
|
||||||
def _resolve_image_path(self, image_path: str) -> Optional[str]:
|
def _render_image(self, c: canvas.Canvas, image_element: 'ImageData', x_pt: float,
|
||||||
"""
|
|
||||||
Resolve an image path, handling both absolute and relative paths.
|
|
||||||
Uses the same logic as ImageData.render() for consistency.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
image_path: The image path (absolute or relative)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Resolved absolute path if found, None otherwise
|
|
||||||
"""
|
|
||||||
if not image_path:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# If already absolute and exists, return it
|
|
||||||
if os.path.isabs(image_path) and os.path.exists(image_path):
|
|
||||||
return image_path
|
|
||||||
|
|
||||||
# For relative paths, try resolution using the same logic as ImageData
|
|
||||||
from pyPhotoAlbum.models import get_asset_search_paths
|
|
||||||
|
|
||||||
project_folder, search_paths = get_asset_search_paths()
|
|
||||||
possible_paths = []
|
|
||||||
|
|
||||||
# Try project folder first if available
|
|
||||||
if project_folder:
|
|
||||||
possible_paths.append(os.path.join(project_folder, image_path))
|
|
||||||
|
|
||||||
# Try additional search paths
|
|
||||||
for search_path in search_paths:
|
|
||||||
possible_paths.append(os.path.join(search_path, image_path))
|
|
||||||
|
|
||||||
# Fallback paths for compatibility
|
|
||||||
possible_paths.extend([
|
|
||||||
image_path, # Try as-is
|
|
||||||
os.path.join(os.getcwd(), image_path), # Relative to CWD
|
|
||||||
os.path.join(os.path.dirname(os.getcwd()), image_path), # Parent of CWD
|
|
||||||
])
|
|
||||||
|
|
||||||
# Find first existing path
|
|
||||||
for path in possible_paths:
|
|
||||||
if os.path.exists(path):
|
|
||||||
return path
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _render_image(self, c: canvas.Canvas, image_element: 'ImageData', x_pt: float,
|
|
||||||
y_pt: float, width_pt: float, height_pt: float, page_number: int,
|
y_pt: float, width_pt: float, height_pt: float, page_number: int,
|
||||||
crop_left: float = 0.0, crop_right: float = 1.0,
|
crop_left: float = 0.0, crop_right: float = 1.0,
|
||||||
original_width_pt: Optional[float] = None, original_height_pt: Optional[float] = None):
|
original_width_pt: Optional[float] = None, original_height_pt: Optional[float] = None):
|
||||||
"""
|
"""
|
||||||
Render an image element on the PDF canvas.
|
Render an image element on the PDF canvas.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
c: ReportLab canvas
|
c: ReportLab canvas
|
||||||
image_element: ImageData instance
|
image_element: ImageData instance
|
||||||
@ -411,19 +365,16 @@ class PDFExporter:
|
|||||||
original_width_pt: Original element width in points (before splitting, for aspect ratio)
|
original_width_pt: Original element width in points (before splitting, for aspect ratio)
|
||||||
original_height_pt: Original element height in points (before splitting, for aspect ratio)
|
original_height_pt: Original element height in points (before splitting, for aspect ratio)
|
||||||
"""
|
"""
|
||||||
# Resolve image path (handles both absolute and relative paths)
|
|
||||||
image_full_path = self._resolve_image_path(image_element.image_path)
|
|
||||||
|
|
||||||
# Check if image exists
|
# Check if image exists
|
||||||
if not image_full_path:
|
if not image_element.image_path or not os.path.exists(image_element.image_path):
|
||||||
warning = f"Page {page_number}: Image not found: {image_element.image_path}"
|
warning = f"Page {page_number}: Image not found: {image_element.image_path}"
|
||||||
print(f"WARNING: {warning}")
|
print(f"WARNING: {warning}")
|
||||||
self.warnings.append(warning)
|
self.warnings.append(warning)
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Load image using resolved path
|
# Load image
|
||||||
img = Image.open(image_full_path)
|
img = Image.open(image_element.image_path)
|
||||||
img = img.convert('RGBA')
|
img = img.convert('RGBA')
|
||||||
|
|
||||||
# Apply element's crop_info (from the element's own cropping)
|
# Apply element's crop_info (from the element's own cropping)
|
||||||
|
|||||||
@ -109,23 +109,19 @@ class Project:
|
|||||||
self.working_dpi = 300 # Default working DPI
|
self.working_dpi = 300 # Default working DPI
|
||||||
self.export_dpi = 300 # Default export DPI
|
self.export_dpi = 300 # Default export DPI
|
||||||
self.page_spacing_mm = 10.0 # Default spacing between pages (1cm)
|
self.page_spacing_mm = 10.0 # Default spacing between pages (1cm)
|
||||||
|
|
||||||
# Cover configuration
|
# Cover configuration
|
||||||
self.has_cover = False # Whether project has a cover
|
self.has_cover = False # Whether project has a cover
|
||||||
self.paper_thickness_mm = 0.2 # Paper thickness for spine calculation (default 0.2mm)
|
self.paper_thickness_mm = 0.2 # Paper thickness for spine calculation (default 0.2mm)
|
||||||
self.cover_bleed_mm = 0.0 # Bleed margin for cover (default 0mm)
|
self.cover_bleed_mm = 0.0 # Bleed margin for cover (default 0mm)
|
||||||
self.binding_type = "saddle_stitch" # Binding type for spine calculation
|
self.binding_type = "saddle_stitch" # Binding type for spine calculation
|
||||||
|
|
||||||
# Embedded templates - templates that travel with the project
|
# Embedded templates - templates that travel with the project
|
||||||
self.embedded_templates: Dict[str, Dict[str, Any]] = {}
|
self.embedded_templates: Dict[str, Dict[str, Any]] = {}
|
||||||
|
|
||||||
# Temporary directory management (if loaded from .ppz)
|
|
||||||
# Using TemporaryDirectory instance that auto-cleans on deletion
|
|
||||||
self._temp_dir = None
|
|
||||||
|
|
||||||
# Initialize asset manager
|
# Initialize asset manager
|
||||||
self.asset_manager = AssetManager(self.folder_path)
|
self.asset_manager = AssetManager(self.folder_path)
|
||||||
|
|
||||||
# Initialize command history with asset manager
|
# Initialize command history with asset manager
|
||||||
self.history = CommandHistory(max_history=100)
|
self.history = CommandHistory(max_history=100)
|
||||||
self.history.asset_manager = self.asset_manager
|
self.history.asset_manager = self.asset_manager
|
||||||
@ -364,22 +360,3 @@ class Project:
|
|||||||
else:
|
else:
|
||||||
self.history = CommandHistory(max_history=100)
|
self.history = CommandHistory(max_history=100)
|
||||||
self.history.asset_manager = self.asset_manager
|
self.history.asset_manager = self.asset_manager
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
"""
|
|
||||||
Cleanup project resources, including temporary directories.
|
|
||||||
Should be called when the project is closed or no longer needed.
|
|
||||||
"""
|
|
||||||
if self._temp_dir is not None:
|
|
||||||
try:
|
|
||||||
# Let TemporaryDirectory clean itself up
|
|
||||||
temp_path = self._temp_dir.name
|
|
||||||
self._temp_dir.cleanup()
|
|
||||||
self._temp_dir = None
|
|
||||||
print(f"Cleaned up temporary project directory: {temp_path}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Warning: Failed to cleanup temporary directory: {e}")
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
"""Destructor to ensure cleanup happens when project is deleted."""
|
|
||||||
self.cleanup()
|
|
||||||
|
|||||||
@ -121,32 +121,27 @@ def save_to_zip(project: Project, zip_path: str) -> Tuple[bool, Optional[str]]:
|
|||||||
def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Optional[Project], Optional[str]]:
|
def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Optional[Project], Optional[str]]:
|
||||||
"""
|
"""
|
||||||
Load a project from a ZIP file.
|
Load a project from a ZIP file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
zip_path: Path to the ZIP file to load
|
zip_path: Path to the ZIP file to load
|
||||||
extract_to: Optional directory to extract to. If None, uses a temporary
|
extract_to: Optional directory to extract to. If None, uses a directory
|
||||||
directory that will be cleaned up when the project is closed.
|
based on the ZIP filename in ./projects/
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (project: Optional[Project], error_message: Optional[str])
|
Tuple of (project: Optional[Project], error_message: Optional[str])
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
if not os.path.exists(zip_path):
|
if not os.path.exists(zip_path):
|
||||||
return None, f"ZIP file not found: {zip_path}"
|
return None, f"ZIP file not found: {zip_path}"
|
||||||
|
|
||||||
# Track if we created a temp directory
|
|
||||||
temp_dir_obj = None
|
|
||||||
|
|
||||||
# Determine extraction directory
|
# Determine extraction directory
|
||||||
if extract_to is None:
|
if extract_to is None:
|
||||||
# Create a temporary directory using TemporaryDirectory
|
# Extract to ./projects/{zipname}/
|
||||||
# This will be attached to the Project and auto-cleaned on deletion
|
|
||||||
zip_basename = os.path.splitext(os.path.basename(zip_path))[0]
|
zip_basename = os.path.splitext(os.path.basename(zip_path))[0]
|
||||||
temp_dir_obj = tempfile.TemporaryDirectory(prefix=f"pyPhotoAlbum_{zip_basename}_")
|
extract_to = os.path.join("./projects", zip_basename)
|
||||||
extract_to = temp_dir_obj.name
|
|
||||||
else:
|
# Create extraction directory
|
||||||
# Create extraction directory if it doesn't exist
|
os.makedirs(extract_to, exist_ok=True)
|
||||||
os.makedirs(extract_to, exist_ok=True)
|
|
||||||
|
|
||||||
# Extract ZIP contents
|
# Extract ZIP contents
|
||||||
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
||||||
@ -185,7 +180,7 @@ def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Opti
|
|||||||
# Create new project
|
# Create new project
|
||||||
project_name = project_data.get('name', 'Untitled Project')
|
project_name = project_data.get('name', 'Untitled Project')
|
||||||
project = Project(name=project_name, folder_path=extract_to)
|
project = Project(name=project_name, folder_path=extract_to)
|
||||||
|
|
||||||
# Deserialize project data
|
# Deserialize project data
|
||||||
project.deserialize(project_data)
|
project.deserialize(project_data)
|
||||||
|
|
||||||
@ -194,12 +189,6 @@ def load_from_zip(zip_path: str, extract_to: Optional[str] = None) -> Tuple[Opti
|
|||||||
project.asset_manager.project_folder = extract_to
|
project.asset_manager.project_folder = extract_to
|
||||||
project.asset_manager.assets_folder = os.path.join(extract_to, "assets")
|
project.asset_manager.assets_folder = os.path.join(extract_to, "assets")
|
||||||
|
|
||||||
# Attach temporary directory to project (if we created one)
|
|
||||||
# The TemporaryDirectory will auto-cleanup when the project is deleted
|
|
||||||
if temp_dir_obj is not None:
|
|
||||||
project._temp_dir = temp_dir_obj
|
|
||||||
print(f"Project loaded to temporary directory: {extract_to}")
|
|
||||||
|
|
||||||
# Normalize asset paths in all ImageData elements
|
# Normalize asset paths in all ImageData elements
|
||||||
# This fixes old projects that have absolute or wrong relative paths
|
# This fixes old projects that have absolute or wrong relative paths
|
||||||
_normalize_asset_paths(project, extract_to)
|
_normalize_asset_paths(project, extract_to)
|
||||||
|
|||||||
@ -1,134 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
"""
|
|
||||||
Test to verify async loading doesn't block the main thread.
|
|
||||||
|
|
||||||
This test demonstrates that the UI remains responsive during image loading.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import time
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from PyQt6.QtWidgets import QApplication
|
|
||||||
from PyQt6.QtCore import QTimer
|
|
||||||
|
|
||||||
from pyPhotoAlbum.async_backend import AsyncImageLoader, ImageCache, LoadPriority
|
|
||||||
|
|
||||||
|
|
||||||
def test_nonblocking_load():
|
|
||||||
"""Test that async image loading doesn't block the main thread"""
|
|
||||||
|
|
||||||
print("Testing non-blocking async image loading...")
|
|
||||||
|
|
||||||
# Track if main thread stays responsive
|
|
||||||
main_thread_ticks = []
|
|
||||||
|
|
||||||
def main_thread_tick():
|
|
||||||
"""This should continue running during async loads"""
|
|
||||||
main_thread_ticks.append(time.time())
|
|
||||||
print(f"✓ Main thread tick {len(main_thread_ticks)} (responsive!)")
|
|
||||||
|
|
||||||
# Create Qt application
|
|
||||||
app = QApplication(sys.argv)
|
|
||||||
|
|
||||||
# Create async loader
|
|
||||||
cache = ImageCache(max_memory_mb=128)
|
|
||||||
loader = AsyncImageLoader(cache=cache, max_workers=2)
|
|
||||||
|
|
||||||
# Track loaded images
|
|
||||||
loaded_images = []
|
|
||||||
|
|
||||||
def on_image_loaded(path, image, user_data):
|
|
||||||
loaded_images.append(path)
|
|
||||||
print(f"✓ Loaded: {path} (size: {image.size})")
|
|
||||||
|
|
||||||
def on_load_failed(path, error_msg, user_data):
|
|
||||||
print(f"✗ Failed: {path} - {error_msg}")
|
|
||||||
|
|
||||||
loader.image_loaded.connect(on_image_loaded)
|
|
||||||
loader.load_failed.connect(on_load_failed)
|
|
||||||
|
|
||||||
# Start the async loader
|
|
||||||
loader.start()
|
|
||||||
print("✓ Async loader started")
|
|
||||||
|
|
||||||
# Request some image loads (these would normally block for 50-500ms each)
|
|
||||||
test_images = [
|
|
||||||
Path("assets/sample1.jpg"),
|
|
||||||
Path("assets/sample2.jpg"),
|
|
||||||
Path("assets/sample3.jpg"),
|
|
||||||
]
|
|
||||||
|
|
||||||
print(f"\nRequesting {len(test_images)} image loads...")
|
|
||||||
for img_path in test_images:
|
|
||||||
loader.request_load(img_path, priority=LoadPriority.HIGH)
|
|
||||||
print(f" → Queued: {img_path}")
|
|
||||||
|
|
||||||
print("\nMain thread should remain responsive while images load in background...")
|
|
||||||
|
|
||||||
# Setup main thread ticker (should run continuously)
|
|
||||||
ticker = QTimer()
|
|
||||||
ticker.timeout.connect(main_thread_tick)
|
|
||||||
ticker.start(100) # Tick every 100ms
|
|
||||||
|
|
||||||
# Setup test timeout
|
|
||||||
def check_completion():
|
|
||||||
elapsed = time.time() - start_time
|
|
||||||
|
|
||||||
if len(loaded_images) >= len(test_images):
|
|
||||||
print(f"\n✓ All images loaded in {elapsed:.2f}s")
|
|
||||||
print(f"✓ Main thread ticked {len(main_thread_ticks)} times during loading")
|
|
||||||
|
|
||||||
if len(main_thread_ticks) >= 3:
|
|
||||||
print("✓ SUCCESS: Main thread remained responsive!")
|
|
||||||
else:
|
|
||||||
print("✗ FAIL: Main thread was blocked!")
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
ticker.stop()
|
|
||||||
loader.stop()
|
|
||||||
app.quit()
|
|
||||||
|
|
||||||
elif elapsed > 10.0:
|
|
||||||
print(f"\n✗ Timeout: Only loaded {len(loaded_images)}/{len(test_images)} images")
|
|
||||||
ticker.stop()
|
|
||||||
loader.stop()
|
|
||||||
app.quit()
|
|
||||||
|
|
||||||
# Check completion every 200ms
|
|
||||||
completion_timer = QTimer()
|
|
||||||
completion_timer.timeout.connect(check_completion)
|
|
||||||
completion_timer.start(200)
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
# Run Qt event loop (this should NOT block)
|
|
||||||
app.exec()
|
|
||||||
|
|
||||||
print("\nTest completed!")
|
|
||||||
|
|
||||||
# Report results
|
|
||||||
print(f"\nResults:")
|
|
||||||
print(f" Images loaded: {len(loaded_images)}/{len(test_images)}")
|
|
||||||
print(f" Main thread ticks: {len(main_thread_ticks)}")
|
|
||||||
print(f" Cache stats: {cache.get_stats()}")
|
|
||||||
|
|
||||||
return len(main_thread_ticks) >= 3 # Success if main thread ticked at least 3 times
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
print("=" * 60)
|
|
||||||
print("Async Non-Blocking Test")
|
|
||||||
print("=" * 60)
|
|
||||||
print()
|
|
||||||
|
|
||||||
success = test_nonblocking_load()
|
|
||||||
|
|
||||||
print()
|
|
||||||
print("=" * 60)
|
|
||||||
if success:
|
|
||||||
print("✓ TEST PASSED: Async loading is non-blocking")
|
|
||||||
else:
|
|
||||||
print("✗ TEST FAILED: Main thread was blocked")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
sys.exit(0 if success else 1)
|
|
||||||
@ -419,195 +419,3 @@ class TestAlignmentManager:
|
|||||||
|
|
||||||
assert abs(gap1 - 50) < 0.01
|
assert abs(gap1 - 50) < 0.01
|
||||||
assert abs(gap2 - 50) < 0.01
|
assert abs(gap2 - 50) < 0.01
|
||||||
|
|
||||||
def test_maximize_pattern_empty_list(self):
|
|
||||||
"""Test maximize_pattern with empty list"""
|
|
||||||
changes = AlignmentManager.maximize_pattern([], (297, 210))
|
|
||||||
assert changes == []
|
|
||||||
|
|
||||||
def test_maximize_pattern_single_element(self):
|
|
||||||
"""Test maximize_pattern with single element"""
|
|
||||||
# Small element in the middle of the page
|
|
||||||
elem = ImageData(x=100, y=80, width=20, height=15)
|
|
||||||
page_size = (297, 210) # A4 landscape in mm
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern([elem], page_size, min_gap=2.0)
|
|
||||||
|
|
||||||
# Element should grow significantly
|
|
||||||
assert elem.size[0] > 20
|
|
||||||
assert elem.size[1] > 15
|
|
||||||
|
|
||||||
# Should maintain aspect ratio
|
|
||||||
original_aspect = 20 / 15
|
|
||||||
new_aspect = elem.size[0] / elem.size[1]
|
|
||||||
assert abs(original_aspect - new_aspect) < 0.01
|
|
||||||
|
|
||||||
# Should not exceed page boundaries (with min_gap)
|
|
||||||
assert elem.position[0] >= 2.0
|
|
||||||
assert elem.position[1] >= 2.0
|
|
||||||
assert elem.position[0] + elem.size[0] <= 297 - 2.0
|
|
||||||
assert elem.position[1] + elem.size[1] <= 210 - 2.0
|
|
||||||
|
|
||||||
# Check undo information
|
|
||||||
assert len(changes) == 1
|
|
||||||
assert changes[0][0] == elem
|
|
||||||
assert changes[0][1] == (100, 80) # old position
|
|
||||||
assert changes[0][2] == (20, 15) # old size
|
|
||||||
|
|
||||||
def test_maximize_pattern_two_elements_horizontal(self):
|
|
||||||
"""Test maximize_pattern with two elements side by side"""
|
|
||||||
elem1 = ImageData(x=50, y=80, width=20, height=20)
|
|
||||||
elem2 = ImageData(x=200, y=80, width=20, height=20)
|
|
||||||
page_size = (297, 210) # A4 landscape in mm
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern([elem1, elem2], page_size, min_gap=2.0)
|
|
||||||
|
|
||||||
# Both elements should grow
|
|
||||||
assert elem1.size[0] > 20 and elem1.size[1] > 20
|
|
||||||
assert elem2.size[0] > 20 and elem2.size[1] > 20
|
|
||||||
|
|
||||||
# Elements should not overlap (min_gap = 2.0)
|
|
||||||
gap_x = max(
|
|
||||||
elem2.position[0] - (elem1.position[0] + elem1.size[0]),
|
|
||||||
elem1.position[0] - (elem2.position[0] + elem2.size[0])
|
|
||||||
)
|
|
||||||
gap_y = max(
|
|
||||||
elem2.position[1] - (elem1.position[1] + elem1.size[1]),
|
|
||||||
elem1.position[1] - (elem2.position[1] + elem2.size[1])
|
|
||||||
)
|
|
||||||
|
|
||||||
# Either horizontal or vertical gap should be >= min_gap
|
|
||||||
assert gap_x >= 2.0 or gap_y >= 2.0
|
|
||||||
|
|
||||||
# Both elements should respect page boundaries
|
|
||||||
for elem in [elem1, elem2]:
|
|
||||||
assert elem.position[0] >= 2.0
|
|
||||||
assert elem.position[1] >= 2.0
|
|
||||||
assert elem.position[0] + elem.size[0] <= 297 - 2.0
|
|
||||||
assert elem.position[1] + elem.size[1] <= 210 - 2.0
|
|
||||||
|
|
||||||
def test_maximize_pattern_three_elements_grid(self):
|
|
||||||
"""Test maximize_pattern with three elements in a grid pattern"""
|
|
||||||
elem1 = ImageData(x=50, y=50, width=15, height=15)
|
|
||||||
elem2 = ImageData(x=150, y=50, width=15, height=15)
|
|
||||||
elem3 = ImageData(x=100, y=120, width=15, height=15)
|
|
||||||
page_size = (297, 210) # A4 landscape in mm
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern([elem1, elem2, elem3], page_size, min_gap=2.0)
|
|
||||||
|
|
||||||
# All elements should grow
|
|
||||||
for elem in [elem1, elem2, elem3]:
|
|
||||||
assert elem.size[0] > 15
|
|
||||||
assert elem.size[1] > 15
|
|
||||||
|
|
||||||
# Check no overlaps with min_gap
|
|
||||||
elements = [elem1, elem2, elem3]
|
|
||||||
for i, elem_a in enumerate(elements):
|
|
||||||
for j, elem_b in enumerate(elements):
|
|
||||||
if i >= j:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Calculate gaps between rectangles
|
|
||||||
gap_x = max(
|
|
||||||
elem_b.position[0] - (elem_a.position[0] + elem_a.size[0]),
|
|
||||||
elem_a.position[0] - (elem_b.position[0] + elem_b.size[0])
|
|
||||||
)
|
|
||||||
gap_y = max(
|
|
||||||
elem_b.position[1] - (elem_a.position[1] + elem_a.size[1]),
|
|
||||||
elem_a.position[1] - (elem_b.position[1] + elem_b.size[1])
|
|
||||||
)
|
|
||||||
|
|
||||||
# At least one gap should be >= min_gap
|
|
||||||
assert gap_x >= 2.0 or gap_y >= 2.0
|
|
||||||
|
|
||||||
# Check undo information
|
|
||||||
assert len(changes) == 3
|
|
||||||
|
|
||||||
def test_maximize_pattern_respects_boundaries(self):
|
|
||||||
"""Test that maximize_pattern respects page boundaries"""
|
|
||||||
elem = ImageData(x=10, y=10, width=10, height=10)
|
|
||||||
page_size = (100, 100)
|
|
||||||
min_gap = 5.0
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern([elem], page_size, min_gap=min_gap)
|
|
||||||
|
|
||||||
# Element should not exceed boundaries
|
|
||||||
assert elem.position[0] >= min_gap
|
|
||||||
assert elem.position[1] >= min_gap
|
|
||||||
assert elem.position[0] + elem.size[0] <= page_size[0] - min_gap
|
|
||||||
assert elem.position[1] + elem.size[1] <= page_size[1] - min_gap
|
|
||||||
|
|
||||||
def test_maximize_pattern_maintains_aspect_ratio(self):
|
|
||||||
"""Test that maximize_pattern maintains element aspect ratios"""
|
|
||||||
elem1 = ImageData(x=50, y=50, width=30, height=20) # 3:2 aspect
|
|
||||||
elem2 = ImageData(x=150, y=50, width=20, height=30) # 2:3 aspect
|
|
||||||
page_size = (297, 210)
|
|
||||||
|
|
||||||
original_aspect1 = elem1.size[0] / elem1.size[1]
|
|
||||||
original_aspect2 = elem2.size[0] / elem2.size[1]
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern([elem1, elem2], page_size, min_gap=2.0)
|
|
||||||
|
|
||||||
# Aspect ratios should be maintained
|
|
||||||
new_aspect1 = elem1.size[0] / elem1.size[1]
|
|
||||||
new_aspect2 = elem2.size[0] / elem2.size[1]
|
|
||||||
|
|
||||||
assert abs(original_aspect1 - new_aspect1) < 0.01
|
|
||||||
assert abs(original_aspect2 - new_aspect2) < 0.01
|
|
||||||
|
|
||||||
def test_maximize_pattern_with_constrained_space(self):
|
|
||||||
"""Test maximize_pattern when elements are tightly packed"""
|
|
||||||
# Create 4 elements in corners with limited space
|
|
||||||
elem1 = ImageData(x=10, y=10, width=10, height=10)
|
|
||||||
elem2 = ImageData(x=140, y=10, width=10, height=10)
|
|
||||||
elem3 = ImageData(x=10, y=90, width=10, height=10)
|
|
||||||
elem4 = ImageData(x=140, y=90, width=10, height=10)
|
|
||||||
page_size = (160, 110)
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern(
|
|
||||||
[elem1, elem2, elem3, elem4],
|
|
||||||
page_size,
|
|
||||||
min_gap=2.0
|
|
||||||
)
|
|
||||||
|
|
||||||
# All elements should grow
|
|
||||||
for elem in [elem1, elem2, elem3, elem4]:
|
|
||||||
assert elem.size[0] > 10
|
|
||||||
assert elem.size[1] > 10
|
|
||||||
|
|
||||||
# Verify no overlaps
|
|
||||||
elements = [elem1, elem2, elem3, elem4]
|
|
||||||
for i, elem_a in enumerate(elements):
|
|
||||||
for j, elem_b in enumerate(elements):
|
|
||||||
if i >= j:
|
|
||||||
continue
|
|
||||||
|
|
||||||
gap_x = max(
|
|
||||||
elem_b.position[0] - (elem_a.position[0] + elem_a.size[0]),
|
|
||||||
elem_a.position[0] - (elem_b.position[0] + elem_b.size[0])
|
|
||||||
)
|
|
||||||
gap_y = max(
|
|
||||||
elem_b.position[1] - (elem_a.position[1] + elem_a.size[1]),
|
|
||||||
elem_a.position[1] - (elem_b.position[1] + elem_b.size[1])
|
|
||||||
)
|
|
||||||
|
|
||||||
assert gap_x >= 2.0 or gap_y >= 2.0
|
|
||||||
|
|
||||||
def test_maximize_pattern_with_different_element_types(self):
|
|
||||||
"""Test maximize_pattern works with different element types"""
|
|
||||||
elem1 = ImageData(x=50, y=50, width=20, height=20)
|
|
||||||
elem2 = PlaceholderData(placeholder_type="image", x=150, y=50, width=20, height=20)
|
|
||||||
elem3 = TextBoxData(text_content="Test", x=100, y=120, width=20, height=20)
|
|
||||||
page_size = (297, 210)
|
|
||||||
|
|
||||||
changes = AlignmentManager.maximize_pattern([elem1, elem2, elem3], page_size, min_gap=2.0)
|
|
||||||
|
|
||||||
# All elements should grow
|
|
||||||
assert elem1.size[0] > 20
|
|
||||||
assert elem2.size[0] > 20
|
|
||||||
assert elem3.size[0] > 20
|
|
||||||
|
|
||||||
# Check undo information has correct element types
|
|
||||||
assert isinstance(changes[0][0], ImageData)
|
|
||||||
assert isinstance(changes[1][0], PlaceholderData)
|
|
||||||
assert isinstance(changes[2][0], TextBoxData)
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user