All checks were successful
Python CI / test (push) Successful in 1m20s
Lint / lint (push) Successful in 1m4s
Tests / test (3.11) (push) Successful in 1m27s
Tests / test (3.12) (push) Successful in 2m25s
Tests / test (3.13) (push) Successful in 2m52s
Tests / test (3.14) (push) Successful in 1m9s
459 lines
16 KiB
Python
Executable File
459 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Test script for project merge functionality
|
|
|
|
This script creates two versions of a project, modifies them differently,
|
|
and tests the merge functionality.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
# Add pyPhotoAlbum to path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
|
|
from pyPhotoAlbum.project import Project, Page
|
|
from pyPhotoAlbum.models import ImageData, TextBoxData
|
|
from pyPhotoAlbum.project_serializer import save_to_zip, load_from_zip
|
|
from pyPhotoAlbum.merge_manager import MergeManager, MergeStrategy, concatenate_projects
|
|
|
|
|
|
def create_base_project():
|
|
"""Create a base project with some content"""
|
|
project = Project("Base Project")
|
|
|
|
# Add a page with text
|
|
page = Page(page_number=1)
|
|
text = TextBoxData(text_content="Original Text", x=10, y=10, width=100, height=50)
|
|
page.layout.add_element(text)
|
|
project.add_page(page)
|
|
|
|
return project
|
|
|
|
|
|
def test_same_project_merge():
|
|
"""Test merging two versions of the same project"""
|
|
print("=" * 60)
|
|
print("Test 1: Merging Same Project (with conflicts)")
|
|
print("=" * 60)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Create base project
|
|
print("\n1. Creating base project...")
|
|
base_project = create_base_project()
|
|
base_file = os.path.join(temp_dir, "base.ppz")
|
|
success, _ = save_to_zip(base_project, base_file)
|
|
assert success, "Failed to save base project"
|
|
print(f" ✓ Base project saved with project_id: {base_project.project_id}")
|
|
|
|
# Load base project twice to create two versions
|
|
print("\n2. Creating two divergent versions...")
|
|
|
|
# Version A: Modify text content
|
|
project_a = load_from_zip(base_file)
|
|
text_a = project_a.pages[0].layout.elements[0]
|
|
text_a.text_content = "Modified by User A"
|
|
text_a.mark_modified()
|
|
version_a_file = os.path.join(temp_dir, "version_a.ppz")
|
|
save_to_zip(project_a, version_a_file)
|
|
print(f" ✓ Version A: Modified text to '{text_a.text_content}'")
|
|
|
|
# Version B: Modify text position
|
|
project_b = load_from_zip(base_file)
|
|
text_b = project_b.pages[0].layout.elements[0]
|
|
text_b.position = (50, 50)
|
|
text_b.mark_modified()
|
|
version_b_file = os.path.join(temp_dir, "version_b.ppz")
|
|
save_to_zip(project_b, version_b_file)
|
|
print(f" ✓ Version B: Modified position to {text_b.position}")
|
|
|
|
# Detect conflicts
|
|
print("\n3. Detecting conflicts...")
|
|
merge_manager = MergeManager()
|
|
|
|
data_a = project_a.serialize()
|
|
data_b = project_b.serialize()
|
|
|
|
should_merge = merge_manager.should_merge_projects(data_a, data_b)
|
|
assert should_merge, "Projects should be merged (same project_id)"
|
|
print(f" ✓ Projects have same project_id, will merge")
|
|
|
|
conflicts = merge_manager.detect_conflicts(data_a, data_b)
|
|
print(f" ✓ Found {len(conflicts)} conflict(s)")
|
|
|
|
for i, conflict in enumerate(conflicts):
|
|
print(f" - Conflict {i+1}: {conflict.description}")
|
|
|
|
# Auto-resolve using LATEST_WINS strategy
|
|
print("\n4. Auto-resolving with LATEST_WINS strategy...")
|
|
resolutions = merge_manager.auto_resolve_conflicts(MergeStrategy.LATEST_WINS)
|
|
print(f" ✓ Resolutions: {resolutions}")
|
|
|
|
# Apply merge
|
|
merged_data = merge_manager.apply_resolutions(data_a, data_b, resolutions)
|
|
print(f" ✓ Merge applied successfully")
|
|
print(f" ✓ Merged project has {len(merged_data['pages'])} page(s)")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ Same project merge test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
def test_different_project_concatenation():
|
|
"""Test concatenating two different projects"""
|
|
print("=" * 60)
|
|
print("Test 2: Concatenating Different Projects")
|
|
print("=" * 60)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Create two different projects
|
|
print("\n1. Creating two different projects...")
|
|
|
|
project_a = Project("Project A")
|
|
page_a = Page(page_number=1)
|
|
text_a = TextBoxData(text_content="From Project A", x=10, y=10, width=100, height=50)
|
|
page_a.layout.add_element(text_a)
|
|
project_a.add_page(page_a)
|
|
|
|
project_b = Project("Project B")
|
|
page_b = Page(page_number=1)
|
|
text_b = TextBoxData(text_content="From Project B", x=10, y=10, width=100, height=50)
|
|
page_b.layout.add_element(text_b)
|
|
project_b.add_page(page_b)
|
|
|
|
print(f" ✓ Project A: project_id={project_a.project_id}")
|
|
print(f" ✓ Project B: project_id={project_b.project_id}")
|
|
|
|
# Check if should merge
|
|
print("\n2. Checking merge vs concatenate...")
|
|
merge_manager = MergeManager()
|
|
|
|
data_a = project_a.serialize()
|
|
data_b = project_b.serialize()
|
|
|
|
should_merge = merge_manager.should_merge_projects(data_a, data_b)
|
|
assert not should_merge, "Projects should be concatenated (different project_ids)"
|
|
print(f" ✓ Projects have different project_ids, will concatenate")
|
|
|
|
# Concatenate
|
|
print("\n3. Concatenating projects...")
|
|
merged_data = concatenate_projects(data_a, data_b)
|
|
|
|
assert len(merged_data["pages"]) == 2, "Should have 2 pages"
|
|
print(f" ✓ Concatenated project has {len(merged_data['pages'])} pages")
|
|
print(f" ✓ Combined name: {merged_data['name']}")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ Project concatenation test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
def test_no_conflicts():
|
|
"""Test merging when there are no conflicts"""
|
|
print("=" * 60)
|
|
print("Test 3: Merging Without Conflicts")
|
|
print("=" * 60)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Create base project with 2 pages
|
|
print("\n1. Creating base project with 2 pages...")
|
|
base_project = Project("Multi-Page Project")
|
|
|
|
page1 = Page(page_number=1)
|
|
text1 = TextBoxData(text_content="Page 1", x=10, y=10, width=100, height=50)
|
|
page1.layout.add_element(text1)
|
|
base_project.add_page(page1)
|
|
|
|
page2 = Page(page_number=2)
|
|
text2 = TextBoxData(text_content="Page 2", x=10, y=10, width=100, height=50)
|
|
page2.layout.add_element(text2)
|
|
base_project.add_page(page2)
|
|
|
|
base_file = os.path.join(temp_dir, "base.ppz")
|
|
save_to_zip(base_project, base_file)
|
|
print(f" ✓ Base project saved with 2 pages")
|
|
|
|
# Version A: Modify page 1
|
|
project_a = load_from_zip(base_file)
|
|
project_a.pages[0].layout.elements[0].text_content = "Page 1 - Modified by A"
|
|
project_a.pages[0].layout.elements[0].mark_modified()
|
|
|
|
# Version B: Modify page 2 (different page, no conflict)
|
|
project_b = load_from_zip(base_file)
|
|
project_b.pages[1].layout.elements[0].text_content = "Page 2 - Modified by B"
|
|
project_b.pages[1].layout.elements[0].mark_modified()
|
|
|
|
print(f" ✓ Version A modified page 1")
|
|
print(f" ✓ Version B modified page 2")
|
|
|
|
# Detect conflicts
|
|
print("\n2. Detecting conflicts...")
|
|
merge_manager = MergeManager()
|
|
|
|
data_a = project_a.serialize()
|
|
data_b = project_b.serialize()
|
|
|
|
conflicts = merge_manager.detect_conflicts(data_a, data_b)
|
|
print(f" ✓ Found {len(conflicts)} conflict(s)")
|
|
|
|
# Should be able to auto-merge
|
|
print("\n3. Auto-merging non-conflicting changes...")
|
|
merged_data = merge_manager.apply_resolutions(data_a, data_b, {})
|
|
|
|
# Verify both changes are present
|
|
merged_project = Project()
|
|
merged_project.deserialize(merged_data)
|
|
|
|
assert len(merged_project.pages) == 2, "Should have 2 pages"
|
|
page1_text = merged_project.pages[0].layout.elements[0].text_content
|
|
page2_text = merged_project.pages[1].layout.elements[0].text_content
|
|
|
|
assert "Modified by A" in page1_text, "Page 1 changes missing"
|
|
assert "Modified by B" in page2_text, "Page 2 changes missing"
|
|
|
|
print(f" ✓ Page 1 text: {page1_text}")
|
|
print(f" ✓ Page 2 text: {page2_text}")
|
|
print(f" ✓ Both changes preserved in merge")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ No-conflict merge test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
def run_all_tests():
|
|
"""Run all merge tests"""
|
|
print("\n" + "=" * 60)
|
|
print("PYPH OTOALBUM MERGE FUNCTIONALITY TESTS")
|
|
print("=" * 60 + "\n")
|
|
|
|
tests = [
|
|
("Same Project Merge", test_same_project_merge),
|
|
("Different Project Concatenation", test_different_project_concatenation),
|
|
("No-Conflict Merge", test_no_conflicts),
|
|
("Helper: _add_missing_pages", test_merge_helper_add_missing_pages),
|
|
("Helper: _is_element_in_conflict", test_merge_helper_is_element_in_conflict),
|
|
("Helper: _merge_by_timestamp", test_merge_helper_merge_by_timestamp),
|
|
("Helper: _merge_element", test_merge_helper_merge_element),
|
|
]
|
|
|
|
results = []
|
|
for name, test_func in tests:
|
|
try:
|
|
success = test_func()
|
|
results.append((name, success))
|
|
except Exception as e:
|
|
print(f"\n❌ Test '{name}' FAILED with exception: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
results.append((name, False))
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("TEST SUMMARY")
|
|
print("=" * 60)
|
|
for name, success in results:
|
|
status = "✅ PASS" if success else "❌ FAIL"
|
|
print(f"{status}: {name}")
|
|
|
|
all_passed = all(success for _, success in results)
|
|
print("=" * 60)
|
|
print(f"\nOverall: {'✅ ALL TESTS PASSED' if all_passed else '❌ SOME TESTS FAILED'}\n")
|
|
|
|
return all_passed
|
|
|
|
|
|
def test_merge_helper_add_missing_pages():
|
|
"""Test _add_missing_pages helper method"""
|
|
print("=" * 60)
|
|
print("Test 4: _add_missing_pages Helper Method")
|
|
print("=" * 60)
|
|
|
|
# Create projects with different pages
|
|
project_a = Project("Project A")
|
|
page_a1 = Page(page_number=1)
|
|
project_a.add_page(page_a1)
|
|
|
|
project_b = Project("Project B")
|
|
# Give page_b1 the same UUID as page_a1 so it won't be added
|
|
page_b1 = Page(page_number=1)
|
|
page_b1.uuid = page_a1.uuid
|
|
page_b2 = Page(page_number=2)
|
|
project_b.add_page(page_b1)
|
|
project_b.add_page(page_b2)
|
|
|
|
data_a = project_a.serialize()
|
|
data_b = project_b.serialize()
|
|
|
|
# Make them same project
|
|
data_b["project_id"] = data_a["project_id"]
|
|
|
|
merge_manager = MergeManager()
|
|
merge_manager.detect_conflicts(data_a, data_b)
|
|
|
|
# Test _add_missing_pages
|
|
merged_data = data_a.copy()
|
|
merged_data["pages"] = list(data_a["pages"])
|
|
initial_page_count = len(merged_data["pages"])
|
|
|
|
merge_manager._add_missing_pages(merged_data, data_b)
|
|
|
|
# Should have added only page_b2 since page_b1 has same UUID as page_a1
|
|
assert len(merged_data["pages"]) == initial_page_count + 1
|
|
print(f" ✓ Added missing page: {len(merged_data['pages'])} total pages")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ _add_missing_pages test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
def test_merge_helper_is_element_in_conflict():
|
|
"""Test _is_element_in_conflict helper method"""
|
|
print("=" * 60)
|
|
print("Test 5: _is_element_in_conflict Helper Method")
|
|
print("=" * 60)
|
|
|
|
from pyPhotoAlbum.merge_manager import ConflictInfo, ConflictType
|
|
|
|
merge_manager = MergeManager()
|
|
|
|
# Create a conflict
|
|
conflict = ConflictInfo(
|
|
conflict_type=ConflictType.ELEMENT_MODIFIED_BOTH,
|
|
page_uuid="page-123",
|
|
element_uuid="elem-456",
|
|
our_version={},
|
|
their_version={},
|
|
description="Test conflict",
|
|
)
|
|
merge_manager.conflicts.append(conflict)
|
|
|
|
# Test detection
|
|
assert merge_manager._is_element_in_conflict("elem-456", "page-123") is True
|
|
assert merge_manager._is_element_in_conflict("elem-999", "page-123") is False
|
|
assert merge_manager._is_element_in_conflict("elem-456", "page-999") is False
|
|
|
|
print(f" ✓ Correctly identified conflicting element")
|
|
print(f" ✓ Correctly identified non-conflicting elements")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ _is_element_in_conflict test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
def test_merge_helper_merge_by_timestamp():
|
|
"""Test _merge_by_timestamp helper method"""
|
|
print("=" * 60)
|
|
print("Test 6: _merge_by_timestamp Helper Method")
|
|
print("=" * 60)
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
merge_manager = MergeManager()
|
|
|
|
# Create page with elements
|
|
now = datetime.now(timezone.utc)
|
|
older = (now - timedelta(hours=1)).isoformat()
|
|
newer = (now + timedelta(hours=1)).isoformat()
|
|
|
|
our_page = {"layout": {"elements": [{"uuid": "elem-1", "text_content": "Older version", "last_modified": older}]}}
|
|
|
|
our_elem = our_page["layout"]["elements"][0]
|
|
their_elem = {"uuid": "elem-1", "text_content": "Newer version", "last_modified": newer}
|
|
|
|
# Test: their version is newer, should replace
|
|
merge_manager._merge_by_timestamp(our_page, "elem-1", their_elem, our_elem)
|
|
|
|
assert our_page["layout"]["elements"][0]["text_content"] == "Newer version"
|
|
print(f" ✓ Correctly replaced with newer version")
|
|
|
|
# Test: our version is newer, should not replace
|
|
our_page["layout"]["elements"][0] = {"uuid": "elem-2", "text_content": "Our newer version", "last_modified": newer}
|
|
their_elem_older = {"uuid": "elem-2", "text_content": "Their older version", "last_modified": older}
|
|
|
|
merge_manager._merge_by_timestamp(our_page, "elem-2", their_elem_older, our_page["layout"]["elements"][0])
|
|
|
|
assert our_page["layout"]["elements"][0]["text_content"] == "Our newer version"
|
|
print(f" ✓ Correctly kept our newer version")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ _merge_by_timestamp test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
def test_merge_helper_merge_element():
|
|
"""Test _merge_element helper method"""
|
|
print("=" * 60)
|
|
print("Test 7: _merge_element Helper Method")
|
|
print("=" * 60)
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
merge_manager = MergeManager()
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Setup: page with one element
|
|
our_page = {
|
|
"uuid": "page-1",
|
|
"layout": {"elements": [{"uuid": "elem-existing", "text_content": "Existing", "last_modified": now}]},
|
|
}
|
|
|
|
our_elements = {"elem-existing": our_page["layout"]["elements"][0]}
|
|
|
|
# Test 1: Adding new element
|
|
their_new_elem = {"uuid": "elem-new", "text_content": "New element", "last_modified": now}
|
|
|
|
merge_manager._merge_element(
|
|
our_page=our_page, page_uuid="page-1", their_elem=their_new_elem, our_elements=our_elements
|
|
)
|
|
|
|
assert len(our_page["layout"]["elements"]) == 2
|
|
assert our_page["layout"]["elements"][1]["uuid"] == "elem-new"
|
|
print(f" ✓ Correctly added new element")
|
|
|
|
# Test 2: Element in conflict should be skipped
|
|
from pyPhotoAlbum.merge_manager import ConflictInfo, ConflictType
|
|
|
|
conflict_elem = {"uuid": "elem-conflict", "text_content": "Conflict element", "last_modified": now}
|
|
|
|
conflict = ConflictInfo(
|
|
conflict_type=ConflictType.ELEMENT_MODIFIED_BOTH,
|
|
page_uuid="page-1",
|
|
element_uuid="elem-conflict",
|
|
our_version={},
|
|
their_version={},
|
|
description="Test",
|
|
)
|
|
merge_manager.conflicts.append(conflict)
|
|
|
|
our_elements["elem-conflict"] = {"uuid": "elem-conflict", "text_content": "Ours"}
|
|
our_page["layout"]["elements"].append(our_elements["elem-conflict"])
|
|
|
|
initial_count = len(our_page["layout"]["elements"])
|
|
|
|
merge_manager._merge_element(
|
|
our_page=our_page, page_uuid="page-1", their_elem=conflict_elem, our_elements=our_elements
|
|
)
|
|
|
|
# Should not change anything since it's in conflict
|
|
assert len(our_page["layout"]["elements"]) == initial_count
|
|
print(f" ✓ Correctly skipped conflicting element")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("✅ _merge_element test PASSED")
|
|
print(f"{'=' * 60}\n")
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = run_all_tests()
|
|
sys.exit(0 if success else 1)
|