Home | History | Annotate | Line # | Download | only in scripts
      1 # -*- coding: utf-8 -*-
      2 # $OpenLDAP$
      3 ## This work is part of OpenLDAP Software <http://www.openldap.org/>.
      4 ##
      5 ## Copyright 2016-2021 Ondej Kuznk, Symas Corp.
      6 ## Copyright 2021-2024 The OpenLDAP Foundation.
      7 ## All rights reserved.
      8 ##
      9 ## Redistribution and use in source and binary forms, with or without
     10 ## modification, are permitted only as authorized by the OpenLDAP
     11 ## Public License.
     12 ##
     13 ## A copy of this license is available in the file LICENSE in the
     14 ## top-level directory of the distribution or, alternatively, at
     15 ## <http://www.OpenLDAP.org/license.html>.
     16 
     17 from __future__ import print_function
     18 
     19 import hashlib
     20 import hmac
     21 import os
     22 import struct
     23 import sys
     24 import time
     25 
     26 import ldap
     27 from ldap.cidict import cidict as CIDict
     28 from ldap.ldapobject import LDAPObject
     29 
     30 if len(sys.argv) > 1 and sys.argv[1] == "--check":
     31     raise SystemExit(0)
     32 
     33 
     34 def get_digits(h, digits):
     35     offset = h[19] & 15
     36     number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff
     37     number %= (10 ** digits)
     38     return ("%0*d" % (digits, number)).encode()
     39 
     40 
     41 def get_hotp_token(secret, interval_no):
     42     msg = struct.pack(">Q", interval_no)
     43     h = hmac.new(secret, msg, hashlib.sha1).digest()
     44     return get_digits(bytearray(h), 6)
     45 
     46 
     47 def get_interval(period=30):
     48     return int(time.time() // period)
     49 
     50 
     51 def get_token_for(connection, dn, typ="totp"):
     52     result = connection.search_s(dn, ldap.SCOPE_BASE)
     53     dn, attrs = result[0]
     54     attrs = CIDict(attrs)
     55 
     56     tokendn = attrs['oath'+typ+'token'][0].decode()
     57 
     58     result = connection.search_s(tokendn, ldap.SCOPE_BASE)
     59     dn, attrs = result[0]
     60     attrs = CIDict(attrs)
     61 
     62     return dn, attrs
     63 
     64 
     65 def main():
     66     uri = os.environ["URI1"]
     67 
     68     managerdn = os.environ['MANAGERDN']
     69     passwd = os.environ['PASSWD']
     70 
     71     babsdn = os.environ['BABSDN']
     72     babspw = b"bjensen"
     73 
     74     bjornsdn = os.environ['BJORNSDN']
     75     bjornspw = b"bjorn"
     76 
     77     connection = LDAPObject(uri)
     78 
     79     start = time.time()
     80     connection.bind_s(managerdn, passwd)
     81     end = time.time()
     82 
     83     if end - start > 1:
     84         print("It takes more than a second to connect and bind, "
     85               "skipping potentially unstable test", file=sys.stderr)
     86         raise SystemExit(0)
     87 
     88     dn, token_entry = get_token_for(connection, babsdn)
     89 
     90     paramsdn = token_entry['oathTOTPParams'][0].decode()
     91     result = connection.search_s(paramsdn, ldap.SCOPE_BASE)
     92     _, attrs = result[0]
     93     params = CIDict(attrs)
     94 
     95     secret = token_entry['oathSecret'][0]
     96     period = int(params['oathTOTPTimeStepPeriod'][0].decode())
     97 
     98     bind_conn = LDAPObject(uri)
     99 
    100     interval_no = get_interval(period)
    101     token = get_hotp_token(secret, interval_no-3)
    102 
    103     print("Testing old tokens are not useable")
    104     bind_conn.bind_s(babsdn, babspw+token)
    105     try:
    106         bind_conn.bind_s(babsdn, babspw+token)
    107     except ldap.INVALID_CREDENTIALS:
    108         pass
    109     else:
    110         raise SystemExit("Bind with an old token should have failed")
    111 
    112     interval_no = get_interval(period)
    113     token = get_hotp_token(secret, interval_no)
    114 
    115     print("Testing token can only be used once")
    116     bind_conn.bind_s(babsdn, babspw+token)
    117     try:
    118         bind_conn.bind_s(babsdn, babspw+token)
    119     except ldap.INVALID_CREDENTIALS:
    120         pass
    121     else:
    122         raise SystemExit("Bind with a reused token should have failed")
    123 
    124     token = get_hotp_token(secret, interval_no+1)
    125     try:
    126         bind_conn.bind_s(babsdn, babspw+token)
    127     except ldap.INVALID_CREDENTIALS:
    128         raise SystemExit("Bind should have succeeded")
    129 
    130     dn, token_entry = get_token_for(connection, babsdn)
    131     last = int(token_entry['oathTOTPLastTimeStep'][0].decode())
    132     if last != interval_no+1:
    133         SystemExit("Unexpected counter value %d (expected %d)" %
    134                    (last, interval_no+1))
    135 
    136     print("Resetting counter and testing secret sharing between accounts")
    137     connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])
    138 
    139     interval_no = get_interval(period)
    140     token = get_hotp_token(secret, interval_no)
    141 
    142     try:
    143         bind_conn.bind_s(bjornsdn, bjornspw+token)
    144     except ldap.INVALID_CREDENTIALS:
    145         raise SystemExit("Bind should have succeeded")
    146 
    147     try:
    148         bind_conn.bind_s(babsdn, babspw+token)
    149     except ldap.INVALID_CREDENTIALS:
    150         pass
    151     else:
    152         raise SystemExit("Bind with a reused token should have failed")
    153 
    154     print("Testing token is retired even with a wrong password")
    155     connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])
    156 
    157     interval_no = get_interval(period)
    158     token = get_hotp_token(secret, interval_no)
    159 
    160     try:
    161         bind_conn.bind_s(babsdn, b"not the password"+token)
    162     except ldap.INVALID_CREDENTIALS:
    163         pass
    164     else:
    165         raise SystemExit("Bind with an incorrect password should have failed")
    166 
    167     try:
    168         bind_conn.bind_s(babsdn, babspw+token)
    169     except ldap.INVALID_CREDENTIALS:
    170         pass
    171     else:
    172         raise SystemExit("Bind with a reused token should have failed")
    173 
    174     token = get_hotp_token(secret, interval_no+1)
    175     try:
    176         bind_conn.bind_s(babsdn, babspw+token)
    177     except ldap.INVALID_CREDENTIALS:
    178         raise SystemExit("Bind should have succeeded")
    179 
    180 
    181 if __name__ == "__main__":
    182     sys.exit(main())
    183