wps-ap-nfc.py revision 1.1.1.2 1 1.1 christos #!/usr/bin/python
2 1.1 christos #
3 1.1 christos # Example nfcpy to hostapd wrapper for WPS NFC operations
4 1.1 christos # Copyright (c) 2012-2013, Jouni Malinen <j (at] w1.fi>
5 1.1 christos #
6 1.1 christos # This software may be distributed under the terms of the BSD license.
7 1.1 christos # See README for more details.
8 1.1 christos
9 1.1 christos import os
10 1.1 christos import sys
11 1.1 christos import time
12 1.1 christos import argparse
13 1.1 christos
14 1.1 christos import nfc
15 1.1 christos import nfc.ndef
16 1.1 christos import nfc.llcp
17 1.1 christos import nfc.handover
18 1.1 christos
19 1.1 christos import logging
20 1.1 christos
21 1.1 christos import wpaspy
22 1.1 christos
23 1.1 christos wpas_ctrl = '/var/run/hostapd'
24 1.1 christos continue_loop = True
25 1.1 christos summary_file = None
26 1.1 christos success_file = None
27 1.1 christos
28 1.1 christos def summary(txt):
29 1.1.1.2 christos print(txt)
30 1.1 christos if summary_file:
31 1.1 christos with open(summary_file, 'a') as f:
32 1.1 christos f.write(txt + "\n")
33 1.1 christos
34 1.1 christos def success_report(txt):
35 1.1 christos summary(txt)
36 1.1 christos if success_file:
37 1.1 christos with open(success_file, 'a') as f:
38 1.1 christos f.write(txt + "\n")
39 1.1 christos
40 1.1 christos def wpas_connect():
41 1.1 christos ifaces = []
42 1.1 christos if os.path.isdir(wpas_ctrl):
43 1.1 christos try:
44 1.1 christos ifaces = [os.path.join(wpas_ctrl, i) for i in os.listdir(wpas_ctrl)]
45 1.1.1.2 christos except OSError as error:
46 1.1.1.2 christos print("Could not find hostapd: ", error)
47 1.1 christos return None
48 1.1 christos
49 1.1 christos if len(ifaces) < 1:
50 1.1.1.2 christos print("No hostapd control interface found")
51 1.1 christos return None
52 1.1 christos
53 1.1 christos for ctrl in ifaces:
54 1.1 christos try:
55 1.1 christos wpas = wpaspy.Ctrl(ctrl)
56 1.1 christos return wpas
57 1.1.1.2 christos except Exception as e:
58 1.1 christos pass
59 1.1 christos return None
60 1.1 christos
61 1.1 christos
62 1.1 christos def wpas_tag_read(message):
63 1.1 christos wpas = wpas_connect()
64 1.1 christos if (wpas == None):
65 1.1 christos return False
66 1.1 christos if "FAIL" in wpas.request("WPS_NFC_TAG_READ " + str(message).encode("hex")):
67 1.1 christos return False
68 1.1 christos return True
69 1.1 christos
70 1.1 christos
71 1.1 christos def wpas_get_config_token():
72 1.1 christos wpas = wpas_connect()
73 1.1 christos if (wpas == None):
74 1.1 christos return None
75 1.1 christos ret = wpas.request("WPS_NFC_CONFIG_TOKEN NDEF")
76 1.1 christos if "FAIL" in ret:
77 1.1 christos return None
78 1.1 christos return ret.rstrip().decode("hex")
79 1.1 christos
80 1.1 christos
81 1.1 christos def wpas_get_password_token():
82 1.1 christos wpas = wpas_connect()
83 1.1 christos if (wpas == None):
84 1.1 christos return None
85 1.1 christos ret = wpas.request("WPS_NFC_TOKEN NDEF")
86 1.1 christos if "FAIL" in ret:
87 1.1 christos return None
88 1.1 christos return ret.rstrip().decode("hex")
89 1.1 christos
90 1.1 christos
91 1.1 christos def wpas_get_handover_sel():
92 1.1 christos wpas = wpas_connect()
93 1.1 christos if (wpas == None):
94 1.1 christos return None
95 1.1 christos ret = wpas.request("NFC_GET_HANDOVER_SEL NDEF WPS-CR")
96 1.1 christos if "FAIL" in ret:
97 1.1 christos return None
98 1.1 christos return ret.rstrip().decode("hex")
99 1.1 christos
100 1.1 christos
101 1.1 christos def wpas_report_handover(req, sel):
102 1.1 christos wpas = wpas_connect()
103 1.1 christos if (wpas == None):
104 1.1 christos return None
105 1.1 christos return wpas.request("NFC_REPORT_HANDOVER RESP WPS " +
106 1.1 christos str(req).encode("hex") + " " +
107 1.1 christos str(sel).encode("hex"))
108 1.1 christos
109 1.1 christos
110 1.1 christos class HandoverServer(nfc.handover.HandoverServer):
111 1.1 christos def __init__(self, llc):
112 1.1 christos super(HandoverServer, self).__init__(llc)
113 1.1 christos self.ho_server_processing = False
114 1.1 christos self.success = False
115 1.1 christos
116 1.1 christos # override to avoid parser error in request/response.pretty() in nfcpy
117 1.1 christos # due to new WSC handover format
118 1.1 christos def _process_request(self, request):
119 1.1 christos summary("received handover request {}".format(request.type))
120 1.1 christos response = nfc.ndef.Message("\xd1\x02\x01Hs\x12")
121 1.1 christos if not request.type == 'urn:nfc:wkt:Hr':
122 1.1 christos summary("not a handover request")
123 1.1 christos else:
124 1.1 christos try:
125 1.1 christos request = nfc.ndef.HandoverRequestMessage(request)
126 1.1 christos except nfc.ndef.DecodeError as e:
127 1.1 christos summary("error decoding 'Hr' message: {}".format(e))
128 1.1 christos else:
129 1.1 christos response = self.process_request(request)
130 1.1 christos summary("send handover response {}".format(response.type))
131 1.1 christos return response
132 1.1 christos
133 1.1 christos def process_request(self, request):
134 1.1 christos summary("HandoverServer - request received")
135 1.1 christos try:
136 1.1.1.2 christos print("Parsed handover request: " + request.pretty())
137 1.1.1.2 christos except Exception as e:
138 1.1.1.2 christos print(e)
139 1.1.1.2 christos print(str(request).encode("hex"))
140 1.1 christos
141 1.1 christos sel = nfc.ndef.HandoverSelectMessage(version="1.2")
142 1.1 christos
143 1.1 christos for carrier in request.carriers:
144 1.1.1.2 christos print("Remote carrier type: " + carrier.type)
145 1.1 christos if carrier.type == "application/vnd.wfa.wsc":
146 1.1 christos summary("WPS carrier type match - add WPS carrier record")
147 1.1 christos data = wpas_get_handover_sel()
148 1.1 christos if data is None:
149 1.1 christos summary("Could not get handover select carrier record from hostapd")
150 1.1 christos continue
151 1.1.1.2 christos print("Handover select carrier record from hostapd:")
152 1.1.1.2 christos print(data.encode("hex"))
153 1.1 christos if "OK" in wpas_report_handover(carrier.record, data):
154 1.1 christos success_report("Handover reported successfully")
155 1.1 christos else:
156 1.1 christos summary("Handover report rejected")
157 1.1 christos
158 1.1 christos message = nfc.ndef.Message(data);
159 1.1 christos sel.add_carrier(message[0], "active", message[1:])
160 1.1 christos
161 1.1.1.2 christos print("Handover select:")
162 1.1 christos try:
163 1.1.1.2 christos print(sel.pretty())
164 1.1.1.2 christos except Exception as e:
165 1.1.1.2 christos print(e)
166 1.1.1.2 christos print(str(sel).encode("hex"))
167 1.1 christos
168 1.1 christos summary("Sending handover select")
169 1.1 christos self.success = True
170 1.1 christos return sel
171 1.1 christos
172 1.1 christos
173 1.1 christos def wps_tag_read(tag):
174 1.1 christos success = False
175 1.1 christos if len(tag.ndef.message):
176 1.1 christos for record in tag.ndef.message:
177 1.1.1.2 christos print("record type " + record.type)
178 1.1 christos if record.type == "application/vnd.wfa.wsc":
179 1.1 christos summary("WPS tag - send to hostapd")
180 1.1 christos success = wpas_tag_read(tag.ndef.message)
181 1.1 christos break
182 1.1 christos else:
183 1.1 christos summary("Empty tag")
184 1.1 christos
185 1.1 christos if success:
186 1.1 christos success_report("Tag read succeeded")
187 1.1 christos
188 1.1 christos return success
189 1.1 christos
190 1.1 christos
191 1.1 christos def rdwr_connected_write(tag):
192 1.1 christos summary("Tag found - writing - " + str(tag))
193 1.1 christos global write_data
194 1.1 christos tag.ndef.message = str(write_data)
195 1.1 christos success_report("Tag write succeeded")
196 1.1.1.2 christos print("Done - remove tag")
197 1.1 christos global only_one
198 1.1 christos if only_one:
199 1.1 christos global continue_loop
200 1.1 christos continue_loop = False
201 1.1 christos global write_wait_remove
202 1.1 christos while write_wait_remove and tag.is_present:
203 1.1 christos time.sleep(0.1)
204 1.1 christos
205 1.1 christos def wps_write_config_tag(clf, wait_remove=True):
206 1.1 christos summary("Write WPS config token")
207 1.1 christos global write_data, write_wait_remove
208 1.1 christos write_wait_remove = wait_remove
209 1.1 christos write_data = wpas_get_config_token()
210 1.1 christos if write_data == None:
211 1.1 christos summary("Could not get WPS config token from hostapd")
212 1.1 christos return
213 1.1 christos
214 1.1.1.2 christos print("Touch an NFC tag")
215 1.1 christos clf.connect(rdwr={'on-connect': rdwr_connected_write})
216 1.1 christos
217 1.1 christos
218 1.1 christos def wps_write_password_tag(clf, wait_remove=True):
219 1.1 christos summary("Write WPS password token")
220 1.1 christos global write_data, write_wait_remove
221 1.1 christos write_wait_remove = wait_remove
222 1.1 christos write_data = wpas_get_password_token()
223 1.1 christos if write_data == None:
224 1.1 christos summary("Could not get WPS password token from hostapd")
225 1.1 christos return
226 1.1 christos
227 1.1.1.2 christos print("Touch an NFC tag")
228 1.1 christos clf.connect(rdwr={'on-connect': rdwr_connected_write})
229 1.1 christos
230 1.1 christos
231 1.1 christos def rdwr_connected(tag):
232 1.1 christos global only_one, no_wait
233 1.1 christos summary("Tag connected: " + str(tag))
234 1.1 christos
235 1.1 christos if tag.ndef:
236 1.1.1.2 christos print("NDEF tag: " + tag.type)
237 1.1 christos try:
238 1.1.1.2 christos print(tag.ndef.message.pretty())
239 1.1.1.2 christos except Exception as e:
240 1.1.1.2 christos print(e)
241 1.1 christos success = wps_tag_read(tag)
242 1.1 christos if only_one and success:
243 1.1 christos global continue_loop
244 1.1 christos continue_loop = False
245 1.1 christos else:
246 1.1 christos summary("Not an NDEF tag - remove tag")
247 1.1 christos return True
248 1.1 christos
249 1.1 christos return not no_wait
250 1.1 christos
251 1.1 christos
252 1.1 christos def llcp_startup(clf, llc):
253 1.1.1.2 christos print("Start LLCP server")
254 1.1 christos global srv
255 1.1 christos srv = HandoverServer(llc)
256 1.1 christos return llc
257 1.1 christos
258 1.1 christos def llcp_connected(llc):
259 1.1.1.2 christos print("P2P LLCP connected")
260 1.1 christos global wait_connection
261 1.1 christos wait_connection = False
262 1.1 christos global srv
263 1.1 christos srv.start()
264 1.1 christos return True
265 1.1 christos
266 1.1 christos
267 1.1 christos def main():
268 1.1 christos clf = nfc.ContactlessFrontend()
269 1.1 christos
270 1.1 christos parser = argparse.ArgumentParser(description='nfcpy to hostapd integration for WPS NFC operations')
271 1.1 christos parser.add_argument('-d', const=logging.DEBUG, default=logging.INFO,
272 1.1 christos action='store_const', dest='loglevel',
273 1.1 christos help='verbose debug output')
274 1.1 christos parser.add_argument('-q', const=logging.WARNING, action='store_const',
275 1.1 christos dest='loglevel', help='be quiet')
276 1.1 christos parser.add_argument('--only-one', '-1', action='store_true',
277 1.1 christos help='run only one operation and exit')
278 1.1 christos parser.add_argument('--no-wait', action='store_true',
279 1.1 christos help='do not wait for tag to be removed before exiting')
280 1.1 christos parser.add_argument('--summary',
281 1.1 christos help='summary file for writing status updates')
282 1.1 christos parser.add_argument('--success',
283 1.1 christos help='success file for writing success update')
284 1.1 christos parser.add_argument('command', choices=['write-config',
285 1.1 christos 'write-password'],
286 1.1 christos nargs='?')
287 1.1 christos args = parser.parse_args()
288 1.1 christos
289 1.1 christos global only_one
290 1.1 christos only_one = args.only_one
291 1.1 christos
292 1.1 christos global no_wait
293 1.1 christos no_wait = args.no_wait
294 1.1 christos
295 1.1 christos if args.summary:
296 1.1 christos global summary_file
297 1.1 christos summary_file = args.summary
298 1.1 christos
299 1.1 christos if args.success:
300 1.1 christos global success_file
301 1.1 christos success_file = args.success
302 1.1 christos
303 1.1 christos logging.basicConfig(level=args.loglevel)
304 1.1 christos
305 1.1 christos try:
306 1.1 christos if not clf.open("usb"):
307 1.1.1.2 christos print("Could not open connection with an NFC device")
308 1.1 christos raise SystemExit
309 1.1 christos
310 1.1 christos if args.command == "write-config":
311 1.1 christos wps_write_config_tag(clf, wait_remove=not args.no_wait)
312 1.1 christos raise SystemExit
313 1.1 christos
314 1.1 christos if args.command == "write-password":
315 1.1 christos wps_write_password_tag(clf, wait_remove=not args.no_wait)
316 1.1 christos raise SystemExit
317 1.1 christos
318 1.1 christos global continue_loop
319 1.1 christos while continue_loop:
320 1.1.1.2 christos print("Waiting for a tag or peer to be touched")
321 1.1 christos wait_connection = True
322 1.1 christos try:
323 1.1 christos if not clf.connect(rdwr={'on-connect': rdwr_connected},
324 1.1 christos llcp={'on-startup': llcp_startup,
325 1.1 christos 'on-connect': llcp_connected}):
326 1.1 christos break
327 1.1.1.2 christos except Exception as e:
328 1.1.1.2 christos print("clf.connect failed")
329 1.1 christos
330 1.1 christos global srv
331 1.1 christos if only_one and srv and srv.success:
332 1.1 christos raise SystemExit
333 1.1 christos
334 1.1 christos except KeyboardInterrupt:
335 1.1 christos raise SystemExit
336 1.1 christos finally:
337 1.1 christos clf.close()
338 1.1 christos
339 1.1 christos raise SystemExit
340 1.1 christos
341 1.1 christos if __name__ == '__main__':
342 1.1 christos main()
343