eapol_test.py revision 1.1.1.2.8.1 1 1.1 christos #!/usr/bin/env python2
2 1.1 christos #
3 1.1 christos # eapol_test controller
4 1.1 christos # Copyright (c) 2015, Jouni Malinen <j (at] w1.fi>
5 1.1 christos #
6 1.1 christos # This software may be distributed under the terms of the BSD license.
7 1.1 christos # See README for more details.
8 1.1 christos
9 1.1 christos import argparse
10 1.1 christos import logging
11 1.1 christos import os
12 1.1 christos import Queue
13 1.1 christos import sys
14 1.1 christos import threading
15 1.1 christos
16 1.1 christos logger = logging.getLogger()
17 1.1 christos dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__))
18 1.1 christos sys.path.append(os.path.join(dir, '..', 'wpaspy'))
19 1.1 christos import wpaspy
20 1.1 christos wpas_ctrl = '/tmp/eapol_test'
21 1.1 christos
22 1.1 christos class eapol_test:
23 1.1 christos def __init__(self, ifname):
24 1.1 christos self.ifname = ifname
25 1.1 christos self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
26 1.1 christos if "PONG" not in self.ctrl.request("PING"):
27 1.1 christos raise Exception("Failed to connect to eapol_test (%s)" % ifname)
28 1.1 christos self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
29 1.1 christos self.mon.attach()
30 1.1 christos
31 1.1 christos def add_network(self):
32 1.1 christos id = self.request("ADD_NETWORK")
33 1.1 christos if "FAIL" in id:
34 1.1 christos raise Exception("ADD_NETWORK failed")
35 1.1 christos return int(id)
36 1.1 christos
37 1.1 christos def remove_network(self, id):
38 1.1 christos id = self.request("REMOVE_NETWORK " + str(id))
39 1.1 christos if "FAIL" in id:
40 1.1 christos raise Exception("REMOVE_NETWORK failed")
41 1.1 christos return None
42 1.1 christos
43 1.1 christos def set_network(self, id, field, value):
44 1.1 christos res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
45 1.1 christos if "FAIL" in res:
46 1.1 christos raise Exception("SET_NETWORK failed")
47 1.1 christos return None
48 1.1 christos
49 1.1 christos def set_network_quoted(self, id, field, value):
50 1.1 christos res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
51 1.1 christos if "FAIL" in res:
52 1.1 christos raise Exception("SET_NETWORK failed")
53 1.1 christos return None
54 1.1 christos
55 1.1 christos def request(self, cmd, timeout=10):
56 1.1 christos return self.ctrl.request(cmd, timeout=timeout)
57 1.1 christos
58 1.1 christos def wait_event(self, events, timeout=10):
59 1.1 christos start = os.times()[4]
60 1.1 christos while True:
61 1.1 christos while self.mon.pending():
62 1.1 christos ev = self.mon.recv()
63 1.1 christos logger.debug(self.ifname + ": " + ev)
64 1.1 christos for event in events:
65 1.1 christos if event in ev:
66 1.1 christos return ev
67 1.1 christos now = os.times()[4]
68 1.1 christos remaining = start + timeout - now
69 1.1 christos if remaining <= 0:
70 1.1 christos break
71 1.1 christos if not self.mon.pending(timeout=remaining):
72 1.1 christos break
73 1.1 christos return None
74 1.1 christos
75 1.1.1.2.8.1 perseant def run(ifname, count, no_fast_reauth, res, conf):
76 1.1 christos et = eapol_test(ifname)
77 1.1 christos
78 1.1 christos et.request("AP_SCAN 0")
79 1.1 christos if no_fast_reauth:
80 1.1 christos et.request("SET fast_reauth 0")
81 1.1 christos else:
82 1.1 christos et.request("SET fast_reauth 1")
83 1.1 christos id = et.add_network()
84 1.1.1.2.8.1 perseant
85 1.1.1.2.8.1 perseant if len(conf):
86 1.1.1.2.8.1 perseant for item in conf:
87 1.1.1.2.8.1 perseant et.set_network(id, item, conf[item])
88 1.1.1.2.8.1 perseant else:
89 1.1.1.2.8.1 perseant et.set_network(id, "key_mgmt", "IEEE8021X")
90 1.1.1.2.8.1 perseant et.set_network(id, "eapol_flags", "0")
91 1.1.1.2.8.1 perseant et.set_network(id, "eap", "TLS")
92 1.1.1.2.8.1 perseant et.set_network_quoted(id, "identity", "user")
93 1.1.1.2.8.1 perseant et.set_network_quoted(id, "ca_cert", 'ca.pem')
94 1.1.1.2.8.1 perseant et.set_network_quoted(id, "client_cert", 'client.pem')
95 1.1.1.2.8.1 perseant et.set_network_quoted(id, "private_key", 'client.key')
96 1.1.1.2.8.1 perseant et.set_network_quoted(id, "private_key_passwd", 'whatever')
97 1.1.1.2.8.1 perseant
98 1.1 christos et.set_network(id, "disabled", "0")
99 1.1 christos
100 1.1 christos fail = False
101 1.1 christos for i in range(count):
102 1.1 christos et.request("REASSOCIATE")
103 1.1 christos ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"])
104 1.1 christos if ev is None or "CTRL-EVENT-CONNECTED" not in ev:
105 1.1 christos fail = True
106 1.1 christos break
107 1.1 christos
108 1.1 christos et.remove_network(id)
109 1.1 christos
110 1.1 christos if fail:
111 1.1 christos res.put("FAIL (%d OK)" % i)
112 1.1 christos else:
113 1.1 christos res.put("PASS %d" % (i + 1))
114 1.1 christos
115 1.1 christos def main():
116 1.1 christos parser = argparse.ArgumentParser(description='eapol_test controller')
117 1.1 christos parser.add_argument('--ctrl', help='control interface directory')
118 1.1 christos parser.add_argument('--num', help='number of processes')
119 1.1 christos parser.add_argument('--iter', help='number of iterations')
120 1.1 christos parser.add_argument('--no-fast-reauth', action='store_true',
121 1.1 christos dest='no_fast_reauth',
122 1.1 christos help='disable TLS session resumption')
123 1.1.1.2.8.1 perseant parser.add_argument('--conf', help='file of network conf items')
124 1.1 christos args = parser.parse_args()
125 1.1 christos
126 1.1 christos num = int(args.num)
127 1.1 christos iter = int(args.iter)
128 1.1 christos if args.ctrl:
129 1.1 christos global wpas_ctrl
130 1.1 christos wpas_ctrl = args.ctrl
131 1.1 christos
132 1.1.1.2.8.1 perseant conf = {}
133 1.1.1.2.8.1 perseant if args.conf:
134 1.1.1.2.8.1 perseant f = open(args.conf, "r")
135 1.1.1.2.8.1 perseant for line in f:
136 1.1.1.2.8.1 perseant confitem = line.split("=")
137 1.1.1.2.8.1 perseant if len(confitem) == 2:
138 1.1.1.2.8.1 perseant conf[confitem[0].strip()] = confitem[1].strip()
139 1.1.1.2.8.1 perseant f.close()
140 1.1.1.2.8.1 perseant
141 1.1 christos t = {}
142 1.1 christos res = {}
143 1.1 christos for i in range(num):
144 1.1 christos res[i] = Queue.Queue()
145 1.1 christos t[i] = threading.Thread(target=run, args=(str(i), iter,
146 1.1.1.2.8.1 perseant args.no_fast_reauth, res[i],
147 1.1.1.2.8.1 perseant conf))
148 1.1 christos for i in range(num):
149 1.1 christos t[i].start()
150 1.1 christos for i in range(num):
151 1.1 christos t[i].join()
152 1.1 christos try:
153 1.1 christos results = res[i].get(False)
154 1.1 christos except:
155 1.1 christos results = "N/A"
156 1.1.1.2 christos print("%d: %s" % (i, results))
157 1.1 christos
158 1.1 christos if __name__ == "__main__":
159 1.1 christos main()
160