Home | History | Annotate | Line # | Download | only in cookie
      1  1.1  christos # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      2  1.1  christos #
      3  1.1  christos # SPDX-License-Identifier: MPL-2.0
      4  1.1  christos #
      5  1.1  christos # This Source Code Form is subject to the terms of the Mozilla Public
      6  1.1  christos # License, v. 2.0.  If a copy of the MPL was not distributed with this
      7  1.1  christos # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      8  1.1  christos #
      9  1.1  christos # See the COPYRIGHT file distributed with this work for additional
     10  1.1  christos # information regarding copyright ownership.
     11  1.1  christos 
     12  1.1  christos from typing import AsyncGenerator
     13  1.1  christos 
     14  1.1  christos import dns.edns
     15  1.1  christos import dns.name
     16  1.1  christos import dns.rcode
     17  1.1  christos import dns.rdatatype
     18  1.1  christos import dns.rrset
     19  1.1  christos import dns.tsigkeyring
     20  1.1  christos 
     21  1.1  christos from isctest.asyncserver import (
     22  1.1  christos     AsyncDnsServer,
     23  1.1  christos     ResponseHandler,
     24  1.1  christos     DnsResponseSend,
     25  1.1  christos     DnsProtocol,
     26  1.1  christos     QueryContext,
     27  1.1  christos )
     28  1.1  christos 
     29  1.1  christos from isctest.name import prepend_label
     30  1.1  christos from isctest.vars.algorithms import ALG_VARS
     31  1.1  christos 
     32  1.1  christos KEYRING = dns.tsigkeyring.from_text(
     33  1.1  christos     {
     34  1.1  christos         "foo": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"),
     35  1.1  christos         "fake": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"),
     36  1.1  christos     }
     37  1.1  christos )
     38  1.1  christos 
     39  1.1  christos 
     40  1.1  christos def _first_label(qctx: QueryContext) -> str:
     41  1.1  christos     return qctx.qname.labels[0].decode("ascii")
     42  1.1  christos 
     43  1.1  christos 
     44  1.1  christos def _add_cookie(qctx: QueryContext) -> None:
     45  1.1  christos     for o in qctx.query.options:
     46  1.1  christos         if o.otype == dns.edns.OptionType.COOKIE:
     47  1.1  christos             cookie = o
     48  1.1  christos             try:
     49  1.1  christos                 if len(cookie.server) == 0:
     50  1.1  christos                     cookie.server = cookie.client
     51  1.1  christos             except AttributeError:  # dnspython<2.7.0 compat
     52  1.1  christos                 if len(o.data) == 8:
     53  1.1  christos                     cookie.data *= 2
     54  1.1  christos 
     55  1.1  christos             qctx.response.use_edns(options=[cookie])
     56  1.1  christos             return
     57  1.1  christos 
     58  1.1  christos 
     59  1.1  christos def _tld(qctx: QueryContext) -> dns.name.Name:
     60  1.1  christos     return dns.name.Name(qctx.qname.labels[-2:])
     61  1.1  christos 
     62  1.1  christos 
     63  1.1  christos def _soa(qctx: QueryContext) -> dns.rrset.RRset:
     64  1.1  christos     return dns.rrset.from_text(
     65  1.1  christos         _tld(qctx), 2, qctx.qclass, dns.rdatatype.SOA, ". . 0 0 0 0 2"
     66  1.1  christos     )
     67  1.1  christos 
     68  1.1  christos 
     69  1.1  christos def _ns_name(qctx: QueryContext) -> dns.name.Name:
     70  1.1  christos     return prepend_label("ns", _tld(qctx))
     71  1.1  christos 
     72  1.1  christos 
     73  1.1  christos def _ns(qctx: QueryContext) -> dns.rrset.RRset:
     74  1.1  christos     return dns.rrset.from_text(
     75  1.1  christos         qctx.qname,
     76  1.1  christos         1,
     77  1.1  christos         qctx.qclass,
     78  1.1  christos         dns.rdatatype.NS,
     79  1.1  christos         _ns_name(qctx).to_text(),
     80  1.1  christos     )
     81  1.1  christos 
     82  1.1  christos 
     83  1.1  christos def _legit_a(qctx: QueryContext) -> dns.rrset.RRset:
     84  1.1  christos     return dns.rrset.from_text(qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.9")
     85  1.1  christos 
     86  1.1  christos 
     87  1.1  christos def _spoofed_a(qctx: QueryContext) -> dns.rrset.RRset:
     88  1.1  christos     return dns.rrset.from_text(
     89  1.1  christos         qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.10"
     90  1.1  christos     )
     91  1.1  christos 
     92  1.1  christos 
     93  1.1  christos class _SpoofableHandler(ResponseHandler):
     94  1.1  christos     def __init__(self, evil_server: bool) -> None:
     95  1.1  christos         self.evil_server = evil_server
     96  1.1  christos 
     97  1.1  christos 
     98  1.1  christos class NsHandler(_SpoofableHandler):
     99  1.1  christos     def match(self, qctx: QueryContext) -> bool:
    100  1.1  christos         return qctx.qtype == dns.rdatatype.NS and qctx.qname == _tld(qctx)
    101  1.1  christos 
    102  1.1  christos     async def get_responses(
    103  1.1  christos         self, qctx: QueryContext
    104  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    105  1.1  christos         _add_cookie(qctx)
    106  1.1  christos         qctx.response.answer.append(_ns(qctx))
    107  1.1  christos         if self.evil_server:
    108  1.1  christos             qctx.response.authority.append(_spoofed_a(qctx))
    109  1.1  christos         else:
    110  1.1  christos             qctx.response.authority.append(_legit_a(qctx))
    111  1.1  christos         yield DnsResponseSend(qctx.response)
    112  1.1  christos 
    113  1.1  christos 
    114  1.1  christos class GlueHandler(_SpoofableHandler):
    115  1.1  christos     def match(self, qctx: QueryContext) -> bool:
    116  1.1  christos         return qctx.qtype == dns.rdatatype.A and qctx.qname == _ns_name(qctx)
    117  1.1  christos 
    118  1.1  christos     async def get_responses(
    119  1.1  christos         self, qctx: QueryContext
    120  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    121  1.1  christos         _add_cookie(qctx)
    122  1.1  christos         if self.evil_server:
    123  1.1  christos             qctx.response.answer.append(_spoofed_a(qctx))
    124  1.1  christos         else:
    125  1.1  christos             qctx.response.answer.append(_legit_a(qctx))
    126  1.1  christos         yield DnsResponseSend(qctx.response)
    127  1.1  christos 
    128  1.1  christos 
    129  1.1  christos class TcpAHandler(ResponseHandler):
    130  1.1  christos     def match(self, qctx: QueryContext) -> bool:
    131  1.1  christos         return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.TCP
    132  1.1  christos 
    133  1.1  christos     async def get_responses(
    134  1.1  christos         self, qctx: QueryContext
    135  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    136  1.1  christos         if _first_label(qctx) != "nocookie":
    137  1.1  christos             _add_cookie(qctx)
    138  1.1  christos         qctx.response.answer.append(_legit_a(qctx))
    139  1.1  christos         yield DnsResponseSend(qctx.response)
    140  1.1  christos 
    141  1.1  christos 
    142  1.1  christos class WithtsigUdpAHandler(ResponseHandler):
    143  1.1  christos     def match(self, qctx: QueryContext) -> bool:
    144  1.1  christos         return (
    145  1.1  christos             qctx.qtype == dns.rdatatype.A
    146  1.1  christos             and qctx.protocol == DnsProtocol.UDP
    147  1.1  christos             and _first_label(qctx) == "withtsig"
    148  1.1  christos         )
    149  1.1  christos 
    150  1.1  christos     async def get_responses(
    151  1.1  christos         self, qctx: QueryContext
    152  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    153  1.1  christos         qctx.response.answer.append(_legit_a(qctx))
    154  1.1  christos         qctx.response.answer.append(_spoofed_a(qctx))
    155  1.1  christos         qctx.response.use_tsig(keyring=KEYRING, keyname="fake")
    156  1.1  christos         yield DnsResponseSend(qctx.response)
    157  1.1  christos 
    158  1.1  christos         qctx.prepare_new_response()
    159  1.1  christos         _add_cookie(qctx)
    160  1.1  christos         qctx.response.answer.append(_legit_a(qctx))
    161  1.1  christos         yield DnsResponseSend(qctx.response)
    162  1.1  christos 
    163  1.1  christos 
    164  1.1  christos class UdpAHandler(ResponseHandler):
    165  1.1  christos     def match(self, qctx: QueryContext) -> bool:
    166  1.1  christos         return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.UDP
    167  1.1  christos 
    168  1.1  christos     async def get_responses(
    169  1.1  christos         self, qctx: QueryContext
    170  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    171  1.1  christos         qctx.response.answer.append(_legit_a(qctx))
    172  1.1  christos         if _first_label(qctx) not in ("nocookie", "tcponly"):
    173  1.1  christos             _add_cookie(qctx)
    174  1.1  christos         else:
    175  1.1  christos             qctx.response.answer.append(_spoofed_a(qctx))
    176  1.1  christos 
    177  1.1  christos         yield DnsResponseSend(qctx.response)
    178  1.1  christos 
    179  1.1  christos 
    180  1.1  christos class FallbackHandler(ResponseHandler):
    181  1.1  christos     async def get_responses(
    182  1.1  christos         self, qctx: QueryContext
    183  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    184  1.1  christos         _add_cookie(qctx)
    185  1.1  christos         if qctx.qtype == dns.rdatatype.SOA:
    186  1.1  christos             qctx.response.answer.append(_soa(qctx))
    187  1.1  christos         else:
    188  1.1  christos             qctx.response.authority.append(_soa(qctx))
    189  1.1  christos         yield DnsResponseSend(qctx.response)
    190  1.1  christos 
    191  1.1  christos 
    192  1.1  christos def cookie_server(evil: bool) -> AsyncDnsServer:
    193  1.1  christos     server = AsyncDnsServer(
    194  1.1  christos         keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR
    195  1.1  christos     )
    196  1.1  christos     server.install_response_handlers(
    197  1.1  christos         [
    198  1.1  christos             NsHandler(evil),
    199  1.1  christos             GlueHandler(evil),
    200  1.1  christos             TcpAHandler(),
    201  1.1  christos             WithtsigUdpAHandler(),
    202  1.1  christos             UdpAHandler(),
    203  1.1  christos             FallbackHandler(),
    204  1.1  christos         ]
    205  1.1  christos     )
    206  1.1  christos     return server
    207