1 1.1 christos """ 2 1.1 christos Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 1.1 christos 4 1.1 christos SPDX-License-Identifier: MPL-2.0 5 1.1 christos 6 1.1 christos This Source Code Form is subject to the terms of the Mozilla Public 7 1.1 christos License, v. 2.0. If a copy of the MPL was not distributed with this 8 1.1 christos file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 1.1 christos 10 1.1 christos See the COPYRIGHT file distributed with this work for additional 11 1.1 christos information regarding copyright ownership. 12 1.1 christos """ 13 1.1 christos 14 1.1 christos from collections.abc import AsyncGenerator 15 1.1 christos 16 1.1 christos import dns.name 17 1.1 christos import dns.rcode 18 1.1 christos import dns.rdatatype 19 1.1 christos import dns.rrset 20 1.1 christos 21 1.1 christos from isctest.asyncserver import ( 22 1.1 christos AsyncDnsServer, 23 1.1 christos DnsResponseSend, 24 1.1 christos DomainHandler, 25 1.1 christos IgnoreAllQueries, 26 1.1 christos QnameHandler, 27 1.1 christos QnameQtypeHandler, 28 1.1 christos QueryContext, 29 1.1 christos ResponseHandler, 30 1.1 christos StaticResponseHandler, 31 1.1 christos ) 32 1.1 christos 33 1.1 christos from ..resolver_ans import ( 34 1.1 christos DelegationHandler, 35 1.1 christos Gl6412AHandler, 36 1.1 christos Gl6412Handler, 37 1.1 christos Gl6412Ns2Handler, 38 1.1 christos Gl6412Ns3Handler, 39 1.1 christos rrset, 40 1.1 christos setup_delegation, 41 1.1 christos ) 42 1.1 christos 43 1.1 christos 44 1.1 christos class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler): 45 1.1 christos qnames = [ 46 1.1 christos "baddname.example.org.", 47 1.1 christos "gooddname.example.org.", 48 1.1 christos ] 49 1.1 christos qtypes = [dns.rdatatype.NS] 50 1.1 christos answer = [rrset("example.org.", dns.rdatatype.NS, "a.root-servers.nil.")] 51 1.1 christos authoritative = True 52 1.1 christos 53 1.1 christos 54 1.1 christos def _cname_rrsets( 55 1.1 christos qname: dns.name.Name | str, 56 1.1 christos ) -> tuple[dns.rrset.RRset, dns.rrset.RRset]: 57 1.1 christos return ( 58 1.1 christos rrset(qname, dns.rdatatype.CNAME, f"{qname}"), 59 1.1 christos rrset(qname, dns.rdatatype.A, "1.2.3.4"), 60 1.1 christos ) 61 1.1 christos 62 1.1 christos 63 1.1 christos class Cname1Handler(QnameHandler, StaticResponseHandler): 64 1.1 christos qnames = ["cname1.example.com."] 65 1.1 christos # Data for the "cname + other data / 1" test 66 1.1 christos answer = _cname_rrsets(qnames[0]) 67 1.1 christos authoritative = False 68 1.1 christos 69 1.1 christos 70 1.1 christos class Cname2Handler(QnameHandler, StaticResponseHandler): 71 1.1 christos qnames = ["cname2.example.com."] 72 1.1 christos # Data for the "cname + other data / 2" test: same RRs in opposite order 73 1.1 christos answer = tuple(reversed(_cname_rrsets(qnames[0]))) 74 1.1 christos authoritative = False 75 1.1 christos 76 1.1 christos 77 1.1 christos class ExampleOrgHandler(QnameHandler): 78 1.1 christos qnames = [ 79 1.1 christos "www.example.org", 80 1.1 christos "badcname.example.org", 81 1.1 christos "goodcname.example.org", 82 1.1 christos "foo.baddname.example.org", 83 1.1 christos "foo.gooddname.example.org", 84 1.1 christos ] 85 1.1 christos 86 1.1 christos async def get_responses( 87 1.1 christos self, qctx: QueryContext 88 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 89 1.1 christos # Data for address/alias filtering. 90 1.1 christos if qctx.qtype == dns.rdatatype.A: 91 1.1 christos a_rrset = rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1") 92 1.1 christos qctx.response.answer.append(a_rrset) 93 1.1 christos elif qctx.qtype == dns.rdatatype.AAAA: 94 1.1 christos aaaa_rrset = rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1") 95 1.1 christos qctx.response.answer.append(aaaa_rrset) 96 1.1 christos yield DnsResponseSend(qctx.response, authoritative=True) 97 1.1 christos 98 1.1 christos 99 1.1 christos class NoResponseExampleUdpHandler(QnameHandler, IgnoreAllQueries): 100 1.1 christos qnames = ["noresponse.exampleudp.net."] 101 1.1 christos 102 1.1 christos 103 1.1 christos class RootNsHandler(QnameQtypeHandler): 104 1.1 christos qnames = [ 105 1.1 christos "example.com.", 106 1.1 christos "com.", 107 1.1 christos "example.org.", 108 1.1 christos "org.", 109 1.1 christos "net.", 110 1.1 christos ] 111 1.1 christos 112 1.1 christos qtypes = [dns.rdatatype.NS] 113 1.1 christos 114 1.1 christos async def get_responses( 115 1.1 christos self, qctx: QueryContext 116 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 117 1.1 christos root_ns = rrset(qctx.qname, dns.rdatatype.NS, "a.root-servers.nil.") 118 1.1 christos qctx.response.answer.append(root_ns) 119 1.1 christos yield DnsResponseSend(qctx.response, authoritative=True) 120 1.1 christos 121 1.1 christos 122 1.1 christos class Ns2Delegation(DelegationHandler): 123 1.1 christos domains = ["exampleudp.net."] 124 1.1 christos server_number = 2 125 1.1 christos 126 1.1 christos 127 1.1 christos class Ns3Delegation(DelegationHandler): 128 1.1 christos domains = [ 129 1.1 christos "example.net.", 130 1.1 christos "lame.example.org.", 131 1.1 christos "sub.example.org.", 132 1.1 christos ] 133 1.1 christos server_number = 3 134 1.1 christos 135 1.1 christos 136 1.1 christos class Ns3GlueInAnswerDelegation(DelegationHandler): 137 1.1 christos domains = ["glue-in-answer.example.org."] 138 1.1 christos server_number = 3 139 1.1 christos 140 1.1 christos async def get_responses( 141 1.1 christos self, qctx: QueryContext 142 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 143 1.1 christos async for dns_response in super().get_responses(qctx): 144 1.1 christos dns_response.response.answer += dns_response.response.additional 145 1.1 christos yield dns_response 146 1.1 christos 147 1.1 christos 148 1.1 christos class Ns4Delegation(DelegationHandler): 149 1.1 christos domains = ["broken."] 150 1.1 christos server_number = 4 151 1.1 christos 152 1.1 christos 153 1.1 christos class Ns6Delegation(DelegationHandler): 154 1.1 christos domains = [ 155 1.1 christos "redirect.com.", 156 1.1 christos "tld1.", 157 1.1 christos ] 158 1.1 christos server_number = 6 159 1.1 christos 160 1.1 christos 161 1.1 christos class Ns7Delegation(DelegationHandler): 162 1.1 christos domains = ["tld2."] 163 1.1 christos server_number = 7 164 1.1 christos 165 1.1 christos 166 1.1 christos class PartialFormerrHandler(DomainHandler, StaticResponseHandler): 167 1.1 christos domains = ["partial-formerr."] 168 1.1 christos authoritative = False 169 1.1 christos rcode = dns.rcode.FORMERR 170 1.1 christos 171 1.1 christos 172 1.1 christos class FallbackHandler(ResponseHandler): 173 1.1 christos async def get_responses( 174 1.1 christos self, qctx: QueryContext 175 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 176 1.1 christos setup_delegation(qctx, "below.www.example.com.", 3) 177 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 178 1.1 christos 179 1.1 christos 180 1.1 christos def main() -> None: 181 1.1 christos server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR) 182 1.1 christos 183 1.1 christos # Install QnameHandlers first 184 1.1 christos server.install_response_handlers( 185 1.1 christos BadGoodDnameNsHandler(), 186 1.1 christos Cname1Handler(), 187 1.1 christos Cname2Handler(), 188 1.1 christos ExampleOrgHandler(), 189 1.1 christos Gl6412AHandler(), 190 1.1 christos Gl6412Handler(), 191 1.1 christos Gl6412Ns2Handler(), 192 1.1 christos Gl6412Ns3Handler(), 193 1.1 christos NoResponseExampleUdpHandler(), 194 1.1 christos RootNsHandler(), 195 1.1 christos ) 196 1.1 christos 197 1.1 christos # Then install DomainHandlers 198 1.1 christos server.install_response_handlers( 199 1.1 christos Ns2Delegation(), 200 1.1 christos Ns3Delegation(), 201 1.1 christos Ns3GlueInAnswerDelegation(), 202 1.1 christos Ns4Delegation(), 203 1.1 christos Ns6Delegation(), 204 1.1 christos Ns7Delegation(), 205 1.1 christos PartialFormerrHandler(), 206 1.1 christos ) 207 1.1 christos 208 1.1 christos # Finally, install the fallback handler 209 1.1 christos server.install_response_handler(FallbackHandler()) 210 1.1 christos server.run() 211 1.1 christos 212 1.1 christos 213 1.1 christos if __name__ == "__main__": 214 1.1 christos main() 215