Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions scapy/sendrecv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from scapy.data import ETH_P_ALL
from scapy.config import conf
from scapy.error import warning
from scapy.packet import Gen
from scapy.packet import Gen, Packet
from scapy.utils import get_temp_file, tcpdump, wrpcap, \
ContextManagerSubprocess, PcapReader
from scapy.plist import PacketList, SndRcvList
Expand Down Expand Up @@ -772,7 +772,8 @@ def _run(self,
message = "tcpdump is not available. Cannot use filter!"
raise Scapy_Exception(message)

if isinstance(offline, list):
if isinstance(offline, list) and \
all(isinstance(elt, str) for elt in offline):
sniff_sockets.update((PcapReader(
fname if flt is None else
tcpdump(fname, args=["-w", "-", flt], getfd=True)
Expand All @@ -783,6 +784,18 @@ def _run(self,
tcpdump(fname, args=["-w", "-", flt], getfd=True)
), label) for fname, label in six.iteritems(offline))
else:
# Write Scapy Packet objects to a pcap file
def _write_to_pcap(packets_list):
filename = get_temp_file(autoext=".pcap")
wrpcap(filename, offline)
return filename, filename

if isinstance(offline, Packet):
tempfile_written, offline = _write_to_pcap([offline])
elif isinstance(offline, list) and \
all(isinstance(elt, Packet) for elt in offline):
tempfile_written, offline = _write_to_pcap(offline)

sniff_sockets[PcapReader(
offline if flt is None else
tcpdump(offline, args=["-w", "-", flt], getfd=True)
Expand Down
14 changes: 14 additions & 0 deletions test/regression.uts
Original file line number Diff line number Diff line change
Expand Up @@ -6728,6 +6728,20 @@ fdesc.close()
assert list(pktpcap[TCP]) == list(pktpcap_tcp)
os.unlink(filename)

= Check offline sniff() with Packets and tcpdump
~ tcpdump

l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp")
assert len(l) == 2
assert(all(UDP in p for p in l))

l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp")
assert len(l) == 2
assert(all(UDP in p for p in l))

l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp")
assert len(l) == 0

= Check offline sniff() without a tcpdump binary
~ tcpdump
import mock
Expand Down