1
1
"""ModbusProtocol network stub."""
2
2
from __future__ import annotations
3
3
4
- from pymodbus .transport .transport import ModbusProtocol
4
+ from pymodbus .transport .transport import CommParams , ModbusProtocol
5
5
6
6
7
7
class ModbusProtocolStub (ModbusProtocol ):
8
8
"""Protocol layer including transport."""
9
9
10
+ def __init__ (
11
+ self ,
12
+ params : CommParams ,
13
+ is_server : bool ,
14
+ handler : callable | None = None ,
15
+ ) -> None :
16
+ """Initialize a stub instance."""
17
+ self .stub_handle_data = handler if handler else self .dummy_handler
18
+ super ().__init__ (params , is_server )
19
+
20
+
10
21
async def start_run (self ):
11
22
"""Call need functions to start server/client."""
12
23
if self .is_server :
13
24
return await self .transport_listen ()
14
25
return await self .transport_connect ()
15
26
27
+
16
28
def callback_data (self , data : bytes , addr : tuple | None = None ) -> int :
17
29
"""Handle received data."""
18
30
if (response := self .stub_handle_data (data )):
@@ -21,11 +33,13 @@ def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
21
33
22
34
def callback_new_connection (self ) -> ModbusProtocol :
23
35
"""Call when listener receive new connection request."""
24
- return ModbusProtocolStub (self .comm_params , False )
36
+ new_stub = ModbusProtocolStub (self .comm_params , False )
37
+ new_stub .stub_handle_data = self .stub_handle_data
38
+ return new_stub
25
39
26
40
# ---------------- #
27
41
# external methods #
28
42
# ---------------- #
29
- def stub_handle_data (self , data : bytes ) -> bytes | None :
43
+ def dummy_handler (self , data : bytes ) -> bytes | None :
30
44
"""Handle received data."""
31
45
return data
0 commit comments