373 lines
12 KiB
Python
373 lines
12 KiB
Python
"""
|
|
IT8951 E-ink Display Driver Wrapper.
|
|
|
|
This module wraps the IT8951 e-paper controller driver for use with the DReader HAL.
|
|
Implements optimizations for e-ink displays including:
|
|
- Grayscale conversion and dithering
|
|
- Multiple refresh modes (fast DU, quality GC16)
|
|
- Partial update support
|
|
- Ghosting prevention via periodic full refresh
|
|
|
|
Hardware: IT8951 controller (used in Waveshare 6" e-Paper HAT and similar)
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import os
|
|
from typing import Optional
|
|
from PIL import Image, ImageOps
|
|
|
|
# Add external IT8951 to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../external/IT8951/src'))
|
|
|
|
from IT8951.display import AutoEPDDisplay, VirtualEPDDisplay
|
|
from IT8951.constants import DisplayModes
|
|
|
|
from ..types import RefreshMode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IT8951DisplayDriver:
|
|
"""
|
|
Wrapper for IT8951 e-ink controller.
|
|
|
|
Provides async interface and e-ink optimizations for the DReader HAL.
|
|
|
|
Args:
|
|
width: Display width in pixels
|
|
height: Display height in pixels
|
|
vcom: VCOM voltage (device-specific, check cable label)
|
|
bus: SPI bus number (default 0)
|
|
device: SPI device number (default 0)
|
|
spi_hz: SPI clock frequency in Hz (default 24MHz)
|
|
virtual: Use virtual display (Tkinter window) for testing
|
|
rotate: Rotation mode ('CW', 'CCW', 'flip', or None)
|
|
mirror: Mirror display horizontally
|
|
auto_sleep: Automatically put display to sleep after updates (default True)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
width: int = 800,
|
|
height: int = 1200,
|
|
vcom: float = -2.0,
|
|
bus: int = 0,
|
|
device: int = 0,
|
|
spi_hz: int = 24_000_000,
|
|
virtual: bool = False,
|
|
rotate: Optional[str] = None,
|
|
mirror: bool = False,
|
|
auto_sleep: bool = True,
|
|
):
|
|
self.width = width
|
|
self.height = height
|
|
self.vcom = vcom
|
|
self.bus = bus
|
|
self.device = device
|
|
self.spi_hz = spi_hz
|
|
self.virtual = virtual
|
|
self.rotate = rotate
|
|
self.mirror = mirror
|
|
self.auto_sleep = auto_sleep
|
|
|
|
self.display: Optional[AutoEPDDisplay | VirtualEPDDisplay] = None
|
|
self._refresh_count = 0
|
|
self._initialized = False
|
|
|
|
async def initialize(self) -> None:
|
|
"""
|
|
Initialize the IT8951 display controller.
|
|
|
|
Creates either a real EPD display or virtual display for testing.
|
|
Clears the screen to white.
|
|
"""
|
|
if self._initialized:
|
|
return
|
|
|
|
# Run initialization in thread pool since IT8951 is blocking
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, self._init_display)
|
|
|
|
self._initialized = True
|
|
|
|
def _init_display(self) -> None:
|
|
"""Blocking initialization of display (runs in thread pool)."""
|
|
if self.virtual:
|
|
# Virtual display for testing
|
|
logger.info(f"Initializing IT8951 virtual display: {self.width}x{self.height}")
|
|
self.display = VirtualEPDDisplay(
|
|
dims=(self.width, self.height),
|
|
rotate=self.rotate,
|
|
mirror=self.mirror,
|
|
)
|
|
else:
|
|
# Real e-ink display
|
|
logger.info(f"Initializing IT8951 e-ink display: {self.width}x{self.height}, VCOM={self.vcom}V")
|
|
self.display = AutoEPDDisplay(
|
|
vcom=self.vcom,
|
|
bus=self.bus,
|
|
device=self.device,
|
|
spi_hz=self.spi_hz,
|
|
rotate=self.rotate,
|
|
mirror=self.mirror,
|
|
)
|
|
|
|
# Clear screen
|
|
logger.info("Clearing IT8951 display to white")
|
|
self.display.clear()
|
|
logger.info("IT8951 display initialized successfully")
|
|
|
|
async def cleanup(self) -> None:
|
|
"""
|
|
Cleanup display resources.
|
|
|
|
Clears the display and releases hardware.
|
|
"""
|
|
if not self._initialized or not self.display:
|
|
return
|
|
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, self._cleanup_display)
|
|
|
|
self._initialized = False
|
|
|
|
def _cleanup_display(self) -> None:
|
|
"""Blocking cleanup (runs in thread pool)."""
|
|
if self.display:
|
|
# Clear to white before shutting down
|
|
self.display.clear()
|
|
|
|
async def show_image(
|
|
self,
|
|
image: Image.Image,
|
|
mode: RefreshMode = RefreshMode.AUTO,
|
|
) -> None:
|
|
"""
|
|
Display an image on the e-ink screen.
|
|
|
|
Args:
|
|
image: PIL Image to display (RGB or L mode)
|
|
mode: Refresh mode (AUTO, FAST, QUALITY, FULL)
|
|
|
|
Refresh modes:
|
|
- AUTO: Choose based on content and refresh counter
|
|
- FAST: DU mode (~200ms) for text/UI
|
|
- QUALITY: GC16 mode (~1000ms) for images
|
|
- FULL: Full INIT refresh to clear ghosting
|
|
|
|
E-ink optimizations applied:
|
|
- Convert to 8-bit grayscale
|
|
- Apply dithering for better image quality
|
|
- Resize to match display dimensions
|
|
- Automatic ghosting prevention (full refresh every 10 pages)
|
|
- Auto-sleep after update (if enabled) to save power
|
|
"""
|
|
if not self._initialized or not self.display:
|
|
raise RuntimeError("Display not initialized. Call initialize() first.")
|
|
|
|
# Determine refresh mode
|
|
display_mode = self._determine_refresh_mode(mode)
|
|
mode_name = self._get_mode_name(display_mode)
|
|
|
|
logger.info(f"[IT8951] Displaying image {image.size} {image.mode} | Refresh #{self._refresh_count + 1} | Mode: {mode_name}")
|
|
|
|
# Prepare image for e-ink
|
|
prepared_image = self._prepare_image(image)
|
|
|
|
# Update frame buffer
|
|
self.display.frame_buf = prepared_image
|
|
|
|
# Increment refresh counter
|
|
self._refresh_count += 1
|
|
|
|
# Run display update in thread pool
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(
|
|
None,
|
|
self._update_display,
|
|
display_mode
|
|
)
|
|
|
|
logger.info(f"[IT8951] Display update completed (refresh #{self._refresh_count})")
|
|
|
|
# Automatically put display to sleep after update to save power
|
|
# E-ink displays only need power during refresh, not for static display
|
|
if self.auto_sleep:
|
|
await self.sleep()
|
|
|
|
def _prepare_image(self, image: Image.Image) -> Image.Image:
|
|
"""
|
|
Prepare image for e-ink display.
|
|
|
|
Steps:
|
|
1. Convert to grayscale
|
|
2. Resize to display dimensions if needed
|
|
3. Apply dithering for better quality
|
|
|
|
Args:
|
|
image: Input PIL Image
|
|
|
|
Returns:
|
|
Prepared grayscale image ready for e-ink
|
|
"""
|
|
# Convert to grayscale if needed
|
|
if image.mode != 'L':
|
|
image = image.convert('L')
|
|
|
|
# Resize if dimensions don't match
|
|
if image.size != (self.display.width, self.display.height):
|
|
# Use LANCZOS for high-quality downsampling
|
|
image = image.resize(
|
|
(self.display.width, self.display.height),
|
|
Image.Resampling.LANCZOS
|
|
)
|
|
|
|
# Apply dithering for better grayscale rendering
|
|
# Note: IT8951 supports 4-bit grayscale (16 levels)
|
|
# Dithering helps simulate more gray levels
|
|
image = self._apply_dithering(image)
|
|
|
|
return image
|
|
|
|
def _apply_dithering(self, image: Image.Image) -> Image.Image:
|
|
"""
|
|
Apply Floyd-Steinberg dithering to grayscale image.
|
|
|
|
This improves the appearance of images on e-ink displays by
|
|
distributing quantization error to neighboring pixels.
|
|
|
|
Args:
|
|
image: Grayscale PIL Image
|
|
|
|
Returns:
|
|
Dithered grayscale image
|
|
"""
|
|
# Convert to 1-bit with dithering, then back to grayscale
|
|
# This applies Floyd-Steinberg dithering
|
|
# Note: We use posterize instead for softer effect
|
|
# dithered = image.convert('1', dither=Image.Dither.FLOYDSTEINBERG)
|
|
# return dithered.convert('L')
|
|
|
|
# Posterize to 4-bit (16 levels) which matches IT8951 capability
|
|
# This reduces banding compared to 1-bit dithering
|
|
return ImageOps.posterize(image, 4)
|
|
|
|
def _determine_refresh_mode(self, mode: RefreshMode) -> int:
|
|
"""
|
|
Determine IT8951 display mode constant.
|
|
|
|
From HAL spec section 8.1 - E-Ink optimization:
|
|
- Use DU mode for fast text updates (~200ms)
|
|
- Use GC16 mode for quality image rendering (~1000ms)
|
|
- Full refresh every 10 pages to clear ghosting
|
|
|
|
Args:
|
|
mode: High-level refresh mode
|
|
|
|
Returns:
|
|
IT8951 DisplayModes constant
|
|
"""
|
|
if mode == RefreshMode.FAST:
|
|
return DisplayModes.DU
|
|
|
|
elif mode == RefreshMode.QUALITY:
|
|
return DisplayModes.GC16
|
|
|
|
elif mode == RefreshMode.FULL:
|
|
return DisplayModes.INIT
|
|
|
|
elif mode == RefreshMode.AUTO:
|
|
# Automatic mode selection
|
|
# Full refresh every 10 pages to clear ghosting
|
|
if self._refresh_count % 10 == 0:
|
|
return DisplayModes.INIT
|
|
else:
|
|
# Use DU (fast) for normal page turns
|
|
return DisplayModes.DU
|
|
|
|
else:
|
|
# Default to DU (fast)
|
|
return DisplayModes.DU
|
|
|
|
def _get_mode_name(self, display_mode: int) -> str:
|
|
"""Get human-readable name for display mode."""
|
|
if display_mode == DisplayModes.INIT:
|
|
return "INIT (full refresh)"
|
|
elif display_mode == DisplayModes.DU:
|
|
return "DU (fast)"
|
|
elif display_mode == DisplayModes.GC16:
|
|
return "GC16 (quality)"
|
|
else:
|
|
return f"Mode {display_mode}"
|
|
|
|
def _update_display(self, display_mode: int) -> None:
|
|
"""
|
|
Update the physical display (blocking).
|
|
|
|
This runs in a thread pool to avoid blocking the async event loop.
|
|
Wakes display before update if needed.
|
|
|
|
Args:
|
|
display_mode: IT8951 DisplayModes constant
|
|
"""
|
|
# Wake display before update (in case it was sleeping)
|
|
if hasattr(self.display, 'epd') and self.auto_sleep:
|
|
self.display.epd.run()
|
|
|
|
# Use partial update for efficiency
|
|
# IT8951 library tracks what changed and only updates that region
|
|
self.display.draw_partial(display_mode)
|
|
|
|
async def set_brightness(self, level: int) -> None:
|
|
"""
|
|
Set display brightness.
|
|
|
|
Note: IT8951 e-ink displays typically don't have adjustable brightness.
|
|
This is a no-op unless you have a frontlight controller.
|
|
|
|
For devices with frontlight, you would control it here via GPIO/PWM.
|
|
|
|
Args:
|
|
level: Brightness 0-10 (ignored for basic IT8951)
|
|
"""
|
|
# IT8951 e-ink displays don't have built-in brightness control
|
|
# You would need to control an external frontlight here
|
|
pass
|
|
|
|
async def sleep(self) -> None:
|
|
"""Put display in low-power sleep mode."""
|
|
if not self._initialized or not self.display:
|
|
return
|
|
|
|
# IT8951 sleep mode
|
|
if hasattr(self.display, 'epd'):
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(
|
|
None,
|
|
self.display.epd.sleep
|
|
)
|
|
|
|
async def wake(self) -> None:
|
|
"""Wake display from sleep mode."""
|
|
if not self._initialized or not self.display:
|
|
return
|
|
|
|
# IT8951 wake (run mode)
|
|
if hasattr(self.display, 'epd'):
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(
|
|
None,
|
|
self.display.epd.run
|
|
)
|
|
|
|
@property
|
|
def refresh_count(self) -> int:
|
|
"""Get number of refreshes since initialization."""
|
|
return self._refresh_count
|
|
|
|
def reset_refresh_count(self) -> None:
|
|
"""Reset refresh counter (useful for testing)."""
|
|
self._refresh_count = 0
|