1 """ 2 Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 4 SPDX-License-Identifier: MPL-2.0 5 6 This Source Code Form is subject to the terms of the Mozilla Public 7 License, v. 2.0. If a copy of the MPL was not distributed with this 8 file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 10 See the COPYRIGHT file distributed with this work for additional 11 information regarding copyright ownership. 12 """ 13 14 from collections.abc import AsyncGenerator 15 16 import dns.name 17 import dns.rcode 18 import dns.rdatatype 19 import dns.rrset 20 21 from isctest.asyncserver import ( 22 AsyncDnsServer, 23 DnsResponseSend, 24 DomainHandler, 25 IgnoreAllQueries, 26 QnameHandler, 27 QnameQtypeHandler, 28 QueryContext, 29 ResponseHandler, 30 StaticResponseHandler, 31 ) 32 33 from ..resolver_ans import ( 34 DelegationHandler, 35 Gl6412AHandler, 36 Gl6412Handler, 37 Gl6412Ns2Handler, 38 Gl6412Ns3Handler, 39 rrset, 40 setup_delegation, 41 ) 42 43 44 class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler): 45 qnames = [ 46 "baddname.example.org.", 47 "gooddname.example.org.", 48 ] 49 qtypes = [dns.rdatatype.NS] 50 answer = [rrset("example.org.", dns.rdatatype.NS, "a.root-servers.nil.")] 51 authoritative = True 52 53 54 def _cname_rrsets( 55 qname: dns.name.Name | str, 56 ) -> tuple[dns.rrset.RRset, dns.rrset.RRset]: 57 return ( 58 rrset(qname, dns.rdatatype.CNAME, f"{qname}"), 59 rrset(qname, dns.rdatatype.A, "1.2.3.4"), 60 ) 61 62 63 class Cname1Handler(QnameHandler, StaticResponseHandler): 64 qnames = ["cname1.example.com."] 65 # Data for the "cname + other data / 1" test 66 answer = _cname_rrsets(qnames[0]) 67 authoritative = False 68 69 70 class Cname2Handler(QnameHandler, StaticResponseHandler): 71 qnames = ["cname2.example.com."] 72 # Data for the "cname + other data / 2" test: same RRs in opposite order 73 answer = tuple(reversed(_cname_rrsets(qnames[0]))) 74 authoritative = False 75 76 77 class ExampleOrgHandler(QnameHandler): 78 qnames = [ 79 "www.example.org", 80 "badcname.example.org", 81 "goodcname.example.org", 82 "foo.baddname.example.org", 83 "foo.gooddname.example.org", 84 ] 85 86 async def get_responses( 87 self, qctx: QueryContext 88 ) -> AsyncGenerator[DnsResponseSend, None]: 89 # Data for address/alias filtering. 90 if qctx.qtype == dns.rdatatype.A: 91 a_rrset = rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1") 92 qctx.response.answer.append(a_rrset) 93 elif qctx.qtype == dns.rdatatype.AAAA: 94 aaaa_rrset = rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1") 95 qctx.response.answer.append(aaaa_rrset) 96 yield DnsResponseSend(qctx.response, authoritative=True) 97 98 99 class NoResponseExampleUdpHandler(QnameHandler, IgnoreAllQueries): 100 qnames = ["noresponse.exampleudp.net."] 101 102 103 class RootNsHandler(QnameQtypeHandler): 104 qnames = [ 105 "example.com.", 106 "com.", 107 "example.org.", 108 "org.", 109 "net.", 110 ] 111 112 qtypes = [dns.rdatatype.NS] 113 114 async def get_responses( 115 self, qctx: QueryContext 116 ) -> AsyncGenerator[DnsResponseSend, None]: 117 root_ns = rrset(qctx.qname, dns.rdatatype.NS, "a.root-servers.nil.") 118 qctx.response.answer.append(root_ns) 119 yield DnsResponseSend(qctx.response, authoritative=True) 120 121 122 class Ns2Delegation(DelegationHandler): 123 domains = ["exampleudp.net."] 124 server_number = 2 125 126 127 class Ns3Delegation(DelegationHandler): 128 domains = [ 129 "example.net.", 130 "lame.example.org.", 131 "sub.example.org.", 132 ] 133 server_number = 3 134 135 136 class Ns3GlueInAnswerDelegation(DelegationHandler): 137 domains = ["glue-in-answer.example.org."] 138 server_number = 3 139 140 async def get_responses( 141 self, qctx: QueryContext 142 ) -> AsyncGenerator[DnsResponseSend, None]: 143 async for dns_response in super().get_responses(qctx): 144 dns_response.response.answer += dns_response.response.additional 145 yield dns_response 146 147 148 class Ns4Delegation(DelegationHandler): 149 domains = ["broken."] 150 server_number = 4 151 152 153 class Ns6Delegation(DelegationHandler): 154 domains = [ 155 "redirect.com.", 156 "tld1.", 157 ] 158 server_number = 6 159 160 161 class Ns7Delegation(DelegationHandler): 162 domains = ["tld2."] 163 server_number = 7 164 165 166 class PartialFormerrHandler(DomainHandler, StaticResponseHandler): 167 domains = ["partial-formerr."] 168 authoritative = False 169 rcode = dns.rcode.FORMERR 170 171 172 class FallbackHandler(ResponseHandler): 173 async def get_responses( 174 self, qctx: QueryContext 175 ) -> AsyncGenerator[DnsResponseSend, None]: 176 setup_delegation(qctx, "below.www.example.com.", 3) 177 yield DnsResponseSend(qctx.response, authoritative=False) 178 179 180 def main() -> None: 181 server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR) 182 183 # Install QnameHandlers first 184 server.install_response_handlers( 185 BadGoodDnameNsHandler(), 186 Cname1Handler(), 187 Cname2Handler(), 188 ExampleOrgHandler(), 189 Gl6412AHandler(), 190 Gl6412Handler(), 191 Gl6412Ns2Handler(), 192 Gl6412Ns3Handler(), 193 NoResponseExampleUdpHandler(), 194 RootNsHandler(), 195 ) 196 197 # Then install DomainHandlers 198 server.install_response_handlers( 199 Ns2Delegation(), 200 Ns3Delegation(), 201 Ns3GlueInAnswerDelegation(), 202 Ns4Delegation(), 203 Ns6Delegation(), 204 Ns7Delegation(), 205 PartialFormerrHandler(), 206 ) 207 208 # Finally, install the fallback handler 209 server.install_response_handler(FallbackHandler()) 210 server.run() 211 212 213 if __name__ == "__main__": 214 main() 215