1 1.1 christos """ 2 1.1 christos Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 1.1 christos 4 1.1 christos SPDX-License-Identifier: MPL-2.0 5 1.1 christos 6 1.1 christos This Source Code Form is subject to the terms of the Mozilla Public 7 1.1 christos License, v. 2.0. If a copy of the MPL was not distributed with this 8 1.1 christos file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 1.1 christos 10 1.1 christos See the COPYRIGHT file distributed with this work for additional 11 1.1 christos information regarding copyright ownership. 12 1.1 christos """ 13 1.1 christos 14 1.1 christos from collections.abc import AsyncGenerator 15 1.1 christos 16 1.1 christos import dns.name 17 1.1 christos import dns.rcode 18 1.1 christos import dns.rdatatype 19 1.1 christos 20 1.1 christos from isctest.asyncserver import ( 21 1.1 christos AsyncDnsServer, 22 1.1 christos DnsResponseSend, 23 1.1 christos DomainHandler, 24 1.1 christos IgnoreAllQueries, 25 1.1 christos QnameHandler, 26 1.1 christos QnameQtypeHandler, 27 1.1 christos QueryContext, 28 1.1 christos ResponseHandler, 29 1.1 christos StaticResponseHandler, 30 1.1 christos ) 31 1.1 christos 32 1.1 christos from ..resolver_ans import ( 33 1.1 christos DelegationHandler, 34 1.1 christos Gl6412AHandler, 35 1.1 christos Gl6412Handler, 36 1.1 christos Gl6412Ns2Handler, 37 1.1 christos Gl6412Ns3Handler, 38 1.1 christos rrset, 39 1.1 christos rrset_from_list, 40 1.1 christos soa_rrset, 41 1.1 christos ) 42 1.1 christos 43 1.1 christos 44 1.1 christos class ApexNSHandler(QnameHandler, StaticResponseHandler): 45 1.1 christos qnames = ["example.net."] 46 1.1 christos qtypes = [dns.rdatatype.NS] 47 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.NS, f"ns.{qnames[0]}")] 48 1.1 christos additional = [rrset(f"ns.{qnames[0]}", dns.rdatatype.A, "10.53.0.3")] 49 1.1 christos 50 1.1 christos 51 1.1 christos class BadCnameHandler(QnameHandler, StaticResponseHandler): 52 1.1 christos qnames = ["badcname.example.net."] 53 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.CNAME, "badcname.example.org.")] 54 1.1 christos 55 1.1 christos 56 1.1 christos class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler): 57 1.1 christos qnames = ["baddname.example.net.", "gooddname.example.net."] 58 1.1 christos qtypes = [dns.rdatatype.NS] 59 1.1 christos authority = [soa_rrset("example.net.")] 60 1.1 christos 61 1.1 christos 62 1.1 christos class CnameSubHandler(QnameHandler, StaticResponseHandler): 63 1.1 christos qnames = ["cname.sub.example.org."] 64 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.CNAME, "ok.sub.example.org.")] 65 1.1 christos 66 1.1 christos 67 1.1 christos class FooBadDnameHandler(QnameHandler, StaticResponseHandler): 68 1.1 christos qnames = ["foo.baddname.example.net."] 69 1.1 christos answer = [ 70 1.1 christos rrset("baddname.example.net.", dns.rdatatype.DNAME, "baddname.example.org.") 71 1.1 christos ] 72 1.1 christos 73 1.1 christos 74 1.1 christos class FooBarSubTld1Handler(QnameHandler, StaticResponseHandler): 75 1.1 christos qnames = ["foo.bar.sub.tld1."] 76 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.TXT, "baz")] 77 1.1 christos 78 1.1 christos 79 1.1 christos class FooGlueInAnswerHandler(QnameHandler, StaticResponseHandler): 80 1.1 christos qnames = ["foo.glue-in-answer.example.org."] 81 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.A, "192.0.2.1")] 82 1.1 christos 83 1.1 christos 84 1.1 christos class FooGoodDnameHandler(QnameHandler, StaticResponseHandler): 85 1.1 christos qnames = ["foo.gooddname.example.net."] 86 1.1 christos answer = [ 87 1.1 christos rrset("gooddname.example.net.", dns.rdatatype.DNAME, "gooddname.example.org.") 88 1.1 christos ] 89 1.1 christos 90 1.1 christos 91 1.1 christos class GoodCnameHandler(QnameHandler, StaticResponseHandler): 92 1.1 christos qnames = ["goodcname.example.net."] 93 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.CNAME, "goodcname.example.org.")] 94 1.1 christos 95 1.1 christos 96 1.1 christos class LameExampleOrgDelegation(DelegationHandler): 97 1.1 christos domains = ["lame.example.org."] 98 1.1 christos server_number = 3 99 1.1 christos 100 1.1 christos 101 1.1 christos class LargeReferralHandler(QnameHandler, StaticResponseHandler): 102 1.1 christos qnames = ["large-referral.example.net."] 103 1.1 christos qtypes = [dns.rdatatype.NS] 104 1.1 christos authority = [ 105 1.1 christos rrset_from_list( 106 1.1 christos qnames[0], 107 1.1 christos dns.rdatatype.NS, 108 1.1 christos [f"ns{i}.fake.redirect.com." for i in range(1, 1000)], 109 1.1 christos ) 110 1.1 christos ] 111 1.1 christos 112 1.1 christos 113 1.1 christos class LongCnameHandler(ResponseHandler): 114 1.1 christos def match(self, qctx: QueryContext) -> bool: 115 1.1 christos return qctx.qname.labels[0].startswith(b"longcname") 116 1.1 christos 117 1.1 christos async def get_responses( 118 1.1 christos self, qctx: QueryContext 119 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 120 1.1 christos first_label = qctx.qname.labels[0].replace(b"longcname", b"longcnamex") 121 1.1 christos cname_target = f"{dns.name.Name((first_label,) + qctx.qname.labels[1:])}" 122 1.1 christos qctx.response.answer.append( 123 1.1 christos rrset(qctx.qname, dns.rdatatype.CNAME, cname_target) 124 1.1 christos ) 125 1.1 christos yield DnsResponseSend(qctx.response) 126 1.1 christos 127 1.1 christos 128 1.1 christos class NodataHandler(QnameHandler, StaticResponseHandler): 129 1.1 christos qnames = ["nodata.example.net."] 130 1.1 christos 131 1.1 christos 132 1.1 christos class NoresponseHandler(QnameHandler, IgnoreAllQueries): 133 1.1 christos qnames = ["noresponse.example.net."] 134 1.1 christos 135 1.1 christos 136 1.1 christos class NsHandler(QnameHandler, StaticResponseHandler): 137 1.1 christos qnames = ["ns.example.net."] 138 1.1 christos answer = [rrset(qnames[0], dns.rdatatype.A, "10.53.0.3")] 139 1.1 christos 140 1.1 christos 141 1.1 christos class NxdomainHandler(QnameHandler, StaticResponseHandler): 142 1.1 christos qnames = ["nxdomain.example.net."] 143 1.1 christos rcode = dns.rcode.NXDOMAIN 144 1.1 christos 145 1.1 christos 146 1.1 christos class OkSubHandler(QnameHandler): 147 1.1 christos qnames = ["ok.sub.example.org.", "www.ok.sub.example.org."] 148 1.1 christos 149 1.1 christos async def get_responses( 150 1.1 christos self, qctx: QueryContext 151 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 152 1.1 christos qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1")) 153 1.1 christos yield DnsResponseSend(qctx.response) 154 1.1 christos 155 1.1 christos 156 1.1 christos class PartialFormerrHandler(DomainHandler): 157 1.1 christos domains = ["partial-formerr."] 158 1.1 christos 159 1.1 christos async def get_responses( 160 1.1 christos self, qctx: QueryContext 161 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 162 1.1 christos qctx.response.answer.append( 163 1.1 christos rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3", ttl=1) 164 1.1 christos ) 165 1.1 christos yield DnsResponseSend(qctx.response) 166 1.1 christos 167 1.1 christos 168 1.1 christos class WwwDnameSubHandler(QnameHandler, StaticResponseHandler): 169 1.1 christos qnames = ["www.dname.sub.example.org."] 170 1.1 christos answer = [ 171 1.1 christos rrset("dname.sub.example.org.", dns.rdatatype.DNAME, "ok.sub.example.org.") 172 1.1 christos ] 173 1.1 christos 174 1.1 christos 175 1.1 christos class WwwHandler(QnameHandler): 176 1.1 christos qnames = ["www.example.net."] 177 1.1 christos 178 1.1 christos async def get_responses( 179 1.1 christos self, qctx: QueryContext 180 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 181 1.1 christos if qctx.qtype == dns.rdatatype.A: 182 1.1 christos qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1")) 183 1.1 christos elif qctx.qtype == dns.rdatatype.AAAA: 184 1.1 christos qctx.response.answer.append( 185 1.1 christos rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1") 186 1.1 christos ) 187 1.1 christos yield DnsResponseSend(qctx.response) 188 1.1 christos 189 1.1 christos 190 1.1 christos class FallbackHandler(StaticResponseHandler): 191 1.1 christos answer = [rrset("www.example.com.", dns.rdatatype.A, "1.2.3.4")] 192 1.1 christos 193 1.1 christos 194 1.1 christos def main() -> None: 195 1.1 christos server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR) 196 1.1 christos server.install_response_handlers( 197 1.1 christos ApexNSHandler(), 198 1.1 christos BadCnameHandler(), 199 1.1 christos BadGoodDnameNsHandler(), 200 1.1 christos CnameSubHandler(), 201 1.1 christos FooBadDnameHandler(), 202 1.1 christos FooBarSubTld1Handler(), 203 1.1 christos FooGoodDnameHandler(), 204 1.1 christos FooGlueInAnswerHandler(), 205 1.1 christos Gl6412AHandler(), 206 1.1 christos Gl6412Handler(), 207 1.1 christos Gl6412Ns2Handler(), 208 1.1 christos Gl6412Ns3Handler(), 209 1.1 christos GoodCnameHandler(), 210 1.1 christos LameExampleOrgDelegation(), 211 1.1 christos LargeReferralHandler(), 212 1.1 christos LongCnameHandler(), 213 1.1 christos NodataHandler(), 214 1.1 christos NoresponseHandler(), 215 1.1 christos NsHandler(), 216 1.1 christos NxdomainHandler(), 217 1.1 christos OkSubHandler(), 218 1.1 christos PartialFormerrHandler(), 219 1.1 christos WwwDnameSubHandler(), 220 1.1 christos WwwHandler(), 221 1.1 christos ) 222 1.1 christos 223 1.1 christos server.install_response_handler(FallbackHandler()) 224 1.1 christos server.run() 225 1.1 christos 226 1.1 christos 227 1.1 christos if __name__ == "__main__": 228 1.1 christos main() 229