eapol_test.py revision 1.1.1.1.14.2 1 1.1.1.1.14.2 snj #!/usr/bin/env python2
2 1.1.1.1.14.2 snj #
3 1.1.1.1.14.2 snj # eapol_test controller
4 1.1.1.1.14.2 snj # Copyright (c) 2015, Jouni Malinen <j (at] w1.fi>
5 1.1.1.1.14.2 snj #
6 1.1.1.1.14.2 snj # This software may be distributed under the terms of the BSD license.
7 1.1.1.1.14.2 snj # See README for more details.
8 1.1.1.1.14.2 snj
9 1.1.1.1.14.2 snj import argparse
10 1.1.1.1.14.2 snj import logging
11 1.1.1.1.14.2 snj import os
12 1.1.1.1.14.2 snj import Queue
13 1.1.1.1.14.2 snj import sys
14 1.1.1.1.14.2 snj import threading
15 1.1.1.1.14.2 snj
16 1.1.1.1.14.2 snj logger = logging.getLogger()
17 1.1.1.1.14.2 snj dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__))
18 1.1.1.1.14.2 snj sys.path.append(os.path.join(dir, '..', 'wpaspy'))
19 1.1.1.1.14.2 snj import wpaspy
20 1.1.1.1.14.2 snj wpas_ctrl = '/tmp/eapol_test'
21 1.1.1.1.14.2 snj
22 1.1.1.1.14.2 snj class eapol_test:
23 1.1.1.1.14.2 snj def __init__(self, ifname):
24 1.1.1.1.14.2 snj self.ifname = ifname
25 1.1.1.1.14.2 snj self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
26 1.1.1.1.14.2 snj if "PONG" not in self.ctrl.request("PING"):
27 1.1.1.1.14.2 snj raise Exception("Failed to connect to eapol_test (%s)" % ifname)
28 1.1.1.1.14.2 snj self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
29 1.1.1.1.14.2 snj self.mon.attach()
30 1.1.1.1.14.2 snj
31 1.1.1.1.14.2 snj def add_network(self):
32 1.1.1.1.14.2 snj id = self.request("ADD_NETWORK")
33 1.1.1.1.14.2 snj if "FAIL" in id:
34 1.1.1.1.14.2 snj raise Exception("ADD_NETWORK failed")
35 1.1.1.1.14.2 snj return int(id)
36 1.1.1.1.14.2 snj
37 1.1.1.1.14.2 snj def remove_network(self, id):
38 1.1.1.1.14.2 snj id = self.request("REMOVE_NETWORK " + str(id))
39 1.1.1.1.14.2 snj if "FAIL" in id:
40 1.1.1.1.14.2 snj raise Exception("REMOVE_NETWORK failed")
41 1.1.1.1.14.2 snj return None
42 1.1.1.1.14.2 snj
43 1.1.1.1.14.2 snj def set_network(self, id, field, value):
44 1.1.1.1.14.2 snj res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
45 1.1.1.1.14.2 snj if "FAIL" in res:
46 1.1.1.1.14.2 snj raise Exception("SET_NETWORK failed")
47 1.1.1.1.14.2 snj return None
48 1.1.1.1.14.2 snj
49 1.1.1.1.14.2 snj def set_network_quoted(self, id, field, value):
50 1.1.1.1.14.2 snj res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
51 1.1.1.1.14.2 snj if "FAIL" in res:
52 1.1.1.1.14.2 snj raise Exception("SET_NETWORK failed")
53 1.1.1.1.14.2 snj return None
54 1.1.1.1.14.2 snj
55 1.1.1.1.14.2 snj def request(self, cmd, timeout=10):
56 1.1.1.1.14.2 snj return self.ctrl.request(cmd, timeout=timeout)
57 1.1.1.1.14.2 snj
58 1.1.1.1.14.2 snj def wait_event(self, events, timeout=10):
59 1.1.1.1.14.2 snj start = os.times()[4]
60 1.1.1.1.14.2 snj while True:
61 1.1.1.1.14.2 snj while self.mon.pending():
62 1.1.1.1.14.2 snj ev = self.mon.recv()
63 1.1.1.1.14.2 snj logger.debug(self.ifname + ": " + ev)
64 1.1.1.1.14.2 snj for event in events:
65 1.1.1.1.14.2 snj if event in ev:
66 1.1.1.1.14.2 snj return ev
67 1.1.1.1.14.2 snj now = os.times()[4]
68 1.1.1.1.14.2 snj remaining = start + timeout - now
69 1.1.1.1.14.2 snj if remaining <= 0:
70 1.1.1.1.14.2 snj break
71 1.1.1.1.14.2 snj if not self.mon.pending(timeout=remaining):
72 1.1.1.1.14.2 snj break
73 1.1.1.1.14.2 snj return None
74 1.1.1.1.14.2 snj
75 1.1.1.1.14.2 snj def run(ifname, count, no_fast_reauth, res):
76 1.1.1.1.14.2 snj et = eapol_test(ifname)
77 1.1.1.1.14.2 snj
78 1.1.1.1.14.2 snj et.request("AP_SCAN 0")
79 1.1.1.1.14.2 snj if no_fast_reauth:
80 1.1.1.1.14.2 snj et.request("SET fast_reauth 0")
81 1.1.1.1.14.2 snj else:
82 1.1.1.1.14.2 snj et.request("SET fast_reauth 1")
83 1.1.1.1.14.2 snj id = et.add_network()
84 1.1.1.1.14.2 snj et.set_network(id, "key_mgmt", "IEEE8021X")
85 1.1.1.1.14.2 snj et.set_network(id, "eapol_flags", "0")
86 1.1.1.1.14.2 snj et.set_network(id, "eap", "TLS")
87 1.1.1.1.14.2 snj et.set_network_quoted(id, "identity", "user")
88 1.1.1.1.14.2 snj et.set_network_quoted(id, "ca_cert", 'ca.pem')
89 1.1.1.1.14.2 snj et.set_network_quoted(id, "client_cert", 'client.pem')
90 1.1.1.1.14.2 snj et.set_network_quoted(id, "private_key", 'client.key')
91 1.1.1.1.14.2 snj et.set_network_quoted(id, "private_key_passwd", 'whatever')
92 1.1.1.1.14.2 snj et.set_network(id, "disabled", "0")
93 1.1.1.1.14.2 snj
94 1.1.1.1.14.2 snj fail = False
95 1.1.1.1.14.2 snj for i in range(count):
96 1.1.1.1.14.2 snj et.request("REASSOCIATE")
97 1.1.1.1.14.2 snj ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"])
98 1.1.1.1.14.2 snj if ev is None or "CTRL-EVENT-CONNECTED" not in ev:
99 1.1.1.1.14.2 snj fail = True
100 1.1.1.1.14.2 snj break
101 1.1.1.1.14.2 snj
102 1.1.1.1.14.2 snj et.remove_network(id)
103 1.1.1.1.14.2 snj
104 1.1.1.1.14.2 snj if fail:
105 1.1.1.1.14.2 snj res.put("FAIL (%d OK)" % i)
106 1.1.1.1.14.2 snj else:
107 1.1.1.1.14.2 snj res.put("PASS %d" % (i + 1))
108 1.1.1.1.14.2 snj
109 1.1.1.1.14.2 snj def main():
110 1.1.1.1.14.2 snj parser = argparse.ArgumentParser(description='eapol_test controller')
111 1.1.1.1.14.2 snj parser.add_argument('--ctrl', help='control interface directory')
112 1.1.1.1.14.2 snj parser.add_argument('--num', help='number of processes')
113 1.1.1.1.14.2 snj parser.add_argument('--iter', help='number of iterations')
114 1.1.1.1.14.2 snj parser.add_argument('--no-fast-reauth', action='store_true',
115 1.1.1.1.14.2 snj dest='no_fast_reauth',
116 1.1.1.1.14.2 snj help='disable TLS session resumption')
117 1.1.1.1.14.2 snj args = parser.parse_args()
118 1.1.1.1.14.2 snj
119 1.1.1.1.14.2 snj num = int(args.num)
120 1.1.1.1.14.2 snj iter = int(args.iter)
121 1.1.1.1.14.2 snj if args.ctrl:
122 1.1.1.1.14.2 snj global wpas_ctrl
123 1.1.1.1.14.2 snj wpas_ctrl = args.ctrl
124 1.1.1.1.14.2 snj
125 1.1.1.1.14.2 snj t = {}
126 1.1.1.1.14.2 snj res = {}
127 1.1.1.1.14.2 snj for i in range(num):
128 1.1.1.1.14.2 snj res[i] = Queue.Queue()
129 1.1.1.1.14.2 snj t[i] = threading.Thread(target=run, args=(str(i), iter,
130 1.1.1.1.14.2 snj args.no_fast_reauth, res[i]))
131 1.1.1.1.14.2 snj for i in range(num):
132 1.1.1.1.14.2 snj t[i].start()
133 1.1.1.1.14.2 snj for i in range(num):
134 1.1.1.1.14.2 snj t[i].join()
135 1.1.1.1.14.2 snj try:
136 1.1.1.1.14.2 snj results = res[i].get(False)
137 1.1.1.1.14.2 snj except:
138 1.1.1.1.14.2 snj results = "N/A"
139 1.1.1.1.14.2 snj print "%d: %s" % (i, results)
140 1.1.1.1.14.2 snj
141 1.1.1.1.14.2 snj if __name__ == "__main__":
142 1.1.1.1.14.2 snj main()
143