475 lines
17 KiB
Python
475 lines
17 KiB
Python
"""
|
|
EPUB reader for pyWebLayout.
|
|
|
|
This module provides functionality for reading EPUB documents and converting them
|
|
to pyWebLayout's abstract document model.
|
|
"""
|
|
|
|
import os
|
|
import zipfile
|
|
import tempfile
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
import xml.etree.ElementTree as ET
|
|
import re
|
|
import urllib.parse
|
|
|
|
from pyWebLayout.abstract.document import Document, Book, Chapter, MetadataType
|
|
from pyWebLayout.io.readers.html_extraction import parse_html_string
|
|
|
|
|
|
# XML namespaces used in EPUB files
|
|
NAMESPACES = {
|
|
'opf': 'http://www.idpf.org/2007/opf',
|
|
'dc': 'http://purl.org/dc/elements/1.1/',
|
|
'dcterms': 'http://purl.org/dc/terms/',
|
|
'xhtml': 'http://www.w3.org/1999/xhtml',
|
|
'ncx': 'http://www.daisy.org/z3986/2005/ncx/',
|
|
}
|
|
|
|
|
|
class EPUBReader:
|
|
"""
|
|
Reader for EPUB documents.
|
|
|
|
This class extracts content from EPUB files and converts it to
|
|
pyWebLayout's abstract document model.
|
|
"""
|
|
|
|
def __init__(self, epub_path: str):
|
|
"""
|
|
Initialize an EPUB reader.
|
|
|
|
Args:
|
|
epub_path: Path to the EPUB file
|
|
"""
|
|
self.epub_path = epub_path
|
|
self.book = Book()
|
|
self.temp_dir = None
|
|
self.content_dir = None
|
|
self.metadata = {}
|
|
self.toc = []
|
|
self.spine = []
|
|
self.manifest = {}
|
|
self.cover_id = None # ID of the cover image in manifest
|
|
|
|
def read(self) -> Book:
|
|
"""
|
|
Read the EPUB file and convert it to a Book.
|
|
|
|
Returns:
|
|
Book: The parsed book
|
|
"""
|
|
try:
|
|
# Extract the EPUB file
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
self._extract_epub()
|
|
self._parse_package_document()
|
|
self._parse_toc()
|
|
self._create_book()
|
|
|
|
# Add chapters to the book
|
|
self._add_chapters()
|
|
|
|
return self.book
|
|
|
|
finally:
|
|
# Clean up temporary files
|
|
if self.temp_dir:
|
|
import shutil
|
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
|
|
|
def _extract_epub(self):
|
|
"""Extract the EPUB file to a temporary directory."""
|
|
with zipfile.ZipFile(self.epub_path, 'r') as zip_ref:
|
|
zip_ref.extractall(self.temp_dir)
|
|
|
|
# Find the content directory (typically OEBPS or OPS)
|
|
container_path = os.path.join(self.temp_dir, 'META-INF', 'container.xml')
|
|
if os.path.exists(container_path):
|
|
tree = ET.parse(container_path)
|
|
root = tree.getroot()
|
|
|
|
# Get the path to the package document (content.opf)
|
|
for rootfile in root.findall('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile'):
|
|
full_path = rootfile.get('full-path')
|
|
if full_path:
|
|
self.content_dir = os.path.dirname(os.path.join(self.temp_dir, full_path))
|
|
return
|
|
|
|
# Fallback: look for common content directories
|
|
for content_dir in ['OEBPS', 'OPS', 'Content']:
|
|
if os.path.exists(os.path.join(self.temp_dir, content_dir)):
|
|
self.content_dir = os.path.join(self.temp_dir, content_dir)
|
|
return
|
|
|
|
# If no content directory found, use the root
|
|
self.content_dir = self.temp_dir
|
|
|
|
def _parse_package_document(self):
|
|
"""Parse the package document (content.opf)."""
|
|
# Find the package document
|
|
opf_path = None
|
|
for root, dirs, files in os.walk(self.content_dir):
|
|
for file in files:
|
|
if file.endswith('.opf'):
|
|
opf_path = os.path.join(root, file)
|
|
break
|
|
if opf_path:
|
|
break
|
|
|
|
if not opf_path:
|
|
raise ValueError("No package document (.opf) found in EPUB")
|
|
|
|
# Parse the package document
|
|
tree = ET.parse(opf_path)
|
|
root = tree.getroot()
|
|
|
|
# Parse metadata
|
|
self._parse_metadata(root)
|
|
|
|
# Parse manifest
|
|
self._parse_manifest(root)
|
|
|
|
# Parse spine
|
|
self._parse_spine(root)
|
|
|
|
def _parse_metadata(self, root: ET.Element):
|
|
"""
|
|
Parse metadata from the package document.
|
|
|
|
Args:
|
|
root: Root element of the package document
|
|
"""
|
|
# Find the metadata element
|
|
metadata_elem = root.find('.//{{{0}}}metadata'.format(NAMESPACES['opf']))
|
|
if metadata_elem is None:
|
|
return
|
|
|
|
# Parse DC metadata
|
|
for elem in metadata_elem:
|
|
if elem.tag.startswith('{{{0}}}'.format(NAMESPACES['dc'])):
|
|
# Get the local name (without namespace)
|
|
name = elem.tag.split('}', 1)[1]
|
|
value = elem.text
|
|
|
|
if name == 'title':
|
|
self.metadata['title'] = value
|
|
elif name == 'creator':
|
|
self.metadata['creator'] = value
|
|
elif name == 'language':
|
|
self.metadata['language'] = value
|
|
elif name == 'description':
|
|
self.metadata['description'] = value
|
|
elif name == 'subject':
|
|
if 'subjects' not in self.metadata:
|
|
self.metadata['subjects'] = []
|
|
self.metadata['subjects'].append(value)
|
|
elif name == 'date':
|
|
self.metadata['date'] = value
|
|
elif name == 'identifier':
|
|
self.metadata['identifier'] = value
|
|
elif name == 'publisher':
|
|
self.metadata['publisher'] = value
|
|
else:
|
|
# Store other metadata
|
|
self.metadata[name] = value
|
|
|
|
# Parse meta elements for cover reference
|
|
for meta in metadata_elem.findall('.//{{{0}}}meta'.format(NAMESPACES['opf'])):
|
|
name = meta.get('name')
|
|
content = meta.get('content')
|
|
|
|
if name == 'cover' and content:
|
|
# This is a reference to the cover image in the manifest
|
|
self.cover_id = content
|
|
|
|
def _parse_manifest(self, root: ET.Element):
|
|
"""
|
|
Parse manifest from the package document.
|
|
|
|
Args:
|
|
root: Root element of the package document
|
|
"""
|
|
# Find the manifest element
|
|
manifest_elem = root.find('.//{{{0}}}manifest'.format(NAMESPACES['opf']))
|
|
if manifest_elem is None:
|
|
return
|
|
|
|
# Parse items
|
|
for item in manifest_elem.findall('.//{{{0}}}item'.format(NAMESPACES['opf'])):
|
|
id = item.get('id')
|
|
href = item.get('href')
|
|
media_type = item.get('media-type')
|
|
|
|
if id and href:
|
|
# Resolve relative path
|
|
href = urllib.parse.unquote(href)
|
|
path = os.path.normpath(os.path.join(self.content_dir, href))
|
|
|
|
self.manifest[id] = {
|
|
'href': href,
|
|
'path': path,
|
|
'media_type': media_type
|
|
}
|
|
|
|
def _parse_spine(self, root: ET.Element):
|
|
"""
|
|
Parse spine from the package document.
|
|
|
|
Args:
|
|
root: Root element of the package document
|
|
"""
|
|
# Find the spine element
|
|
spine_elem = root.find('.//{{{0}}}spine'.format(NAMESPACES['opf']))
|
|
if spine_elem is None:
|
|
return
|
|
|
|
# Get the toc attribute (NCX file ID)
|
|
toc_id = spine_elem.get('toc')
|
|
if toc_id and toc_id in self.manifest:
|
|
self.toc_path = self.manifest[toc_id]['path']
|
|
|
|
# Parse itemrefs
|
|
for itemref in spine_elem.findall('.//{{{0}}}itemref'.format(NAMESPACES['opf'])):
|
|
idref = itemref.get('idref')
|
|
if idref and idref in self.manifest:
|
|
self.spine.append(idref)
|
|
|
|
def _parse_toc(self):
|
|
"""Parse the table of contents."""
|
|
if not hasattr(self, 'toc_path') or not self.toc_path or not os.path.exists(self.toc_path):
|
|
# Try to find the toc.ncx file
|
|
for root, dirs, files in os.walk(self.content_dir):
|
|
for file in files:
|
|
if file.endswith('.ncx'):
|
|
self.toc_path = os.path.join(root, file)
|
|
break
|
|
if hasattr(self, 'toc_path') and self.toc_path:
|
|
break
|
|
|
|
if not hasattr(self, 'toc_path') or not self.toc_path or not os.path.exists(self.toc_path):
|
|
# No TOC found
|
|
return
|
|
|
|
# Parse the NCX file
|
|
tree = ET.parse(self.toc_path)
|
|
root = tree.getroot()
|
|
|
|
# Parse navMap
|
|
nav_map = root.find('.//{{{0}}}navMap'.format(NAMESPACES['ncx']))
|
|
if nav_map is None:
|
|
return
|
|
|
|
# Parse navPoints
|
|
self._parse_nav_points(nav_map, [])
|
|
|
|
def _parse_nav_points(self, parent: ET.Element, path: List[Dict[str, Any]]):
|
|
"""
|
|
Recursively parse navPoints from the NCX file.
|
|
|
|
Args:
|
|
parent: Parent element containing navPoints
|
|
path: Current path in the TOC hierarchy
|
|
"""
|
|
for nav_point in parent.findall('.//{{{0}}}navPoint'.format(NAMESPACES['ncx'])):
|
|
# Get navPoint attributes
|
|
id = nav_point.get('id')
|
|
play_order = nav_point.get('playOrder')
|
|
|
|
# Get navLabel
|
|
nav_label = nav_point.find('.//{{{0}}}navLabel'.format(NAMESPACES['ncx']))
|
|
text_elem = nav_label.find('.//{{{0}}}text'.format(NAMESPACES['ncx'])) if nav_label else None
|
|
label = text_elem.text if text_elem is not None else ""
|
|
|
|
# Get content
|
|
content = nav_point.find('.//{{{0}}}content'.format(NAMESPACES['ncx']))
|
|
src = content.get('src') if content is not None else ""
|
|
|
|
# Create a TOC entry
|
|
entry = {
|
|
'id': id,
|
|
'label': label,
|
|
'src': src,
|
|
'play_order': play_order,
|
|
'children': []
|
|
}
|
|
|
|
# Add to TOC
|
|
if path:
|
|
path[-1]['children'].append(entry)
|
|
else:
|
|
self.toc.append(entry)
|
|
|
|
# Parse child navPoints
|
|
self._parse_nav_points(nav_point, path + [entry])
|
|
|
|
def _create_book(self):
|
|
"""Create a Book object from the parsed metadata."""
|
|
# Set book metadata
|
|
if 'title' in self.metadata:
|
|
self.book.set_title(self.metadata['title'])
|
|
|
|
if 'creator' in self.metadata:
|
|
self.book.set_metadata(MetadataType.AUTHOR, self.metadata['creator'])
|
|
|
|
if 'language' in self.metadata:
|
|
self.book.set_metadata(MetadataType.LANGUAGE, self.metadata['language'])
|
|
|
|
if 'description' in self.metadata:
|
|
self.book.set_metadata(MetadataType.DESCRIPTION, self.metadata['description'])
|
|
|
|
if 'subjects' in self.metadata:
|
|
self.book.set_metadata(MetadataType.KEYWORDS, ', '.join(self.metadata['subjects']))
|
|
|
|
if 'date' in self.metadata:
|
|
self.book.set_metadata(MetadataType.PUBLICATION_DATE, self.metadata['date'])
|
|
|
|
if 'identifier' in self.metadata:
|
|
self.book.set_metadata(MetadataType.IDENTIFIER, self.metadata['identifier'])
|
|
|
|
if 'publisher' in self.metadata:
|
|
self.book.set_metadata(MetadataType.PUBLISHER, self.metadata['publisher'])
|
|
|
|
def _add_cover_chapter(self):
|
|
"""Add a cover chapter if a cover image is available."""
|
|
if not self.cover_id or self.cover_id not in self.manifest:
|
|
return
|
|
|
|
# Get the cover image path from the manifest
|
|
cover_item = self.manifest[self.cover_id]
|
|
cover_path = cover_item['path']
|
|
|
|
# Check if the file exists
|
|
if not os.path.exists(cover_path):
|
|
print(f"Warning: Cover image file not found: {cover_path}")
|
|
return
|
|
|
|
# Create a cover chapter
|
|
cover_chapter = self.book.create_chapter("Cover", 0)
|
|
|
|
try:
|
|
# Create an Image block for the cover
|
|
from pyWebLayout.abstract.block import Image as AbstractImage
|
|
from PIL import Image as PILImage
|
|
import io
|
|
|
|
# Load the image into memory before the temp directory is cleaned up
|
|
# We need to fully copy the image data to ensure it persists after temp cleanup
|
|
with open(cover_path, 'rb') as f:
|
|
image_bytes = f.read()
|
|
|
|
# Create PIL image from bytes in memory
|
|
pil_image = PILImage.open(io.BytesIO(image_bytes))
|
|
pil_image.load() # Force loading into memory
|
|
|
|
# Create a copy to ensure all data is in memory
|
|
pil_image = pil_image.copy()
|
|
|
|
# Create an AbstractImage block with the cover image path
|
|
cover_image = AbstractImage(source=cover_path, alt_text="Cover Image")
|
|
|
|
# Set dimensions from the loaded image
|
|
cover_image._width = pil_image.width
|
|
cover_image._height = pil_image.height
|
|
|
|
# Store the loaded PIL image in the abstract image so it persists after temp cleanup
|
|
cover_image._loaded_image = pil_image
|
|
|
|
# Add the image to the cover chapter
|
|
cover_chapter.add_block(cover_image)
|
|
|
|
except Exception as e:
|
|
print(f"Error creating cover chapter: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
# If we can't create the cover image, remove the chapter
|
|
if hasattr(self.book, 'chapters') and cover_chapter in self.book.chapters:
|
|
self.book.chapters.remove(cover_chapter)
|
|
|
|
def _add_chapters(self):
|
|
"""Add chapters to the book based on the spine and TOC."""
|
|
# Add cover chapter first if available
|
|
self._add_cover_chapter()
|
|
|
|
# Create a mapping from src to TOC entry
|
|
toc_map = {}
|
|
|
|
def add_to_toc_map(entries):
|
|
for entry in entries:
|
|
if entry['src']:
|
|
# Extract the path part of the src (remove fragment)
|
|
src_parts = entry['src'].split('#', 1)
|
|
path = src_parts[0]
|
|
toc_map[path] = entry
|
|
|
|
# Process children
|
|
if entry['children']:
|
|
add_to_toc_map(entry['children'])
|
|
|
|
add_to_toc_map(self.toc)
|
|
|
|
# Process spine items
|
|
# Start from chapter_index = 1 if cover was added, otherwise 0
|
|
chapter_index = 1 if (self.cover_id and self.cover_id in self.manifest) else 0
|
|
for i, idref in enumerate(self.spine):
|
|
if idref not in self.manifest:
|
|
continue
|
|
|
|
item = self.manifest[idref]
|
|
path = item['path']
|
|
href = item['href']
|
|
|
|
# Skip navigation files
|
|
if (idref == 'nav' or
|
|
item.get('media_type') == 'application/xhtml+xml' and
|
|
('nav' in href.lower() or 'toc' in href.lower())):
|
|
continue
|
|
|
|
# Check if this item is in the TOC
|
|
chapter_title = None
|
|
if href in toc_map:
|
|
chapter_title = toc_map[href]['label']
|
|
|
|
# Create a chapter
|
|
chapter_index += 1
|
|
chapter = self.book.create_chapter(chapter_title, chapter_index)
|
|
|
|
# Parse the HTML content
|
|
try:
|
|
# Read the HTML file
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
html = f.read()
|
|
|
|
# Parse HTML and add blocks to chapter
|
|
blocks = parse_html_string(html, document=self.book)
|
|
|
|
# Copy blocks to the chapter
|
|
for block in blocks:
|
|
chapter.add_block(block)
|
|
|
|
except Exception as e:
|
|
print(f"Error parsing chapter {i+1}: {str(e)}")
|
|
# Add an error message block
|
|
from pyWebLayout.abstract.block import Paragraph
|
|
from pyWebLayout.abstract.inline import Word
|
|
from pyWebLayout.style import Font
|
|
error_para = Paragraph()
|
|
# Create a default font style for the error message
|
|
default_font = Font()
|
|
error_para.add_word(Word(f"Error loading chapter: {str(e)}", default_font))
|
|
chapter.add_block(error_para)
|
|
|
|
|
|
def read_epub(epub_path: str) -> Book:
|
|
"""
|
|
Read an EPUB file and convert it to a Book.
|
|
|
|
Args:
|
|
epub_path: Path to the EPUB file
|
|
|
|
Returns:
|
|
Book: The parsed book
|
|
"""
|
|
reader = EPUBReader(epub_path)
|
|
return reader.read()
|