Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions scapy/layers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
# Copyright (C) Philippe Biondi <[email protected]>

"""
DNS: Domain Name System.
DNS: Domain Name System

This implements:
- RFC1035: Domain Names
- RFC6762: Multicast DNS
- RFC6763: DNS-Based Service Discovery
"""

import abc
Expand Down Expand Up @@ -52,13 +57,16 @@
XStrLenField,
)
from scapy.interfaces import resolve_iface
from scapy.sendrecv import sr1
from scapy.sendrecv import sr1, sr
from scapy.supersocket import StreamSocket
from scapy.plist import SndRcvList, _PacketList, QueryAnswer
from scapy.pton_ntop import inet_ntop, inet_pton
from scapy.utils import pretty_list
from scapy.volatile import RandShort

from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP
from scapy.layers.inet6 import IPv6

from typing import (
Any,
Expand Down Expand Up @@ -1897,3 +1905,88 @@ class mDNS_am(DNS_am):
"""
function_name = "mdnsd"
filter = "udp port 5353"


# DNS-SD (RFC 6763)


class DNSSDResult(SndRcvList):
def __init__(self,
res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501
name="DNS-SD", # type: str
stats=None # type: Optional[List[Type[Packet]]]
):
SndRcvList.__init__(self, res, name, stats)

def show(self, *args, **kwargs):
# type: (*Any, **Any) -> None
"""
Print the list of discovered services.

:param types: types to show. Default ['PTR', 'SRV']
:param alltypes: show all types. Default False
"""
types = kwargs.get("types", ['PTR', 'SRV'])
alltypes = kwargs.get("alltypes", False)
if alltypes:
types = None
data = list() # type: List[Tuple[str | List[str], ...]]

resolve_mac = (
self.res and isinstance(self.res[0][1].underlayer, Ether) and
conf.manufdb
)

header = ("IP", "Service")
if resolve_mac:
header = ("Mac",) + header

for _, r in self.res:
attrs = []
for attr in itertools.chain(r[DNS].an, r[DNS].ar):
if types and dnstypes.get(attr.type) not in types:
continue
if isinstance(attr, DNSRRNSEC):
attrs.append(attr.sprintf("%type%=%nextname%"))
elif isinstance(attr, DNSRRSRV):
attrs.append(attr.sprintf("%type%=(%target%,%port%)"))
else:
attrs.append(attr.sprintf("%type%=%rdata%"))
ans = (r.src, attrs)
if resolve_mac:
mac = conf.manufdb._resolve_MAC(r.underlayer.src)
data.append((mac,) + ans)
else:
data.append(ans)

print(
pretty_list(
data,
[header],
)
)


@conf.commands.register
def dnssd(service="_services._dns-sd._udp.local",
af=socket.AF_INET6,
qtype="PTR",
timeout=3):
"""
Performs a DNS-SD (RFC6763) request

:param service: the service name to query (e.g. _spotify-connect._tcp.local)
:param af: the transport to use. socket.AF_INET or socket.AF_INET6
:param qtype: the type to use in the mDNS. Either TXT, PTR or SRV.
:param ret: return instead of printing
"""
if af == socket.AF_INET:
pkt = IP(dst="224.0.0.251")
elif af == socket.AF_INET6:
pkt = IPv6(dst="ff02::fb")
else:
return
pkt /= UDP(sport=5353, dport=5353)
pkt /= DNS(rd=0, qd=[DNSQR(qname=service, qtype=qtype)])
ans, _ = sr(pkt, multi=True, timeout=timeout)
return DNSSDResult(ans.res)