1 # -*- coding: utf-8 -*- 2 # $OpenLDAP$ 3 ## This work is part of OpenLDAP Software <http://www.openldap.org/>. 4 ## 5 ## Copyright 2016-2021 Ondej Kuznk, Symas Corp. 6 ## Copyright 2021-2024 The OpenLDAP Foundation. 7 ## All rights reserved. 8 ## 9 ## Redistribution and use in source and binary forms, with or without 10 ## modification, are permitted only as authorized by the OpenLDAP 11 ## Public License. 12 ## 13 ## A copy of this license is available in the file LICENSE in the 14 ## top-level directory of the distribution or, alternatively, at 15 ## <http://www.OpenLDAP.org/license.html>. 16 17 from __future__ import print_function 18 19 import hashlib 20 import hmac 21 import os 22 import struct 23 import sys 24 import time 25 26 import ldap 27 from ldap.cidict import cidict as CIDict 28 from ldap.ldapobject import LDAPObject 29 30 if len(sys.argv) > 1 and sys.argv[1] == "--check": 31 raise SystemExit(0) 32 33 34 def get_digits(h, digits): 35 offset = h[19] & 15 36 number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff 37 number %= (10 ** digits) 38 return ("%0*d" % (digits, number)).encode() 39 40 41 def get_hotp_token(secret, interval_no): 42 msg = struct.pack(">Q", interval_no) 43 h = hmac.new(secret, msg, hashlib.sha1).digest() 44 return get_digits(bytearray(h), 6) 45 46 47 def get_interval(period=30): 48 return int(time.time() // period) 49 50 51 def get_token_for(connection, dn, typ="totp"): 52 result = connection.search_s(dn, ldap.SCOPE_BASE) 53 dn, attrs = result[0] 54 attrs = CIDict(attrs) 55 56 tokendn = attrs['oath'+typ+'token'][0].decode() 57 58 result = connection.search_s(tokendn, ldap.SCOPE_BASE) 59 dn, attrs = result[0] 60 attrs = CIDict(attrs) 61 62 return dn, attrs 63 64 65 def main(): 66 uri = os.environ["URI1"] 67 68 managerdn = os.environ['MANAGERDN'] 69 passwd = os.environ['PASSWD'] 70 71 babsdn = os.environ['BABSDN'] 72 babspw = b"bjensen" 73 74 bjornsdn = os.environ['BJORNSDN'] 75 bjornspw = b"bjorn" 76 77 connection = LDAPObject(uri) 78 79 start = time.time() 80 connection.bind_s(managerdn, passwd) 81 end = time.time() 82 83 if end - start > 1: 84 print("It takes more than a second to connect and bind, " 85 "skipping potentially unstable test", file=sys.stderr) 86 raise SystemExit(0) 87 88 dn, token_entry = get_token_for(connection, babsdn) 89 90 paramsdn = token_entry['oathTOTPParams'][0].decode() 91 result = connection.search_s(paramsdn, ldap.SCOPE_BASE) 92 _, attrs = result[0] 93 params = CIDict(attrs) 94 95 secret = token_entry['oathSecret'][0] 96 period = int(params['oathTOTPTimeStepPeriod'][0].decode()) 97 98 bind_conn = LDAPObject(uri) 99 100 interval_no = get_interval(period) 101 token = get_hotp_token(secret, interval_no-3) 102 103 print("Testing old tokens are not useable") 104 bind_conn.bind_s(babsdn, babspw+token) 105 try: 106 bind_conn.bind_s(babsdn, babspw+token) 107 except ldap.INVALID_CREDENTIALS: 108 pass 109 else: 110 raise SystemExit("Bind with an old token should have failed") 111 112 interval_no = get_interval(period) 113 token = get_hotp_token(secret, interval_no) 114 115 print("Testing token can only be used once") 116 bind_conn.bind_s(babsdn, babspw+token) 117 try: 118 bind_conn.bind_s(babsdn, babspw+token) 119 except ldap.INVALID_CREDENTIALS: 120 pass 121 else: 122 raise SystemExit("Bind with a reused token should have failed") 123 124 token = get_hotp_token(secret, interval_no+1) 125 try: 126 bind_conn.bind_s(babsdn, babspw+token) 127 except ldap.INVALID_CREDENTIALS: 128 raise SystemExit("Bind should have succeeded") 129 130 dn, token_entry = get_token_for(connection, babsdn) 131 last = int(token_entry['oathTOTPLastTimeStep'][0].decode()) 132 if last != interval_no+1: 133 SystemExit("Unexpected counter value %d (expected %d)" % 134 (last, interval_no+1)) 135 136 print("Resetting counter and testing secret sharing between accounts") 137 connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])]) 138 139 interval_no = get_interval(period) 140 token = get_hotp_token(secret, interval_no) 141 142 try: 143 bind_conn.bind_s(bjornsdn, bjornspw+token) 144 except ldap.INVALID_CREDENTIALS: 145 raise SystemExit("Bind should have succeeded") 146 147 try: 148 bind_conn.bind_s(babsdn, babspw+token) 149 except ldap.INVALID_CREDENTIALS: 150 pass 151 else: 152 raise SystemExit("Bind with a reused token should have failed") 153 154 print("Testing token is retired even with a wrong password") 155 connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])]) 156 157 interval_no = get_interval(period) 158 token = get_hotp_token(secret, interval_no) 159 160 try: 161 bind_conn.bind_s(babsdn, b"not the password"+token) 162 except ldap.INVALID_CREDENTIALS: 163 pass 164 else: 165 raise SystemExit("Bind with an incorrect password should have failed") 166 167 try: 168 bind_conn.bind_s(babsdn, babspw+token) 169 except ldap.INVALID_CREDENTIALS: 170 pass 171 else: 172 raise SystemExit("Bind with a reused token should have failed") 173 174 token = get_hotp_token(secret, interval_no+1) 175 try: 176 bind_conn.bind_s(babsdn, babspw+token) 177 except ldap.INVALID_CREDENTIALS: 178 raise SystemExit("Bind should have succeeded") 179 180 181 if __name__ == "__main__": 182 sys.exit(main()) 183