517 lines
16 KiB
Python
517 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Python library for control of FT5xx6 Capacitive Touch panels (CTPs).
|
|
Based on Arduino library by Owen Lyke and original work by Helge Langehaug.
|
|
Ported to Python for Raspberry Pi usage.
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
from enum import Enum, IntEnum
|
|
from typing import List, Optional, Callable, Union, Tuple
|
|
try:
|
|
import RPi.GPIO as GPIO
|
|
except ImportError:
|
|
# For development/testing without RPi.GPIO
|
|
print("Warning: RPi.GPIO not available. Using mock implementation.")
|
|
class GPIO:
|
|
BCM = 'BCM'
|
|
FALLING = 'FALLING'
|
|
IN = 'IN'
|
|
PUD_UP = 'PUD_UP'
|
|
|
|
@staticmethod
|
|
def setmode(mode): pass
|
|
|
|
@staticmethod
|
|
def setup(pin, direction, pull_up_down=None): pass
|
|
|
|
@staticmethod
|
|
def add_event_detect(pin, edge, callback=None): pass
|
|
|
|
@staticmethod
|
|
def remove_event_detect(pin): pass
|
|
|
|
try:
|
|
from smbus2 import SMBus
|
|
except ImportError:
|
|
# For development/testing without smbus2
|
|
print("Warning: smbus2 not available. Using mock implementation.")
|
|
class SMBus:
|
|
def __init__(self, bus): pass
|
|
def read_byte_data(self, addr, reg): return 0
|
|
def write_byte_data(self, addr, reg, val): pass
|
|
def read_i2c_block_data(self, addr, reg, length): return [0] * length
|
|
|
|
|
|
# Constants
|
|
FT5XX6_UNUSED_PIN = 0xFF
|
|
|
|
# Enums
|
|
class I2CAddress(IntEnum):
|
|
"""Known I2C addresses for the touch controllers"""
|
|
UNKNOWN = 0x03
|
|
FT5316 = 0x38
|
|
|
|
|
|
class RegisterAddresses(IntEnum):
|
|
"""Register addresses for the FT5xx6 controllers"""
|
|
DEV_MODE = 0x00
|
|
GEST_ID = 0x01
|
|
TD_STATUS = 0x02
|
|
T1_XH = 0x03
|
|
T1_XL = 0x04
|
|
T1_YH = 0x05
|
|
T1_YL = 0x06
|
|
T2_XH = 0x09
|
|
T2_XL = 0x0A
|
|
T2_YH = 0x0B
|
|
T2_YL = 0x0C
|
|
T3_XH = 0x0F
|
|
T3_XL = 0x10
|
|
T3_YH = 0x11
|
|
T3_YL = 0x12
|
|
T4_XH = 0x15
|
|
T4_XL = 0x16
|
|
T4_YH = 0x17
|
|
T4_YL = 0x18
|
|
T5_XH = 0x1B
|
|
T5_XL = 0x1C
|
|
T5_YH = 0x1D
|
|
T5_YL = 0x1E
|
|
|
|
|
|
class Status(Enum):
|
|
"""Status codes for function returns"""
|
|
NOMINAL = 0
|
|
ERROR = 1
|
|
NOT_ENOUGH_MEMORY = 2
|
|
|
|
|
|
class Gestures(IntEnum):
|
|
"""Gesture IDs from the touch controller"""
|
|
NO_GESTURE = 0x00
|
|
MOVE_UP = 0x10
|
|
MOVE_LEFT = 0x14
|
|
MOVE_DOWN = 0x18
|
|
MOVE_RIGHT = 0x1C
|
|
ZOOM_IN = 0x48
|
|
ZOOM_OUT = 0x49
|
|
|
|
|
|
class Mode(Enum):
|
|
"""Operation modes for the touch controller"""
|
|
INTERRUPT = 0
|
|
POLLING = 1
|
|
|
|
|
|
# Data structures
|
|
class TouchRecord:
|
|
"""Stores data for a touch event"""
|
|
|
|
def __init__(self):
|
|
self.num_touches = 0
|
|
self.t1x = 0
|
|
self.t1y = 0
|
|
self.t2x = 0
|
|
self.t2y = 0
|
|
self.t3x = 0
|
|
self.t3y = 0
|
|
self.t4x = 0
|
|
self.t4y = 0
|
|
self.t5x = 0
|
|
self.t5y = 0
|
|
self.gesture = Gestures.NO_GESTURE
|
|
self.timestamp = 0
|
|
|
|
def __eq__(self, other):
|
|
"""Compare two TouchRecord objects"""
|
|
if not isinstance(other, TouchRecord):
|
|
return False
|
|
|
|
if self.num_touches != other.num_touches:
|
|
return False
|
|
if self.t1x != other.t1x or self.t1y != other.t1y:
|
|
return False
|
|
if self.t2x != other.t2x or self.t2y != other.t2y:
|
|
return False
|
|
if self.t3x != other.t3x or self.t3y != other.t3y:
|
|
return False
|
|
if self.t4x != other.t4x or self.t4y != other.t4y:
|
|
return False
|
|
if self.t5x != other.t5x or self.t5y != other.t5y:
|
|
return False
|
|
if self.gesture != other.gesture:
|
|
return False
|
|
|
|
return True
|
|
|
|
def __ne__(self, other):
|
|
"""Not equal operator"""
|
|
return not (self == other)
|
|
|
|
|
|
class FT5xx6:
|
|
"""Base class for FT5xx6 capacitive touch panel controllers"""
|
|
|
|
def __init__(self, address: int = I2CAddress.UNKNOWN):
|
|
"""
|
|
Initialize the touch controller
|
|
|
|
Args:
|
|
address: I2C address of the touch controller
|
|
"""
|
|
self._has_interrupts = False
|
|
self._mode = Mode.POLLING
|
|
self._i2c_bus = None
|
|
self._addr = address
|
|
self._int_pin = FT5XX6_UNUSED_PIN
|
|
|
|
self._touch_record_buffer = None
|
|
self._write_offset = 0
|
|
self._read_offset = 0
|
|
self._record_depth = 0
|
|
self._records_available = 0
|
|
self._write_ok = False
|
|
self._read_ok = False
|
|
self._buffer_was_allocated = False
|
|
|
|
self.new_touch = False
|
|
self.new_data = False
|
|
|
|
self.last_update = 0
|
|
self.last_touch = TouchRecord()
|
|
|
|
# Lock for thread safety
|
|
self._lock = threading.Lock()
|
|
|
|
def begin(self, i2c_bus: int = 1, int_pin: int = FT5XX6_UNUSED_PIN,
|
|
user_isr: Optional[Callable] = None) -> Status:
|
|
"""
|
|
Initialize the touch controller
|
|
|
|
Args:
|
|
i2c_bus: I2C bus number
|
|
int_pin: Interrupt pin (BCM numbering)
|
|
user_isr: User interrupt service routine
|
|
|
|
Returns:
|
|
Status: NOMINAL if successful, ERROR otherwise
|
|
"""
|
|
if int_pin != FT5XX6_UNUSED_PIN and user_isr is not None:
|
|
self._has_interrupts = True
|
|
self._int_pin = int_pin
|
|
self._mode = Mode.INTERRUPT
|
|
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(self._int_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
|
|
GPIO.add_event_detect(self._int_pin, GPIO.FALLING, callback=user_isr)
|
|
|
|
try:
|
|
self._i2c_bus = SMBus(i2c_bus)
|
|
# Set device mode to normal operation
|
|
self._i2c_bus.write_byte_data(self._addr, RegisterAddresses.DEV_MODE, 0)
|
|
return Status.NOMINAL
|
|
except Exception as e:
|
|
print(f"Error initializing FT5xx6: {e}")
|
|
return Status.ERROR
|
|
|
|
def _get_touch_record(self, record: TouchRecord) -> Status:
|
|
"""
|
|
Read touch data from the controller
|
|
|
|
Args:
|
|
record: TouchRecord to store the data
|
|
|
|
Returns:
|
|
Status: NOMINAL if successful, ERROR otherwise
|
|
"""
|
|
try:
|
|
# Read all registers in one go for better performance
|
|
registers = self._i2c_bus.read_i2c_block_data(self._addr, 0, 31)
|
|
|
|
record.timestamp = int(time.time() * 1000) # Milliseconds
|
|
record.num_touches = registers[RegisterAddresses.TD_STATUS] & 0x0F
|
|
record.gesture = Gestures(registers[RegisterAddresses.GEST_ID])
|
|
|
|
if record.num_touches > 0:
|
|
record.t1x = ((registers[RegisterAddresses.T1_XH] & 0x0F) << 8) | registers[RegisterAddresses.T1_XL]
|
|
record.t1y = ((registers[RegisterAddresses.T1_YH] & 0x0F) << 8) | registers[RegisterAddresses.T1_YL]
|
|
|
|
if record.num_touches > 1:
|
|
record.t2x = ((registers[RegisterAddresses.T2_XH] & 0x0F) << 8) | registers[RegisterAddresses.T2_XL]
|
|
record.t2y = ((registers[RegisterAddresses.T2_YH] & 0x0F) << 8) | registers[RegisterAddresses.T2_YL]
|
|
|
|
if record.num_touches > 2:
|
|
record.t3x = ((registers[RegisterAddresses.T3_XH] & 0x0F) << 8) | registers[RegisterAddresses.T3_XL]
|
|
record.t3y = ((registers[RegisterAddresses.T3_YH] & 0x0F) << 8) | registers[RegisterAddresses.T3_YL]
|
|
|
|
if record.num_touches > 3:
|
|
record.t4x = ((registers[RegisterAddresses.T4_XH] & 0x0F) << 8) | registers[RegisterAddresses.T4_XL]
|
|
record.t4y = ((registers[RegisterAddresses.T4_YH] & 0x0F) << 8) | registers[RegisterAddresses.T4_YL]
|
|
|
|
if record.num_touches > 4:
|
|
record.t5x = ((registers[RegisterAddresses.T5_XH] & 0x0F) << 8) | registers[RegisterAddresses.T5_XL]
|
|
record.t5y = ((registers[RegisterAddresses.T5_YH] & 0x0F) << 8) | registers[RegisterAddresses.T5_YL]
|
|
|
|
return Status.NOMINAL
|
|
except Exception as e:
|
|
print(f"Error reading touch data: {e}")
|
|
return Status.ERROR
|
|
|
|
def write(self, records: Union[TouchRecord, List[TouchRecord]],
|
|
num_records: int = 1) -> Status:
|
|
"""
|
|
Write touch records to the buffer
|
|
|
|
Args:
|
|
records: TouchRecord or list of TouchRecords to write
|
|
num_records: Number of records to write
|
|
|
|
Returns:
|
|
Status: NOMINAL if successful, ERROR otherwise
|
|
"""
|
|
with self._lock:
|
|
if records is None or num_records == 0:
|
|
return Status.ERROR
|
|
|
|
# Convert single record to list
|
|
if isinstance(records, TouchRecord):
|
|
records = [records]
|
|
|
|
if self._touch_record_buffer is not None and self._record_depth != 0:
|
|
for i in range(min(num_records, len(records))):
|
|
if self._write_ok:
|
|
self._touch_record_buffer[self._write_offset] = records[i]
|
|
self._write_offset += 1
|
|
self._read_ok = True
|
|
self._records_available += 1
|
|
|
|
if self._write_offset >= self._record_depth:
|
|
self._write_offset = 0
|
|
|
|
if self._write_offset == self._read_offset:
|
|
self._write_ok = False
|
|
else:
|
|
return Status.ERROR
|
|
else:
|
|
# If no buffer, just update the last_touch
|
|
self.last_touch = records[0]
|
|
|
|
return Status.NOMINAL
|
|
|
|
def use_buffer(self, depth: int = 0,
|
|
touch_records: Optional[List[TouchRecord]] = None) -> Status:
|
|
"""
|
|
Initialize a buffer for touch records
|
|
|
|
Args:
|
|
depth: Number of records to store
|
|
touch_records: Pre-allocated buffer (optional)
|
|
|
|
Returns:
|
|
Status: NOMINAL if successful, ERROR otherwise
|
|
"""
|
|
with self._lock:
|
|
if depth > 0:
|
|
# Clear buffer without calling remove_buffer() to avoid deadlock
|
|
self._clear_buffer_internal()
|
|
self._record_depth = 0
|
|
self._buffer_was_allocated = False
|
|
self._touch_record_buffer = None
|
|
self._write_ok = False
|
|
self._read_ok = False
|
|
|
|
if touch_records is None:
|
|
try:
|
|
self._touch_record_buffer = [TouchRecord() for _ in range(depth)]
|
|
self._buffer_was_allocated = True
|
|
except MemoryError:
|
|
self._touch_record_buffer = None
|
|
return Status.NOT_ENOUGH_MEMORY
|
|
else:
|
|
self._touch_record_buffer = touch_records
|
|
self._buffer_was_allocated = False
|
|
|
|
self._record_depth = depth
|
|
self._write_ok = True
|
|
self._read_ok = False
|
|
|
|
return Status.NOMINAL
|
|
|
|
def _clear_buffer_internal(self):
|
|
"""
|
|
Internal method to clear the buffer without acquiring the lock
|
|
Used by methods that already have the lock
|
|
"""
|
|
self._records_available = 0
|
|
self._write_offset = 0
|
|
self._read_offset = 0
|
|
self._write_ok = True
|
|
self._read_ok = False
|
|
|
|
|
|
def remove_buffer(self) -> Status:
|
|
"""
|
|
Remove the touch record buffer
|
|
|
|
Returns:
|
|
Status: NOMINAL
|
|
"""
|
|
with self._lock:
|
|
self.clear_buffer()
|
|
|
|
self._record_depth = 0
|
|
self._buffer_was_allocated = False
|
|
self._touch_record_buffer = None
|
|
self._write_ok = False
|
|
self._read_ok = False
|
|
|
|
return Status.NOMINAL
|
|
|
|
def available(self) -> int:
|
|
"""
|
|
Get the number of available touch records
|
|
|
|
Returns:
|
|
Number of available records
|
|
"""
|
|
return self._records_available
|
|
|
|
def clear_buffer(self) -> Status:
|
|
"""
|
|
Clear the touch record buffer
|
|
|
|
Returns:
|
|
Status: NOMINAL
|
|
"""
|
|
with self._lock:
|
|
self._records_available = 0
|
|
self._write_offset = 0
|
|
self._read_offset = 0
|
|
self._write_ok = True
|
|
self._read_ok = False
|
|
|
|
return Status.NOMINAL
|
|
|
|
def read(self) -> TouchRecord:
|
|
"""
|
|
Read the oldest touch record from the buffer
|
|
|
|
Returns:
|
|
TouchRecord: The oldest touch record
|
|
"""
|
|
with self._lock:
|
|
if self._touch_record_buffer is not None and self._record_depth != 0:
|
|
if self._read_ok:
|
|
record = self._touch_record_buffer[self._read_offset]
|
|
self._read_offset += 1
|
|
|
|
if self._read_offset >= self._record_depth:
|
|
self._read_offset = 0
|
|
|
|
if self._read_offset == self._write_offset:
|
|
self._read_ok = False
|
|
|
|
if self._records_available > 0:
|
|
self._records_available -= 1
|
|
if self._records_available == 0:
|
|
self.new_touch = False
|
|
|
|
self._write_ok = True
|
|
|
|
return record
|
|
|
|
self.new_touch = False
|
|
return self.last_touch
|
|
|
|
def peek(self, offset_from_read: int = 0) -> TouchRecord:
|
|
"""
|
|
Peek at a touch record without removing it from the buffer
|
|
|
|
Args:
|
|
offset_from_read: Offset from the read pointer
|
|
|
|
Returns:
|
|
TouchRecord: The touch record at the specified offset
|
|
"""
|
|
with self._lock:
|
|
if self._touch_record_buffer is None or self._record_depth == 0:
|
|
return self.last_touch
|
|
|
|
offset = (self._read_offset + offset_from_read) % self._record_depth
|
|
return self._touch_record_buffer[offset]
|
|
|
|
def set_mode(self, mode: Mode) -> Status:
|
|
"""
|
|
Set the operation mode
|
|
|
|
Args:
|
|
mode: INTERRUPT or POLLING
|
|
|
|
Returns:
|
|
Status: NOMINAL
|
|
"""
|
|
self._mode = mode
|
|
return Status.NOMINAL
|
|
|
|
def update(self) -> Status:
|
|
"""
|
|
Update touch data (polling mode)
|
|
|
|
Returns:
|
|
Status: NOMINAL if successful, ERROR otherwise
|
|
"""
|
|
new_record = TouchRecord()
|
|
result = self._get_touch_record(new_record)
|
|
|
|
if result != Status.NOMINAL:
|
|
return result
|
|
|
|
self.new_data = False
|
|
self.last_update = int(time.time() * 1000) # Milliseconds
|
|
|
|
if new_record != self.last_touch:
|
|
self.write(new_record)
|
|
self.new_touch = True
|
|
|
|
return Status.NOMINAL
|
|
|
|
def interrupt(self) -> Status:
|
|
"""
|
|
Handle an interrupt
|
|
|
|
Returns:
|
|
Status: NOMINAL
|
|
"""
|
|
# Set flag to indicate new data is available
|
|
self.new_data = True
|
|
|
|
# Call the callback if defined
|
|
try:
|
|
ft5xx6_interrupt_callback(self)
|
|
except NameError:
|
|
# Callback not defined, ignore
|
|
pass
|
|
|
|
return Status.NOMINAL
|
|
|
|
|
|
class FT5316(FT5xx6):
|
|
"""FT5316 touch controller implementation"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the FT5316 touch controller"""
|
|
super().__init__(I2CAddress.FT5316)
|
|
|
|
|
|
# Weak callback functions (can be overridden by the user)
|
|
def ft5xx6_return_callback(retval: Status, file: str, line: int):
|
|
"""Called when a function returns a status code"""
|
|
pass
|
|
|
|
|
|
def ft5xx6_interrupt_callback(controller: FT5xx6):
|
|
"""Called when an interrupt occurs"""
|
|
pass
|