1 1.1 christos # -*- coding: utf-8 -*- 2 1.1 christos # $OpenLDAP$ 3 1.1 christos ## This work is part of OpenLDAP Software <http://www.openldap.org/>. 4 1.1 christos ## 5 1.1 christos ## Copyright 2016-2021 Ondej Kuznk, Symas Corp. 6 1.1.1.2 christos ## Copyright 2021-2024 The OpenLDAP Foundation. 7 1.1 christos ## All rights reserved. 8 1.1 christos ## 9 1.1 christos ## Redistribution and use in source and binary forms, with or without 10 1.1 christos ## modification, are permitted only as authorized by the OpenLDAP 11 1.1 christos ## Public License. 12 1.1 christos ## 13 1.1 christos ## A copy of this license is available in the file LICENSE in the 14 1.1 christos ## top-level directory of the distribution or, alternatively, at 15 1.1 christos ## <http://www.OpenLDAP.org/license.html>. 16 1.1 christos 17 1.1 christos from __future__ import print_function 18 1.1 christos 19 1.1 christos import hashlib 20 1.1 christos import hmac 21 1.1 christos import os 22 1.1 christos import struct 23 1.1 christos import sys 24 1.1 christos import time 25 1.1 christos 26 1.1 christos import ldap 27 1.1 christos from ldap.cidict import cidict as CIDict 28 1.1 christos from ldap.ldapobject import LDAPObject 29 1.1 christos 30 1.1 christos if len(sys.argv) > 1 and sys.argv[1] == "--check": 31 1.1 christos raise SystemExit(0) 32 1.1 christos 33 1.1 christos 34 1.1 christos def get_digits(h, digits): 35 1.1 christos offset = h[19] & 15 36 1.1 christos number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff 37 1.1 christos number %= (10 ** digits) 38 1.1 christos return ("%0*d" % (digits, number)).encode() 39 1.1 christos 40 1.1 christos 41 1.1 christos def get_hotp_token(secret, interval_no): 42 1.1 christos msg = struct.pack(">Q", interval_no) 43 1.1 christos h = hmac.new(secret, msg, hashlib.sha1).digest() 44 1.1 christos return get_digits(bytearray(h), 6) 45 1.1 christos 46 1.1 christos 47 1.1 christos def get_interval(period=30): 48 1.1 christos return int(time.time() // period) 49 1.1 christos 50 1.1 christos 51 1.1 christos def get_token_for(connection, dn, typ="totp"): 52 1.1 christos result = connection.search_s(dn, ldap.SCOPE_BASE) 53 1.1 christos dn, attrs = result[0] 54 1.1 christos attrs = CIDict(attrs) 55 1.1 christos 56 1.1 christos tokendn = attrs['oath'+typ+'token'][0].decode() 57 1.1 christos 58 1.1 christos result = connection.search_s(tokendn, ldap.SCOPE_BASE) 59 1.1 christos dn, attrs = result[0] 60 1.1 christos attrs = CIDict(attrs) 61 1.1 christos 62 1.1 christos return dn, attrs 63 1.1 christos 64 1.1 christos 65 1.1 christos def main(): 66 1.1 christos uri = os.environ["URI1"] 67 1.1 christos 68 1.1 christos managerdn = os.environ['MANAGERDN'] 69 1.1 christos passwd = os.environ['PASSWD'] 70 1.1 christos 71 1.1 christos babsdn = os.environ['BABSDN'] 72 1.1 christos babspw = b"bjensen" 73 1.1 christos 74 1.1 christos bjornsdn = os.environ['BJORNSDN'] 75 1.1 christos bjornspw = b"bjorn" 76 1.1 christos 77 1.1 christos connection = LDAPObject(uri) 78 1.1 christos 79 1.1 christos start = time.time() 80 1.1 christos connection.bind_s(managerdn, passwd) 81 1.1 christos end = time.time() 82 1.1 christos 83 1.1 christos if end - start > 1: 84 1.1 christos print("It takes more than a second to connect and bind, " 85 1.1 christos "skipping potentially unstable test", file=sys.stderr) 86 1.1 christos raise SystemExit(0) 87 1.1 christos 88 1.1 christos dn, token_entry = get_token_for(connection, babsdn) 89 1.1 christos 90 1.1 christos paramsdn = token_entry['oathTOTPParams'][0].decode() 91 1.1 christos result = connection.search_s(paramsdn, ldap.SCOPE_BASE) 92 1.1 christos _, attrs = result[0] 93 1.1 christos params = CIDict(attrs) 94 1.1 christos 95 1.1 christos secret = token_entry['oathSecret'][0] 96 1.1 christos period = int(params['oathTOTPTimeStepPeriod'][0].decode()) 97 1.1 christos 98 1.1 christos bind_conn = LDAPObject(uri) 99 1.1 christos 100 1.1 christos interval_no = get_interval(period) 101 1.1 christos token = get_hotp_token(secret, interval_no-3) 102 1.1 christos 103 1.1 christos print("Testing old tokens are not useable") 104 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 105 1.1 christos try: 106 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 107 1.1 christos except ldap.INVALID_CREDENTIALS: 108 1.1 christos pass 109 1.1 christos else: 110 1.1 christos raise SystemExit("Bind with an old token should have failed") 111 1.1 christos 112 1.1 christos interval_no = get_interval(period) 113 1.1 christos token = get_hotp_token(secret, interval_no) 114 1.1 christos 115 1.1 christos print("Testing token can only be used once") 116 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 117 1.1 christos try: 118 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 119 1.1 christos except ldap.INVALID_CREDENTIALS: 120 1.1 christos pass 121 1.1 christos else: 122 1.1 christos raise SystemExit("Bind with a reused token should have failed") 123 1.1 christos 124 1.1 christos token = get_hotp_token(secret, interval_no+1) 125 1.1 christos try: 126 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 127 1.1 christos except ldap.INVALID_CREDENTIALS: 128 1.1 christos raise SystemExit("Bind should have succeeded") 129 1.1 christos 130 1.1 christos dn, token_entry = get_token_for(connection, babsdn) 131 1.1 christos last = int(token_entry['oathTOTPLastTimeStep'][0].decode()) 132 1.1 christos if last != interval_no+1: 133 1.1 christos SystemExit("Unexpected counter value %d (expected %d)" % 134 1.1 christos (last, interval_no+1)) 135 1.1 christos 136 1.1 christos print("Resetting counter and testing secret sharing between accounts") 137 1.1 christos connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])]) 138 1.1 christos 139 1.1 christos interval_no = get_interval(period) 140 1.1 christos token = get_hotp_token(secret, interval_no) 141 1.1 christos 142 1.1 christos try: 143 1.1 christos bind_conn.bind_s(bjornsdn, bjornspw+token) 144 1.1 christos except ldap.INVALID_CREDENTIALS: 145 1.1 christos raise SystemExit("Bind should have succeeded") 146 1.1 christos 147 1.1 christos try: 148 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 149 1.1 christos except ldap.INVALID_CREDENTIALS: 150 1.1 christos pass 151 1.1 christos else: 152 1.1 christos raise SystemExit("Bind with a reused token should have failed") 153 1.1 christos 154 1.1 christos print("Testing token is retired even with a wrong password") 155 1.1 christos connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])]) 156 1.1 christos 157 1.1 christos interval_no = get_interval(period) 158 1.1 christos token = get_hotp_token(secret, interval_no) 159 1.1 christos 160 1.1 christos try: 161 1.1 christos bind_conn.bind_s(babsdn, b"not the password"+token) 162 1.1 christos except ldap.INVALID_CREDENTIALS: 163 1.1 christos pass 164 1.1 christos else: 165 1.1 christos raise SystemExit("Bind with an incorrect password should have failed") 166 1.1 christos 167 1.1 christos try: 168 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 169 1.1 christos except ldap.INVALID_CREDENTIALS: 170 1.1 christos pass 171 1.1 christos else: 172 1.1 christos raise SystemExit("Bind with a reused token should have failed") 173 1.1 christos 174 1.1 christos token = get_hotp_token(secret, interval_no+1) 175 1.1 christos try: 176 1.1 christos bind_conn.bind_s(babsdn, babspw+token) 177 1.1 christos except ldap.INVALID_CREDENTIALS: 178 1.1 christos raise SystemExit("Bind should have succeeded") 179 1.1 christos 180 1.1 christos 181 1.1 christos if __name__ == "__main__": 182 1.1 christos sys.exit(main()) 183