1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 2 # 3 # SPDX-License-Identifier: MPL-2.0 4 # 5 # This Source Code Form is subject to the terms of the Mozilla Public 6 # License, v. 2.0. If a copy of the MPL was not distributed with this 7 # file, you can obtain one at https://mozilla.org/MPL/2.0/. 8 # 9 # See the COPYRIGHT file distributed with this work for additional 10 # information regarding copyright ownership. 11 12 """ 13 Authoritative server that simulates Kaminsky-style off-path spoofing on UDP: 14 for every UDP query for trigger.example./A it sends one response with a 15 deliberately flipped DNS message id. A resolver that escalates to TCP on 16 the first id mismatch will still get the correct answer over TCP, which 17 this server serves normally. 18 """ 19 20 from collections.abc import AsyncGenerator 21 22 import dns.name 23 import dns.rdatatype 24 25 from isctest.asyncserver import ( 26 AsyncDnsServer, 27 DnsProtocol, 28 DnsResponseSend, 29 QueryContext, 30 ResponseAction, 31 ResponseHandler, 32 ) 33 34 35 class MismatchOnUdpHandler(ResponseHandler): 36 """ 37 Spoof UDP queries for trigger.example./A with a properly-formed 38 response whose DNS message id does not match the request. Answer 39 the same query normally on TCP using the zone data prepared by the 40 framework. 41 """ 42 43 def __init__(self) -> None: 44 self._trigger = dns.name.from_text("trigger.example.") 45 46 def match(self, qctx: QueryContext) -> bool: 47 return qctx.qname == self._trigger and qctx.qtype == dns.rdatatype.A 48 49 async def get_responses( 50 self, qctx: QueryContext 51 ) -> AsyncGenerator[ResponseAction, None]: 52 if qctx.protocol == DnsProtocol.UDP: 53 qctx.response.id = qctx.query.id ^ 0xFFFF 54 yield DnsResponseSend(qctx.response) 55 else: 56 yield DnsResponseSend(qctx.response) 57 58 59 def main() -> None: 60 server = AsyncDnsServer() 61 server.install_response_handler(MismatchOnUdpHandler()) 62 server.run() 63 64 65 if __name__ == "__main__": 66 main() 67