1 1.1 christos # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 2 1.1 christos # 3 1.1 christos # SPDX-License-Identifier: MPL-2.0 4 1.1 christos # 5 1.1 christos # This Source Code Form is subject to the terms of the Mozilla Public 6 1.1 christos # License, v. 2.0. If a copy of the MPL was not distributed with this 7 1.1 christos # file, you can obtain one at https://mozilla.org/MPL/2.0/. 8 1.1 christos # 9 1.1 christos # See the COPYRIGHT file distributed with this work for additional 10 1.1 christos # information regarding copyright ownership. 11 1.1 christos 12 1.1 christos from collections.abc import AsyncGenerator 13 1.1 christos 14 1.1 christos import dns.rcode 15 1.1 christos 16 1.1 christos from isctest.asyncserver import ( 17 1.1 christos AsyncDnsServer, 18 1.1 christos CloseConnection, 19 1.1 christos DnsResponseSend, 20 1.1 christos DomainHandler, 21 1.1 christos IgnoreAllQueries, 22 1.1 christos QueryContext, 23 1.1 christos ResponseAction, 24 1.1 christos ResponseDrop, 25 1.1 christos ) 26 1.1 christos 27 1.1 christos 28 1.1 christos class SilentHandler(DomainHandler, IgnoreAllQueries): 29 1.1 christos """Handler that doesn't respond.""" 30 1.1 christos 31 1.1 christos domains = ["silent.example"] 32 1.1 christos 33 1.1 christos 34 1.1 christos class CloseHandler(DomainHandler): 35 1.1 christos """Handler that doesn't respond and closes TCP connection.""" 36 1.1 christos 37 1.1 christos domains = ["close.example"] 38 1.1 christos 39 1.1 christos async def get_responses( 40 1.1 christos self, qctx: QueryContext 41 1.1 christos ) -> AsyncGenerator[ResponseAction, None]: 42 1.1 christos yield CloseConnection() 43 1.1 christos 44 1.1 christos 45 1.1 christos class SilentThenServfailHandler(DomainHandler): 46 1.1 christos """Handler that drops one query and response to the next one with SERVFAIL.""" 47 1.1 christos 48 1.1 christos domains = ["silent-then-servfail.example"] 49 1.1 christos counter = 0 50 1.1 christos 51 1.1 christos async def get_responses( 52 1.1 christos self, qctx: QueryContext 53 1.1 christos ) -> AsyncGenerator[ResponseAction, None]: 54 1.1 christos if self.counter % 2 == 0: 55 1.1 christos yield ResponseDrop() 56 1.1 christos else: 57 1.1 christos qctx.response.set_rcode(dns.rcode.SERVFAIL) 58 1.1 christos yield DnsResponseSend(qctx.response, authoritative=False) 59 1.1 christos self.counter += 1 60 1.1 christos 61 1.1 christos 62 1.1 christos def main() -> None: 63 1.1 christos server = AsyncDnsServer() 64 1.1 christos server.install_response_handlers( 65 1.1 christos CloseHandler(), 66 1.1 christos SilentHandler(), 67 1.1 christos SilentThenServfailHandler(), 68 1.1 christos ) 69 1.1 christos server.run() 70 1.1 christos 71 1.1 christos 72 1.1 christos if __name__ == "__main__": 73 1.1 christos main() 74