1 1.1 christos """ 2 1.1 christos Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 1.1 christos 4 1.1 christos SPDX-License-Identifier: MPL-2.0 5 1.1 christos 6 1.1 christos This Source Code Form is subject to the terms of the Mozilla Public 7 1.1 christos License, v. 2.0. If a copy of the MPL was not distributed with this 8 1.1 christos file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 1.1 christos 10 1.1 christos See the COPYRIGHT file distributed with this work for additional 11 1.1 christos information regarding copyright ownership. 12 1.1 christos """ 13 1.1 christos 14 1.1 christos from collections.abc import AsyncGenerator, Collection, Iterable 15 1.1 christos 16 1.1 christos import abc 17 1.1 christos 18 1.1 christos import dns.rcode 19 1.1 christos import dns.rdataclass 20 1.1 christos import dns.rdatatype 21 1.1 christos import dns.rrset 22 1.1 christos 23 1.1 christos from isctest.asyncserver import ( 24 1.1 christos ControllableAsyncDnsServer, 25 1.1 christos DnsResponseSend, 26 1.1 christos QueryContext, 27 1.1 christos ResponseHandler, 28 1.1 christos SwitchControlCommand, 29 1.1 christos ) 30 1.1 christos 31 1.1 christos 32 1.1 christos def rrset(owner: str, rdtype: dns.rdatatype.RdataType, rdata: str) -> dns.rrset.RRset: 33 1.1 christos return dns.rrset.from_text( 34 1.1 christos owner, 35 1.1 christos 300, 36 1.1 christos dns.rdataclass.IN, 37 1.1 christos rdtype, 38 1.1 christos rdata, 39 1.1 christos ) 40 1.1 christos 41 1.1 christos 42 1.1 christos def soa(serial: int, *, owner: str = "nil.") -> dns.rrset.RRset: 43 1.1 christos return rrset( 44 1.1 christos owner, 45 1.1 christos dns.rdatatype.SOA, 46 1.1 christos f"ns.nil. root.nil. {serial} 300 300 604800 300", 47 1.1 christos ) 48 1.1 christos 49 1.1 christos 50 1.1 christos def ns() -> dns.rrset.RRset: 51 1.1 christos return rrset( 52 1.1 christos "nil.", 53 1.1 christos dns.rdatatype.NS, 54 1.1 christos "ns.nil.", 55 1.1 christos ) 56 1.1 christos 57 1.1 christos 58 1.1 christos def a(address: str, *, owner: str) -> dns.rrset.RRset: 59 1.1 christos return rrset( 60 1.1 christos owner, 61 1.1 christos dns.rdatatype.A, 62 1.1 christos address, 63 1.1 christos ) 64 1.1 christos 65 1.1 christos 66 1.1 christos def txt(data: str, *, owner: str = "nil.") -> dns.rrset.RRset: 67 1.1 christos return rrset( 68 1.1 christos owner, 69 1.1 christos dns.rdatatype.TXT, 70 1.1 christos f'"{data}"', 71 1.1 christos ) 72 1.1 christos 73 1.1 christos 74 1.1 christos class SoaHandler(ResponseHandler): 75 1.1 christos def __init__(self, serial: int): 76 1.1 christos self._serial = serial 77 1.1 christos 78 1.1 christos def match(self, qctx: QueryContext) -> bool: 79 1.1 christos return qctx.qtype == dns.rdatatype.SOA 80 1.1 christos 81 1.1 christos async def get_responses( 82 1.1 christos self, qctx: QueryContext 83 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 84 1.1 christos qctx.response.answer.append(soa(self._serial)) 85 1.1 christos yield DnsResponseSend(qctx.response) 86 1.1 christos 87 1.1 christos 88 1.1 christos class AxfrHandler(ResponseHandler): 89 1.1 christos @property 90 1.1 christos @abc.abstractmethod 91 1.1 christos def answers(self) -> Iterable[Collection[dns.rrset.RRset]]: 92 1.1 christos """ 93 1.1 christos Answer sections of response packets sent in response to 94 1.1 christos AXFR queries. 95 1.1 christos """ 96 1.1 christos raise NotImplementedError 97 1.1 christos 98 1.1 christos def match(self, qctx: QueryContext) -> bool: 99 1.1 christos return qctx.qtype == dns.rdatatype.AXFR 100 1.1 christos 101 1.1 christos async def get_responses( 102 1.1 christos self, qctx: QueryContext 103 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 104 1.1 christos for answer in self.answers: 105 1.1 christos response = qctx.prepare_new_response() 106 1.1 christos for rrset_ in answer: 107 1.1 christos response.answer.append(rrset_) 108 1.1 christos yield DnsResponseSend(response) 109 1.1 christos 110 1.1 christos 111 1.1 christos class IxfrHandler(ResponseHandler): 112 1.1 christos @property 113 1.1 christos @abc.abstractmethod 114 1.1 christos def answer(self) -> Collection[dns.rrset.RRset]: 115 1.1 christos """ 116 1.1 christos Answer section of a response packet sent in response to 117 1.1 christos IXFR queries. 118 1.1 christos """ 119 1.1 christos raise NotImplementedError 120 1.1 christos 121 1.1 christos def match(self, qctx: QueryContext) -> bool: 122 1.1 christos return qctx.qtype == dns.rdatatype.IXFR 123 1.1 christos 124 1.1 christos async def get_responses( 125 1.1 christos self, qctx: QueryContext 126 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 127 1.1 christos for rrset_ in self.answer: 128 1.1 christos qctx.response.answer.append(rrset_) 129 1.1 christos yield DnsResponseSend(qctx.response) 130 1.1 christos 131 1.1 christos 132 1.1 christos class InitialAfxrHandler(AxfrHandler): 133 1.1 christos answers = ( 134 1.1 christos (soa(1),), 135 1.1 christos ( 136 1.1 christos ns(), 137 1.1 christos txt("initial AXFR"), 138 1.1 christos a("10.0.0.61", owner="a.nil."), 139 1.1 christos a("10.0.0.62", owner="b.nil."), 140 1.1 christos ), 141 1.1 christos (soa(1),), 142 1.1 christos ) 143 1.1 christos 144 1.1 christos 145 1.1 christos class SuccessfulIfxrHandler(IxfrHandler): 146 1.1 christos answer = ( 147 1.1 christos soa(3), 148 1.1 christos soa(1), 149 1.1 christos a("10.0.0.61", owner="a.nil."), 150 1.1 christos txt("initial AXFR"), 151 1.1 christos soa(2), 152 1.1 christos txt("successful IXFR"), 153 1.1 christos a("10.0.1.61", owner="a.nil."), 154 1.1 christos soa(2), 155 1.1 christos soa(3), 156 1.1 christos soa(3), 157 1.1 christos ) 158 1.1 christos 159 1.1 christos 160 1.1 christos class NotExactIxfrHandler(IxfrHandler): 161 1.1 christos answer = ( 162 1.1 christos soa(4), 163 1.1 christos soa(3), 164 1.1 christos txt("delete-nonexistent-txt-record"), 165 1.1 christos soa(4), 166 1.1 christos txt("this-txt-record-would-be-added"), 167 1.1 christos soa(4), 168 1.1 christos ) 169 1.1 christos 170 1.1 christos 171 1.1 christos class FallbackNotExactAxfrHandler(AxfrHandler): 172 1.1 christos answers = ( 173 1.1 christos (soa(3),), 174 1.1 christos ( 175 1.1 christos ns(), 176 1.1 christos txt("fallback AXFR"), 177 1.1 christos ), 178 1.1 christos (soa(3),), 179 1.1 christos ) 180 1.1 christos 181 1.1 christos 182 1.1 christos class TooManyRecordsIxfrHandler(IxfrHandler): 183 1.1 christos answer = ( 184 1.1 christos soa(4), 185 1.1 christos soa(3), 186 1.1 christos soa(4), 187 1.1 christos txt("text 1"), 188 1.1 christos txt("text 2"), 189 1.1 christos txt("text 3"), 190 1.1 christos txt("text 4"), 191 1.1 christos txt("text 5"), 192 1.1 christos txt("text 6: causing too many records"), 193 1.1 christos soa(4), 194 1.1 christos ) 195 1.1 christos 196 1.1 christos 197 1.1 christos class FallbackTooManyRecordsAxfrHandler(AxfrHandler): 198 1.1 christos answers = ( 199 1.1 christos ( 200 1.1 christos soa(3), 201 1.1 christos ns(), 202 1.1 christos txt("fallback AXFR on too many records"), 203 1.1 christos ), 204 1.1 christos (soa(3),), 205 1.1 christos ) 206 1.1 christos 207 1.1 christos 208 1.1 christos class BadSoaOwnerIxfrHandler(IxfrHandler): 209 1.1 christos answer = ( 210 1.1 christos soa(4), 211 1.1 christos soa(3), 212 1.1 christos soa(4, owner="bad-owner."), 213 1.1 christos txt("serial 4, malformed IXFR", owner="test.nil."), 214 1.1 christos soa(4), 215 1.1 christos ) 216 1.1 christos 217 1.1 christos 218 1.1 christos class FallbackBadSoaOwnerAxfrHandler(AxfrHandler): 219 1.1 christos answers = ( 220 1.1 christos (soa(4),), 221 1.1 christos ( 222 1.1 christos ns(), 223 1.1 christos txt("serial 4, fallback AXFR", owner="test.nil."), 224 1.1 christos ), 225 1.1 christos (soa(4),), 226 1.1 christos ) 227 1.1 christos 228 1.1 christos 229 1.1 christos def main() -> None: 230 1.1 christos server = ControllableAsyncDnsServer( 231 1.1 christos default_aa=True, default_rcode=dns.rcode.NOERROR 232 1.1 christos ) 233 1.1 christos switch_command = SwitchControlCommand( 234 1.1 christos { 235 1.1 christos "initial_axfr": ( 236 1.1 christos SoaHandler(1), 237 1.1 christos InitialAfxrHandler(), 238 1.1 christos ), 239 1.1 christos "successful_ixfr": ( 240 1.1 christos SoaHandler(3), 241 1.1 christos SuccessfulIfxrHandler(), 242 1.1 christos ), 243 1.1 christos "not_exact": ( 244 1.1 christos SoaHandler(4), 245 1.1 christos NotExactIxfrHandler(), 246 1.1 christos FallbackNotExactAxfrHandler(), 247 1.1 christos ), 248 1.1 christos "too_many_records": ( 249 1.1 christos SoaHandler(4), 250 1.1 christos TooManyRecordsIxfrHandler(), 251 1.1 christos FallbackTooManyRecordsAxfrHandler(), 252 1.1 christos ), 253 1.1 christos "bad_soa_owner": ( 254 1.1 christos SoaHandler(4), 255 1.1 christos BadSoaOwnerIxfrHandler(), 256 1.1 christos FallbackBadSoaOwnerAxfrHandler(), 257 1.1 christos ), 258 1.1 christos } 259 1.1 christos ) 260 1.1 christos server.install_control_command(switch_command) 261 1.1 christos server.run() 262 1.1 christos 263 1.1 christos 264 1.1 christos if __name__ == "__main__": 265 1.1 christos main() 266