Coverage for pyWebLayout/io/readers/epub_reader.py: 70%

286 statements  

« prev     ^ index     » next       coverage.py v7.11.2, created at 2025-11-12 12:02 +0000

1""" 

2EPUB reader for pyWebLayout. 

3 

4This module provides functionality for reading EPUB documents and converting them 

5to pyWebLayout's abstract document model. 

6""" 

7 

8import os 

9import zipfile 

10import tempfile 

11from typing import Dict, List, Optional, Any, Callable 

12import xml.etree.ElementTree as ET 

13import urllib.parse 

14from PIL import Image as PILImage, ImageOps 

15 

16from pyWebLayout.abstract.document import Book, Chapter, MetadataType 

17from pyWebLayout.abstract.block import PageBreak 

18from pyWebLayout.io.readers.html_extraction import parse_html_string 

19 

20 

21# XML namespaces used in EPUB files 

22NAMESPACES = { 

23 'opf': 'http://www.idpf.org/2007/opf', 

24 'dc': 'http://purl.org/dc/elements/1.1/', 

25 'dcterms': 'http://purl.org/dc/terms/', 

26 'xhtml': 'http://www.w3.org/1999/xhtml', 

27 'ncx': 'http://www.daisy.org/z3986/2005/ncx/', 

28} 

29 

30 

31def default_eink_processor(img: PILImage.Image) -> PILImage.Image: 

32 """ 

33 Process image for 4-bit e-ink display using PIL only. 

34 Applies histogram equalization and 4-bit quantization. 

35 

36 Args: 

37 img: PIL Image to process 

38 

39 Returns: 

40 Processed PIL Image in L mode (grayscale) with 4-bit quantization 

41 """ 

42 # Convert to grayscale if needed 

43 if img.mode != 'L': 

44 img = img.convert('L') 

45 

46 # Apply histogram equalization for contrast enhancement 

47 img = ImageOps.equalize(img) 

48 

49 # Quantize to 4-bit (16 grayscale levels: 0, 17, 34, ..., 255) 

50 img = img.point(lambda x: (x // 16) * 17) 

51 

52 return img 

53 

54 

55class EPUBReader: 

56 """ 

57 Reader for EPUB documents. 

58 

59 This class extracts content from EPUB files and converts it to 

60 pyWebLayout's abstract document model. 

61 """ 

62 

63 def __init__(self, epub_path: str, image_processor: Optional[Callable[[ 

64 PILImage.Image], PILImage.Image]] = default_eink_processor): 

65 """ 

66 Initialize an EPUB reader. 

67 

68 Args: 

69 epub_path: Path to the EPUB file 

70 image_processor: Optional function to process images for display optimization. 

71 Defaults to default_eink_processor for 4-bit e-ink displays. 

72 Set to None to disable image processing. 

73 Custom processor should accept and return a PIL Image. 

74 """ 

75 self.epub_path = epub_path 

76 self.image_processor = image_processor 

77 self.book = Book() 

78 self.temp_dir = None 

79 self.content_dir = None 

80 self.metadata = {} 

81 self.toc = [] 

82 self.spine = [] 

83 self.manifest = {} 

84 self.cover_id = None # ID of the cover image in manifest 

85 

86 def read(self) -> Book: 

87 """ 

88 Read the EPUB file and convert it to a Book. 

89 

90 Returns: 

91 Book: The parsed book 

92 """ 

93 try: 

94 # Extract the EPUB file 

95 self.temp_dir = tempfile.mkdtemp() 

96 self._extract_epub() 

97 self._parse_package_document() 

98 self._parse_toc() 

99 self._create_book() 

100 

101 # Add chapters to the book 

102 self._add_chapters() 

103 

104 # Process images for e-ink display optimization 

105 self._process_content_images() 

106 

107 return self.book 

108 

109 finally: 

110 # Clean up temporary files 

111 if self.temp_dir: 

112 import shutil 

113 shutil.rmtree(self.temp_dir, ignore_errors=True) 

114 

115 def _extract_epub(self): 

116 """Extract the EPUB file to a temporary directory.""" 

117 with zipfile.ZipFile(self.epub_path, 'r') as zip_ref: 

118 zip_ref.extractall(self.temp_dir) 

119 

120 # Find the content directory (typically OEBPS or OPS) 

121 container_path = os.path.join(self.temp_dir, 'META-INF', 'container.xml') 

122 if os.path.exists(container_path): 122 ↛ 136line 122 didn't jump to line 136 because the condition on line 122 was always true

123 tree = ET.parse(container_path) 

124 root = tree.getroot() 

125 

126 # Get the path to the package document (content.opf) 

127 for rootfile in root.findall( 127 ↛ 136line 127 didn't jump to line 136 because the loop on line 127 didn't complete

128 './/{urn:oasis:names:tc:opendocument:xmlns:container}rootfile'): 

129 full_path = rootfile.get('full-path') 

130 if full_path: 130 ↛ 127line 130 didn't jump to line 127 because the condition on line 130 was always true

131 self.content_dir = os.path.dirname( 

132 os.path.join(self.temp_dir, full_path)) 

133 return 

134 

135 # Fallback: look for common content directories 

136 for content_dir in ['OEBPS', 'OPS', 'Content']: 

137 if os.path.exists(os.path.join(self.temp_dir, content_dir)): 

138 self.content_dir = os.path.join(self.temp_dir, content_dir) 

139 return 

140 

141 # If no content directory found, use the root 

142 self.content_dir = self.temp_dir 

143 

144 def _parse_package_document(self): 

145 """Parse the package document (content.opf).""" 

146 # Find the package document 

147 opf_path = None 

148 for root, dirs, files in os.walk(self.content_dir): 148 ↛ 156line 148 didn't jump to line 156 because the loop on line 148 didn't complete

149 for file in files: 149 ↛ 153line 149 didn't jump to line 153 because the loop on line 149 didn't complete

150 if file.endswith('.opf'): 

151 opf_path = os.path.join(root, file) 

152 break 

153 if opf_path: 153 ↛ 148line 153 didn't jump to line 148 because the condition on line 153 was always true

154 break 

155 

156 if not opf_path: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 raise ValueError("No package document (.opf) found in EPUB") 

158 

159 # Parse the package document 

160 tree = ET.parse(opf_path) 

161 root = tree.getroot() 

162 

163 # Parse metadata 

164 self._parse_metadata(root) 

165 

166 # Parse manifest 

167 self._parse_manifest(root) 

168 

169 # Parse spine 

170 self._parse_spine(root) 

171 

172 def _parse_metadata(self, root: ET.Element): 

173 """ 

174 Parse metadata from the package document. 

175 

176 Args: 

177 root: Root element of the package document 

178 """ 

179 # Find the metadata element 

180 metadata_elem = root.find('.//{{{0}}}metadata'.format(NAMESPACES['opf'])) 

181 if metadata_elem is None: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 return 

183 

184 # Parse DC metadata 

185 for elem in metadata_elem: 

186 if elem.tag.startswith('{{{0}}}'.format(NAMESPACES['dc'])): 

187 # Get the local name (without namespace) 

188 name = elem.tag.split('}', 1)[1] 

189 value = elem.text 

190 

191 if name == 'title': 

192 self.metadata['title'] = value 

193 elif name == 'creator': 

194 self.metadata['creator'] = value 

195 elif name == 'language': 

196 self.metadata['language'] = value 

197 elif name == 'description': 

198 self.metadata['description'] = value 

199 elif name == 'subject': 

200 if 'subjects' not in self.metadata: 

201 self.metadata['subjects'] = [] 

202 self.metadata['subjects'].append(value) 

203 elif name == 'date': 

204 self.metadata['date'] = value 

205 elif name == 'identifier': 

206 self.metadata['identifier'] = value 

207 elif name == 'publisher': 

208 self.metadata['publisher'] = value 

209 else: 

210 # Store other metadata 

211 self.metadata[name] = value 

212 

213 # Parse meta elements for cover reference 

214 for meta in metadata_elem.findall('.//{{{0}}}meta'.format(NAMESPACES['opf'])): 

215 name = meta.get('name') 

216 content = meta.get('content') 

217 

218 if name == 'cover' and content: 218 ↛ 220line 218 didn't jump to line 220 because the condition on line 218 was never true

219 # This is a reference to the cover image in the manifest 

220 self.cover_id = content 

221 

222 def _parse_manifest(self, root: ET.Element): 

223 """ 

224 Parse manifest from the package document. 

225 

226 Args: 

227 root: Root element of the package document 

228 """ 

229 # Find the manifest element 

230 manifest_elem = root.find('.//{{{0}}}manifest'.format(NAMESPACES['opf'])) 

231 if manifest_elem is None: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 return 

233 

234 # Parse items 

235 for item in manifest_elem.findall('.//{{{0}}}item'.format(NAMESPACES['opf'])): 

236 id = item.get('id') 

237 href = item.get('href') 

238 media_type = item.get('media-type') 

239 

240 if id and href: 240 ↛ 235line 240 didn't jump to line 235 because the condition on line 240 was always true

241 # Resolve relative path 

242 href = urllib.parse.unquote(href) 

243 path = os.path.normpath(os.path.join(self.content_dir, href)) 

244 

245 self.manifest[id] = { 

246 'href': href, 

247 'path': path, 

248 'media_type': media_type 

249 } 

250 

251 def _parse_spine(self, root: ET.Element): 

252 """ 

253 Parse spine from the package document. 

254 

255 Args: 

256 root: Root element of the package document 

257 """ 

258 # Find the spine element 

259 spine_elem = root.find('.//{{{0}}}spine'.format(NAMESPACES['opf'])) 

260 if spine_elem is None: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 return 

262 

263 # Get the toc attribute (NCX file ID) 

264 toc_id = spine_elem.get('toc') 

265 if toc_id and toc_id in self.manifest: 265 ↛ 269line 265 didn't jump to line 269 because the condition on line 265 was always true

266 self.toc_path = self.manifest[toc_id]['path'] 

267 

268 # Parse itemrefs 

269 for itemref in spine_elem.findall( 

270 './/{{{0}}}itemref'.format(NAMESPACES['opf'])): 

271 idref = itemref.get('idref') 

272 if idref and idref in self.manifest: 272 ↛ 269line 272 didn't jump to line 269 because the condition on line 272 was always true

273 self.spine.append(idref) 

274 

275 def _parse_toc(self): 

276 """Parse the table of contents.""" 

277 if not hasattr( 277 ↛ 282line 277 didn't jump to line 282 because the condition on line 277 was never true

278 self, 

279 'toc_path') or not self.toc_path or not os.path.exists( 

280 self.toc_path): 

281 # Try to find the toc.ncx file 

282 for root, dirs, files in os.walk(self.content_dir): 

283 for file in files: 

284 if file.endswith('.ncx'): 

285 self.toc_path = os.path.join(root, file) 

286 break 

287 if hasattr(self, 'toc_path') and self.toc_path: 

288 break 

289 

290 if not hasattr( 290 ↛ 295line 290 didn't jump to line 295 because the condition on line 290 was never true

291 self, 

292 'toc_path') or not self.toc_path or not os.path.exists( 

293 self.toc_path): 

294 # No TOC found 

295 return 

296 

297 # Parse the NCX file 

298 tree = ET.parse(self.toc_path) 

299 root = tree.getroot() 

300 

301 # Parse navMap 

302 nav_map = root.find('.//{{{0}}}navMap'.format(NAMESPACES['ncx'])) 

303 if nav_map is None: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 return 

305 

306 # Parse navPoints 

307 self._parse_nav_points(nav_map, []) 

308 

309 def _parse_nav_points(self, parent: ET.Element, path: List[Dict[str, Any]]): 

310 """ 

311 Recursively parse navPoints from the NCX file. 

312 

313 Args: 

314 parent: Parent element containing navPoints 

315 path: Current path in the TOC hierarchy 

316 """ 

317 for nav_point in parent.findall('.//{{{0}}}navPoint'.format(NAMESPACES['ncx'])): 

318 # Get navPoint attributes 

319 id = nav_point.get('id') 

320 play_order = nav_point.get('playOrder') 

321 

322 # Get navLabel 

323 nav_label = nav_point.find('.//{{{0}}}navLabel'.format(NAMESPACES['ncx'])) 

324 text_elem = nav_label.find( 

325 './/{{{0}}}text'.format(NAMESPACES['ncx'])) if nav_label else None 

326 label = text_elem.text if text_elem is not None else "" 

327 

328 # Get content 

329 content = nav_point.find('.//{{{0}}}content'.format(NAMESPACES['ncx'])) 

330 src = content.get('src') if content is not None else "" 

331 

332 # Create a TOC entry 

333 entry = { 

334 'id': id, 

335 'label': label, 

336 'src': src, 

337 'play_order': play_order, 

338 'children': [] 

339 } 

340 

341 # Add to TOC 

342 if path: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true

343 path[-1]['children'].append(entry) 

344 else: 

345 self.toc.append(entry) 

346 

347 # Parse child navPoints 

348 self._parse_nav_points(nav_point, path + [entry]) 

349 

350 def _create_book(self): 

351 """Create a Book object from the parsed metadata.""" 

352 # Set book metadata 

353 if 'title' in self.metadata: 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true

354 self.book.set_title(self.metadata['title']) 

355 

356 if 'creator' in self.metadata: 

357 self.book.set_metadata(MetadataType.AUTHOR, self.metadata['creator']) 

358 

359 if 'language' in self.metadata: 

360 self.book.set_metadata(MetadataType.LANGUAGE, self.metadata['language']) 

361 

362 if 'description' in self.metadata: 

363 self.book.set_metadata( 

364 MetadataType.DESCRIPTION, 

365 self.metadata['description']) 

366 

367 if 'subjects' in self.metadata: 

368 self.book.set_metadata( 

369 MetadataType.KEYWORDS, ', '.join( 

370 self.metadata['subjects'])) 

371 

372 if 'date' in self.metadata: 

373 self.book.set_metadata(MetadataType.PUBLICATION_DATE, self.metadata['date']) 

374 

375 if 'identifier' in self.metadata: 375 ↛ 378line 375 didn't jump to line 378 because the condition on line 375 was always true

376 self.book.set_metadata(MetadataType.IDENTIFIER, self.metadata['identifier']) 

377 

378 if 'publisher' in self.metadata: 

379 self.book.set_metadata(MetadataType.PUBLISHER, self.metadata['publisher']) 

380 

381 def _add_cover_chapter(self): 

382 """Add a cover chapter if a cover image is available.""" 

383 if not self.cover_id or self.cover_id not in self.manifest: 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was always true

384 return 

385 

386 # Get the cover image path from the manifest 

387 cover_item = self.manifest[self.cover_id] 

388 cover_path = cover_item['path'] 

389 

390 # Check if the file exists 

391 if not os.path.exists(cover_path): 

392 print(f"Warning: Cover image file not found: {cover_path}") 

393 return 

394 

395 # Create a cover chapter 

396 cover_chapter = self.book.create_chapter("Cover", 0) 

397 

398 try: 

399 # Create an Image block for the cover 

400 from pyWebLayout.abstract.block import Image as AbstractImage 

401 from PIL import Image as PILImage 

402 import io 

403 

404 # Load the image into memory before the temp directory is cleaned up 

405 # We need to fully copy the image data to ensure it persists after temp 

406 # cleanup 

407 with open(cover_path, 'rb') as f: 

408 image_bytes = f.read() 

409 

410 # Create PIL image from bytes in memory 

411 pil_image = PILImage.open(io.BytesIO(image_bytes)) 

412 pil_image.load() # Force loading into memory 

413 

414 # Create a copy to ensure all data is in memory 

415 pil_image = pil_image.copy() 

416 

417 # Apply image processing if enabled 

418 if self.image_processor: 

419 try: 

420 pil_image = self.image_processor(pil_image) 

421 except Exception as e: 

422 print(f"Warning: Image processing failed for cover: {str(e)}") 

423 # Continue with unprocessed image 

424 

425 # Create an AbstractImage block with the cover image path 

426 cover_image = AbstractImage(source=cover_path, alt_text="Cover Image") 

427 

428 # Set dimensions from the loaded image 

429 cover_image._width = pil_image.width 

430 cover_image._height = pil_image.height 

431 

432 # Store the loaded PIL image in the abstract image so it persists after 

433 # temp cleanup 

434 cover_image._loaded_image = pil_image 

435 

436 # Add the image to the cover chapter 

437 cover_chapter.add_block(cover_image) 

438 

439 except Exception as e: 

440 print(f"Error creating cover chapter: {str(e)}") 

441 import traceback 

442 traceback.print_exc() 

443 # If we can't create the cover image, remove the chapter 

444 if hasattr(self.book, 'chapters') and cover_chapter in self.book.chapters: 

445 self.book.chapters.remove(cover_chapter) 

446 

447 def _process_chapter_images(self, chapter: Chapter): 

448 """ 

449 Load and process images in a single chapter. 

450 

451 This method loads images from disk into memory and applies image processing. 

452 Images must be loaded before the temporary EPUB directory is cleaned up. 

453 

454 Args: 

455 chapter: The chapter containing images to process 

456 """ 

457 from pyWebLayout.abstract.block import Image as AbstractImage 

458 from PIL import Image as PILImage 

459 import io 

460 

461 for block in chapter.blocks: 

462 if isinstance(block, AbstractImage): 

463 # Load image into memory if not already loaded 

464 if not hasattr(block, '_loaded_image') or not block._loaded_image: 464 ↛ 485line 464 didn't jump to line 485 because the condition on line 464 was always true

465 try: 

466 # Load the image from the source path 

467 if os.path.isfile(block.source): 467 ↛ 485line 467 didn't jump to line 485 because the condition on line 467 was always true

468 with open(block.source, 'rb') as f: 

469 image_bytes = f.read() 

470 # Create PIL image from bytes in memory 

471 pil_image = PILImage.open(io.BytesIO(image_bytes)) 

472 pil_image.load() # Force loading into memory 

473 block._loaded_image = pil_image.copy() # Create a copy to ensure it persists 

474 

475 # Set width and height on the block from the loaded image 

476 # This is required for layout calculations 

477 block._width = pil_image.width 

478 block._height = pil_image.height 

479 except Exception as e: 

480 print(f"Warning: Failed to load image '{block.source}': {str(e)}") 

481 # Continue without the image 

482 continue 

483 

484 # Apply image processing if enabled and image is loaded 

485 if self.image_processor and hasattr(block, '_loaded_image') and block._loaded_image: 

486 try: 

487 block._loaded_image = self.image_processor(block._loaded_image) 

488 except Exception as e: 

489 print( 

490 f"Warning: Image processing failed for image '{block.alt_text}': {str(e)}" 

491 ) 

492 # Continue with unprocessed image 

493 

494 def _process_content_images(self): 

495 """ 

496 Load all images into memory and apply image processing. 

497 

498 This must be called before the temporary EPUB directory is cleaned up, 

499 to ensure images are loaded from disk into memory. 

500 """ 

501 for chapter in self.book.chapters: 

502 self._process_chapter_images(chapter) 

503 

504 def _add_chapters(self): 

505 """Add chapters to the book based on the spine and TOC.""" 

506 # Add cover chapter first if available 

507 self._add_cover_chapter() 

508 

509 # Create a mapping from src to TOC entry 

510 toc_map = {} 

511 

512 def add_to_toc_map(entries): 

513 for entry in entries: 

514 if entry['src']: 514 ↛ 521line 514 didn't jump to line 521 because the condition on line 514 was always true

515 # Extract the path part of the src (remove fragment) 

516 src_parts = entry['src'].split('#', 1) 

517 path = src_parts[0] 

518 toc_map[path] = entry 

519 

520 # Process children 

521 if entry['children']: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 add_to_toc_map(entry['children']) 

523 

524 add_to_toc_map(self.toc) 

525 

526 # Process spine items 

527 # Start from chapter_index = 1 if cover was added, otherwise 0 

528 chapter_index = 1 if (self.cover_id and self.cover_id in self.manifest) else 0 

529 for i, idref in enumerate(self.spine): 

530 if idref not in self.manifest: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 continue 

532 

533 item = self.manifest[idref] 

534 path = item['path'] 

535 href = item['href'] 

536 

537 # Skip navigation files 

538 if (idref == 'nav' or 

539 item.get('media_type') == 'application/xhtml+xml' and 

540 ('nav' in href.lower() or 'toc' in href.lower())): 

541 continue 

542 

543 # Check if this item is in the TOC 

544 chapter_title = None 

545 if href in toc_map: 545 ↛ 549line 545 didn't jump to line 549 because the condition on line 545 was always true

546 chapter_title = toc_map[href]['label'] 

547 

548 # Create a chapter 

549 chapter_index += 1 

550 chapter = self.book.create_chapter(chapter_title, chapter_index) 

551 

552 # Parse the HTML content 

553 try: 

554 # Read the HTML file 

555 with open(path, 'r', encoding='utf-8') as f: 

556 html = f.read() 

557 

558 # Get the directory of the HTML file for resolving relative paths 

559 html_dir = os.path.dirname(path) 

560 

561 # Parse HTML and add blocks to chapter, passing base_path for image resolution 

562 blocks = parse_html_string(html, document=self.book, base_path=html_dir) 

563 

564 # Copy blocks to the chapter 

565 for block in blocks: 

566 chapter.add_block(block) 

567 

568 # Add a PageBreak after the chapter to ensure next chapter starts on new page 

569 # This helps maintain chapter boundaries during pagination 

570 chapter.add_block(PageBreak()) 

571 

572 except Exception as e: 

573 print(f"Error parsing chapter {i + 1}: {str(e)}") 

574 # Add an error message block 

575 from pyWebLayout.abstract.block import Paragraph 

576 from pyWebLayout.abstract.inline import Word 

577 from pyWebLayout.style import Font 

578 error_para = Paragraph() 

579 # Create a default font style for the error message 

580 default_font = Font() 

581 error_para.add_word( 

582 Word( 

583 f"Error loading chapter: {str(e)}", 

584 default_font 

585 ) 

586 ) 

587 chapter.add_block(error_para) 

588 # Still add PageBreak even after error 

589 chapter.add_block(PageBreak()) 

590 

591 

592def read_epub(epub_path: str) -> Book: 

593 """ 

594 Read an EPUB file and convert it to a Book. 

595 

596 Args: 

597 epub_path: Path to the EPUB file 

598 

599 Returns: 

600 Book: The parsed book 

601 """ 

602 reader = EPUBReader(epub_path) 

603 return reader.read()