diff --git a/periphery/i2c.py b/periphery/i2c.py index c30734e..92e0310 100644 --- a/periphery/i2c.py +++ b/periphery/i2c.py @@ -100,30 +100,30 @@ def transfer(self, address, messages): Raises: I2CError: if an I/O or OS error occurs. - TypeError: if `messages` type is not list. + TypeError: if `messages` type is not a sequence. ValueError: if `messages` length is zero, or if message data is not valid bytes. """ - if not isinstance(messages, list): - raise TypeError("Invalid messages type, should be list of I2C.Message.") - elif len(messages) == 0: - raise ValueError("Invalid messages data, should be non-zero length.") + try: + if len(messages) == 0: + raise ValueError("Invalid messages data, should be non-zero length.") + except TypeError: + raise TypeError("Invalid messages type, should be a sequence of I2C.Message.") # Convert I2C.Message messages to _CI2CMessage messages + convert_reads = [] cmessages = (_CI2CMessage * len(messages))() - for i in range(len(messages)): - # Convert I2C.Message data to bytes - if isinstance(messages[i].data, bytes): - data = messages[i].data - elif isinstance(messages[i].data, bytearray): - data = bytes(messages[i].data) - elif isinstance(messages[i].data, list): - data = bytes(bytearray(messages[i].data)) - - cmessages[i].addr = address - cmessages[i].flags = messages[i].flags | (I2C._I2C_M_RD if messages[i].read else 0) - cmessages[i].len = len(data) - cmessages[i].buf = ctypes.cast(ctypes.create_string_buffer(data, len(data)), ctypes.POINTER(ctypes.c_ubyte)) + for message, cmessage in zip(messages, cmessages): + data = message.data + if not isinstance(data, bytearray): + data = bytearray(data) + if message.read: + convert_reads.append((message, data)) + + cmessage.addr = address + cmessage.flags = message.flags | (I2C._I2C_M_RD if message.read else 0) + cmessage.len = n = len(data) + cmessage.buf = (ctypes.c_ubyte * n).from_buffer(data) # Prepare transfer structure i2c_xfer = _CI2CIocTransfer() @@ -137,16 +137,12 @@ def transfer(self, address, messages): raise I2CError(e.errno, "I2C transfer: " + e.strerror) # Update any read I2C.Message messages - for i in range(len(messages)): - if messages[i].read: - data = [cmessages[i].buf[j] for j in range(cmessages[i].len)] - # Convert read data to type used in I2C.Message messages - if isinstance(messages[i].data, list): - messages[i].data = data - elif isinstance(messages[i].data, bytearray): - messages[i].data = bytearray(data) - elif isinstance(messages[i].data, bytes): - messages[i].data = bytes(bytearray(data)) + for message, data in convert_reads: + # Convert read data to type used in I2C.Message messages + if isinstance(message.data, list): + message.data[:] = data + else: + message.data = bytes(data) def close(self): """Close the i2c-dev I2C device. @@ -193,10 +189,13 @@ def __init__(self, data, read=False, flags=0): """Instantiate an I2C Message object. Args: - data (bytes, bytearray, list): a byte array or list of 8-bit - integers to write. + data (bytes, bytearray, list, positive integer): + a sequence of 8-bit integers to write; + a number of 0x00 to write; + or a number of bytes to read (result type is bytes) read (bool): specify this as a read message, where `data` - serves as placeholder bytes for the read. + serves as placeholder bytes for the read; + if type is mutable, data is modified in-place flags (int): additional i2c-dev flags for this message. Returns: @@ -206,8 +205,8 @@ def __init__(self, data, read=False, flags=0): TypeError: if `data`, `read`, or `flags` types are invalid. """ - if not isinstance(data, (bytes, bytearray, list)): - raise TypeError("Invalid data type, should be bytes, bytearray, or list.") + if not isinstance(data, (bytes, bytearray, list, int)): + raise TypeError("Invalid data type, should be bytes, bytearray, list or integer.") if not isinstance(read, bool): raise TypeError("Invalid read type, should be boolean.") if not isinstance(flags, int): diff --git a/periphery/i2c.pyi b/periphery/i2c.pyi index 05efe5c..716e264 100644 --- a/periphery/i2c.pyi +++ b/periphery/i2c.pyi @@ -1,19 +1,22 @@ +from collections.abc import Sequence from types import TracebackType +type MessageData = bytes | bytearray | list[int] | int + class I2CError(IOError): ... class I2C: class Message: - data: bytes | bytearray | list[int] + data: MessageData read: bool flags: int - def __init__(self, data: bytes | bytearray | list[int], read: bool = ..., flags: int = ...) -> None: ... + def __init__(self, data: MessageData, read: bool = ..., flags: int = ...) -> None: ... def __init__(self, devpath: str) -> None: ... def __del__(self) -> None: ... def __enter__(self) -> I2C: ... # noqa: Y034 def __exit__(self, t: type[BaseException] | None, value: BaseException | None, traceback: TracebackType | None) -> None: ... - def transfer(self, address: int, messages: list[Message]) -> None: ... + def transfer(self, address: int, messages: Sequence[Message]) -> None: ... def close(self) -> None: ... @property def fd(self) -> int: ...