1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 2 # 3 # SPDX-License-Identifier: MPL-2.0 4 # 5 # This Source Code Form is subject to the terms of the Mozilla Public 6 # License, v. 2.0. If a copy of the MPL was not distributed with this 7 # file, you can obtain one at https://mozilla.org/MPL/2.0/. 8 # 9 # See the COPYRIGHT file distributed with this work for additional 10 # information regarding copyright ownership. 11 12 from typing import AsyncGenerator 13 14 import dns.edns 15 import dns.name 16 import dns.rcode 17 import dns.rdatatype 18 import dns.rrset 19 import dns.tsigkeyring 20 21 from isctest.asyncserver import ( 22 AsyncDnsServer, 23 ResponseHandler, 24 DnsResponseSend, 25 DnsProtocol, 26 QueryContext, 27 ) 28 29 from isctest.name import prepend_label 30 from isctest.vars.algorithms import ALG_VARS 31 32 KEYRING = dns.tsigkeyring.from_text( 33 { 34 "foo": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"), 35 "fake": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"), 36 } 37 ) 38 39 40 def _first_label(qctx: QueryContext) -> str: 41 return qctx.qname.labels[0].decode("ascii") 42 43 44 def _add_cookie(qctx: QueryContext) -> None: 45 for o in qctx.query.options: 46 if o.otype == dns.edns.OptionType.COOKIE: 47 cookie = o 48 try: 49 if len(cookie.server) == 0: 50 cookie.server = cookie.client 51 except AttributeError: # dnspython<2.7.0 compat 52 if len(o.data) == 8: 53 cookie.data *= 2 54 55 qctx.response.use_edns(options=[cookie]) 56 return 57 58 59 def _tld(qctx: QueryContext) -> dns.name.Name: 60 return dns.name.Name(qctx.qname.labels[-2:]) 61 62 63 def _soa(qctx: QueryContext) -> dns.rrset.RRset: 64 return dns.rrset.from_text( 65 _tld(qctx), 2, qctx.qclass, dns.rdatatype.SOA, ". . 0 0 0 0 2" 66 ) 67 68 69 def _ns_name(qctx: QueryContext) -> dns.name.Name: 70 return prepend_label("ns", _tld(qctx)) 71 72 73 def _ns(qctx: QueryContext) -> dns.rrset.RRset: 74 return dns.rrset.from_text( 75 qctx.qname, 76 1, 77 qctx.qclass, 78 dns.rdatatype.NS, 79 _ns_name(qctx).to_text(), 80 ) 81 82 83 def _legit_a(qctx: QueryContext) -> dns.rrset.RRset: 84 return dns.rrset.from_text(qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.9") 85 86 87 def _spoofed_a(qctx: QueryContext) -> dns.rrset.RRset: 88 return dns.rrset.from_text( 89 qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.10" 90 ) 91 92 93 class _SpoofableHandler(ResponseHandler): 94 def __init__(self, evil_server: bool) -> None: 95 self.evil_server = evil_server 96 97 98 class NsHandler(_SpoofableHandler): 99 def match(self, qctx: QueryContext) -> bool: 100 return qctx.qtype == dns.rdatatype.NS and qctx.qname == _tld(qctx) 101 102 async def get_responses( 103 self, qctx: QueryContext 104 ) -> AsyncGenerator[DnsResponseSend, None]: 105 _add_cookie(qctx) 106 qctx.response.answer.append(_ns(qctx)) 107 if self.evil_server: 108 qctx.response.authority.append(_spoofed_a(qctx)) 109 else: 110 qctx.response.authority.append(_legit_a(qctx)) 111 yield DnsResponseSend(qctx.response) 112 113 114 class GlueHandler(_SpoofableHandler): 115 def match(self, qctx: QueryContext) -> bool: 116 return qctx.qtype == dns.rdatatype.A and qctx.qname == _ns_name(qctx) 117 118 async def get_responses( 119 self, qctx: QueryContext 120 ) -> AsyncGenerator[DnsResponseSend, None]: 121 _add_cookie(qctx) 122 if self.evil_server: 123 qctx.response.answer.append(_spoofed_a(qctx)) 124 else: 125 qctx.response.answer.append(_legit_a(qctx)) 126 yield DnsResponseSend(qctx.response) 127 128 129 class TcpAHandler(ResponseHandler): 130 def match(self, qctx: QueryContext) -> bool: 131 return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.TCP 132 133 async def get_responses( 134 self, qctx: QueryContext 135 ) -> AsyncGenerator[DnsResponseSend, None]: 136 if _first_label(qctx) != "nocookie": 137 _add_cookie(qctx) 138 qctx.response.answer.append(_legit_a(qctx)) 139 yield DnsResponseSend(qctx.response) 140 141 142 class WithtsigUdpAHandler(ResponseHandler): 143 def match(self, qctx: QueryContext) -> bool: 144 return ( 145 qctx.qtype == dns.rdatatype.A 146 and qctx.protocol == DnsProtocol.UDP 147 and _first_label(qctx) == "withtsig" 148 ) 149 150 async def get_responses( 151 self, qctx: QueryContext 152 ) -> AsyncGenerator[DnsResponseSend, None]: 153 qctx.response.answer.append(_legit_a(qctx)) 154 qctx.response.answer.append(_spoofed_a(qctx)) 155 qctx.response.use_tsig(keyring=KEYRING, keyname="fake") 156 yield DnsResponseSend(qctx.response) 157 158 qctx.prepare_new_response() 159 _add_cookie(qctx) 160 qctx.response.answer.append(_legit_a(qctx)) 161 yield DnsResponseSend(qctx.response) 162 163 164 class UdpAHandler(ResponseHandler): 165 def match(self, qctx: QueryContext) -> bool: 166 return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.UDP 167 168 async def get_responses( 169 self, qctx: QueryContext 170 ) -> AsyncGenerator[DnsResponseSend, None]: 171 qctx.response.answer.append(_legit_a(qctx)) 172 if _first_label(qctx) not in ("nocookie", "tcponly"): 173 _add_cookie(qctx) 174 else: 175 qctx.response.answer.append(_spoofed_a(qctx)) 176 177 yield DnsResponseSend(qctx.response) 178 179 180 class FallbackHandler(ResponseHandler): 181 async def get_responses( 182 self, qctx: QueryContext 183 ) -> AsyncGenerator[DnsResponseSend, None]: 184 _add_cookie(qctx) 185 if qctx.qtype == dns.rdatatype.SOA: 186 qctx.response.answer.append(_soa(qctx)) 187 else: 188 qctx.response.authority.append(_soa(qctx)) 189 yield DnsResponseSend(qctx.response) 190 191 192 def cookie_server(evil: bool) -> AsyncDnsServer: 193 server = AsyncDnsServer( 194 keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR 195 ) 196 server.install_response_handlers( 197 [ 198 NsHandler(evil), 199 GlueHandler(evil), 200 TcpAHandler(), 201 WithtsigUdpAHandler(), 202 UdpAHandler(), 203 FallbackHandler(), 204 ] 205 ) 206 return server 207