diff --git a/makefile b/makefile index 44a32e2..f708daa 100644 --- a/makefile +++ b/makefile @@ -14,3 +14,12 @@ build: @echo "Building..." python setup.py sdist bdist_wheel --universal @echo "Building... Done" + +install-ptf: clean + @echo "Getting ptf..." + git clone https://github.com/PacketHelper/ptf.git + cd ptf && python setup.py install + python setup.py sdist bdist_wheel --universal + pip install dist/ptf-0.9.1-py2.py3-none-any.whl + rm -rf ptf + @echo "Getting ptf... Done" \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index 03f940c..c25c707 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -3,3 +3,4 @@ black flake8 pytest~=6.2.4 +git+https://github.com/PacketHelper/ptf.git \ No newline at end of file diff --git a/tests/test_packet_data.py b/tests/test_packet_data.py index c463d9a..2aa10d4 100644 --- a/tests/test_packet_data.py +++ b/tests/test_packet_data.py @@ -6,6 +6,12 @@ from packet_helper_core.packet_data import PacketData from packet_helper_core.utils.utils import decode_hex from tests.utils.example_packets import EXAMPLE_ETHER +from ptf.testutils import ( + simple_tcp_packet, + simple_ip_packet, + simple_udp_packet, + simple_eth_packet, +) class TestPacketData: @@ -28,3 +34,65 @@ def test_custom_packet_data(self): raise Exception( f"Missing layer ${expected_packet} in packet. PyShark decode correctly?" ) + + def test_tcp_packet(self): + dport = 81 + pkt = simple_tcp_packet(tcp_dport=dport) + packet = decode_hex(get_hex(pkt)) + list_of_expected_packets = ("ETH", "IP", "TCP") + list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] + for expected_packet in list_of_expected_packets: + if expected_packet not in list_of_layers_from_packet: + raise Exception( + f"Missing layer ${expected_packet} in packet. PyShark decode correctly?" + ) + if packet.tcp.dstport != str(dport): + raise Exception( + f"TCP destination port mismatch. Is {packet.tcp.dstport}, should be {dport}." + ) + + def test_ip_packet(self): + ttl = 32 + pkt = simple_ip_packet(ip_ttl=ttl) + packet = decode_hex(get_hex(pkt)) + list_of_expected_packets = ("ETH", "IP") + list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] + for expected_packet in list_of_expected_packets: + if expected_packet not in list_of_layers_from_packet: + raise Exception( + f"Missing layer ${expected_packet} in packet. PyShark decode correctly?" + ) + if packet.ip.ttl != str(ttl): + raise Exception(f"IP ttl mismatch. Is {packet.ip.ttl}, should be {ttl}.") + + def test_udp_packet(self): + dport = 81 + pkt = simple_udp_packet(udp_dport=dport) + packet = decode_hex(get_hex(pkt)) + list_of_expected_packets = ("ETH", "IP", "UDP") + list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] + for expected_packet in list_of_expected_packets: + if expected_packet not in list_of_layers_from_packet: + raise Exception( + f"Missing layer ${expected_packet} in packet. PyShark decode correctly?" + ) + if packet.udp.dstport != str(dport): + raise Exception( + f"UDP destination port mismatch. Is {packet.udp.dstport}, should be {dport}." + ) + + def test_eth_packet(self): + dst = "01:02:03:04:05:06" + pkt = simple_eth_packet(eth_dst=dst) + packet = decode_hex(get_hex(pkt)) + list_of_expected_packets = ("ETH", "LLDP") + list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] + for expected_packet in list_of_expected_packets: + if expected_packet not in list_of_layers_from_packet: + raise Exception( + f"Missing layer ${expected_packet} in packet. PyShark decode correctly?" + ) + if packet.eth.dst != dst: + raise Exception( + f"ETH destination mismatch. Is {packet.eth.dst}, should be {dst}." + )