1 """ 2 Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 4 SPDX-License-Identifier: MPL-2.0 5 6 This Source Code Form is subject to the terms of the Mozilla Public 7 License, v. 2.0. If a copy of the MPL was not distributed with this 8 file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 10 See the COPYRIGHT file distributed with this work for additional 11 information regarding copyright ownership. 12 """ 13 14 from typing import AsyncGenerator 15 16 import dns.rcode 17 import dns.rdatatype 18 import dns.rrset 19 20 from isctest.asyncserver import ( 21 AsyncDnsServer, 22 DnsResponseSend, 23 DomainHandler, 24 IgnoreAllQueries, 25 QnameHandler, 26 QueryContext, 27 ResponseHandler, 28 ) 29 30 31 def setup_delegation(qctx: QueryContext, owner: str) -> None: 32 ns_name = f"ns.{owner}" 33 ns_rrset = dns.rrset.from_text(owner, 300, qctx.qclass, dns.rdatatype.NS, ns_name) 34 a_rrset = dns.rrset.from_text( 35 ns_name, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.3" 36 ) 37 qctx.response.authority.append(ns_rrset) 38 qctx.response.additional.append(a_rrset) 39 40 41 class BadGoodCnameHandler(QnameHandler): 42 qnames = [ 43 "badcname.example.net.", 44 "goodcname.example.net.", 45 ] 46 47 async def get_responses( 48 self, qctx: QueryContext 49 ) -> AsyncGenerator[DnsResponseSend, None]: 50 # Data for CNAME/DNAME filtering. We need to make one-level 51 # delegation to avoid automatic acceptance for subdomain aliases 52 setup_delegation(qctx, "example.net.") 53 yield DnsResponseSend(qctx.response, authoritative=False) 54 55 56 class Cname1Handler(QnameHandler): 57 qnames = ["cname1.example.com."] 58 59 async def get_responses( 60 self, qctx: QueryContext 61 ) -> AsyncGenerator[DnsResponseSend, None]: 62 # Data for the "cname + other data / 1" test 63 cname_rrset = dns.rrset.from_text( 64 qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname1.example.com." 65 ) 66 a_rrset = dns.rrset.from_text( 67 qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4" 68 ) 69 qctx.response.answer.append(cname_rrset) 70 qctx.response.answer.append(a_rrset) 71 yield DnsResponseSend(qctx.response, authoritative=False) 72 73 74 class Cname2Handler(QnameHandler): 75 qnames = ["cname2.example.com."] 76 77 async def get_responses( 78 self, qctx: QueryContext 79 ) -> AsyncGenerator[DnsResponseSend, None]: 80 # Data for the "cname + other data / 2" test: same RRs in opposite order 81 a_rrset = dns.rrset.from_text( 82 qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4" 83 ) 84 cname_rrset = dns.rrset.from_text( 85 qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname2.example.com." 86 ) 87 qctx.response.answer.append(a_rrset) 88 qctx.response.answer.append(cname_rrset) 89 yield DnsResponseSend(qctx.response, authoritative=False) 90 91 92 class ExampleHandler(QnameHandler): 93 qnames = [ 94 "www.example.com.", 95 "www.example.net.", 96 "badcname.example.org.", 97 "goodcname.example.org.", 98 "foo.badcname.example.org.", 99 "foo.goodcname.example.org.", 100 ] 101 102 async def get_responses( 103 self, qctx: QueryContext 104 ) -> AsyncGenerator[DnsResponseSend, None]: 105 # Data for address/alias filtering. 106 if qctx.qtype == dns.rdatatype.A: 107 a_rrset = dns.rrset.from_text( 108 qctx.qname, 300, qctx.qclass, qctx.qtype, "192.0.2.1" 109 ) 110 qctx.response.answer.append(a_rrset) 111 elif qctx.qtype == dns.rdatatype.AAAA: 112 aaaa_rrset = dns.rrset.from_text( 113 qctx.qname, 300, qctx.qclass, qctx.qtype, "2001:db8:beef::1" 114 ) 115 qctx.response.answer.append(aaaa_rrset) 116 yield DnsResponseSend(qctx.response, authoritative=True) 117 118 119 class FooInfoHandler(QnameHandler, IgnoreAllQueries): 120 qnames = ["foo.info."] 121 122 123 class NoDataHandler(DomainHandler): 124 domains = ["nodata.example.net."] 125 126 async def get_responses( 127 self, qctx: QueryContext 128 ) -> AsyncGenerator[DnsResponseSend, None]: 129 yield DnsResponseSend(qctx.response, authoritative=True) 130 131 132 class NxdomainHandler(DomainHandler): 133 domains = ["nxdomain.example.net."] 134 135 async def get_responses( 136 self, qctx: QueryContext 137 ) -> AsyncGenerator[DnsResponseSend, None]: 138 qctx.response.set_rcode(dns.rcode.NXDOMAIN) 139 yield DnsResponseSend(qctx.response, authoritative=True) 140 141 142 class SubHandler(DomainHandler): 143 domains = ["sub.example.org."] 144 145 async def get_responses( 146 self, qctx: QueryContext 147 ) -> AsyncGenerator[DnsResponseSend, None]: 148 # Data for CNAME/DNAME filtering. The final answers are 149 # expected to be accepted regardless of the filter setting. 150 setup_delegation(qctx, "sub.example.org.") 151 yield DnsResponseSend(qctx.response, authoritative=False) 152 153 154 class FallbackHandler(ResponseHandler): 155 async def get_responses( 156 self, qctx: QueryContext 157 ) -> AsyncGenerator[DnsResponseSend, None]: 158 setup_delegation(qctx, "below.www.example.com.") 159 yield DnsResponseSend(qctx.response, authoritative=False) 160 161 162 def main() -> None: 163 server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR) 164 server.install_response_handlers( 165 [ 166 BadGoodCnameHandler(), 167 Cname1Handler(), 168 Cname2Handler(), 169 ExampleHandler(), 170 FooInfoHandler(), 171 NoDataHandler(), 172 NxdomainHandler(), 173 SubHandler(), 174 FallbackHandler(), 175 ] 176 ) 177 server.run() 178 179 180 if __name__ == "__main__": 181 main() 182