1 1.1 christos # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 2 1.1 christos # 3 1.1 christos # SPDX-License-Identifier: MPL-2.0 4 1.1 christos # 5 1.1 christos # This Source Code Form is subject to the terms of the Mozilla Public 6 1.1 christos # License, v. 2.0. If a copy of the MPL was not distributed with this 7 1.1 christos # file, you can obtain one at https://mozilla.org/MPL/2.0/. 8 1.1 christos # 9 1.1 christos # See the COPYRIGHT file distributed with this work for additional 10 1.1 christos # information regarding copyright ownership. 11 1.1 christos 12 1.1 christos from typing import AsyncGenerator 13 1.1 christos 14 1.1 christos import dns.edns 15 1.1 christos import dns.name 16 1.1 christos import dns.rcode 17 1.1 christos import dns.rdatatype 18 1.1 christos import dns.rrset 19 1.1 christos import dns.tsigkeyring 20 1.1 christos 21 1.1 christos from isctest.asyncserver import ( 22 1.1 christos AsyncDnsServer, 23 1.1 christos ResponseHandler, 24 1.1 christos DnsResponseSend, 25 1.1 christos DnsProtocol, 26 1.1 christos QueryContext, 27 1.1 christos ) 28 1.1 christos 29 1.1 christos from isctest.name import prepend_label 30 1.1 christos from isctest.vars.algorithms import ALG_VARS 31 1.1 christos 32 1.1 christos KEYRING = dns.tsigkeyring.from_text( 33 1.1 christos { 34 1.1 christos "foo": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"), 35 1.1 christos "fake": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"), 36 1.1 christos } 37 1.1 christos ) 38 1.1 christos 39 1.1 christos 40 1.1 christos def _first_label(qctx: QueryContext) -> str: 41 1.1 christos return qctx.qname.labels[0].decode("ascii") 42 1.1 christos 43 1.1 christos 44 1.1 christos def _add_cookie(qctx: QueryContext) -> None: 45 1.1 christos for o in qctx.query.options: 46 1.1 christos if o.otype == dns.edns.OptionType.COOKIE: 47 1.1 christos cookie = o 48 1.1 christos try: 49 1.1 christos if len(cookie.server) == 0: 50 1.1 christos cookie.server = cookie.client 51 1.1 christos except AttributeError: # dnspython<2.7.0 compat 52 1.1 christos if len(o.data) == 8: 53 1.1 christos cookie.data *= 2 54 1.1 christos 55 1.1 christos qctx.response.use_edns(options=[cookie]) 56 1.1 christos return 57 1.1 christos 58 1.1 christos 59 1.1 christos def _tld(qctx: QueryContext) -> dns.name.Name: 60 1.1 christos return dns.name.Name(qctx.qname.labels[-2:]) 61 1.1 christos 62 1.1 christos 63 1.1 christos def _soa(qctx: QueryContext) -> dns.rrset.RRset: 64 1.1 christos return dns.rrset.from_text( 65 1.1 christos _tld(qctx), 2, qctx.qclass, dns.rdatatype.SOA, ". . 0 0 0 0 2" 66 1.1 christos ) 67 1.1 christos 68 1.1 christos 69 1.1 christos def _ns_name(qctx: QueryContext) -> dns.name.Name: 70 1.1 christos return prepend_label("ns", _tld(qctx)) 71 1.1 christos 72 1.1 christos 73 1.1 christos def _ns(qctx: QueryContext) -> dns.rrset.RRset: 74 1.1 christos return dns.rrset.from_text( 75 1.1 christos qctx.qname, 76 1.1 christos 1, 77 1.1 christos qctx.qclass, 78 1.1 christos dns.rdatatype.NS, 79 1.1 christos _ns_name(qctx).to_text(), 80 1.1 christos ) 81 1.1 christos 82 1.1 christos 83 1.1 christos def _legit_a(qctx: QueryContext) -> dns.rrset.RRset: 84 1.1 christos return dns.rrset.from_text(qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.9") 85 1.1 christos 86 1.1 christos 87 1.1 christos def _spoofed_a(qctx: QueryContext) -> dns.rrset.RRset: 88 1.1 christos return dns.rrset.from_text( 89 1.1 christos qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.10" 90 1.1 christos ) 91 1.1 christos 92 1.1 christos 93 1.1 christos class _SpoofableHandler(ResponseHandler): 94 1.1 christos def __init__(self, evil_server: bool) -> None: 95 1.1 christos self.evil_server = evil_server 96 1.1 christos 97 1.1 christos 98 1.1 christos class NsHandler(_SpoofableHandler): 99 1.1 christos def match(self, qctx: QueryContext) -> bool: 100 1.1 christos return qctx.qtype == dns.rdatatype.NS and qctx.qname == _tld(qctx) 101 1.1 christos 102 1.1 christos async def get_responses( 103 1.1 christos self, qctx: QueryContext 104 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 105 1.1 christos _add_cookie(qctx) 106 1.1 christos qctx.response.answer.append(_ns(qctx)) 107 1.1 christos if self.evil_server: 108 1.1 christos qctx.response.authority.append(_spoofed_a(qctx)) 109 1.1 christos else: 110 1.1 christos qctx.response.authority.append(_legit_a(qctx)) 111 1.1 christos yield DnsResponseSend(qctx.response) 112 1.1 christos 113 1.1 christos 114 1.1 christos class GlueHandler(_SpoofableHandler): 115 1.1 christos def match(self, qctx: QueryContext) -> bool: 116 1.1 christos return qctx.qtype == dns.rdatatype.A and qctx.qname == _ns_name(qctx) 117 1.1 christos 118 1.1 christos async def get_responses( 119 1.1 christos self, qctx: QueryContext 120 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 121 1.1 christos _add_cookie(qctx) 122 1.1 christos if self.evil_server: 123 1.1 christos qctx.response.answer.append(_spoofed_a(qctx)) 124 1.1 christos else: 125 1.1 christos qctx.response.answer.append(_legit_a(qctx)) 126 1.1 christos yield DnsResponseSend(qctx.response) 127 1.1 christos 128 1.1 christos 129 1.1 christos class TcpAHandler(ResponseHandler): 130 1.1 christos def match(self, qctx: QueryContext) -> bool: 131 1.1 christos return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.TCP 132 1.1 christos 133 1.1 christos async def get_responses( 134 1.1 christos self, qctx: QueryContext 135 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 136 1.1 christos if _first_label(qctx) != "nocookie": 137 1.1 christos _add_cookie(qctx) 138 1.1 christos qctx.response.answer.append(_legit_a(qctx)) 139 1.1 christos yield DnsResponseSend(qctx.response) 140 1.1 christos 141 1.1 christos 142 1.1 christos class WithtsigUdpAHandler(ResponseHandler): 143 1.1 christos def match(self, qctx: QueryContext) -> bool: 144 1.1 christos return ( 145 1.1 christos qctx.qtype == dns.rdatatype.A 146 1.1 christos and qctx.protocol == DnsProtocol.UDP 147 1.1 christos and _first_label(qctx) == "withtsig" 148 1.1 christos ) 149 1.1 christos 150 1.1 christos async def get_responses( 151 1.1 christos self, qctx: QueryContext 152 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 153 1.1 christos qctx.response.answer.append(_legit_a(qctx)) 154 1.1 christos qctx.response.answer.append(_spoofed_a(qctx)) 155 1.1 christos qctx.response.use_tsig(keyring=KEYRING, keyname="fake") 156 1.1 christos yield DnsResponseSend(qctx.response) 157 1.1 christos 158 1.1 christos qctx.prepare_new_response() 159 1.1 christos _add_cookie(qctx) 160 1.1 christos qctx.response.answer.append(_legit_a(qctx)) 161 1.1 christos yield DnsResponseSend(qctx.response) 162 1.1 christos 163 1.1 christos 164 1.1 christos class UdpAHandler(ResponseHandler): 165 1.1 christos def match(self, qctx: QueryContext) -> bool: 166 1.1 christos return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.UDP 167 1.1 christos 168 1.1 christos async def get_responses( 169 1.1 christos self, qctx: QueryContext 170 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 171 1.1 christos qctx.response.answer.append(_legit_a(qctx)) 172 1.1 christos if _first_label(qctx) not in ("nocookie", "tcponly"): 173 1.1 christos _add_cookie(qctx) 174 1.1 christos else: 175 1.1 christos qctx.response.answer.append(_spoofed_a(qctx)) 176 1.1 christos 177 1.1 christos yield DnsResponseSend(qctx.response) 178 1.1 christos 179 1.1 christos 180 1.1 christos class FallbackHandler(ResponseHandler): 181 1.1 christos async def get_responses( 182 1.1 christos self, qctx: QueryContext 183 1.1 christos ) -> AsyncGenerator[DnsResponseSend, None]: 184 1.1 christos _add_cookie(qctx) 185 1.1 christos if qctx.qtype == dns.rdatatype.SOA: 186 1.1 christos qctx.response.answer.append(_soa(qctx)) 187 1.1 christos else: 188 1.1 christos qctx.response.authority.append(_soa(qctx)) 189 1.1 christos yield DnsResponseSend(qctx.response) 190 1.1 christos 191 1.1 christos 192 1.1 christos def cookie_server(evil: bool) -> AsyncDnsServer: 193 1.1 christos server = AsyncDnsServer( 194 1.1 christos keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR 195 1.1 christos ) 196 1.1 christos server.install_response_handlers( 197 1.1 christos [ 198 1.1 christos NsHandler(evil), 199 1.1 christos GlueHandler(evil), 200 1.1 christos TcpAHandler(), 201 1.1 christos WithtsigUdpAHandler(), 202 1.1 christos UdpAHandler(), 203 1.1 christos FallbackHandler(), 204 1.1 christos ] 205 1.1 christos ) 206 1.1 christos return server 207