1 1.1 christos """ 2 1.1 christos Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 1.1 christos 4 1.1 christos SPDX-License-Identifier: MPL-2.0 5 1.1 christos 6 1.1 christos This Source Code Form is subject to the terms of the Mozilla Public 7 1.1 christos License, v. 2.0. If a copy of the MPL was not distributed with this 8 1.1 christos file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 1.1 christos 10 1.1 christos See the COPYRIGHT file distributed with this work for additional 11 1.1 christos information regarding copyright ownership. 12 1.1 christos """ 13 1.1 christos 14 1.1 christos from typing import AsyncGenerator 15 1.1 christos 16 1.1 christos import dns.rcode 17 1.1 christos import dns.rdatatype 18 1.1 christos import dns.rrset 19 1.1 christos 20 1.1 christos from isctest.asyncserver import ( 21 1.1 christos AsyncDnsServer, 22 1.1 christos DnsResponseSend, 23 1.1 christos DomainHandler, 24 1.1 christos IgnoreAllQueries, 25 1.1 christos QnameHandler, 26 1.1 christos QueryContext, 27 1.1 christos ResponseHandler, 28 1.1 christos ) 29 1.1 christos 30 1.1 christos 31 1.1 christos def setup_delegation(qctx: QueryContext, owner: str) -> None: 32 1.1 christos ns_name = f"ns.{owner}" 33 1.1 christos ns_rrset = dns.rrset.from_text(owner, 300, qctx.qclass, dns.rdatatype.NS, ns_name) 34 1.1 christos a_rrset = dns.rrset.from_text( 35 1.1 christos ns_name, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.3" 36 1.1 christos ) 37 1.1 christos qctx.response.authority.append(ns_rrset) 38 1.1 christos qctx.response.additional.append(a_rrset) 39 1.1 christos 40 1.1 christos 41 1.1 christos class BadGoodCnameHandler(QnameHandler): 42 1.1 christos qnames = [ 43 1.1 christos "badcname.example.net.", 44 1.1 christos "goodcname.example.net.", 45 1.1 christos ] 46 1.1 christos 47 1.1 christos async def get_responses( 48 1.1 christos self, qctx: QueryContext 49 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 50 1.1 christos # Data for CNAME/DNAME filtering. We need to make one-level 51 1.1 christos # delegation to avoid automatic acceptance for subdomain aliases 52 1.1 christos setup_delegation(qctx, "example.net.") 53 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 54 1.1 christos 55 1.1 christos 56 1.1 christos class Cname1Handler(QnameHandler): 57 1.1 christos qnames = ["cname1.example.com."] 58 1.1 christos 59 1.1 christos async def get_responses( 60 1.1 christos self, qctx: QueryContext 61 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 62 1.1 christos # Data for the "cname + other data / 1" test 63 1.1 christos cname_rrset = dns.rrset.from_text( 64 1.1 christos qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname1.example.com." 65 1.1 christos ) 66 1.1 christos a_rrset = dns.rrset.from_text( 67 1.1 christos qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4" 68 1.1 christos ) 69 1.1 christos qctx.response.answer.append(cname_rrset) 70 1.1 christos qctx.response.answer.append(a_rrset) 71 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 72 1.1 christos 73 1.1 christos 74 1.1 christos class Cname2Handler(QnameHandler): 75 1.1 christos qnames = ["cname2.example.com."] 76 1.1 christos 77 1.1 christos async def get_responses( 78 1.1 christos self, qctx: QueryContext 79 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 80 1.1 christos # Data for the "cname + other data / 2" test: same RRs in opposite order 81 1.1 christos a_rrset = dns.rrset.from_text( 82 1.1 christos qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4" 83 1.1 christos ) 84 1.1 christos cname_rrset = dns.rrset.from_text( 85 1.1 christos qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname2.example.com." 86 1.1 christos ) 87 1.1 christos qctx.response.answer.append(a_rrset) 88 1.1 christos qctx.response.answer.append(cname_rrset) 89 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 90 1.1 christos 91 1.1 christos 92 1.1 christos class ExampleHandler(QnameHandler): 93 1.1 christos qnames = [ 94 1.1 christos "www.example.com.", 95 1.1 christos "www.example.net.", 96 1.1 christos "badcname.example.org.", 97 1.1 christos "goodcname.example.org.", 98 1.1 christos "foo.badcname.example.org.", 99 1.1 christos "foo.goodcname.example.org.", 100 1.1 christos ] 101 1.1 christos 102 1.1 christos async def get_responses( 103 1.1 christos self, qctx: QueryContext 104 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 105 1.1 christos # Data for address/alias filtering. 106 1.1 christos if qctx.qtype == dns.rdatatype.A: 107 1.1 christos a_rrset = dns.rrset.from_text( 108 1.1 christos qctx.qname, 300, qctx.qclass, qctx.qtype, "192.0.2.1" 109 1.1 christos ) 110 1.1 christos qctx.response.answer.append(a_rrset) 111 1.1 christos elif qctx.qtype == dns.rdatatype.AAAA: 112 1.1 christos aaaa_rrset = dns.rrset.from_text( 113 1.1 christos qctx.qname, 300, qctx.qclass, qctx.qtype, "2001:db8:beef::1" 114 1.1 christos ) 115 1.1 christos qctx.response.answer.append(aaaa_rrset) 116 1.1 christos yield DnsResponseSend(qctx.response, authoritative=True) 117 1.1 christos 118 1.1 christos 119 1.1 christos class FooInfoHandler(QnameHandler, IgnoreAllQueries): 120 1.1 christos qnames = ["foo.info."] 121 1.1 christos 122 1.1 christos 123 1.1 christos class NoDataHandler(DomainHandler): 124 1.1 christos domains = ["nodata.example.net."] 125 1.1 christos 126 1.1 christos async def get_responses( 127 1.1 christos self, qctx: QueryContext 128 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 129 1.1 christos yield DnsResponseSend(qctx.response, authoritative=True) 130 1.1 christos 131 1.1 christos 132 1.1 christos class NxdomainHandler(DomainHandler): 133 1.1 christos domains = ["nxdomain.example.net."] 134 1.1 christos 135 1.1 christos async def get_responses( 136 1.1 christos self, qctx: QueryContext 137 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 138 1.1 christos qctx.response.set_rcode(dns.rcode.NXDOMAIN) 139 1.1 christos yield DnsResponseSend(qctx.response, authoritative=True) 140 1.1 christos 141 1.1 christos 142 1.1 christos class SubHandler(DomainHandler): 143 1.1 christos domains = ["sub.example.org."] 144 1.1 christos 145 1.1 christos async def get_responses( 146 1.1 christos self, qctx: QueryContext 147 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 148 1.1 christos # Data for CNAME/DNAME filtering. The final answers are 149 1.1 christos # expected to be accepted regardless of the filter setting. 150 1.1 christos setup_delegation(qctx, "sub.example.org.") 151 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 152 1.1 christos 153 1.1 christos 154 1.1 christos class FallbackHandler(ResponseHandler): 155 1.1 christos async def get_responses( 156 1.1 christos self, qctx: QueryContext 157 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 158 1.1 christos setup_delegation(qctx, "below.www.example.com.") 159 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 160 1.1 christos 161 1.1 christos 162 1.1 christos def main() -> None: 163 1.1 christos server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR) 164 1.1 christos server.install_response_handlers( 165 1.1 christos [ 166 1.1 christos BadGoodCnameHandler(), 167 1.1 christos Cname1Handler(), 168 1.1 christos Cname2Handler(), 169 1.1 christos ExampleHandler(), 170 1.1 christos FooInfoHandler(), 171 1.1 christos NoDataHandler(), 172 1.1 christos NxdomainHandler(), 173 1.1 christos SubHandler(), 174 1.1 christos FallbackHandler(), 175 1.1 christos ] 176 1.1 christos ) 177 1.1 christos server.run() 178 1.1 christos 179 1.1 christos 180 1.1 christos if __name__ == "__main__": 181 1.1 christos main() 182