eapol_test.py revision 1.1.1.1 1 1.1 christos #!/usr/bin/env python2
2 1.1 christos #
3 1.1 christos # eapol_test controller
4 1.1 christos # Copyright (c) 2015, Jouni Malinen <j (at] w1.fi>
5 1.1 christos #
6 1.1 christos # This software may be distributed under the terms of the BSD license.
7 1.1 christos # See README for more details.
8 1.1 christos
9 1.1 christos import argparse
10 1.1 christos import logging
11 1.1 christos import os
12 1.1 christos import Queue
13 1.1 christos import sys
14 1.1 christos import threading
15 1.1 christos
16 1.1 christos logger = logging.getLogger()
17 1.1 christos dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__))
18 1.1 christos sys.path.append(os.path.join(dir, '..', 'wpaspy'))
19 1.1 christos import wpaspy
20 1.1 christos wpas_ctrl = '/tmp/eapol_test'
21 1.1 christos
22 1.1 christos class eapol_test:
23 1.1 christos def __init__(self, ifname):
24 1.1 christos self.ifname = ifname
25 1.1 christos self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
26 1.1 christos if "PONG" not in self.ctrl.request("PING"):
27 1.1 christos raise Exception("Failed to connect to eapol_test (%s)" % ifname)
28 1.1 christos self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
29 1.1 christos self.mon.attach()
30 1.1 christos
31 1.1 christos def add_network(self):
32 1.1 christos id = self.request("ADD_NETWORK")
33 1.1 christos if "FAIL" in id:
34 1.1 christos raise Exception("ADD_NETWORK failed")
35 1.1 christos return int(id)
36 1.1 christos
37 1.1 christos def remove_network(self, id):
38 1.1 christos id = self.request("REMOVE_NETWORK " + str(id))
39 1.1 christos if "FAIL" in id:
40 1.1 christos raise Exception("REMOVE_NETWORK failed")
41 1.1 christos return None
42 1.1 christos
43 1.1 christos def set_network(self, id, field, value):
44 1.1 christos res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
45 1.1 christos if "FAIL" in res:
46 1.1 christos raise Exception("SET_NETWORK failed")
47 1.1 christos return None
48 1.1 christos
49 1.1 christos def set_network_quoted(self, id, field, value):
50 1.1 christos res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
51 1.1 christos if "FAIL" in res:
52 1.1 christos raise Exception("SET_NETWORK failed")
53 1.1 christos return None
54 1.1 christos
55 1.1 christos def request(self, cmd, timeout=10):
56 1.1 christos return self.ctrl.request(cmd, timeout=timeout)
57 1.1 christos
58 1.1 christos def wait_event(self, events, timeout=10):
59 1.1 christos start = os.times()[4]
60 1.1 christos while True:
61 1.1 christos while self.mon.pending():
62 1.1 christos ev = self.mon.recv()
63 1.1 christos logger.debug(self.ifname + ": " + ev)
64 1.1 christos for event in events:
65 1.1 christos if event in ev:
66 1.1 christos return ev
67 1.1 christos now = os.times()[4]
68 1.1 christos remaining = start + timeout - now
69 1.1 christos if remaining <= 0:
70 1.1 christos break
71 1.1 christos if not self.mon.pending(timeout=remaining):
72 1.1 christos break
73 1.1 christos return None
74 1.1 christos
75 1.1 christos def run(ifname, count, no_fast_reauth, res):
76 1.1 christos et = eapol_test(ifname)
77 1.1 christos
78 1.1 christos et.request("AP_SCAN 0")
79 1.1 christos if no_fast_reauth:
80 1.1 christos et.request("SET fast_reauth 0")
81 1.1 christos else:
82 1.1 christos et.request("SET fast_reauth 1")
83 1.1 christos id = et.add_network()
84 1.1 christos et.set_network(id, "key_mgmt", "IEEE8021X")
85 1.1 christos et.set_network(id, "eapol_flags", "0")
86 1.1 christos et.set_network(id, "eap", "TLS")
87 1.1 christos et.set_network_quoted(id, "identity", "user")
88 1.1 christos et.set_network_quoted(id, "ca_cert", 'ca.pem')
89 1.1 christos et.set_network_quoted(id, "client_cert", 'client.pem')
90 1.1 christos et.set_network_quoted(id, "private_key", 'client.key')
91 1.1 christos et.set_network_quoted(id, "private_key_passwd", 'whatever')
92 1.1 christos et.set_network(id, "disabled", "0")
93 1.1 christos
94 1.1 christos fail = False
95 1.1 christos for i in range(count):
96 1.1 christos et.request("REASSOCIATE")
97 1.1 christos ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"])
98 1.1 christos if ev is None or "CTRL-EVENT-CONNECTED" not in ev:
99 1.1 christos fail = True
100 1.1 christos break
101 1.1 christos
102 1.1 christos et.remove_network(id)
103 1.1 christos
104 1.1 christos if fail:
105 1.1 christos res.put("FAIL (%d OK)" % i)
106 1.1 christos else:
107 1.1 christos res.put("PASS %d" % (i + 1))
108 1.1 christos
109 1.1 christos def main():
110 1.1 christos parser = argparse.ArgumentParser(description='eapol_test controller')
111 1.1 christos parser.add_argument('--ctrl', help='control interface directory')
112 1.1 christos parser.add_argument('--num', help='number of processes')
113 1.1 christos parser.add_argument('--iter', help='number of iterations')
114 1.1 christos parser.add_argument('--no-fast-reauth', action='store_true',
115 1.1 christos dest='no_fast_reauth',
116 1.1 christos help='disable TLS session resumption')
117 1.1 christos args = parser.parse_args()
118 1.1 christos
119 1.1 christos num = int(args.num)
120 1.1 christos iter = int(args.iter)
121 1.1 christos if args.ctrl:
122 1.1 christos global wpas_ctrl
123 1.1 christos wpas_ctrl = args.ctrl
124 1.1 christos
125 1.1 christos t = {}
126 1.1 christos res = {}
127 1.1 christos for i in range(num):
128 1.1 christos res[i] = Queue.Queue()
129 1.1 christos t[i] = threading.Thread(target=run, args=(str(i), iter,
130 1.1 christos args.no_fast_reauth, res[i]))
131 1.1 christos for i in range(num):
132 1.1 christos t[i].start()
133 1.1 christos for i in range(num):
134 1.1 christos t[i].join()
135 1.1 christos try:
136 1.1 christos results = res[i].get(False)
137 1.1 christos except:
138 1.1 christos results = "N/A"
139 1.1 christos print "%d: %s" % (i, results)
140 1.1 christos
141 1.1 christos if __name__ == "__main__":
142 1.1 christos main()
143