Home | History | Annotate | Line # | Download | only in ans7
      1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      2 #
      3 # SPDX-License-Identifier: MPL-2.0
      4 #
      5 # This Source Code Form is subject to the terms of the Mozilla Public
      6 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      7 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      8 #
      9 # See the COPYRIGHT file distributed with this work for additional
     10 # information regarding copyright ownership.
     11 
     12 from collections.abc import AsyncGenerator
     13 
     14 import dns.rcode
     15 
     16 from isctest.asyncserver import (
     17     AsyncDnsServer,
     18     CloseConnection,
     19     DnsResponseSend,
     20     DomainHandler,
     21     IgnoreAllQueries,
     22     QueryContext,
     23     ResponseAction,
     24     ResponseDrop,
     25 )
     26 
     27 
     28 class SilentHandler(DomainHandler, IgnoreAllQueries):
     29     """Handler that doesn't respond."""
     30 
     31     domains = ["silent.example"]
     32 
     33 
     34 class CloseHandler(DomainHandler):
     35     """Handler that doesn't respond and closes TCP connection."""
     36 
     37     domains = ["close.example"]
     38 
     39     async def get_responses(
     40         self, qctx: QueryContext
     41     ) -> AsyncGenerator[ResponseAction, None]:
     42         yield CloseConnection()
     43 
     44 
     45 class SilentThenServfailHandler(DomainHandler):
     46     """Handler that drops one query and response to the next one with SERVFAIL."""
     47 
     48     domains = ["silent-then-servfail.example"]
     49     counter = 0
     50 
     51     async def get_responses(
     52         self, qctx: QueryContext
     53     ) -> AsyncGenerator[ResponseAction, None]:
     54         if self.counter % 2 == 0:
     55             yield ResponseDrop()
     56         else:
     57             qctx.response.set_rcode(dns.rcode.SERVFAIL)
     58             yield DnsResponseSend(qctx.response, authoritative=False)
     59         self.counter += 1
     60 
     61 
     62 def main() -> None:
     63     server = AsyncDnsServer()
     64     server.install_response_handlers(
     65         CloseHandler(),
     66         SilentHandler(),
     67         SilentThenServfailHandler(),
     68     )
     69     server.run()
     70 
     71 
     72 if __name__ == "__main__":
     73     main()
     74