From 59f2fb158505837e6ebe362f1a74fc8ea81dcbcc Mon Sep 17 00:00:00 2001 From: Guillaume Valadon Date: Mon, 24 Jun 2019 17:33:55 +0200 Subject: [PATCH] Accept Scapy Packets as offline inputs --- scapy/sendrecv.py | 17 +++++++++++++++-- test/regression.uts | 14 ++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index c4a66501f8b..388b8f5bb79 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -20,7 +20,7 @@ from scapy.data import ETH_P_ALL from scapy.config import conf from scapy.error import warning -from scapy.packet import Gen +from scapy.packet import Gen, Packet from scapy.utils import get_temp_file, tcpdump, wrpcap, \ ContextManagerSubprocess, PcapReader from scapy.plist import PacketList, SndRcvList @@ -772,7 +772,8 @@ def _run(self, message = "tcpdump is not available. Cannot use filter!" raise Scapy_Exception(message) - if isinstance(offline, list): + if isinstance(offline, list) and \ + all(isinstance(elt, str) for elt in offline): sniff_sockets.update((PcapReader( fname if flt is None else tcpdump(fname, args=["-w", "-", flt], getfd=True) @@ -783,6 +784,18 @@ def _run(self, tcpdump(fname, args=["-w", "-", flt], getfd=True) ), label) for fname, label in six.iteritems(offline)) else: + # Write Scapy Packet objects to a pcap file + def _write_to_pcap(packets_list): + filename = get_temp_file(autoext=".pcap") + wrpcap(filename, offline) + return filename, filename + + if isinstance(offline, Packet): + tempfile_written, offline = _write_to_pcap([offline]) + elif isinstance(offline, list) and \ + all(isinstance(elt, Packet) for elt in offline): + tempfile_written, offline = _write_to_pcap(offline) + sniff_sockets[PcapReader( offline if flt is None else tcpdump(offline, args=["-w", "-", flt], getfd=True) diff --git a/test/regression.uts b/test/regression.uts index 07c08b5efc4..5a5feb6eb33 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -6728,6 +6728,20 @@ fdesc.close() assert list(pktpcap[TCP]) == list(pktpcap_tcp) os.unlink(filename) += Check offline sniff() with Packets and tcpdump +~ tcpdump + +l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp") +assert len(l) == 2 +assert(all(UDP in p for p in l)) + +l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp") +assert len(l) == 2 +assert(all(UDP in p for p in l)) + +l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp") +assert len(l) == 0 + = Check offline sniff() without a tcpdump binary ~ tcpdump import mock