Skip to content

correct handle_local_echo for sync client. #2593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions pymodbus/transaction/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def __init__(
sync_client = None,
) -> None:
"""Initialize an instance of the ModbusTransactionManager."""
super().__init__(params, is_server, is_sync=bool(sync_client))
self.is_sync = bool(sync_client)
super().__init__(params, is_server, is_sync=self.is_sync)
self.framer = framer
self.retries = retries
self.next_tid: int = 0
Expand All @@ -59,10 +60,9 @@ def __init__(
else:
self._lock = asyncio.Lock()
self.low_level_send = self.send
if self.is_server:
self.last_pdu: ModbusPDU | None
self.last_addr: tuple | None
self.response_future: asyncio.Future = asyncio.Future()
self.last_pdu: ModbusPDU | None
self.last_addr: tuple | None

def dummy_trace_packet(self, sending: bool, data: bytes) -> bytes:
"""Do dummy trace."""
Expand All @@ -84,6 +84,28 @@ def sync_get_response(self, dev_id) -> ModbusPDU:
while True:
if not (data := self.sync_client.recv(None)):
raise asyncio.exceptions.TimeoutError()

if self.sent_buffer:
if data.startswith(self.sent_buffer):
Log.debug(
"sync recv skipping (local_echo): {}",
self.sent_buffer,
":hex",
)
data = data[len(self.sent_buffer) :]
self.sent_buffer = b""
elif self.sent_buffer.startswith(data):
Log.debug(
"sync recv skipping (partial local_echo): {}", data, ":hex"
)
self.sent_buffer = self.sent_buffer[len(data) :]
continue
else:
Log.debug("did not sync receive local echo: {}", data, ":hex")
self.sent_buffer = b""
if not data:
continue

databuffer += data
used_len, pdu = self.framer.processIncomingFrame(self.trace_packet(False, databuffer))
databuffer = databuffer[used_len:]
Expand Down Expand Up @@ -170,6 +192,8 @@ def pdu_send(self, pdu: ModbusPDU, addr: tuple | None = None) -> None:
"""Build byte stream and send."""
self.request_dev_id = pdu.dev_id
packet = self.framer.buildFrame(self.trace_pdu(True, pdu))
if self.is_sync and self.comm_params.handle_local_echo:
self.sent_buffer = packet
self.low_level_send(self.trace_packet(True, packet), addr=addr)

def callback_new_connection(self):
Expand Down Expand Up @@ -198,8 +222,10 @@ def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
f"ERROR: request ask for id={self.request_dev_id} but got id={pdu.dev_id}, CLOSING CONNECTION."
)
if self.response_future.done():
raise ModbusIOException(f"received pdu: {pdu:str} without a corresponding request")
self.response_future.set_result(self.last_pdu)
raise ModbusIOException("received pdu without a corresponding request")
self.response_future.set_result(self.last_pdu

)
return used_len

def getNextTID(self) -> int:
Expand Down
10 changes: 10 additions & 0 deletions test/framer/test_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,16 @@ def test_framer_decode(self, test_framer):
assert not res_len
assert not res_data

@pytest.mark.parametrize(("is_server"), [False])
async def xtest_processIncomingFrame_no(self, test_framer):
"""Test processIncomingFrame."""
msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b"
with mock.patch.object(test_framer, "_processIncomingFrame") as mock_process:
mock_process.return_value = (5, None)
used_len, pdu = test_framer.processIncomingFrame(msg)
assert not used_len
assert not pdu

@pytest.mark.parametrize(("is_server"), [True])
async def test_processIncomingFrame1(self, test_framer):
"""Test processIncomingFrame."""
Expand Down
165 changes: 132 additions & 33 deletions test/transaction/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ async def test_transaction_calls(self, use_clc):
transact.callback_new_connection()
transact.callback_connected()

async def test_transaction_sync_pdu_send(self, use_clc):
"""Test dummy calls from transport."""
transact = TransactionManager(
use_clc,
FramerRTU(DecodePDU(False)),
5,
False,
None,
None,
None,
)
transact.is_sync = True
transact.comm_params.handle_local_echo = True
transact.pdu_send(ExceptionResponse(0xff), (0,0))
assert transact.sent_buffer == b'\x01\xff\x00a\xf0'

async def test_transaction_disconnect(self, use_clc):
"""Test tracers in disconnect."""
transact = TransactionManager(
Expand All @@ -89,8 +105,8 @@ async def test_transaction_disconnect(self, use_clc):
transact.trace_packet.assert_not_called()
transact.trace_pdu.assert_not_called()

@pytest.mark.parametrize("test", [True, False])
async def test_transaction_data(self, use_clc, test):
@pytest.mark.parametrize(("test", "is_server"), [(True, False), (False, False), (True, True)])
async def test_transaction_data(self, use_clc, test, is_server):
"""Test tracers in disconnect."""
pdu = ExceptionResponse(0xff)
pdu.dev_id = 0
Expand All @@ -104,6 +120,7 @@ async def test_transaction_data(self, use_clc, test):
None,
None,
)
transact.is_server = is_server
transact.framer.processIncomingFrame = mock.Mock(return_value=(0, None))
transact.callback_data(packet)
assert not transact.response_future.done()
Expand All @@ -122,7 +139,32 @@ async def test_transaction_data(self, use_clc, test):
transact.trace_pdu.assert_called_once_with(False, pdu)
assert transact.response_future.result() == pdu

@pytest.mark.parametrize("scenario", range(6))
@pytest.mark.parametrize("test", [True, False])
async def test_transaction_data_2(self, use_clc, test):
"""Test tracers in disconnect."""
pdu = ExceptionResponse(0xff)
packet = b'\x00\x03\x00\x7c\x00\x02\x04\x02'
transact = TransactionManager(
use_clc,
FramerRTU(DecodePDU(False)),
5,
False,
None,
None,
None,
)
transact.framer.processIncomingFrame = mock.Mock()
transact.trace_packet = mock.Mock(return_value=packet)
transact.framer.processIncomingFrame.return_value = (1, pdu)
if test:
pdu.dev_id = 17
else:
pdu.dev_id = 0
transact.response_future.set_result((1, pdu))
with pytest.raises(ModbusIOException):
transact.callback_data(packet)

@pytest.mark.parametrize("scenario", range(8))
async def test_transaction_execute(self, use_clc, scenario):
"""Test tracers in disconnect."""
transact = TransactionManager(
Expand Down Expand Up @@ -160,12 +202,27 @@ async def test_transaction_execute(self, use_clc, scenario):
transact.connection_lost = mock.Mock()
with pytest.raises(ModbusIOException):
await transact.execute(False, request)
elif scenario == 4: # wait receive,timeout, no_responses pass
elif scenario == 4: # wait receive,timeout, disconnect
transact.comm_params.timeout_connect = 0.1
transact.count_no_responses = 10
transact.count_until_disconnect = -1
transact.connection_lost = mock.Mock()
with pytest.raises(ModbusIOException):
await transact.execute(False, request)
else: # if scenario == 5: # response
elif scenario == 5: # wait receive,timeout, no_responses pass
transact.comm_params.timeout_connect = 0.1
transact.connection_lost = mock.Mock()
with pytest.raises(ModbusIOException):
await transact.execute(False, request)
elif scenario == 6: # wait receive, cancel
transact.comm_params.timeout_connect = 0.2
resp = asyncio.create_task(transact.execute(False, request))
await asyncio.sleep(0.1)
resp.cancel()
await asyncio.sleep(0.1)
with pytest.raises(ModbusIOException):
await resp
else: # if scenario == 7: # response
transact.comm_params.timeout_connect = 0.2
resp = asyncio.create_task(transact.execute(False, request))
await asyncio.sleep(0.1)
Expand Down Expand Up @@ -223,6 +280,41 @@ async def test_client_protocol_execute_outside(self, use_clc, no_resp):
assert not result.isError()
assert isinstance(result, ReadCoilsResponse)

async def test_transaction_id0(self, use_clc):
"""Test tracers in disconnect."""
transact = TransactionManager(
use_clc,
FramerRTU(DecodePDU(False)),
5,
False,
None,
None,
None,
)
transact.send = mock.Mock()
request = ReadCoilsRequest(address=117, count=5, dev_id=1)
response = ReadCoilsResponse(bits=[True, False, True, True, False], dev_id=0)
transact.retries = 0
transact.connection_made(mock.AsyncMock())
transact.transport.write = mock.Mock()
transact.comm_params.timeout_connect = 0.2
resp = asyncio.create_task(transact.execute(False, request))
await asyncio.sleep(0.1)
transact.response_future.set_result(response)
await asyncio.sleep(0.1)
with pytest.raises(ModbusIOException):
await resp
response = ReadCoilsResponse(bits=[True, False, True, True, False], dev_id=1)
transact.retries = 0
transact.connection_made(mock.AsyncMock())
transact.transport.write = mock.Mock()
transact.comm_params.timeout_connect = 0.2
resp = asyncio.create_task(transact.execute(False, request))
await asyncio.sleep(0.1)
transact.response_future.set_result(response)
await asyncio.sleep(0.1)
assert response == await resp


@pytest.mark.parametrize("use_port", [5098])
class TestSyncTransaction:
Expand Down Expand Up @@ -260,7 +352,7 @@ def test_sync_transaction_instance(self, use_clc):
)


@pytest.mark.parametrize("scenario", range(6))
@pytest.mark.parametrize("scenario", range(7))
async def test_sync_transaction_execute(self, use_clc, scenario):
"""Test tracers in disconnect."""
client = ModbusBaseSyncClient(
Expand Down Expand Up @@ -306,11 +398,17 @@ async def test_sync_transaction_execute(self, use_clc, scenario):
transact.count_no_responses = 10
with pytest.raises(ModbusIOException):
transact.sync_execute(False, request)
elif scenario == 4: # wait receive,timeout, no_responses pass
elif scenario == 4: # wait receive,timeout, disconnect
transact.comm_params.timeout_connect = 0.1
transact.count_no_responses = 10
transact.count_until_disconnect = -1
with pytest.raises(ModbusIOException):
transact.sync_execute(False, request)
elif scenario == 5: # wait receive,timeout, no_responses pass
transact.comm_params.timeout_connect = 0.1
with pytest.raises(ModbusIOException):
transact.sync_execute(False, request)
else: # if scenario == 5 # response
else: # if scenario == 6 # response
transact.transport = 1
resp_bytes = transact.framer.buildFrame(response)
transact.sync_client.recv = mock.Mock(return_value=resp_bytes)
Expand Down Expand Up @@ -457,8 +555,16 @@ def test_transaction_sync_id0(self, use_clc):
resp = transact.sync_execute(False, request)
assert not resp.isError()

async def test_transaction_id0(self, use_clc):
"""Test tracers in disconnect."""
def test_transaction_sync_get_response(self, use_clc):
"""Test id 0 in sync."""
client = ModbusBaseSyncClient(
FramerType.SOCKET,
5,
use_clc,
None,
None,
None,
)
transact = TransactionManager(
use_clc,
FramerRTU(DecodePDU(False)),
Expand All @@ -467,27 +573,20 @@ async def test_transaction_id0(self, use_clc):
None,
None,
None,
sync_client=client,
)
transact.send = mock.Mock()
request = ReadCoilsRequest(address=117, count=5, dev_id=1)
response = ReadCoilsResponse(bits=[True, False, True, True, False], dev_id=0)
transact.retries = 0
transact.connection_made(mock.AsyncMock())
transact.transport.write = mock.Mock()
transact.comm_params.timeout_connect = 0.2
resp = asyncio.create_task(transact.execute(False, request))
await asyncio.sleep(0.1)
transact.response_future.set_result(response)
await asyncio.sleep(0.1)
with pytest.raises(ModbusIOException):
await resp
response = ReadCoilsResponse(bits=[True, False, True, True, False], dev_id=1)
transact.retries = 0
transact.connection_made(mock.AsyncMock())
transact.transport.write = mock.Mock()
transact.comm_params.timeout_connect = 0.2
resp = asyncio.create_task(transact.execute(False, request))
await asyncio.sleep(0.1)
transact.response_future.set_result(response)
await asyncio.sleep(0.1)
assert response == await resp
client.recv = mock.Mock()
request = transact.framer.buildFrame(ReadCoilsRequest(address=117, count=5, dev_id=1))
response = transact.framer.buildFrame(ReadCoilsResponse(bits=[True*8], dev_id=1))
transact.sent_buffer = request
client.recv.side_effect = [request, response]
pdu = transact.sync_get_response(1)
assert isinstance(pdu, ReadCoilsResponse)
transact.sent_buffer = request
client.recv.side_effect = [request[:3], request[3:], response]
pdu = transact.sync_get_response(1)
assert isinstance(pdu, ReadCoilsResponse)
transact.sent_buffer = request
client.recv.side_effect = [response]
pdu = transact.sync_get_response(1)
assert isinstance(pdu, ReadCoilsResponse)
5 changes: 5 additions & 0 deletions test/transport/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ async def test_init_source_addr(self, use_clc):
use_clc.source_address = ("localhost", 112)
ModbusProtocol(use_clc, True)

def test_init_sync(self, use_clc):
"""Test callbacks."""
use_clc.source_address = ("localhost", 112)
ModbusProtocol(use_clc, True, is_sync=True)

async def test_init_source_addr_none(self, use_clc):
"""Test callbacks."""
use_clc.source_address = None
Expand Down