Skip to content

Correct bit handling internally and API. #2627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pymodbus/client/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import pymodbus.pdu.register_message as pdu_reg
from pymodbus.constants import ModbusStatus
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ModbusPDU
from pymodbus.utilities import pack_bitstring, unpack_bitstring
from pymodbus.pdu.pdu import ModbusPDU, pack_bitstring, unpack_bitstring


T = TypeVar("T", covariant=False)
Expand Down
4 changes: 2 additions & 2 deletions pymodbus/pdu/bit_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from pymodbus.constants import ModbusStatus
from pymodbus.datastore import ModbusDeviceContext
from pymodbus.pdu.pdu import ModbusPDU
from pymodbus.utilities import pack_bitstring, unpack_bitstring

from .pdu import ModbusPDU, pack_bitstring, unpack_bitstring


class ReadCoilsRequest(ModbusPDU):
Expand Down
4 changes: 2 additions & 2 deletions pymodbus/pdu/diag_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from pymodbus.constants import ModbusPlusOperation
from pymodbus.datastore import ModbusDeviceContext
from pymodbus.pdu.device import ModbusControlBlock
from pymodbus.pdu.pdu import ModbusPDU
from pymodbus.utilities import pack_bitstring

from .pdu import ModbusPDU, pack_bitstring


_MCB = ModbusControlBlock()
Expand Down
3 changes: 2 additions & 1 deletion pymodbus/pdu/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from abc import ABC, abstractmethod

from pymodbus.exceptions import ParameterException
from pymodbus.utilities import pack_bitstring, unpack_bitstring

from .pdu import pack_bitstring, unpack_bitstring


class ModbusEvent(ABC):
Expand Down
49 changes: 49 additions & 0 deletions pymodbus/pdu/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,52 @@ def encode(self) -> bytes:
def decode(self, data: bytes) -> None:
"""Decode a modbus exception response."""
self.exception_code = int(data[0])


def pack_bitstring(bits: list[bool], align_byte=True) -> bytes:
"""Create a bytestring out of a list of bits.

example::

bits = [True, False, False, False] +
[False, False, False, True] +
[True, False, True, False] +
[False, False, False, False]
result = pack_bitstring(bits)
bytes 0x05 0x81
"""
ret = b""
i = packed = 0
t_bits = bits
bits_extra = 8 if align_byte else 16
if (extra := len(bits) % bits_extra):
t_bits += [False] * (bits_extra - extra)
for bit in reversed(t_bits):
packed <<= 1
if bit:
packed += 1
i += 1
if i == 8:
ret += struct.pack(">B", packed)
i = packed = 0
return ret


def unpack_bitstring(data: bytes) -> list[bool]:
"""Create bit list out of a bytestring.

example::

bytes 0x05 0x81
result = unpack_bitstring(bytes)

[True, False, False, False] +
[False, False, False, True] +
[True, False, True, False] +
[False, False, False, False]
"""
res = []
for byte_index in range(len(data) -1, -1, -1):
for bit in (1, 2, 4, 8, 16, 32, 64, 128):
res.append(bool(data[byte_index] & bit))
return res
52 changes: 1 addition & 51 deletions pymodbus/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"""
from __future__ import annotations


# pylint: disable=missing-type-doc
import struct


def dict_property(store, index):
Expand Down Expand Up @@ -44,56 +44,6 @@ def dict_property(store, index):
return property(getter, setter)


# --------------------------------------------------------------------------- #
# Bit packing functions
# --------------------------------------------------------------------------- #
def pack_bitstring(bits: list[bool]) -> bytes:
"""Create a bytestring out of a list of bits.

:param bits: A list of bits

example::

bits = [False, True, False, True]
result = pack_bitstring(bits)
"""
ret = b""
i = packed = 0
for bit in bits:
if bit:
packed += 128
i += 1
if i == 8:
ret += struct.pack(">B", packed)
i = packed = 0
else:
packed >>= 1
if 0 < i < 8:
packed >>= 7 - i
ret += struct.pack(">B", packed)
return ret


def unpack_bitstring(data: bytes) -> list[bool]:
"""Create bit list out of a bytestring.

:param data: The modbus data packet to decode

example::

bytes = "bytes to decode"
result = unpack_bitstring(bytes)
"""
byte_count = len(data)
bits = []
for byte in range(byte_count):
value = int(data[byte])
for _ in range(8):
bits.append((value & 1) == 1)
value >>= 1
return bits


# --------------------------------------------------------------------------- #
# Error Detection Functions
# --------------------------------------------------------------------------- #
Expand Down
41 changes: 28 additions & 13 deletions test/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,32 +138,50 @@ def fake_execute(_self, _no_response_expected, request):
),
(
ModbusClientMixin.DATATYPE.BITS,
[True],
[256],
[True] + [False] * 15,
[1], # 0x00 0x01
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[True, False, True],
[1280],
[False] * 8 + [True] + [False] * 7,
[256], # 0x01 0x00
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[True, False, True] + [False] * 5 + [True],
[1281],
[False] * 15 + [True],
[32768], # 0x80 0x00
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[True, False, True] + [False] * 5 + [True] + [False] * 6 + [True],
[1409],
[True] + [False] * 14 + [True],
[32769], # 0x80 0x01
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[True, False, True] + [False] * 5 + [True] + [False] * 6 + [True] * 2,
[1409, 256],
[False] * 8 + [True, False, True] + [False] * 5,
[1280], # 0x05 0x00
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[True] + [False] * 7 + [True, False, True] + [False] * 5,
[1281], # 0x05 0x01
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[True] + [False] * 6 + [True, True, False, True] + [False] * 5,
[1409], # 0x05 0x81
None,
),
(
ModbusClientMixin.DATATYPE.BITS,
[False] * 8 + [True] + [False] * 7 + [True] + [False] * 6 + [True, True, False, True] + [False] * 5,
[1409, 256], # 92340480 = 0x05 0x81 0x01 0x00
None,
),
],
Expand All @@ -181,9 +199,6 @@ def test_client_mixin_convert(self, datatype, word_order, registers, value, stri
result = ModbusClientMixin.convert_from_registers(registers, datatype, **kwargs)
if datatype == ModbusClientMixin.DATATYPE.FLOAT32:
result = round(result, 6)
if datatype == ModbusClientMixin.DATATYPE.BITS:
if (missing := len(value) % 16):
value = value + [False] * (16 - missing)
assert result == value

@pytest.mark.parametrize(
Expand Down
23 changes: 1 addition & 22 deletions test/not_updated/test_utilities.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
"""Test utilities."""
import struct

import pytest

from pymodbus.utilities import (
dict_property,
pack_bitstring,
unpack_bitstring,
)
from pymodbus.utilities import dict_property


_test_master = {4: "d"}
Expand Down Expand Up @@ -64,18 +58,3 @@ def test_dict_property(self):
assert result.s_1 == "x"
assert result.s_2 == "x"
assert result.g_1 == "x"

@pytest.mark.parametrize(
("bytestream", "bitlist"),
[
(b"\x55", [True, False, True, False, True, False, True, False]),
(b"\x80", [False] * 7 + [True]),
(b"\x01", [True] + [False] * 7),
(b"\x80\x00", [False] * 7 + [True] + [False] * 8),
(b"\x01\x00", [True] + [False] * 15),
]
)
def test_bit_packing(self, bytestream, bitlist):
"""Test all string <=> bit packing functions."""
assert pack_bitstring(bitlist) == bytestream
assert unpack_bitstring(bytestream) == bitlist
2 changes: 0 additions & 2 deletions test/pdu/test_bit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ def test_bit_read_base_response_encoding(self):
data = [True] * i
pdu = bit_msg.ReadCoilsResponse(bits=data)
pdu.decode(pdu.encode())
if i % 8:
data.extend([False] * (8 - (i % 8)))
assert pdu.bits == data

def test_bit_read_base_requests(self):
Expand Down
58 changes: 57 additions & 1 deletion test/pdu/test_pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ExceptionResponse,
ModbusPDU,
)
from pymodbus.pdu.pdu import pack_bitstring, unpack_bitstring


class TestPdu:
Expand Down Expand Up @@ -115,7 +116,7 @@ def test_calculate_frame_size(self):
]

responses = [
(bit_msg.ReadCoilsResponse, (), {"bits": [True, True], "address": 17}, b'\x01\x01\x03'),
(bit_msg.ReadCoilsResponse, (), {"bits": [True, True] + [False] * 6, "address": 17}, b'\x01\x01\x03'),
(bit_msg.ReadDiscreteInputsResponse, (), {"bits": [True, True], "address": 17}, b'\x02\x01\x03'),
(bit_msg.WriteSingleCoilResponse, (), {"address": 117, "bits": [True]}, b'\x05\x00\x75\xff\x00'),
(bit_msg.WriteMultipleCoilsResponse, (), {"address": 117, "count": 3}, b'\x0f\x00\x75\x00\x03'),
Expand Down Expand Up @@ -250,3 +251,58 @@ async def test_pdu_default_datastore(self, mock_context):
pdu = ModbusPDU()
context = mock_context()
assert await pdu.update_datastore(context)

@pytest.mark.parametrize(
("bytestream", "bitlist"),
[
(b"\x00\x01", [True] + [False] * 15),
(b"\x01\x00", [False] * 8 + [True] + [False] * 7),
(b"\x80\x00", [False] * 15 + [True]),
(b"\x80\x01", [True] + [False] * 14 + [True]),
(b"\x05\x00", [False] * 8 + [True, False, True] + [False] * 5),
(b"\x05\x01", [True] + [False] * 7 + [True, False, True] + [False] * 5),
(b"\x05\x81", [True] + [False] * 6 + [True, True, False, True] + [False] * 5),
(b"\x05\x81\x01\x00", [False] * 8 + [True] + [False] * 7 + [True] + [False] * 6 + [True, True, False, True] + [False] * 5),

(b"\x00\x01", [True]),
(b"\x01\x00", [False] * 8 + [True]),
(b"\x05\x00", [False] * 8 + [True, False, True]),
(b"\x05\x01", [True] + [False] * 7 + [True, False, True]),
(b"\x05\x81", [True] + [False] * 6 + [True, True, False, True]),
(b"\x05\x81\x01\x00", [False] * 8 + [True] + [False] * 7 + [True] + [False] * 6 + [True, True, False, True]),
],
)
def test_bit_packing(self, bytestream, bitlist):
"""Test all string <=> bit packing functions."""
assert pack_bitstring(bitlist, align_byte=False) == bytestream

@pytest.mark.parametrize(
("bytestream", "bitlist"),
[
(b"\x01", [True]),
(b"\x01\x00", [False] * 8 + [True]),
(b"\x05", [True, False, True]),
(b"\x05\x01", [True] + [False] * 7 + [True, False, True]),
],
)
def test_bit_packing8(self, bytestream, bitlist):
"""Test all string <=> bit packing functions."""
assert pack_bitstring(bitlist) == bytestream

@pytest.mark.parametrize(
("bytestream", "bitlist"),
[
(b"\x01", [True] + [False] * 7),
(b"\x00\x01", [True] + [False] * 15),
(b"\x01\x00", [False] * 8 + [True] + [False] * 7),
(b"\x80\x00", [False] * 15 + [True]),
(b"\x80\x01", [True] + [False] * 14 + [True]),
(b"\x05\x00", [False] * 8 + [True, False, True] + [False] * 5),
(b"\x05\x01", [True] + [False] * 7 + [True, False, True] + [False] * 5),
(b"\x05\x81", [True] + [False] * 6 + [True, True, False, True] + [False] * 5),
(b"\x05\x81\x01\x00", [False] * 8 + [True] + [False] * 7 + [True] + [False] * 6 + [True, True, False, True] + [False] * 5),
],
)
def test_bit_unpacking(self, bytestream, bitlist):
"""Test all string <=> bit packing functions."""
assert unpack_bitstring(bytestream) == bitlist