11import logging
22from enum import Enum
3- from ipaddress import IPv6Address
3+ from ipaddress import IPv4Address , IPv6Address
44from typing import Dict , Iterable , List , NewType , Optional , Union
55from urllib .parse import urlparse
66
@@ -22,6 +22,8 @@ class TargetType(str, Enum):
2222
2323
2424def hostname_from_url (url : Union [HttpUrl , str ]) -> Hostname :
25+ """Extract FQDN from url"""
26+
2527 parsed = urlparse (url )
2628 if all ([parsed .scheme , parsed .netloc ]) is True :
2729 url = parsed .netloc
@@ -39,6 +41,60 @@ class DomainValidator:
3941 def __init__ (self ):
4042 self .resolver = aiodns .DNSResolver (servers = settings .DNS_RESOLVERS )
4143
44+ async def get_ns_servers (self , hostname : Hostname ):
45+ """Get ns servers of a domain"""
46+ dns_servers = settings .DNS_RESOLVERS
47+ fqdn = hostname
48+
49+ stop = False
50+ while stop == False :
51+ """**Detect and get authoritative NS server of subdomains if delegated**"""
52+ try :
53+ entries = await self .resolver .query (fqdn , "NS" )
54+ servers = []
55+ for entry in entries :
56+ servers += await self .get_ipv6_addresses (entry .host )
57+ servers += await self .get_ipv4_addresses (entry .host )
58+
59+ dns_servers = servers
60+ stop = True
61+ except aiodns .error .DNSError :
62+ sub_domains = fqdn .split ("." )
63+ if len (sub_domains ) > 2 :
64+ fqdn = "." .join (sub_domains [1 :])
65+ continue
66+
67+ if len (sub_domains ) == 2 :
68+ stop = True
69+
70+ return dns_servers
71+
72+ async def get_resolver_for (self , hostname : Hostname ):
73+ dns_servers = await self .get_ns_servers (hostname )
74+ return aiodns .DNSResolver (servers = dns_servers )
75+
76+ async def get_target_type (self , fqdn : Hostname ) -> Optional [TargetType ]:
77+ domain_validator = DomainValidator ()
78+ resolver = await domain_validator .get_resolver_for (fqdn )
79+ try :
80+ entry = await resolver .query (fqdn , "CNAME" )
81+ cname = getattr (entry , "cname" )
82+ if cname == settings .DNS_IPFS_DOMAIN :
83+ return TargetType .IPFS
84+ elif cname == settings .DNS_PROGRAM_DOMAIN :
85+ return TargetType .PROGRAM
86+ elif cname == settings .DNS_INSTANCE_DOMAIN :
87+ return TargetType .INSTANCE
88+
89+ return None
90+ except aiodns .error .DNSError :
91+ return None
92+
93+ async def get_ipv4_addresses (self , hostname : Hostname ) -> List [IPv4Address ]:
94+ """Returns all IPv4 addresses for a domain"""
95+ entries : Iterable = await self .resolver .query (hostname , "A" ) or []
96+ return [entry .host for entry in entries ]
97+
4298 async def get_ipv6_addresses (self , hostname : Hostname ) -> List [IPv6Address ]:
4399 """Returns all IPv6 addresses for a domain"""
44100 entries : Iterable = await self .resolver .query (hostname , "AAAA" ) or []
@@ -109,7 +165,8 @@ async def check_domain(
109165 record_value = dns_rule ["dns" ]["value" ]
110166
111167 try :
112- entries = await self .resolver .query (record_name , record_type .upper ())
168+ resolver = await self .get_resolver_for (hostname )
169+ entries = await resolver .query (record_name , record_type .upper ())
113170 except aiodns .error .DNSError :
114171 """Continue checks"""
115172 entries = None
0 commit comments