Skip to content

Commit 6410e50

Browse files
committed
Some improvements
1 parent 2142695 commit 6410e50

File tree

7 files changed

+72
-56
lines changed

7 files changed

+72
-56
lines changed

packet_helper_core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from packet_helper_core.core import Core
1+
from packet_helper_core.decoder import Decoder
22
from packet_helper_core.packet_data import PacketData
33
from packet_helper_core.packet_data_scapy import PacketDataScapy

packet_helper_core/core.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

packet_helper_core/decoder.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from packet_helper_core.packet_data import PacketData
2+
from packet_helper_core.packet_data_scapy import PacketDataScapy
3+
from packet_helper_core.utils.utils import decode_hex
4+
5+
6+
class Decoder:
7+
"""
8+
Class Decoder is just a wrapper to create a handy-shortcut
9+
for preparing a data from hex string
10+
"""
11+
12+
def __init__(self, hex_string: str) -> None:
13+
self.hex_string = hex_string.replace(" ", "")
14+
15+
self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 2
16+
17+
def run(self, extend_with_scapy: bool = True) -> None:
18+
self.__decoded_by_tshark = PacketData(raw=str(decode_hex(self.hex_string)))
19+
if extend_with_scapy:
20+
self.__decoded_by_scapy = PacketDataScapy(
21+
raw=self.hex_string, packet_data=self.tshark_data
22+
)
23+
24+
@property
25+
def tshark_data(self) -> PacketData | None:
26+
return self.__decoded_by_tshark
27+
28+
@property
29+
def scapy_data(self) -> PacketDataScapy | None:
30+
return self.__decoded_by_scapy

packet_helper_core/packet_data.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,15 @@ class PacketData:
1111
_data_layer: list[str] = field(default_factory=list)
1212

1313
def __post_init__(self):
14-
self.raw_array = self.raw.split("\n")
15-
self.length = self.raw_array[0].replace(")", "").split()[2]
16-
self.array = self.raw_array[1:]
14+
self.array = self.raw.split("\n")[1:]
1715

18-
self.header = self.compose_header()
19-
self.body = self.compose_body()
20-
self.body2 = self.compose_body_list()
16+
self.header = self.__compose_header()
17+
self.body = self.__compose_body()
18+
self.body2 = self.__compose_body_list()
2119

22-
self.update_header()
20+
self.__update_header()
2321

24-
def compose_header(self):
22+
def __compose_header(self):
2523
return [
2624
a.replace("Layer", "").replace(":", "").replace(" ", "")
2725
for a in self.array
@@ -35,7 +33,7 @@ def __is_data_element(self, layer_fragment: str) -> bool:
3533
return True
3634
return False
3735

38-
def compose_body(self) -> dict[str, list[str]]:
36+
def __compose_body(self) -> dict[str, list[str]]:
3937
temp_body_dict: dict[str, list[str]] = {}
4038
actual_layer: str = ""
4139
for x in self.array:
@@ -51,7 +49,7 @@ def compose_body(self) -> dict[str, list[str]]:
5149
temp_body_dict[actual_layer].append(x)
5250
return temp_body_dict
5351

54-
def compose_body_list(self) -> list[list[str]]:
52+
def __compose_body_list(self) -> list[list[str]]:
5553
temp_body_dict = []
5654
line = []
5755
ckhsum_flag = False
@@ -82,12 +80,12 @@ def compose_body_list(self) -> list[list[str]]:
8280

8381
if ckhsum_flag:
8482
for y in temp_body_dict:
85-
self.chksum_verification(y)
83+
self.__chksum_verification(y)
8684

8785
temp_body_dict.append(data_found)
8886
return temp_body_dict
8987

90-
def chksum_verification(self, element) -> None:
88+
def __chksum_verification(self, element) -> None:
9189
chksum_status: ChecksumStatus = ChecksumStatus()
9290
for x in element:
9391
x = x.lower()
@@ -104,7 +102,7 @@ def chksum_verification(self, element) -> None:
104102
chksum_status.verify()
105103
self.chksum_list.append(chksum_status)
106104

107-
def update_header(self):
105+
def __update_header(self) -> None:
108106
"""Update header with data layer which is 'hidden' in the tshark output"""
109107
if self._data_layer:
110108
self.header.append("RAW")

packet_helper_core/utils/conversion.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ def from_sh_list(packet_list) -> BasePacket:
77
imported_all = importlib.import_module("scapy.all")
88

99
def remove_none():
10-
return {k: v for k, v in _value.items() if v is not None}
10+
return {k: v for k, v in layer.get(_key, {}).items() if v is not None}
1111

1212
new_packet = None
1313
for layer in packet_list:
1414
if isinstance(layer, dict):
1515
_key = [x for x in layer.keys()][0]
16-
_value = layer.get(_key)
1716
_value = remove_none()
1817
if _key == "Ethernet":
1918
_key = "Ether"

tests/test_core.py renamed to tests/test_decoder.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from packet_helper_core import PacketData, PacketDataScapy
44
from packet_helper_core.checksum_status import ChecksumStatus
5-
from packet_helper_core.core import Core
5+
from packet_helper_core.decoder import Decoder
66
from scapy.all import IP, TCP, Ether # noqa
77
from scapy_helper import get_hex
88

@@ -13,21 +13,21 @@ def test_core_post_init():
1313
"IP",
1414
"TCP",
1515
]
16-
core_results = Core(get_hex(Ether() / IP() / TCP()))
16+
decoder = Decoder(get_hex(Ether() / IP() / TCP()))
17+
decoder.run()
1718

18-
assert isinstance(core_results.hex_string, str), "Should be String"
19-
assert isinstance(
20-
core_results.scapy_data, PacketDataScapy
21-
), "Should be PacketDataScapy"
22-
assert isinstance(core_results.tshark_data, PacketData), "Should be PacketData"
23-
assert core_results.scapy_data.header == expected_headers, "Should be properly decoded"
24-
assert core_results.tshark_data.header == expected_headers, "Should be properly decoded"
19+
assert isinstance(decoder.hex_string, str), "Should be String"
20+
assert isinstance(decoder.scapy_data, PacketDataScapy), "Should be PacketDataScapy"
21+
assert isinstance(decoder.tshark_data, PacketData), "Should be PacketData"
22+
assert decoder.scapy_data.header == expected_headers, "Should be properly decoded"
23+
assert decoder.tshark_data.header == expected_headers, "Should be properly decoded"
2524

2625

2726
def test_core_chksum_verification():
28-
core_results = Core(get_hex(Ether() / IP() / IP() / TCP()))
29-
assert core_results.tshark_data.chksum_list
30-
assert len(core_results.tshark_data.chksum_list) == 4
27+
decoder = Decoder(get_hex(Ether() / IP() / IP() / TCP()))
28+
decoder.run()
29+
assert decoder.tshark_data.chksum_list
30+
assert len(decoder.tshark_data.chksum_list) == 4
3131

3232

3333
@pytest.mark.parametrize(
@@ -48,22 +48,24 @@ def test_core_chksum_verification():
4848
def test_negative_core_chksum_verification_with_wrong_chksum(
4949
packet: str, position_to_check: int, expected_chksum_value: str
5050
):
51-
core_results2 = Core(packet)
51+
decoder = Decoder(packet)
52+
decoder.run()
5253
assert (
53-
core_results2.tshark_data.chksum_list[position_to_check].chksum
54+
decoder.tshark_data.chksum_list[position_to_check].chksum
5455
== expected_chksum_value
5556
)
5657

5758

5859
def test_ethernet_ip_udp_dns():
59-
core_result = Core(
60+
decoder = Decoder(
6061
"00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010"
6162
"A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870"
6263
"617472696F747302696E0000010001C00C0005000100002A4B0002C011C01"
6364
"10001000100002A4C00044A358C99C011000200010001438C0006036E7332"
6465
"C011C011000200010001438C0006036E7331C011"
6566
)
66-
chksum_obj: ChecksumStatus = core_result.tshark_data.chksum_list[2]
67+
decoder.run()
68+
chksum_obj: ChecksumStatus = decoder.tshark_data.chksum_list[2]
6769

6870
assert chksum_obj.chksum == "0x2d2e"
6971
assert chksum_obj.chksum_calculated == "0x2d2d"

tests/test_from_sh_list.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
from typing import Any
2+
13
import pytest
4+
from scapy.base_classes import BasePacket
5+
26
from packet_helper_core.utils.conversion import from_sh_list
37
from scapy.packet import Packet
48
from scapy_helper import get_hex, to_list
@@ -10,8 +14,11 @@
1014
"packet", (SIMPLE_IP_IN_IP_PACKET, SIMPLE_IP_IN_IP_PACKET / SIMPLE_IP_IN_IP_PACKET)
1115
)
1216
def test_from_sh_list(packet: Packet) -> None:
13-
packet_list = to_list(packet)
14-
new_packet = from_sh_list(packet_list)
17+
packet_list: list[dict[str, Any]] = to_list(packet)
18+
packet_generated_from_scapy_helper = from_sh_list(packet_list)
1519

16-
assert get_hex(packet) == get_hex(new_packet)
17-
assert packet_list == to_list(new_packet)
20+
assert isinstance(packet_generated_from_scapy_helper, BasePacket)
21+
assert get_hex(packet) == get_hex(
22+
packet_generated_from_scapy_helper
23+
), "Packets should return same hex results"
24+
assert packet_list == to_list(packet_generated_from_scapy_helper)

0 commit comments

Comments
 (0)