From 44e8de6656f555726a30f160473595554a0be0c4 Mon Sep 17 00:00:00 2001 From: Melissa LeBlanc-Williams Date: Wed, 6 May 2020 08:07:40 -0700 Subject: [PATCH 1/4] Initial working version of I2C --- adafruit_bitbangio.py | 194 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 193 insertions(+), 1 deletion(-) diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index c793fa8..1c9de0f 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -41,7 +41,7 @@ """ # imports -from time import monotonic +from time import monotonic, sleep from digitalio import DigitalInOut __version__ = "0.0.0-auto.0" @@ -84,6 +84,198 @@ def deinit(self): # pylint: enable=no-self-use +class I2C(_BitBangIO): + """Software-based implementation of the I2C protocol over GPIO pins.""" + def __init__(self, scl, sda, *, frequency=400000, timeout=1): + """Initialize bitbang (or software) based I2C. Must provide the I2C + clock, and data pin numbers. + """ + super().__init__() + + # Set pins as outputs/inputs. + self._scl = DigitalInOut(scl) + self._scl.switch_to_output() + self._scl.value = 1 + + # SDA flips between being input and output + self._sda = DigitalInOut(sda) + self._sda.switch_to_output() + self._sda.value = 1 + + self._delay = 1 / frequency / 2 + self._frequency = frequency + self._timeout = timeout + + def scan(self): + """Perform an I2C Device Scan""" + found = [] + for address in range(0, 0x80): + if self._probe(address): + found.append(address) + return found + + def writeto(self, address, buffer, *, start=0, end=None, stop=True): + """Write data from the buffer to an address""" + if end is None: + end = len(buffer) + self._write(address, buffer[start:end], stop) + + def readfrom_into(self, address, buffer, *, start=0, end=None): + """Read data from an address and into the buffer""" + if end is None: + end = len(buffer) + + readin = self._read(address, end - start) + for i in range(end - start): + buffer[i + start] = readin[i] + + def writeto_then_readfrom( + self, + address, + out_buffer, + in_buffer, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None + ): + """Write data from buffer_out to an address and then + read data from an address and into buffer_in + """ + if out_end is None: + out_end = len(buffer_out) + if in_end is None: + in_end = len(buffer_in) + self.writeto(address, buffer_out, start=out_start, end=out_end, stop=stop) + self.readfrom_into(address, buffer_in, start=in_start, end=in_end) + + def _scl_low(self): + self._scl.value = 0 + + def _sda_low(self): + self._sda.value = 0 + + def _scl_release(self): + """Release and let the pullups lift""" + self._scl.value = 1 + + def _sda_release(self): + """Release and let the pullups lift""" + self._sda.value = 1 + + def _set_values(self, *, scl, sda, delay=None): + if delay is None: + delay = self._delay + self._scl.value = scl + self._scl.value = sda + sleep(delay) + + def _start(self): + self._sda_release() + self._scl_release() + sleep(self._delay) + self._sda_low() + sleep(self._delay) + + def _stop(self): + self._scl_low() + sleep(self._delay) + self._sda_low() + sleep(self._delay) + self._scl_release() + sleep(self._delay) + self._sda_release() + sleep(self._delay) + + def _repeated_start(self): + self._scl_low() + sleep(self._delay) + self._sda_release() + sleep(self._delay) + self._scl_release() + sleep(self._delay) + self._sda_low() + sleep(self._delay) + + def _write_byte(self, byte): + for bit_position in range(8): + self._scl_low() + sleep(self._delay) + if byte & (0x80 >> bit_position): + self._sda_release() + else: + self._sda_low() + sleep(self._delay) + self._scl_release() + sleep(self._delay) + self._scl_low() + sleep(self._delay * 2) + + self._scl_release() + sleep(self._delay) + + self._sda.switch_to_input() + ack = self._sda.value + self._sda.switch_to_output() + sleep(self._delay) + + self._scl_low() + + return not ack + + def _read_byte(self, ack=False): + self._scl_low() + sleep(self._delay) + + data = 0 + self._sda.switch_to_input() + for _ in range(8): + self._scl_release() + sleep(self._delay) + data = (data << 1) | int(self._sda.value) + sleep(self._delay) + self._scl_low() + sleep(self._delay) + self._sda.switch_to_output() + + if ack: + self._sda_low() + else: + self._sda_release() + sleep(self._delay) + self._scl_release() + sleep(self._delay) + return data & 0xFF + + def _probe(self, address): + self._start() + ok = self._write_byte(address << 1) + self._stop() + return ok > 0 + + def _write(self, address, buffer, transmit_stop): + self._start() + if not self._write_byte(address << 1): + raise RuntimeError("Device not responding at 0x{:02X}".format(address)) + for byte in buffer: + self._write_byte(byte) + if transmit_stop: + self._stop() + + def _read(self, address, length): + self._start() + if not self._write_byte(address << 1 | 1): + raise RuntimeError("Device not responding at 0x{:02X}".format(address)) + buffer = bytearray(length) + for byte_position in range(length): + buffer[byte_position] = self._read_byte(ack=(byte_position != length - 1)) + self._stop() + return buffer + + def _read_i2c_block_data(self, address, write_bytes, length): + """Untested""" + pass class SPI(_BitBangIO): """Software-based implementation of the SPI protocol over GPIO pins.""" From c67ee4a0b0adae4734dd7b268a73050a42a978d7 Mon Sep 17 00:00:00 2001 From: Melissa LeBlanc-Williams Date: Wed, 6 May 2020 08:19:30 -0700 Subject: [PATCH 2/4] Blacked and Pylinted --- adafruit_bitbangio.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index 1c9de0f..6b2a268 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -84,8 +84,10 @@ def deinit(self): # pylint: enable=no-self-use + class I2C(_BitBangIO): """Software-based implementation of the I2C protocol over GPIO pins.""" + def __init__(self, scl, sda, *, frequency=400000, timeout=1): """Initialize bitbang (or software) based I2C. Must provide the I2C clock, and data pin numbers. @@ -132,13 +134,14 @@ def readfrom_into(self, address, buffer, *, start=0, end=None): def writeto_then_readfrom( self, address, - out_buffer, - in_buffer, + buffer_out, + buffer_in, *, out_start=0, out_end=None, in_start=0, - in_end=None + in_end=None, + stop=True ): """Write data from buffer_out to an address and then read data from an address and into buffer_in @@ -159,7 +162,7 @@ def _sda_low(self): def _scl_release(self): """Release and let the pullups lift""" self._scl.value = 1 - + def _sda_release(self): """Release and let the pullups lift""" self._sda.value = 1 @@ -238,7 +241,7 @@ def _read_byte(self, ack=False): self._scl_low() sleep(self._delay) self._sda.switch_to_output() - + if ack: self._sda_low() else: @@ -247,13 +250,13 @@ def _read_byte(self, ack=False): self._scl_release() sleep(self._delay) return data & 0xFF - + def _probe(self, address): self._start() ok = self._write_byte(address << 1) self._stop() return ok > 0 - + def _write(self, address, buffer, transmit_stop): self._start() if not self._write_byte(address << 1): @@ -262,7 +265,7 @@ def _write(self, address, buffer, transmit_stop): self._write_byte(byte) if transmit_stop: self._stop() - + def _read(self, address, length): self._start() if not self._write_byte(address << 1 | 1): @@ -272,10 +275,7 @@ def _read(self, address, length): buffer[byte_position] = self._read_byte(ack=(byte_position != length - 1)) self._stop() return buffer - - def _read_i2c_block_data(self, address, write_bytes, length): - """Untested""" - pass + class SPI(_BitBangIO): """Software-based implementation of the SPI protocol over GPIO pins.""" From a5288a1455c179feb96c00ada32d1104863738e8 Mon Sep 17 00:00:00 2001 From: Melissa LeBlanc-Williams Date: Wed, 6 May 2020 08:39:12 -0700 Subject: [PATCH 3/4] Added lock checking --- adafruit_bitbangio.py | 214 ++++++++++++++++++++++-------------------- 1 file changed, 113 insertions(+), 101 deletions(-) diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index 6b2a268..916fa4c 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -71,6 +71,11 @@ def unlock(self): else: raise ValueError("Not locked") + def _check_lock(self): + if not self._locked: + raise RuntimeError("First call try_lock()") + return True + def __enter__(self): return self @@ -111,25 +116,28 @@ def __init__(self, scl, sda, *, frequency=400000, timeout=1): def scan(self): """Perform an I2C Device Scan""" found = [] - for address in range(0, 0x80): - if self._probe(address): - found.append(address) + if self._check_lock(): + for address in range(0, 0x80): + if self._probe(address): + found.append(address) return found def writeto(self, address, buffer, *, start=0, end=None, stop=True): """Write data from the buffer to an address""" if end is None: end = len(buffer) - self._write(address, buffer[start:end], stop) + if self._check_lock(): + self._write(address, buffer[start:end], stop) def readfrom_into(self, address, buffer, *, start=0, end=None): """Read data from an address and into the buffer""" if end is None: end = len(buffer) - readin = self._read(address, end - start) - for i in range(end - start): - buffer[i + start] = readin[i] + if self._check_lock(): + readin = self._read(address, end - start) + for i in range(end - start): + buffer[i + start] = readin[i] def writeto_then_readfrom( self, @@ -150,8 +158,9 @@ def writeto_then_readfrom( out_end = len(buffer_out) if in_end is None: in_end = len(buffer_in) - self.writeto(address, buffer_out, start=out_start, end=out_end, stop=stop) - self.readfrom_into(address, buffer_in, start=in_start, end=in_end) + if self._check_lock(): + self.writeto(address, buffer_out, start=out_start, end=out_end, stop=stop) + self.readfrom_into(address, buffer_in, start=in_start, end=in_end) def _scl_low(self): self._scl.value = 0 @@ -308,7 +317,7 @@ def __init__(self, clock, MOSI=None, MISO=None): def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8): """Configures the SPI bus. Only valid when locked.""" - if self._locked: + if self._check_lock(): if not isinstance(baudrate, int): raise ValueError("baudrate must be an integer") if not isinstance(bits, int): @@ -325,9 +334,6 @@ def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8): self._bits = bits self._half_period = (1 / self._baudrate) / 2 # 50% Duty Cyle delay - else: - raise RuntimeError("First call try_lock()") - def _wait(self, start=None): """Wait for up to one half cycle""" while (start + self._half_period) > monotonic(): @@ -344,26 +350,28 @@ def write(self, buffer, start=0, end=None): if end is None: end = len(buffer) - start_time = monotonic() - for byte in buffer[start:end]: - for bit_position in range(self._bits): - bit_value = byte & 0x80 >> bit_position - # Set clock to base - if not self._phase: # Mode 0, 2 - self._mosi.value = bit_value - self._sclk.value = self._polarity - start_time = self._wait(start_time) - - # Flip clock off base - if self._phase: # Mode 1, 3 - self._mosi.value = bit_value - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + if self._check_lock(): + start_time = monotonic() + for byte in buffer[start:end]: + for bit_position in range(self._bits): + bit_value = byte & 0x80 >> bit_position + # Set clock to base + if not self._phase: # Mode 0, 2 + self._mosi.value = bit_value + self._sclk.value = self._polarity + start_time = self._wait(start_time) + # Flip clock off base + if self._phase: # Mode 1, 3 + self._mosi.value = bit_value + self._sclk.value = not self._polarity + start_time = self._wait(start_time) + + # Return pins to base positions + self._mosi.value = 0 + self._sclk.value = self._polarity + + # pylint: disable=too-many-branches def readinto(self, buffer, start=0, end=None, write_value=0): """Read into the buffer specified by buf while writing zeroes. Requires the SPI being locked. If the number of bytes to read is 0, nothing happens. @@ -372,43 +380,44 @@ def readinto(self, buffer, start=0, end=None, write_value=0): raise RuntimeError("Read attempted with no MISO pin specified.") if end is None: end = len(buffer) - start_time = monotonic() - for byte_position, _ in enumerate(buffer[start:end]): - for bit_position in range(self._bits): - bit_mask = 0x80 >> bit_position - bit_value = write_value & 0x80 >> bit_position - # Return clock to base - self._sclk.value = self._polarity - start_time = self._wait(start_time) - # Handle read on leading edge of clock. - if not self._phase: # Mode 0, 2 - if self._mosi is not None: - self._mosi.value = bit_value - if self._miso.value: - # Set bit to 1 at appropriate location. - buffer[byte_position] |= bit_mask - else: - # Set bit to 0 at appropriate location. - buffer[byte_position] &= ~bit_mask - # Flip clock off base - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - # Handle read on trailing edge of clock. - if self._phase: # Mode 1, 3 - if self._mosi is not None: - self._mosi.value = bit_value - if self._miso.value: - # Set bit to 1 at appropriate location. - buffer[byte_position] |= bit_mask - else: - # Set bit to 0 at appropriate location. - buffer[byte_position] &= ~bit_mask - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + if self._check_lock(): + start_time = monotonic() + for byte_position, _ in enumerate(buffer[start:end]): + for bit_position in range(self._bits): + bit_mask = 0x80 >> bit_position + bit_value = write_value & 0x80 >> bit_position + # Return clock to base + self._sclk.value = self._polarity + start_time = self._wait(start_time) + # Handle read on leading edge of clock. + if not self._phase: # Mode 0, 2 + if self._mosi is not None: + self._mosi.value = bit_value + if self._miso.value: + # Set bit to 1 at appropriate location. + buffer[byte_position] |= bit_mask + else: + # Set bit to 0 at appropriate location. + buffer[byte_position] &= ~bit_mask + # Flip clock off base + self._sclk.value = not self._polarity + start_time = self._wait(start_time) + # Handle read on trailing edge of clock. + if self._phase: # Mode 1, 3 + if self._mosi is not None: + self._mosi.value = bit_value + if self._miso.value: + # Set bit to 1 at appropriate location. + buffer[byte_position] |= bit_mask + else: + # Set bit to 0 at appropriate location. + buffer[byte_position] &= ~bit_mask + + # Return pins to base positions + self._mosi.value = 0 + self._sclk.value = self._polarity - # pylint: disable=too-many-branches def write_readinto( self, buffer_out, @@ -435,40 +444,43 @@ def write_readinto( if len(buffer_out[out_start:out_end]) != len(buffer_in[in_start:in_end]): raise RuntimeError("Buffer slices must be equal length") - start_time = monotonic() - for byte_position, _ in enumerate(buffer_out[out_start:out_end]): - for bit_position in range(self._bits): - bit_mask = 0x80 >> bit_position - bit_value = buffer_out[byte_position + out_start] & 0x80 >> bit_position - in_byte_position = byte_position + in_start - # Return clock to 0 - self._sclk.value = self._polarity - start_time = self._wait(start_time) - # Handle read on leading edge of clock. - if not self._phase: # Mode 0, 2 - self._mosi.value = bit_value - if self._miso.value: - # Set bit to 1 at appropriate location. - buffer_in[in_byte_position] |= bit_mask - else: - # Set bit to 0 at appropriate location. - buffer_in[in_byte_position] &= ~bit_mask - # Flip clock off base - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - # Handle read on trailing edge of clock. - if self._phase: # Mode 1, 3 - self._mosi.value = bit_value - if self._miso.value: - # Set bit to 1 at appropriate location. - buffer_in[in_byte_position] |= bit_mask - else: - # Set bit to 0 at appropriate location. - buffer_in[in_byte_position] &= ~bit_mask - - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + if self._check_lock(): + start_time = monotonic() + for byte_position, _ in enumerate(buffer_out[out_start:out_end]): + for bit_position in range(self._bits): + bit_mask = 0x80 >> bit_position + bit_value = ( + buffer_out[byte_position + out_start] & 0x80 >> bit_position + ) + in_byte_position = byte_position + in_start + # Return clock to 0 + self._sclk.value = self._polarity + start_time = self._wait(start_time) + # Handle read on leading edge of clock. + if not self._phase: # Mode 0, 2 + self._mosi.value = bit_value + if self._miso.value: + # Set bit to 1 at appropriate location. + buffer_in[in_byte_position] |= bit_mask + else: + # Set bit to 0 at appropriate location. + buffer_in[in_byte_position] &= ~bit_mask + # Flip clock off base + self._sclk.value = not self._polarity + start_time = self._wait(start_time) + # Handle read on trailing edge of clock. + if self._phase: # Mode 1, 3 + self._mosi.value = bit_value + if self._miso.value: + # Set bit to 1 at appropriate location. + buffer_in[in_byte_position] |= bit_mask + else: + # Set bit to 0 at appropriate location. + buffer_in[in_byte_position] &= ~bit_mask + + # Return pins to base positions + self._mosi.value = 0 + self._sclk.value = self._polarity # pylint: enable=too-many-branches From ddccab737e15d6c1cdf5a0c2942e9a64c5548fa9 Mon Sep 17 00:00:00 2001 From: Melissa LeBlanc-Williams Date: Wed, 6 May 2020 08:47:23 -0700 Subject: [PATCH 4/4] Last second cleanup of unused variable --- adafruit_bitbangio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index 916fa4c..e2741e7 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -110,7 +110,6 @@ def __init__(self, scl, sda, *, frequency=400000, timeout=1): self._sda.value = 1 self._delay = 1 / frequency / 2 - self._frequency = frequency self._timeout = timeout def scan(self): @@ -170,10 +169,12 @@ def _sda_low(self): def _scl_release(self): """Release and let the pullups lift""" + # Use self._timeout to add clock stretching self._scl.value = 1 def _sda_release(self): """Release and let the pullups lift""" + # Use self._timeout to add clock stretching self._sda.value = 1 def _set_values(self, *, scl, sda, delay=None):