1 1.1 christos #!/usr/bin/env python2 2 1.1 christos # 3 1.1 christos # eapol_test controller 4 1.1 christos # Copyright (c) 2015, Jouni Malinen <j (at] w1.fi> 5 1.1 christos # 6 1.1 christos # This software may be distributed under the terms of the BSD license. 7 1.1 christos # See README for more details. 8 1.1 christos 9 1.1 christos import argparse 10 1.1 christos import logging 11 1.1 christos import os 12 1.1 christos import Queue 13 1.1 christos import sys 14 1.1 christos import threading 15 1.1 christos 16 1.1 christos logger = logging.getLogger() 17 1.1 christos dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__)) 18 1.1 christos sys.path.append(os.path.join(dir, '..', 'wpaspy')) 19 1.1 christos import wpaspy 20 1.1 christos wpas_ctrl = '/tmp/eapol_test' 21 1.1 christos 22 1.1 christos class eapol_test: 23 1.1 christos def __init__(self, ifname): 24 1.1 christos self.ifname = ifname 25 1.1 christos self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) 26 1.1 christos if "PONG" not in self.ctrl.request("PING"): 27 1.1 christos raise Exception("Failed to connect to eapol_test (%s)" % ifname) 28 1.1 christos self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) 29 1.1 christos self.mon.attach() 30 1.1 christos 31 1.1 christos def add_network(self): 32 1.1 christos id = self.request("ADD_NETWORK") 33 1.1 christos if "FAIL" in id: 34 1.1 christos raise Exception("ADD_NETWORK failed") 35 1.1 christos return int(id) 36 1.1 christos 37 1.1 christos def remove_network(self, id): 38 1.1 christos id = self.request("REMOVE_NETWORK " + str(id)) 39 1.1 christos if "FAIL" in id: 40 1.1 christos raise Exception("REMOVE_NETWORK failed") 41 1.1 christos return None 42 1.1 christos 43 1.1 christos def set_network(self, id, field, value): 44 1.1 christos res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value) 45 1.1 christos if "FAIL" in res: 46 1.1 christos raise Exception("SET_NETWORK failed") 47 1.1 christos return None 48 1.1 christos 49 1.1 christos def set_network_quoted(self, id, field, value): 50 1.1 christos res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') 51 1.1 christos if "FAIL" in res: 52 1.1 christos raise Exception("SET_NETWORK failed") 53 1.1 christos return None 54 1.1 christos 55 1.1 christos def request(self, cmd, timeout=10): 56 1.1 christos return self.ctrl.request(cmd, timeout=timeout) 57 1.1 christos 58 1.1 christos def wait_event(self, events, timeout=10): 59 1.1 christos start = os.times()[4] 60 1.1 christos while True: 61 1.1 christos while self.mon.pending(): 62 1.1 christos ev = self.mon.recv() 63 1.1 christos logger.debug(self.ifname + ": " + ev) 64 1.1 christos for event in events: 65 1.1 christos if event in ev: 66 1.1 christos return ev 67 1.1 christos now = os.times()[4] 68 1.1 christos remaining = start + timeout - now 69 1.1 christos if remaining <= 0: 70 1.1 christos break 71 1.1 christos if not self.mon.pending(timeout=remaining): 72 1.1 christos break 73 1.1 christos return None 74 1.1 christos 75 1.1.1.3 christos def run(ifname, count, no_fast_reauth, res, conf): 76 1.1 christos et = eapol_test(ifname) 77 1.1 christos 78 1.1 christos et.request("AP_SCAN 0") 79 1.1 christos if no_fast_reauth: 80 1.1 christos et.request("SET fast_reauth 0") 81 1.1 christos else: 82 1.1 christos et.request("SET fast_reauth 1") 83 1.1 christos id = et.add_network() 84 1.1.1.3 christos 85 1.1.1.3 christos if len(conf): 86 1.1.1.3 christos for item in conf: 87 1.1.1.3 christos et.set_network(id, item, conf[item]) 88 1.1.1.3 christos else: 89 1.1.1.3 christos et.set_network(id, "key_mgmt", "IEEE8021X") 90 1.1.1.3 christos et.set_network(id, "eapol_flags", "0") 91 1.1.1.3 christos et.set_network(id, "eap", "TLS") 92 1.1.1.3 christos et.set_network_quoted(id, "identity", "user") 93 1.1.1.3 christos et.set_network_quoted(id, "ca_cert", 'ca.pem') 94 1.1.1.3 christos et.set_network_quoted(id, "client_cert", 'client.pem') 95 1.1.1.3 christos et.set_network_quoted(id, "private_key", 'client.key') 96 1.1.1.3 christos et.set_network_quoted(id, "private_key_passwd", 'whatever') 97 1.1.1.3 christos 98 1.1 christos et.set_network(id, "disabled", "0") 99 1.1 christos 100 1.1 christos fail = False 101 1.1 christos for i in range(count): 102 1.1 christos et.request("REASSOCIATE") 103 1.1 christos ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"]) 104 1.1 christos if ev is None or "CTRL-EVENT-CONNECTED" not in ev: 105 1.1 christos fail = True 106 1.1 christos break 107 1.1 christos 108 1.1 christos et.remove_network(id) 109 1.1 christos 110 1.1 christos if fail: 111 1.1 christos res.put("FAIL (%d OK)" % i) 112 1.1 christos else: 113 1.1 christos res.put("PASS %d" % (i + 1)) 114 1.1 christos 115 1.1 christos def main(): 116 1.1 christos parser = argparse.ArgumentParser(description='eapol_test controller') 117 1.1 christos parser.add_argument('--ctrl', help='control interface directory') 118 1.1 christos parser.add_argument('--num', help='number of processes') 119 1.1 christos parser.add_argument('--iter', help='number of iterations') 120 1.1 christos parser.add_argument('--no-fast-reauth', action='store_true', 121 1.1 christos dest='no_fast_reauth', 122 1.1 christos help='disable TLS session resumption') 123 1.1.1.3 christos parser.add_argument('--conf', help='file of network conf items') 124 1.1 christos args = parser.parse_args() 125 1.1 christos 126 1.1 christos num = int(args.num) 127 1.1 christos iter = int(args.iter) 128 1.1 christos if args.ctrl: 129 1.1 christos global wpas_ctrl 130 1.1 christos wpas_ctrl = args.ctrl 131 1.1 christos 132 1.1.1.3 christos conf = {} 133 1.1.1.3 christos if args.conf: 134 1.1.1.3 christos f = open(args.conf, "r") 135 1.1.1.3 christos for line in f: 136 1.1.1.3 christos confitem = line.split("=") 137 1.1.1.3 christos if len(confitem) == 2: 138 1.1.1.3 christos conf[confitem[0].strip()] = confitem[1].strip() 139 1.1.1.3 christos f.close() 140 1.1.1.3 christos 141 1.1 christos t = {} 142 1.1 christos res = {} 143 1.1 christos for i in range(num): 144 1.1 christos res[i] = Queue.Queue() 145 1.1 christos t[i] = threading.Thread(target=run, args=(str(i), iter, 146 1.1.1.3 christos args.no_fast_reauth, res[i], 147 1.1.1.3 christos conf)) 148 1.1 christos for i in range(num): 149 1.1 christos t[i].start() 150 1.1 christos for i in range(num): 151 1.1 christos t[i].join() 152 1.1 christos try: 153 1.1 christos results = res[i].get(False) 154 1.1 christos except: 155 1.1 christos results = "N/A" 156 1.1.1.2 christos print("%d: %s" % (i, results)) 157 1.1 christos 158 1.1 christos if __name__ == "__main__": 159 1.1 christos main() 160