#!/usr/bin/env python3 """ Test script for project merge functionality This script creates two versions of a project, modifies them differently, and tests the merge functionality. """ import os import sys import tempfile from datetime import datetime, timezone, timedelta # Add pyPhotoAlbum to path sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from pyPhotoAlbum.project import Project, Page from pyPhotoAlbum.models import ImageData, TextBoxData from pyPhotoAlbum.project_serializer import save_to_zip, load_from_zip from pyPhotoAlbum.merge_manager import MergeManager, MergeStrategy, concatenate_projects def create_base_project(): """Create a base project with some content""" project = Project("Base Project") # Add a page with text page = Page(page_number=1) text = TextBoxData(text_content="Original Text", x=10, y=10, width=100, height=50) page.layout.add_element(text) project.add_page(page) return project def test_same_project_merge(): """Test merging two versions of the same project""" print("=" * 60) print("Test 1: Merging Same Project (with conflicts)") print("=" * 60) with tempfile.TemporaryDirectory() as temp_dir: # Create base project print("\n1. Creating base project...") base_project = create_base_project() base_file = os.path.join(temp_dir, "base.ppz") success, _ = save_to_zip(base_project, base_file) assert success, "Failed to save base project" print(f" ✓ Base project saved with project_id: {base_project.project_id}") # Load base project twice to create two versions print("\n2. Creating two divergent versions...") # Version A: Modify text content project_a = load_from_zip(base_file) text_a = project_a.pages[0].layout.elements[0] text_a.text_content = "Modified by User A" text_a.mark_modified() version_a_file = os.path.join(temp_dir, "version_a.ppz") save_to_zip(project_a, version_a_file) print(f" ✓ Version A: Modified text to '{text_a.text_content}'") # Version B: Modify text position project_b = load_from_zip(base_file) text_b = project_b.pages[0].layout.elements[0] text_b.position = (50, 50) text_b.mark_modified() version_b_file = os.path.join(temp_dir, "version_b.ppz") save_to_zip(project_b, version_b_file) print(f" ✓ Version B: Modified position to {text_b.position}") # Detect conflicts print("\n3. Detecting conflicts...") merge_manager = MergeManager() data_a = project_a.serialize() data_b = project_b.serialize() should_merge = merge_manager.should_merge_projects(data_a, data_b) assert should_merge, "Projects should be merged (same project_id)" print(f" ✓ Projects have same project_id, will merge") conflicts = merge_manager.detect_conflicts(data_a, data_b) print(f" ✓ Found {len(conflicts)} conflict(s)") for i, conflict in enumerate(conflicts): print(f" - Conflict {i+1}: {conflict.description}") # Auto-resolve using LATEST_WINS strategy print("\n4. Auto-resolving with LATEST_WINS strategy...") resolutions = merge_manager.auto_resolve_conflicts(MergeStrategy.LATEST_WINS) print(f" ✓ Resolutions: {resolutions}") # Apply merge merged_data = merge_manager.apply_resolutions(data_a, data_b, resolutions) print(f" ✓ Merge applied successfully") print(f" ✓ Merged project has {len(merged_data['pages'])} page(s)") print(f"\n{'=' * 60}") print("✅ Same project merge test PASSED") print(f"{'=' * 60}\n") return True def test_different_project_concatenation(): """Test concatenating two different projects""" print("=" * 60) print("Test 2: Concatenating Different Projects") print("=" * 60) with tempfile.TemporaryDirectory() as temp_dir: # Create two different projects print("\n1. Creating two different projects...") project_a = Project("Project A") page_a = Page(page_number=1) text_a = TextBoxData(text_content="From Project A", x=10, y=10, width=100, height=50) page_a.layout.add_element(text_a) project_a.add_page(page_a) project_b = Project("Project B") page_b = Page(page_number=1) text_b = TextBoxData(text_content="From Project B", x=10, y=10, width=100, height=50) page_b.layout.add_element(text_b) project_b.add_page(page_b) print(f" ✓ Project A: project_id={project_a.project_id}") print(f" ✓ Project B: project_id={project_b.project_id}") # Check if should merge print("\n2. Checking merge vs concatenate...") merge_manager = MergeManager() data_a = project_a.serialize() data_b = project_b.serialize() should_merge = merge_manager.should_merge_projects(data_a, data_b) assert not should_merge, "Projects should be concatenated (different project_ids)" print(f" ✓ Projects have different project_ids, will concatenate") # Concatenate print("\n3. Concatenating projects...") merged_data = concatenate_projects(data_a, data_b) assert len(merged_data["pages"]) == 2, "Should have 2 pages" print(f" ✓ Concatenated project has {len(merged_data['pages'])} pages") print(f" ✓ Combined name: {merged_data['name']}") print(f"\n{'=' * 60}") print("✅ Project concatenation test PASSED") print(f"{'=' * 60}\n") return True def test_no_conflicts(): """Test merging when there are no conflicts""" print("=" * 60) print("Test 3: Merging Without Conflicts") print("=" * 60) with tempfile.TemporaryDirectory() as temp_dir: # Create base project with 2 pages print("\n1. Creating base project with 2 pages...") base_project = Project("Multi-Page Project") page1 = Page(page_number=1) text1 = TextBoxData(text_content="Page 1", x=10, y=10, width=100, height=50) page1.layout.add_element(text1) base_project.add_page(page1) page2 = Page(page_number=2) text2 = TextBoxData(text_content="Page 2", x=10, y=10, width=100, height=50) page2.layout.add_element(text2) base_project.add_page(page2) base_file = os.path.join(temp_dir, "base.ppz") save_to_zip(base_project, base_file) print(f" ✓ Base project saved with 2 pages") # Version A: Modify page 1 project_a = load_from_zip(base_file) project_a.pages[0].layout.elements[0].text_content = "Page 1 - Modified by A" project_a.pages[0].layout.elements[0].mark_modified() # Version B: Modify page 2 (different page, no conflict) project_b = load_from_zip(base_file) project_b.pages[1].layout.elements[0].text_content = "Page 2 - Modified by B" project_b.pages[1].layout.elements[0].mark_modified() print(f" ✓ Version A modified page 1") print(f" ✓ Version B modified page 2") # Detect conflicts print("\n2. Detecting conflicts...") merge_manager = MergeManager() data_a = project_a.serialize() data_b = project_b.serialize() conflicts = merge_manager.detect_conflicts(data_a, data_b) print(f" ✓ Found {len(conflicts)} conflict(s)") # Should be able to auto-merge print("\n3. Auto-merging non-conflicting changes...") merged_data = merge_manager.apply_resolutions(data_a, data_b, {}) # Verify both changes are present merged_project = Project() merged_project.deserialize(merged_data) assert len(merged_project.pages) == 2, "Should have 2 pages" page1_text = merged_project.pages[0].layout.elements[0].text_content page2_text = merged_project.pages[1].layout.elements[0].text_content assert "Modified by A" in page1_text, "Page 1 changes missing" assert "Modified by B" in page2_text, "Page 2 changes missing" print(f" ✓ Page 1 text: {page1_text}") print(f" ✓ Page 2 text: {page2_text}") print(f" ✓ Both changes preserved in merge") print(f"\n{'=' * 60}") print("✅ No-conflict merge test PASSED") print(f"{'=' * 60}\n") return True def run_all_tests(): """Run all merge tests""" print("\n" + "=" * 60) print("PYPH OTOALBUM MERGE FUNCTIONALITY TESTS") print("=" * 60 + "\n") tests = [ ("Same Project Merge", test_same_project_merge), ("Different Project Concatenation", test_different_project_concatenation), ("No-Conflict Merge", test_no_conflicts), ("Helper: _add_missing_pages", test_merge_helper_add_missing_pages), ("Helper: _is_element_in_conflict", test_merge_helper_is_element_in_conflict), ("Helper: _merge_by_timestamp", test_merge_helper_merge_by_timestamp), ("Helper: _merge_element", test_merge_helper_merge_element), ] results = [] for name, test_func in tests: try: success = test_func() results.append((name, success)) except Exception as e: print(f"\n❌ Test '{name}' FAILED with exception: {e}") import traceback traceback.print_exc() results.append((name, False)) # Print summary print("\n" + "=" * 60) print("TEST SUMMARY") print("=" * 60) for name, success in results: status = "✅ PASS" if success else "❌ FAIL" print(f"{status}: {name}") all_passed = all(success for _, success in results) print("=" * 60) print(f"\nOverall: {'✅ ALL TESTS PASSED' if all_passed else '❌ SOME TESTS FAILED'}\n") return all_passed def test_merge_helper_add_missing_pages(): """Test _add_missing_pages helper method""" print("=" * 60) print("Test 4: _add_missing_pages Helper Method") print("=" * 60) # Create projects with different pages project_a = Project("Project A") page_a1 = Page(page_number=1) project_a.add_page(page_a1) project_b = Project("Project B") # Give page_b1 the same UUID as page_a1 so it won't be added page_b1 = Page(page_number=1) page_b1.uuid = page_a1.uuid page_b2 = Page(page_number=2) project_b.add_page(page_b1) project_b.add_page(page_b2) data_a = project_a.serialize() data_b = project_b.serialize() # Make them same project data_b["project_id"] = data_a["project_id"] merge_manager = MergeManager() merge_manager.detect_conflicts(data_a, data_b) # Test _add_missing_pages merged_data = data_a.copy() merged_data["pages"] = list(data_a["pages"]) initial_page_count = len(merged_data["pages"]) merge_manager._add_missing_pages(merged_data, data_b) # Should have added only page_b2 since page_b1 has same UUID as page_a1 assert len(merged_data["pages"]) == initial_page_count + 1 print(f" ✓ Added missing page: {len(merged_data['pages'])} total pages") print(f"\n{'=' * 60}") print("✅ _add_missing_pages test PASSED") print(f"{'=' * 60}\n") return True def test_merge_helper_is_element_in_conflict(): """Test _is_element_in_conflict helper method""" print("=" * 60) print("Test 5: _is_element_in_conflict Helper Method") print("=" * 60) from pyPhotoAlbum.merge_manager import ConflictInfo, ConflictType merge_manager = MergeManager() # Create a conflict conflict = ConflictInfo( conflict_type=ConflictType.ELEMENT_MODIFIED_BOTH, page_uuid="page-123", element_uuid="elem-456", our_version={}, their_version={}, description="Test conflict", ) merge_manager.conflicts.append(conflict) # Test detection assert merge_manager._is_element_in_conflict("elem-456", "page-123") is True assert merge_manager._is_element_in_conflict("elem-999", "page-123") is False assert merge_manager._is_element_in_conflict("elem-456", "page-999") is False print(f" ✓ Correctly identified conflicting element") print(f" ✓ Correctly identified non-conflicting elements") print(f"\n{'=' * 60}") print("✅ _is_element_in_conflict test PASSED") print(f"{'=' * 60}\n") return True def test_merge_helper_merge_by_timestamp(): """Test _merge_by_timestamp helper method""" print("=" * 60) print("Test 6: _merge_by_timestamp Helper Method") print("=" * 60) from datetime import datetime, timezone, timedelta merge_manager = MergeManager() # Create page with elements now = datetime.now(timezone.utc) older = (now - timedelta(hours=1)).isoformat() newer = (now + timedelta(hours=1)).isoformat() our_page = {"layout": {"elements": [{"uuid": "elem-1", "text_content": "Older version", "last_modified": older}]}} our_elem = our_page["layout"]["elements"][0] their_elem = {"uuid": "elem-1", "text_content": "Newer version", "last_modified": newer} # Test: their version is newer, should replace merge_manager._merge_by_timestamp(our_page, "elem-1", their_elem, our_elem) assert our_page["layout"]["elements"][0]["text_content"] == "Newer version" print(f" ✓ Correctly replaced with newer version") # Test: our version is newer, should not replace our_page["layout"]["elements"][0] = {"uuid": "elem-2", "text_content": "Our newer version", "last_modified": newer} their_elem_older = {"uuid": "elem-2", "text_content": "Their older version", "last_modified": older} merge_manager._merge_by_timestamp(our_page, "elem-2", their_elem_older, our_page["layout"]["elements"][0]) assert our_page["layout"]["elements"][0]["text_content"] == "Our newer version" print(f" ✓ Correctly kept our newer version") print(f"\n{'=' * 60}") print("✅ _merge_by_timestamp test PASSED") print(f"{'=' * 60}\n") return True def test_merge_helper_merge_element(): """Test _merge_element helper method""" print("=" * 60) print("Test 7: _merge_element Helper Method") print("=" * 60) from datetime import datetime, timezone merge_manager = MergeManager() now = datetime.now(timezone.utc).isoformat() # Setup: page with one element our_page = { "uuid": "page-1", "layout": {"elements": [{"uuid": "elem-existing", "text_content": "Existing", "last_modified": now}]}, } our_elements = {"elem-existing": our_page["layout"]["elements"][0]} # Test 1: Adding new element their_new_elem = {"uuid": "elem-new", "text_content": "New element", "last_modified": now} merge_manager._merge_element( our_page=our_page, page_uuid="page-1", their_elem=their_new_elem, our_elements=our_elements ) assert len(our_page["layout"]["elements"]) == 2 assert our_page["layout"]["elements"][1]["uuid"] == "elem-new" print(f" ✓ Correctly added new element") # Test 2: Element in conflict should be skipped from pyPhotoAlbum.merge_manager import ConflictInfo, ConflictType conflict_elem = {"uuid": "elem-conflict", "text_content": "Conflict element", "last_modified": now} conflict = ConflictInfo( conflict_type=ConflictType.ELEMENT_MODIFIED_BOTH, page_uuid="page-1", element_uuid="elem-conflict", our_version={}, their_version={}, description="Test", ) merge_manager.conflicts.append(conflict) our_elements["elem-conflict"] = {"uuid": "elem-conflict", "text_content": "Ours"} our_page["layout"]["elements"].append(our_elements["elem-conflict"]) initial_count = len(our_page["layout"]["elements"]) merge_manager._merge_element( our_page=our_page, page_uuid="page-1", their_elem=conflict_elem, our_elements=our_elements ) # Should not change anything since it's in conflict assert len(our_page["layout"]["elements"]) == initial_count print(f" ✓ Correctly skipped conflicting element") print(f"\n{'=' * 60}") print("✅ _merge_element test PASSED") print(f"{'=' * 60}\n") return True if __name__ == "__main__": success = run_all_tests() sys.exit(0 if success else 1)