Coverage for pyWebLayout/io/readers/epub_reader.py: 70%
286 statements
« prev ^ index » next coverage.py v7.11.2, created at 2025-11-12 12:02 +0000
« prev ^ index » next coverage.py v7.11.2, created at 2025-11-12 12:02 +0000
1"""
2EPUB reader for pyWebLayout.
4This module provides functionality for reading EPUB documents and converting them
5to pyWebLayout's abstract document model.
6"""
8import os
9import zipfile
10import tempfile
11from typing import Dict, List, Optional, Any, Callable
12import xml.etree.ElementTree as ET
13import urllib.parse
14from PIL import Image as PILImage, ImageOps
16from pyWebLayout.abstract.document import Book, Chapter, MetadataType
17from pyWebLayout.abstract.block import PageBreak
18from pyWebLayout.io.readers.html_extraction import parse_html_string
21# XML namespaces used in EPUB files
22NAMESPACES = {
23 'opf': 'http://www.idpf.org/2007/opf',
24 'dc': 'http://purl.org/dc/elements/1.1/',
25 'dcterms': 'http://purl.org/dc/terms/',
26 'xhtml': 'http://www.w3.org/1999/xhtml',
27 'ncx': 'http://www.daisy.org/z3986/2005/ncx/',
28}
31def default_eink_processor(img: PILImage.Image) -> PILImage.Image:
32 """
33 Process image for 4-bit e-ink display using PIL only.
34 Applies histogram equalization and 4-bit quantization.
36 Args:
37 img: PIL Image to process
39 Returns:
40 Processed PIL Image in L mode (grayscale) with 4-bit quantization
41 """
42 # Convert to grayscale if needed
43 if img.mode != 'L':
44 img = img.convert('L')
46 # Apply histogram equalization for contrast enhancement
47 img = ImageOps.equalize(img)
49 # Quantize to 4-bit (16 grayscale levels: 0, 17, 34, ..., 255)
50 img = img.point(lambda x: (x // 16) * 17)
52 return img
55class EPUBReader:
56 """
57 Reader for EPUB documents.
59 This class extracts content from EPUB files and converts it to
60 pyWebLayout's abstract document model.
61 """
63 def __init__(self, epub_path: str, image_processor: Optional[Callable[[
64 PILImage.Image], PILImage.Image]] = default_eink_processor):
65 """
66 Initialize an EPUB reader.
68 Args:
69 epub_path: Path to the EPUB file
70 image_processor: Optional function to process images for display optimization.
71 Defaults to default_eink_processor for 4-bit e-ink displays.
72 Set to None to disable image processing.
73 Custom processor should accept and return a PIL Image.
74 """
75 self.epub_path = epub_path
76 self.image_processor = image_processor
77 self.book = Book()
78 self.temp_dir = None
79 self.content_dir = None
80 self.metadata = {}
81 self.toc = []
82 self.spine = []
83 self.manifest = {}
84 self.cover_id = None # ID of the cover image in manifest
86 def read(self) -> Book:
87 """
88 Read the EPUB file and convert it to a Book.
90 Returns:
91 Book: The parsed book
92 """
93 try:
94 # Extract the EPUB file
95 self.temp_dir = tempfile.mkdtemp()
96 self._extract_epub()
97 self._parse_package_document()
98 self._parse_toc()
99 self._create_book()
101 # Add chapters to the book
102 self._add_chapters()
104 # Process images for e-ink display optimization
105 self._process_content_images()
107 return self.book
109 finally:
110 # Clean up temporary files
111 if self.temp_dir:
112 import shutil
113 shutil.rmtree(self.temp_dir, ignore_errors=True)
115 def _extract_epub(self):
116 """Extract the EPUB file to a temporary directory."""
117 with zipfile.ZipFile(self.epub_path, 'r') as zip_ref:
118 zip_ref.extractall(self.temp_dir)
120 # Find the content directory (typically OEBPS or OPS)
121 container_path = os.path.join(self.temp_dir, 'META-INF', 'container.xml')
122 if os.path.exists(container_path): 122 ↛ 136line 122 didn't jump to line 136 because the condition on line 122 was always true
123 tree = ET.parse(container_path)
124 root = tree.getroot()
126 # Get the path to the package document (content.opf)
127 for rootfile in root.findall( 127 ↛ 136line 127 didn't jump to line 136 because the loop on line 127 didn't complete
128 './/{urn:oasis:names:tc:opendocument:xmlns:container}rootfile'):
129 full_path = rootfile.get('full-path')
130 if full_path: 130 ↛ 127line 130 didn't jump to line 127 because the condition on line 130 was always true
131 self.content_dir = os.path.dirname(
132 os.path.join(self.temp_dir, full_path))
133 return
135 # Fallback: look for common content directories
136 for content_dir in ['OEBPS', 'OPS', 'Content']:
137 if os.path.exists(os.path.join(self.temp_dir, content_dir)):
138 self.content_dir = os.path.join(self.temp_dir, content_dir)
139 return
141 # If no content directory found, use the root
142 self.content_dir = self.temp_dir
144 def _parse_package_document(self):
145 """Parse the package document (content.opf)."""
146 # Find the package document
147 opf_path = None
148 for root, dirs, files in os.walk(self.content_dir): 148 ↛ 156line 148 didn't jump to line 156 because the loop on line 148 didn't complete
149 for file in files: 149 ↛ 153line 149 didn't jump to line 153 because the loop on line 149 didn't complete
150 if file.endswith('.opf'):
151 opf_path = os.path.join(root, file)
152 break
153 if opf_path: 153 ↛ 148line 153 didn't jump to line 148 because the condition on line 153 was always true
154 break
156 if not opf_path: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 raise ValueError("No package document (.opf) found in EPUB")
159 # Parse the package document
160 tree = ET.parse(opf_path)
161 root = tree.getroot()
163 # Parse metadata
164 self._parse_metadata(root)
166 # Parse manifest
167 self._parse_manifest(root)
169 # Parse spine
170 self._parse_spine(root)
172 def _parse_metadata(self, root: ET.Element):
173 """
174 Parse metadata from the package document.
176 Args:
177 root: Root element of the package document
178 """
179 # Find the metadata element
180 metadata_elem = root.find('.//{{{0}}}metadata'.format(NAMESPACES['opf']))
181 if metadata_elem is None: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 return
184 # Parse DC metadata
185 for elem in metadata_elem:
186 if elem.tag.startswith('{{{0}}}'.format(NAMESPACES['dc'])):
187 # Get the local name (without namespace)
188 name = elem.tag.split('}', 1)[1]
189 value = elem.text
191 if name == 'title':
192 self.metadata['title'] = value
193 elif name == 'creator':
194 self.metadata['creator'] = value
195 elif name == 'language':
196 self.metadata['language'] = value
197 elif name == 'description':
198 self.metadata['description'] = value
199 elif name == 'subject':
200 if 'subjects' not in self.metadata:
201 self.metadata['subjects'] = []
202 self.metadata['subjects'].append(value)
203 elif name == 'date':
204 self.metadata['date'] = value
205 elif name == 'identifier':
206 self.metadata['identifier'] = value
207 elif name == 'publisher':
208 self.metadata['publisher'] = value
209 else:
210 # Store other metadata
211 self.metadata[name] = value
213 # Parse meta elements for cover reference
214 for meta in metadata_elem.findall('.//{{{0}}}meta'.format(NAMESPACES['opf'])):
215 name = meta.get('name')
216 content = meta.get('content')
218 if name == 'cover' and content: 218 ↛ 220line 218 didn't jump to line 220 because the condition on line 218 was never true
219 # This is a reference to the cover image in the manifest
220 self.cover_id = content
222 def _parse_manifest(self, root: ET.Element):
223 """
224 Parse manifest from the package document.
226 Args:
227 root: Root element of the package document
228 """
229 # Find the manifest element
230 manifest_elem = root.find('.//{{{0}}}manifest'.format(NAMESPACES['opf']))
231 if manifest_elem is None: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 return
234 # Parse items
235 for item in manifest_elem.findall('.//{{{0}}}item'.format(NAMESPACES['opf'])):
236 id = item.get('id')
237 href = item.get('href')
238 media_type = item.get('media-type')
240 if id and href: 240 ↛ 235line 240 didn't jump to line 235 because the condition on line 240 was always true
241 # Resolve relative path
242 href = urllib.parse.unquote(href)
243 path = os.path.normpath(os.path.join(self.content_dir, href))
245 self.manifest[id] = {
246 'href': href,
247 'path': path,
248 'media_type': media_type
249 }
251 def _parse_spine(self, root: ET.Element):
252 """
253 Parse spine from the package document.
255 Args:
256 root: Root element of the package document
257 """
258 # Find the spine element
259 spine_elem = root.find('.//{{{0}}}spine'.format(NAMESPACES['opf']))
260 if spine_elem is None: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 return
263 # Get the toc attribute (NCX file ID)
264 toc_id = spine_elem.get('toc')
265 if toc_id and toc_id in self.manifest: 265 ↛ 269line 265 didn't jump to line 269 because the condition on line 265 was always true
266 self.toc_path = self.manifest[toc_id]['path']
268 # Parse itemrefs
269 for itemref in spine_elem.findall(
270 './/{{{0}}}itemref'.format(NAMESPACES['opf'])):
271 idref = itemref.get('idref')
272 if idref and idref in self.manifest: 272 ↛ 269line 272 didn't jump to line 269 because the condition on line 272 was always true
273 self.spine.append(idref)
275 def _parse_toc(self):
276 """Parse the table of contents."""
277 if not hasattr( 277 ↛ 282line 277 didn't jump to line 282 because the condition on line 277 was never true
278 self,
279 'toc_path') or not self.toc_path or not os.path.exists(
280 self.toc_path):
281 # Try to find the toc.ncx file
282 for root, dirs, files in os.walk(self.content_dir):
283 for file in files:
284 if file.endswith('.ncx'):
285 self.toc_path = os.path.join(root, file)
286 break
287 if hasattr(self, 'toc_path') and self.toc_path:
288 break
290 if not hasattr( 290 ↛ 295line 290 didn't jump to line 295 because the condition on line 290 was never true
291 self,
292 'toc_path') or not self.toc_path or not os.path.exists(
293 self.toc_path):
294 # No TOC found
295 return
297 # Parse the NCX file
298 tree = ET.parse(self.toc_path)
299 root = tree.getroot()
301 # Parse navMap
302 nav_map = root.find('.//{{{0}}}navMap'.format(NAMESPACES['ncx']))
303 if nav_map is None: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 return
306 # Parse navPoints
307 self._parse_nav_points(nav_map, [])
309 def _parse_nav_points(self, parent: ET.Element, path: List[Dict[str, Any]]):
310 """
311 Recursively parse navPoints from the NCX file.
313 Args:
314 parent: Parent element containing navPoints
315 path: Current path in the TOC hierarchy
316 """
317 for nav_point in parent.findall('.//{{{0}}}navPoint'.format(NAMESPACES['ncx'])):
318 # Get navPoint attributes
319 id = nav_point.get('id')
320 play_order = nav_point.get('playOrder')
322 # Get navLabel
323 nav_label = nav_point.find('.//{{{0}}}navLabel'.format(NAMESPACES['ncx']))
324 text_elem = nav_label.find(
325 './/{{{0}}}text'.format(NAMESPACES['ncx'])) if nav_label else None
326 label = text_elem.text if text_elem is not None else ""
328 # Get content
329 content = nav_point.find('.//{{{0}}}content'.format(NAMESPACES['ncx']))
330 src = content.get('src') if content is not None else ""
332 # Create a TOC entry
333 entry = {
334 'id': id,
335 'label': label,
336 'src': src,
337 'play_order': play_order,
338 'children': []
339 }
341 # Add to TOC
342 if path: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true
343 path[-1]['children'].append(entry)
344 else:
345 self.toc.append(entry)
347 # Parse child navPoints
348 self._parse_nav_points(nav_point, path + [entry])
350 def _create_book(self):
351 """Create a Book object from the parsed metadata."""
352 # Set book metadata
353 if 'title' in self.metadata: 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true
354 self.book.set_title(self.metadata['title'])
356 if 'creator' in self.metadata:
357 self.book.set_metadata(MetadataType.AUTHOR, self.metadata['creator'])
359 if 'language' in self.metadata:
360 self.book.set_metadata(MetadataType.LANGUAGE, self.metadata['language'])
362 if 'description' in self.metadata:
363 self.book.set_metadata(
364 MetadataType.DESCRIPTION,
365 self.metadata['description'])
367 if 'subjects' in self.metadata:
368 self.book.set_metadata(
369 MetadataType.KEYWORDS, ', '.join(
370 self.metadata['subjects']))
372 if 'date' in self.metadata:
373 self.book.set_metadata(MetadataType.PUBLICATION_DATE, self.metadata['date'])
375 if 'identifier' in self.metadata: 375 ↛ 378line 375 didn't jump to line 378 because the condition on line 375 was always true
376 self.book.set_metadata(MetadataType.IDENTIFIER, self.metadata['identifier'])
378 if 'publisher' in self.metadata:
379 self.book.set_metadata(MetadataType.PUBLISHER, self.metadata['publisher'])
381 def _add_cover_chapter(self):
382 """Add a cover chapter if a cover image is available."""
383 if not self.cover_id or self.cover_id not in self.manifest: 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was always true
384 return
386 # Get the cover image path from the manifest
387 cover_item = self.manifest[self.cover_id]
388 cover_path = cover_item['path']
390 # Check if the file exists
391 if not os.path.exists(cover_path):
392 print(f"Warning: Cover image file not found: {cover_path}")
393 return
395 # Create a cover chapter
396 cover_chapter = self.book.create_chapter("Cover", 0)
398 try:
399 # Create an Image block for the cover
400 from pyWebLayout.abstract.block import Image as AbstractImage
401 from PIL import Image as PILImage
402 import io
404 # Load the image into memory before the temp directory is cleaned up
405 # We need to fully copy the image data to ensure it persists after temp
406 # cleanup
407 with open(cover_path, 'rb') as f:
408 image_bytes = f.read()
410 # Create PIL image from bytes in memory
411 pil_image = PILImage.open(io.BytesIO(image_bytes))
412 pil_image.load() # Force loading into memory
414 # Create a copy to ensure all data is in memory
415 pil_image = pil_image.copy()
417 # Apply image processing if enabled
418 if self.image_processor:
419 try:
420 pil_image = self.image_processor(pil_image)
421 except Exception as e:
422 print(f"Warning: Image processing failed for cover: {str(e)}")
423 # Continue with unprocessed image
425 # Create an AbstractImage block with the cover image path
426 cover_image = AbstractImage(source=cover_path, alt_text="Cover Image")
428 # Set dimensions from the loaded image
429 cover_image._width = pil_image.width
430 cover_image._height = pil_image.height
432 # Store the loaded PIL image in the abstract image so it persists after
433 # temp cleanup
434 cover_image._loaded_image = pil_image
436 # Add the image to the cover chapter
437 cover_chapter.add_block(cover_image)
439 except Exception as e:
440 print(f"Error creating cover chapter: {str(e)}")
441 import traceback
442 traceback.print_exc()
443 # If we can't create the cover image, remove the chapter
444 if hasattr(self.book, 'chapters') and cover_chapter in self.book.chapters:
445 self.book.chapters.remove(cover_chapter)
447 def _process_chapter_images(self, chapter: Chapter):
448 """
449 Load and process images in a single chapter.
451 This method loads images from disk into memory and applies image processing.
452 Images must be loaded before the temporary EPUB directory is cleaned up.
454 Args:
455 chapter: The chapter containing images to process
456 """
457 from pyWebLayout.abstract.block import Image as AbstractImage
458 from PIL import Image as PILImage
459 import io
461 for block in chapter.blocks:
462 if isinstance(block, AbstractImage):
463 # Load image into memory if not already loaded
464 if not hasattr(block, '_loaded_image') or not block._loaded_image: 464 ↛ 485line 464 didn't jump to line 485 because the condition on line 464 was always true
465 try:
466 # Load the image from the source path
467 if os.path.isfile(block.source): 467 ↛ 485line 467 didn't jump to line 485 because the condition on line 467 was always true
468 with open(block.source, 'rb') as f:
469 image_bytes = f.read()
470 # Create PIL image from bytes in memory
471 pil_image = PILImage.open(io.BytesIO(image_bytes))
472 pil_image.load() # Force loading into memory
473 block._loaded_image = pil_image.copy() # Create a copy to ensure it persists
475 # Set width and height on the block from the loaded image
476 # This is required for layout calculations
477 block._width = pil_image.width
478 block._height = pil_image.height
479 except Exception as e:
480 print(f"Warning: Failed to load image '{block.source}': {str(e)}")
481 # Continue without the image
482 continue
484 # Apply image processing if enabled and image is loaded
485 if self.image_processor and hasattr(block, '_loaded_image') and block._loaded_image:
486 try:
487 block._loaded_image = self.image_processor(block._loaded_image)
488 except Exception as e:
489 print(
490 f"Warning: Image processing failed for image '{block.alt_text}': {str(e)}"
491 )
492 # Continue with unprocessed image
494 def _process_content_images(self):
495 """
496 Load all images into memory and apply image processing.
498 This must be called before the temporary EPUB directory is cleaned up,
499 to ensure images are loaded from disk into memory.
500 """
501 for chapter in self.book.chapters:
502 self._process_chapter_images(chapter)
504 def _add_chapters(self):
505 """Add chapters to the book based on the spine and TOC."""
506 # Add cover chapter first if available
507 self._add_cover_chapter()
509 # Create a mapping from src to TOC entry
510 toc_map = {}
512 def add_to_toc_map(entries):
513 for entry in entries:
514 if entry['src']: 514 ↛ 521line 514 didn't jump to line 521 because the condition on line 514 was always true
515 # Extract the path part of the src (remove fragment)
516 src_parts = entry['src'].split('#', 1)
517 path = src_parts[0]
518 toc_map[path] = entry
520 # Process children
521 if entry['children']: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 add_to_toc_map(entry['children'])
524 add_to_toc_map(self.toc)
526 # Process spine items
527 # Start from chapter_index = 1 if cover was added, otherwise 0
528 chapter_index = 1 if (self.cover_id and self.cover_id in self.manifest) else 0
529 for i, idref in enumerate(self.spine):
530 if idref not in self.manifest: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 continue
533 item = self.manifest[idref]
534 path = item['path']
535 href = item['href']
537 # Skip navigation files
538 if (idref == 'nav' or
539 item.get('media_type') == 'application/xhtml+xml' and
540 ('nav' in href.lower() or 'toc' in href.lower())):
541 continue
543 # Check if this item is in the TOC
544 chapter_title = None
545 if href in toc_map: 545 ↛ 549line 545 didn't jump to line 549 because the condition on line 545 was always true
546 chapter_title = toc_map[href]['label']
548 # Create a chapter
549 chapter_index += 1
550 chapter = self.book.create_chapter(chapter_title, chapter_index)
552 # Parse the HTML content
553 try:
554 # Read the HTML file
555 with open(path, 'r', encoding='utf-8') as f:
556 html = f.read()
558 # Get the directory of the HTML file for resolving relative paths
559 html_dir = os.path.dirname(path)
561 # Parse HTML and add blocks to chapter, passing base_path for image resolution
562 blocks = parse_html_string(html, document=self.book, base_path=html_dir)
564 # Copy blocks to the chapter
565 for block in blocks:
566 chapter.add_block(block)
568 # Add a PageBreak after the chapter to ensure next chapter starts on new page
569 # This helps maintain chapter boundaries during pagination
570 chapter.add_block(PageBreak())
572 except Exception as e:
573 print(f"Error parsing chapter {i + 1}: {str(e)}")
574 # Add an error message block
575 from pyWebLayout.abstract.block import Paragraph
576 from pyWebLayout.abstract.inline import Word
577 from pyWebLayout.style import Font
578 error_para = Paragraph()
579 # Create a default font style for the error message
580 default_font = Font()
581 error_para.add_word(
582 Word(
583 f"Error loading chapter: {str(e)}",
584 default_font
585 )
586 )
587 chapter.add_block(error_para)
588 # Still add PageBreak even after error
589 chapter.add_block(PageBreak())
592def read_epub(epub_path: str) -> Book:
593 """
594 Read an EPUB file and convert it to a Book.
596 Args:
597 epub_path: Path to the EPUB file
599 Returns:
600 Book: The parsed book
601 """
602 reader = EPUBReader(epub_path)
603 return reader.read()