Home | History | Annotate | Line # | Download | only in ans2
      1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      2 #
      3 # SPDX-License-Identifier: MPL-2.0
      4 #
      5 # This Source Code Form is subject to the terms of the Mozilla Public
      6 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      7 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      8 #
      9 # See the COPYRIGHT file distributed with this work for additional
     10 # information regarding copyright ownership.
     11 
     12 """
     13 Authoritative server that simulates Kaminsky-style off-path spoofing on UDP:
     14 for every UDP query for trigger.example./A it sends one response with a
     15 deliberately flipped DNS message id.  A resolver that escalates to TCP on
     16 the first id mismatch will still get the correct answer over TCP, which
     17 this server serves normally.
     18 """
     19 
     20 from collections.abc import AsyncGenerator
     21 
     22 import dns.name
     23 import dns.rdatatype
     24 
     25 from isctest.asyncserver import (
     26     AsyncDnsServer,
     27     DnsProtocol,
     28     DnsResponseSend,
     29     QueryContext,
     30     ResponseAction,
     31     ResponseHandler,
     32 )
     33 
     34 
     35 class MismatchOnUdpHandler(ResponseHandler):
     36     """
     37     Spoof UDP queries for trigger.example./A with a properly-formed
     38     response whose DNS message id does not match the request.  Answer
     39     the same query normally on TCP using the zone data prepared by the
     40     framework.
     41     """
     42 
     43     def __init__(self) -> None:
     44         self._trigger = dns.name.from_text("trigger.example.")
     45 
     46     def match(self, qctx: QueryContext) -> bool:
     47         return qctx.qname == self._trigger and qctx.qtype == dns.rdatatype.A
     48 
     49     async def get_responses(
     50         self, qctx: QueryContext
     51     ) -> AsyncGenerator[ResponseAction, None]:
     52         if qctx.protocol == DnsProtocol.UDP:
     53             qctx.response.id = qctx.query.id ^ 0xFFFF
     54             yield DnsResponseSend(qctx.response)
     55         else:
     56             yield DnsResponseSend(qctx.response)
     57 
     58 
     59 def main() -> None:
     60     server = AsyncDnsServer()
     61     server.install_response_handler(MismatchOnUdpHandler())
     62     server.run()
     63 
     64 
     65 if __name__ == "__main__":
     66     main()
     67