Home | History | Annotate | Line # | Download | only in scripts
      1      1.1  christos # -*- coding: utf-8 -*-
      2      1.1  christos # $OpenLDAP$
      3      1.1  christos ## This work is part of OpenLDAP Software <http://www.openldap.org/>.
      4      1.1  christos ##
      5      1.1  christos ## Copyright 2016-2021 Ondej Kuznk, Symas Corp.
      6  1.1.1.2  christos ## Copyright 2021-2024 The OpenLDAP Foundation.
      7      1.1  christos ## All rights reserved.
      8      1.1  christos ##
      9      1.1  christos ## Redistribution and use in source and binary forms, with or without
     10      1.1  christos ## modification, are permitted only as authorized by the OpenLDAP
     11      1.1  christos ## Public License.
     12      1.1  christos ##
     13      1.1  christos ## A copy of this license is available in the file LICENSE in the
     14      1.1  christos ## top-level directory of the distribution or, alternatively, at
     15      1.1  christos ## <http://www.OpenLDAP.org/license.html>.
     16      1.1  christos 
     17      1.1  christos from __future__ import print_function
     18      1.1  christos 
     19      1.1  christos import hashlib
     20      1.1  christos import hmac
     21      1.1  christos import os
     22      1.1  christos import struct
     23      1.1  christos import sys
     24      1.1  christos import time
     25      1.1  christos 
     26      1.1  christos import ldap
     27      1.1  christos from ldap.cidict import cidict as CIDict
     28      1.1  christos from ldap.ldapobject import LDAPObject
     29      1.1  christos 
     30      1.1  christos if len(sys.argv) > 1 and sys.argv[1] == "--check":
     31      1.1  christos     raise SystemExit(0)
     32      1.1  christos 
     33      1.1  christos 
     34      1.1  christos def get_digits(h, digits):
     35      1.1  christos     offset = h[19] & 15
     36      1.1  christos     number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff
     37      1.1  christos     number %= (10 ** digits)
     38      1.1  christos     return ("%0*d" % (digits, number)).encode()
     39      1.1  christos 
     40      1.1  christos 
     41      1.1  christos def get_hotp_token(secret, interval_no):
     42      1.1  christos     msg = struct.pack(">Q", interval_no)
     43      1.1  christos     h = hmac.new(secret, msg, hashlib.sha1).digest()
     44      1.1  christos     return get_digits(bytearray(h), 6)
     45      1.1  christos 
     46      1.1  christos 
     47      1.1  christos def get_interval(period=30):
     48      1.1  christos     return int(time.time() // period)
     49      1.1  christos 
     50      1.1  christos 
     51      1.1  christos def get_token_for(connection, dn, typ="totp"):
     52      1.1  christos     result = connection.search_s(dn, ldap.SCOPE_BASE)
     53      1.1  christos     dn, attrs = result[0]
     54      1.1  christos     attrs = CIDict(attrs)
     55      1.1  christos 
     56      1.1  christos     tokendn = attrs['oath'+typ+'token'][0].decode()
     57      1.1  christos 
     58      1.1  christos     result = connection.search_s(tokendn, ldap.SCOPE_BASE)
     59      1.1  christos     dn, attrs = result[0]
     60      1.1  christos     attrs = CIDict(attrs)
     61      1.1  christos 
     62      1.1  christos     return dn, attrs
     63      1.1  christos 
     64      1.1  christos 
     65      1.1  christos def main():
     66      1.1  christos     uri = os.environ["URI1"]
     67      1.1  christos 
     68      1.1  christos     managerdn = os.environ['MANAGERDN']
     69      1.1  christos     passwd = os.environ['PASSWD']
     70      1.1  christos 
     71      1.1  christos     babsdn = os.environ['BABSDN']
     72      1.1  christos     babspw = b"bjensen"
     73      1.1  christos 
     74      1.1  christos     bjornsdn = os.environ['BJORNSDN']
     75      1.1  christos     bjornspw = b"bjorn"
     76      1.1  christos 
     77      1.1  christos     connection = LDAPObject(uri)
     78      1.1  christos 
     79      1.1  christos     start = time.time()
     80      1.1  christos     connection.bind_s(managerdn, passwd)
     81      1.1  christos     end = time.time()
     82      1.1  christos 
     83      1.1  christos     if end - start > 1:
     84      1.1  christos         print("It takes more than a second to connect and bind, "
     85      1.1  christos               "skipping potentially unstable test", file=sys.stderr)
     86      1.1  christos         raise SystemExit(0)
     87      1.1  christos 
     88      1.1  christos     dn, token_entry = get_token_for(connection, babsdn)
     89      1.1  christos 
     90      1.1  christos     paramsdn = token_entry['oathTOTPParams'][0].decode()
     91      1.1  christos     result = connection.search_s(paramsdn, ldap.SCOPE_BASE)
     92      1.1  christos     _, attrs = result[0]
     93      1.1  christos     params = CIDict(attrs)
     94      1.1  christos 
     95      1.1  christos     secret = token_entry['oathSecret'][0]
     96      1.1  christos     period = int(params['oathTOTPTimeStepPeriod'][0].decode())
     97      1.1  christos 
     98      1.1  christos     bind_conn = LDAPObject(uri)
     99      1.1  christos 
    100      1.1  christos     interval_no = get_interval(period)
    101      1.1  christos     token = get_hotp_token(secret, interval_no-3)
    102      1.1  christos 
    103      1.1  christos     print("Testing old tokens are not useable")
    104      1.1  christos     bind_conn.bind_s(babsdn, babspw+token)
    105      1.1  christos     try:
    106      1.1  christos         bind_conn.bind_s(babsdn, babspw+token)
    107      1.1  christos     except ldap.INVALID_CREDENTIALS:
    108      1.1  christos         pass
    109      1.1  christos     else:
    110      1.1  christos         raise SystemExit("Bind with an old token should have failed")
    111      1.1  christos 
    112      1.1  christos     interval_no = get_interval(period)
    113      1.1  christos     token = get_hotp_token(secret, interval_no)
    114      1.1  christos 
    115      1.1  christos     print("Testing token can only be used once")
    116      1.1  christos     bind_conn.bind_s(babsdn, babspw+token)
    117      1.1  christos     try:
    118      1.1  christos         bind_conn.bind_s(babsdn, babspw+token)
    119      1.1  christos     except ldap.INVALID_CREDENTIALS:
    120      1.1  christos         pass
    121      1.1  christos     else:
    122      1.1  christos         raise SystemExit("Bind with a reused token should have failed")
    123      1.1  christos 
    124      1.1  christos     token = get_hotp_token(secret, interval_no+1)
    125      1.1  christos     try:
    126      1.1  christos         bind_conn.bind_s(babsdn, babspw+token)
    127      1.1  christos     except ldap.INVALID_CREDENTIALS:
    128      1.1  christos         raise SystemExit("Bind should have succeeded")
    129      1.1  christos 
    130      1.1  christos     dn, token_entry = get_token_for(connection, babsdn)
    131      1.1  christos     last = int(token_entry['oathTOTPLastTimeStep'][0].decode())
    132      1.1  christos     if last != interval_no+1:
    133      1.1  christos         SystemExit("Unexpected counter value %d (expected %d)" %
    134      1.1  christos                    (last, interval_no+1))
    135      1.1  christos 
    136      1.1  christos     print("Resetting counter and testing secret sharing between accounts")
    137      1.1  christos     connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])
    138      1.1  christos 
    139      1.1  christos     interval_no = get_interval(period)
    140      1.1  christos     token = get_hotp_token(secret, interval_no)
    141      1.1  christos 
    142      1.1  christos     try:
    143      1.1  christos         bind_conn.bind_s(bjornsdn, bjornspw+token)
    144      1.1  christos     except ldap.INVALID_CREDENTIALS:
    145      1.1  christos         raise SystemExit("Bind should have succeeded")
    146      1.1  christos 
    147      1.1  christos     try:
    148      1.1  christos         bind_conn.bind_s(babsdn, babspw+token)
    149      1.1  christos     except ldap.INVALID_CREDENTIALS:
    150      1.1  christos         pass
    151      1.1  christos     else:
    152      1.1  christos         raise SystemExit("Bind with a reused token should have failed")
    153      1.1  christos 
    154      1.1  christos     print("Testing token is retired even with a wrong password")
    155      1.1  christos     connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])
    156      1.1  christos 
    157      1.1  christos     interval_no = get_interval(period)
    158      1.1  christos     token = get_hotp_token(secret, interval_no)
    159      1.1  christos 
    160      1.1  christos     try:
    161      1.1  christos         bind_conn.bind_s(babsdn, b"not the password"+token)
    162      1.1  christos     except ldap.INVALID_CREDENTIALS:
    163      1.1  christos         pass
    164      1.1  christos     else:
    165      1.1  christos         raise SystemExit("Bind with an incorrect password should have failed")
    166      1.1  christos 
    167      1.1  christos     try:
    168      1.1  christos         bind_conn.bind_s(babsdn, babspw+token)
    169      1.1  christos     except ldap.INVALID_CREDENTIALS:
    170      1.1  christos         pass
    171      1.1  christos     else:
    172      1.1  christos         raise SystemExit("Bind with a reused token should have failed")
    173      1.1  christos 
    174      1.1  christos     token = get_hotp_token(secret, interval_no+1)
    175      1.1  christos     try:
    176      1.1  christos         bind_conn.bind_s(babsdn, babspw+token)
    177      1.1  christos     except ldap.INVALID_CREDENTIALS:
    178      1.1  christos         raise SystemExit("Bind should have succeeded")
    179      1.1  christos 
    180      1.1  christos 
    181      1.1  christos if __name__ == "__main__":
    182      1.1  christos     sys.exit(main())
    183