Home | History | Annotate | Line # | Download | only in ans7
      1  1.1  christos # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      2  1.1  christos #
      3  1.1  christos # SPDX-License-Identifier: MPL-2.0
      4  1.1  christos #
      5  1.1  christos # This Source Code Form is subject to the terms of the Mozilla Public
      6  1.1  christos # License, v. 2.0.  If a copy of the MPL was not distributed with this
      7  1.1  christos # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      8  1.1  christos #
      9  1.1  christos # See the COPYRIGHT file distributed with this work for additional
     10  1.1  christos # information regarding copyright ownership.
     11  1.1  christos 
     12  1.1  christos from collections.abc import AsyncGenerator
     13  1.1  christos 
     14  1.1  christos import dns.rcode
     15  1.1  christos 
     16  1.1  christos from isctest.asyncserver import (
     17  1.1  christos     AsyncDnsServer,
     18  1.1  christos     CloseConnection,
     19  1.1  christos     DnsResponseSend,
     20  1.1  christos     DomainHandler,
     21  1.1  christos     IgnoreAllQueries,
     22  1.1  christos     QueryContext,
     23  1.1  christos     ResponseAction,
     24  1.1  christos     ResponseDrop,
     25  1.1  christos )
     26  1.1  christos 
     27  1.1  christos 
     28  1.1  christos class SilentHandler(DomainHandler, IgnoreAllQueries):
     29  1.1  christos     """Handler that doesn't respond."""
     30  1.1  christos 
     31  1.1  christos     domains = ["silent.example"]
     32  1.1  christos 
     33  1.1  christos 
     34  1.1  christos class CloseHandler(DomainHandler):
     35  1.1  christos     """Handler that doesn't respond and closes TCP connection."""
     36  1.1  christos 
     37  1.1  christos     domains = ["close.example"]
     38  1.1  christos 
     39  1.1  christos     async def get_responses(
     40  1.1  christos         self, qctx: QueryContext
     41  1.1  christos     ) -> AsyncGenerator[ResponseAction, None]:
     42  1.1  christos         yield CloseConnection()
     43  1.1  christos 
     44  1.1  christos 
     45  1.1  christos class SilentThenServfailHandler(DomainHandler):
     46  1.1  christos     """Handler that drops one query and response to the next one with SERVFAIL."""
     47  1.1  christos 
     48  1.1  christos     domains = ["silent-then-servfail.example"]
     49  1.1  christos     counter = 0
     50  1.1  christos 
     51  1.1  christos     async def get_responses(
     52  1.1  christos         self, qctx: QueryContext
     53  1.1  christos     ) -> AsyncGenerator[ResponseAction, None]:
     54  1.1  christos         if self.counter % 2 == 0:
     55  1.1  christos             yield ResponseDrop()
     56  1.1  christos         else:
     57  1.1  christos             qctx.response.set_rcode(dns.rcode.SERVFAIL)
     58  1.1  christos             yield DnsResponseSend(qctx.response, authoritative=False)
     59  1.1  christos         self.counter += 1
     60  1.1  christos 
     61  1.1  christos 
     62  1.1  christos def main() -> None:
     63  1.1  christos     server = AsyncDnsServer()
     64  1.1  christos     server.install_response_handlers(
     65  1.1  christos         CloseHandler(),
     66  1.1  christos         SilentHandler(),
     67  1.1  christos         SilentThenServfailHandler(),
     68  1.1  christos     )
     69  1.1  christos     server.run()
     70  1.1  christos 
     71  1.1  christos 
     72  1.1  christos if __name__ == "__main__":
     73  1.1  christos     main()
     74