Home | History | Annotate | Line # | Download | only in cookie
      1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      2 #
      3 # SPDX-License-Identifier: MPL-2.0
      4 #
      5 # This Source Code Form is subject to the terms of the Mozilla Public
      6 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      7 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      8 #
      9 # See the COPYRIGHT file distributed with this work for additional
     10 # information regarding copyright ownership.
     11 
     12 from typing import AsyncGenerator
     13 
     14 import dns.edns
     15 import dns.name
     16 import dns.rcode
     17 import dns.rdatatype
     18 import dns.rrset
     19 import dns.tsigkeyring
     20 
     21 from isctest.asyncserver import (
     22     AsyncDnsServer,
     23     ResponseHandler,
     24     DnsResponseSend,
     25     DnsProtocol,
     26     QueryContext,
     27 )
     28 
     29 from isctest.name import prepend_label
     30 from isctest.vars.algorithms import ALG_VARS
     31 
     32 KEYRING = dns.tsigkeyring.from_text(
     33     {
     34         "foo": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"),
     35         "fake": (ALG_VARS["DEFAULT_HMAC"], "aaaaaaaaaaaa"),
     36     }
     37 )
     38 
     39 
     40 def _first_label(qctx: QueryContext) -> str:
     41     return qctx.qname.labels[0].decode("ascii")
     42 
     43 
     44 def _add_cookie(qctx: QueryContext) -> None:
     45     for o in qctx.query.options:
     46         if o.otype == dns.edns.OptionType.COOKIE:
     47             cookie = o
     48             try:
     49                 if len(cookie.server) == 0:
     50                     cookie.server = cookie.client
     51             except AttributeError:  # dnspython<2.7.0 compat
     52                 if len(o.data) == 8:
     53                     cookie.data *= 2
     54 
     55             qctx.response.use_edns(options=[cookie])
     56             return
     57 
     58 
     59 def _tld(qctx: QueryContext) -> dns.name.Name:
     60     return dns.name.Name(qctx.qname.labels[-2:])
     61 
     62 
     63 def _soa(qctx: QueryContext) -> dns.rrset.RRset:
     64     return dns.rrset.from_text(
     65         _tld(qctx), 2, qctx.qclass, dns.rdatatype.SOA, ". . 0 0 0 0 2"
     66     )
     67 
     68 
     69 def _ns_name(qctx: QueryContext) -> dns.name.Name:
     70     return prepend_label("ns", _tld(qctx))
     71 
     72 
     73 def _ns(qctx: QueryContext) -> dns.rrset.RRset:
     74     return dns.rrset.from_text(
     75         qctx.qname,
     76         1,
     77         qctx.qclass,
     78         dns.rdatatype.NS,
     79         _ns_name(qctx).to_text(),
     80     )
     81 
     82 
     83 def _legit_a(qctx: QueryContext) -> dns.rrset.RRset:
     84     return dns.rrset.from_text(qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.9")
     85 
     86 
     87 def _spoofed_a(qctx: QueryContext) -> dns.rrset.RRset:
     88     return dns.rrset.from_text(
     89         qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.10"
     90     )
     91 
     92 
     93 class _SpoofableHandler(ResponseHandler):
     94     def __init__(self, evil_server: bool) -> None:
     95         self.evil_server = evil_server
     96 
     97 
     98 class NsHandler(_SpoofableHandler):
     99     def match(self, qctx: QueryContext) -> bool:
    100         return qctx.qtype == dns.rdatatype.NS and qctx.qname == _tld(qctx)
    101 
    102     async def get_responses(
    103         self, qctx: QueryContext
    104     ) -> AsyncGenerator[DnsResponseSend, None]:
    105         _add_cookie(qctx)
    106         qctx.response.answer.append(_ns(qctx))
    107         if self.evil_server:
    108             qctx.response.authority.append(_spoofed_a(qctx))
    109         else:
    110             qctx.response.authority.append(_legit_a(qctx))
    111         yield DnsResponseSend(qctx.response)
    112 
    113 
    114 class GlueHandler(_SpoofableHandler):
    115     def match(self, qctx: QueryContext) -> bool:
    116         return qctx.qtype == dns.rdatatype.A and qctx.qname == _ns_name(qctx)
    117 
    118     async def get_responses(
    119         self, qctx: QueryContext
    120     ) -> AsyncGenerator[DnsResponseSend, None]:
    121         _add_cookie(qctx)
    122         if self.evil_server:
    123             qctx.response.answer.append(_spoofed_a(qctx))
    124         else:
    125             qctx.response.answer.append(_legit_a(qctx))
    126         yield DnsResponseSend(qctx.response)
    127 
    128 
    129 class TcpAHandler(ResponseHandler):
    130     def match(self, qctx: QueryContext) -> bool:
    131         return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.TCP
    132 
    133     async def get_responses(
    134         self, qctx: QueryContext
    135     ) -> AsyncGenerator[DnsResponseSend, None]:
    136         if _first_label(qctx) != "nocookie":
    137             _add_cookie(qctx)
    138         qctx.response.answer.append(_legit_a(qctx))
    139         yield DnsResponseSend(qctx.response)
    140 
    141 
    142 class WithtsigUdpAHandler(ResponseHandler):
    143     def match(self, qctx: QueryContext) -> bool:
    144         return (
    145             qctx.qtype == dns.rdatatype.A
    146             and qctx.protocol == DnsProtocol.UDP
    147             and _first_label(qctx) == "withtsig"
    148         )
    149 
    150     async def get_responses(
    151         self, qctx: QueryContext
    152     ) -> AsyncGenerator[DnsResponseSend, None]:
    153         qctx.response.answer.append(_legit_a(qctx))
    154         qctx.response.answer.append(_spoofed_a(qctx))
    155         qctx.response.use_tsig(keyring=KEYRING, keyname="fake")
    156         yield DnsResponseSend(qctx.response)
    157 
    158         qctx.prepare_new_response()
    159         _add_cookie(qctx)
    160         qctx.response.answer.append(_legit_a(qctx))
    161         yield DnsResponseSend(qctx.response)
    162 
    163 
    164 class UdpAHandler(ResponseHandler):
    165     def match(self, qctx: QueryContext) -> bool:
    166         return qctx.qtype == dns.rdatatype.A and qctx.protocol == DnsProtocol.UDP
    167 
    168     async def get_responses(
    169         self, qctx: QueryContext
    170     ) -> AsyncGenerator[DnsResponseSend, None]:
    171         qctx.response.answer.append(_legit_a(qctx))
    172         if _first_label(qctx) not in ("nocookie", "tcponly"):
    173             _add_cookie(qctx)
    174         else:
    175             qctx.response.answer.append(_spoofed_a(qctx))
    176 
    177         yield DnsResponseSend(qctx.response)
    178 
    179 
    180 class FallbackHandler(ResponseHandler):
    181     async def get_responses(
    182         self, qctx: QueryContext
    183     ) -> AsyncGenerator[DnsResponseSend, None]:
    184         _add_cookie(qctx)
    185         if qctx.qtype == dns.rdatatype.SOA:
    186             qctx.response.answer.append(_soa(qctx))
    187         else:
    188             qctx.response.authority.append(_soa(qctx))
    189         yield DnsResponseSend(qctx.response)
    190 
    191 
    192 def cookie_server(evil: bool) -> AsyncDnsServer:
    193     server = AsyncDnsServer(
    194         keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR
    195     )
    196     server.install_response_handlers(
    197         [
    198             NsHandler(evil),
    199             GlueHandler(evil),
    200             TcpAHandler(),
    201             WithtsigUdpAHandler(),
    202             UdpAHandler(),
    203             FallbackHandler(),
    204         ]
    205     )
    206     return server
    207