tests: WPS configuration file update
[mech_eap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import subprocess
10 import logging
11 logger = logging.getLogger()
12 import re
13 import socket
14 import httplib
15 import urlparse
16 import urllib
17 import xml.etree.ElementTree as ET
18 import StringIO
19
20 import hwsim_utils
21 import hostapd
22
23 def test_ap_wps_init(dev, apdev):
24     """Initial AP configuration with first WPS Enrollee"""
25     ssid = "test-wps"
26     hostapd.add_ap(apdev[0]['ifname'],
27                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
28     hapd = hostapd.Hostapd(apdev[0]['ifname'])
29     logger.info("WPS provisioning step")
30     hapd.request("WPS_PBC")
31     if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32         raise Exception("PBC status not shown correctly")
33     dev[0].dump_monitor()
34     dev[0].request("WPS_PBC")
35     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
36     if ev is None:
37         raise Exception("Association with the AP timed out")
38     status = dev[0].get_status()
39     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
40         raise Exception("Not fully connected")
41     if status['ssid'] != ssid:
42         raise Exception("Unexpected SSID")
43     if status['pairwise_cipher'] != 'CCMP':
44         raise Exception("Unexpected encryption configuration")
45     if status['key_mgmt'] != 'WPA2-PSK':
46         raise Exception("Unexpected key_mgmt")
47
48     status = hapd.request("WPS_GET_STATUS")
49     if "PBC Status: Disabled" not in status:
50         raise Exception("PBC status not shown correctly")
51     if "Last WPS result: Success" not in status:
52         raise Exception("Last WPS result not shown correctly")
53     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
54         raise Exception("Peer address not shown correctly")
55     conf = hapd.request("GET_CONFIG")
56     if "wps_state=configured" not in conf:
57         raise Exception("AP not in WPS configured state")
58     if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
59         raise Exception("Unexpected rsn_pairwise_cipher")
60     if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
61         raise Exception("Unexpected wpa_pairwise_cipher")
62     if "group_cipher=TKIP" not in conf:
63         raise Exception("Unexpected group_cipher")
64
65 def test_ap_wps_init_2ap_pbc(dev, apdev):
66     """Initial two-radio AP configuration with first WPS PBC Enrollee"""
67     ssid = "test-wps"
68     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
69     hostapd.add_ap(apdev[0]['ifname'], params)
70     hostapd.add_ap(apdev[1]['ifname'], params)
71     hapd = hostapd.Hostapd(apdev[0]['ifname'])
72     logger.info("WPS provisioning step")
73     hapd.request("WPS_PBC")
74     dev[0].scan(freq="2412")
75     bss = dev[0].get_bss(apdev[0]['bssid'])
76     if "[WPS-PBC]" not in bss['flags']:
77         raise Exception("WPS-PBC flag missing from AP1")
78     bss = dev[0].get_bss(apdev[1]['bssid'])
79     if "[WPS-PBC]" not in bss['flags']:
80         raise Exception("WPS-PBC flag missing from AP2")
81     dev[0].dump_monitor()
82     dev[0].request("WPS_PBC")
83     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
84     if ev is None:
85         raise Exception("Association with the AP timed out")
86
87     dev[1].scan(freq="2412")
88     bss = dev[1].get_bss(apdev[0]['bssid'])
89     if "[WPS-PBC]" in bss['flags']:
90         raise Exception("WPS-PBC flag not cleared from AP1")
91     bss = dev[1].get_bss(apdev[1]['bssid'])
92     if "[WPS-PBC]" in bss['flags']:
93         raise Exception("WPS-PBC flag bit ckeared from AP2")
94
95 def test_ap_wps_init_2ap_pin(dev, apdev):
96     """Initial two-radio AP configuration with first WPS PIN Enrollee"""
97     ssid = "test-wps"
98     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
99     hostapd.add_ap(apdev[0]['ifname'], params)
100     hostapd.add_ap(apdev[1]['ifname'], params)
101     hapd = hostapd.Hostapd(apdev[0]['ifname'])
102     logger.info("WPS provisioning step")
103     pin = dev[0].wps_read_pin()
104     hapd.request("WPS_PIN any " + pin)
105     dev[0].scan(freq="2412")
106     bss = dev[0].get_bss(apdev[0]['bssid'])
107     if "[WPS-AUTH]" not in bss['flags']:
108         raise Exception("WPS-AUTH flag missing from AP1")
109     bss = dev[0].get_bss(apdev[1]['bssid'])
110     if "[WPS-AUTH]" not in bss['flags']:
111         raise Exception("WPS-AUTH flag missing from AP2")
112     dev[0].dump_monitor()
113     dev[0].request("WPS_PIN any " + pin)
114     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
115     if ev is None:
116         raise Exception("Association with the AP timed out")
117
118     dev[1].scan(freq="2412")
119     bss = dev[1].get_bss(apdev[0]['bssid'])
120     if "[WPS-AUTH]" in bss['flags']:
121         raise Exception("WPS-AUTH flag not cleared from AP1")
122     bss = dev[1].get_bss(apdev[1]['bssid'])
123     if "[WPS-AUTH]" in bss['flags']:
124         raise Exception("WPS-AUTH flag bit ckeared from AP2")
125
126 def test_ap_wps_init_through_wps_config(dev, apdev):
127     """Initial AP configuration using wps_config command"""
128     ssid = "test-wps-init-config"
129     hostapd.add_ap(apdev[0]['ifname'],
130                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
131     hapd = hostapd.Hostapd(apdev[0]['ifname'])
132     if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
133         raise Exception("WPS_CONFIG command failed")
134     ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
135     if ev is None:
136         raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
137     # It takes some time for the AP to update Beacon and Probe Response frames,
138     # so wait here before requesting the scan to be started to avoid adding
139     # extra five second wait to the test due to fetching obsolete scan results.
140     hapd.ping()
141     time.sleep(0.2)
142     dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
143                    pairwise="CCMP", group="CCMP")
144
145 def test_ap_wps_conf(dev, apdev):
146     """WPS PBC provisioning with configured AP"""
147     ssid = "test-wps-conf"
148     hostapd.add_ap(apdev[0]['ifname'],
149                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
150                      "wpa_passphrase": "12345678", "wpa": "2",
151                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
152     hapd = hostapd.Hostapd(apdev[0]['ifname'])
153     logger.info("WPS provisioning step")
154     hapd.request("WPS_PBC")
155     dev[0].dump_monitor()
156     dev[0].request("WPS_PBC")
157     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
158     if ev is None:
159         raise Exception("Association with the AP timed out")
160     status = dev[0].get_status()
161     if status['wpa_state'] != 'COMPLETED':
162         raise Exception("Not fully connected")
163     if status['bssid'] != apdev[0]['bssid']:
164         raise Exception("Unexpected BSSID")
165     if status['ssid'] != ssid:
166         raise Exception("Unexpected SSID")
167     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
168         raise Exception("Unexpected encryption configuration")
169     if status['key_mgmt'] != 'WPA2-PSK':
170         raise Exception("Unexpected key_mgmt")
171
172     sta = hapd.get_sta(dev[0].p2p_interface_addr())
173     if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
174         raise Exception("Device name not available in STA command")
175
176 def test_ap_wps_twice(dev, apdev):
177     """WPS provisioning with twice to change passphrase"""
178     ssid = "test-wps-twice"
179     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
180                "wpa_passphrase": "12345678", "wpa": "2",
181                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
182     hostapd.add_ap(apdev[0]['ifname'], params)
183     hapd = hostapd.Hostapd(apdev[0]['ifname'])
184     logger.info("WPS provisioning step")
185     hapd.request("WPS_PBC")
186     dev[0].dump_monitor()
187     dev[0].request("WPS_PBC")
188     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
189     if ev is None:
190         raise Exception("Association with the AP timed out")
191     dev[0].request("DISCONNECT")
192
193     logger.info("Restart AP with different passphrase and re-run WPS")
194     hapd_global = hostapd.HostapdGlobal()
195     hapd_global.remove(apdev[0]['ifname'])
196     params['wpa_passphrase'] = 'another passphrase'
197     hostapd.add_ap(apdev[0]['ifname'], params)
198     hapd = hostapd.Hostapd(apdev[0]['ifname'])
199     logger.info("WPS provisioning step")
200     hapd.request("WPS_PBC")
201     dev[0].dump_monitor()
202     dev[0].request("WPS_PBC")
203     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
204     if ev is None:
205         raise Exception("Association with the AP timed out")
206     networks = dev[0].list_networks()
207     if len(networks) > 1:
208         raise Exception("Unexpected duplicated network block present")
209
210 def test_ap_wps_incorrect_pin(dev, apdev):
211     """WPS PIN provisioning with incorrect PIN"""
212     ssid = "test-wps-incorrect-pin"
213     hostapd.add_ap(apdev[0]['ifname'],
214                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
215                      "wpa_passphrase": "12345678", "wpa": "2",
216                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
217     hapd = hostapd.Hostapd(apdev[0]['ifname'])
218
219     logger.info("WPS provisioning attempt 1")
220     hapd.request("WPS_PIN any 12345670")
221     dev[0].dump_monitor()
222     dev[0].request("WPS_PIN any 55554444")
223     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
224     if ev is None:
225         raise Exception("WPS operation timed out")
226     if "config_error=18" not in ev:
227         raise Exception("Incorrect config_error reported")
228     if "msg=8" not in ev:
229         raise Exception("PIN error detected on incorrect message")
230     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
231     if ev is None:
232         raise Exception("Timeout on disconnection event")
233     dev[0].request("WPS_CANCEL")
234     # if a scan was in progress, wait for it to complete before trying WPS again
235     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
236
237     status = hapd.request("WPS_GET_STATUS")
238     if "Last WPS result: Failed" not in status:
239         raise Exception("WPS failure result not shown correctly")
240
241     logger.info("WPS provisioning attempt 2")
242     hapd.request("WPS_PIN any 12345670")
243     dev[0].dump_monitor()
244     dev[0].request("WPS_PIN any 12344444")
245     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
246     if ev is None:
247         raise Exception("WPS operation timed out")
248     if "config_error=18" not in ev:
249         raise Exception("Incorrect config_error reported")
250     if "msg=10" not in ev:
251         raise Exception("PIN error detected on incorrect message")
252     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
253     if ev is None:
254         raise Exception("Timeout on disconnection event")
255
256 def test_ap_wps_conf_pin(dev, apdev):
257     """WPS PIN provisioning with configured AP"""
258     ssid = "test-wps-conf-pin"
259     hostapd.add_ap(apdev[0]['ifname'],
260                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
261                      "wpa_passphrase": "12345678", "wpa": "2",
262                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
263     hapd = hostapd.Hostapd(apdev[0]['ifname'])
264     logger.info("WPS provisioning step")
265     pin = dev[0].wps_read_pin()
266     hapd.request("WPS_PIN any " + pin)
267     dev[0].dump_monitor()
268     dev[0].request("WPS_PIN any " + pin)
269     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
270     if ev is None:
271         raise Exception("Association with the AP timed out")
272     status = dev[0].get_status()
273     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
274         raise Exception("Not fully connected")
275     if status['ssid'] != ssid:
276         raise Exception("Unexpected SSID")
277     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
278         raise Exception("Unexpected encryption configuration")
279     if status['key_mgmt'] != 'WPA2-PSK':
280         raise Exception("Unexpected key_mgmt")
281
282     dev[1].scan(freq="2412")
283     bss = dev[1].get_bss(apdev[0]['bssid'])
284     if "[WPS-AUTH]" in bss['flags']:
285         raise Exception("WPS-AUTH flag not cleared")
286     logger.info("Try to connect from another station using the same PIN")
287     dev[1].request("WPS_PIN any " + pin)
288     ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
289     if ev is None:
290         raise Exception("Operation timed out")
291     if "WPS-M2D" not in ev:
292         raise Exception("Unexpected WPS operation started")
293
294 def test_ap_wps_conf_pin_2sta(dev, apdev):
295     """Two stations trying to use WPS PIN at the same time"""
296     ssid = "test-wps-conf-pin2"
297     hostapd.add_ap(apdev[0]['ifname'],
298                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
299                      "wpa_passphrase": "12345678", "wpa": "2",
300                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
301     hapd = hostapd.Hostapd(apdev[0]['ifname'])
302     logger.info("WPS provisioning step")
303     pin = "12345670"
304     pin2 = "55554444"
305     hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
306     hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
307     dev[0].dump_monitor()
308     dev[1].dump_monitor()
309     dev[0].request("WPS_PIN any " + pin)
310     dev[1].request("WPS_PIN any " + pin)
311     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
312     if ev is None:
313         raise Exception("Association with the AP timed out")
314     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
315     if ev is None:
316         raise Exception("Association with the AP timed out")
317
318 def test_ap_wps_conf_pin_timeout(dev, apdev):
319     """WPS PIN provisioning with configured AP timing out PIN"""
320     ssid = "test-wps-conf-pin"
321     hostapd.add_ap(apdev[0]['ifname'],
322                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
323                      "wpa_passphrase": "12345678", "wpa": "2",
324                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
325     hapd = hostapd.Hostapd(apdev[0]['ifname'])
326     addr = dev[0].p2p_interface_addr()
327     pin = dev[0].wps_read_pin()
328     if "FAIL" not in hapd.request("WPS_PIN "):
329         raise Exception("Unexpected success on invalid WPS_PIN")
330     hapd.request("WPS_PIN any " + pin + " 1")
331     time.sleep(1.1)
332     dev[0].request("WPS_PIN any " + pin)
333     ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=20)
334     if ev is None:
335         raise Exception("WPS-PIN-NEEDED event timed out")
336     ev = dev[0].wait_event(["WPS-M2D"])
337     if ev is None:
338         raise Exception("M2D not reported")
339     dev[0].request("WPS_CANCEL")
340
341     hapd.request("WPS_PIN any " + pin + " 20 " + addr)
342     dev[0].request("WPS_PIN any " + pin)
343     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
344     if ev is None:
345         raise Exception("Association with the AP timed out")
346
347 def test_ap_wps_reg_connect(dev, apdev):
348     """WPS registrar using AP PIN to connect"""
349     ssid = "test-wps-reg-ap-pin"
350     appin = "12345670"
351     hostapd.add_ap(apdev[0]['ifname'],
352                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
353                      "wpa_passphrase": "12345678", "wpa": "2",
354                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
355                      "ap_pin": appin})
356     logger.info("WPS provisioning step")
357     dev[0].dump_monitor()
358     dev[0].wps_reg(apdev[0]['bssid'], appin)
359     status = dev[0].get_status()
360     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
361         raise Exception("Not fully connected")
362     if status['ssid'] != ssid:
363         raise Exception("Unexpected SSID")
364     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
365         raise Exception("Unexpected encryption configuration")
366     if status['key_mgmt'] != 'WPA2-PSK':
367         raise Exception("Unexpected key_mgmt")
368
369 def check_wps_reg_failure(dev, ap, appin):
370     dev.request("WPS_REG " + ap['bssid'] + " " + appin)
371     ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
372     if ev is None:
373         raise Exception("WPS operation timed out")
374     if "WPS-SUCCESS" in ev:
375         raise Exception("WPS operation succeeded unexpectedly")
376     if "config_error=15" not in ev:
377         raise Exception("WPS setup locked state was not reported correctly")
378
379 def test_ap_wps_random_ap_pin(dev, apdev):
380     """WPS registrar using random AP PIN"""
381     ssid = "test-wps-reg-random-ap-pin"
382     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
383     hostapd.add_ap(apdev[0]['ifname'],
384                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
385                      "wpa_passphrase": "12345678", "wpa": "2",
386                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
387                      "device_name": "Wireless AP", "manufacturer": "Company",
388                      "model_name": "WAP", "model_number": "123",
389                      "serial_number": "12345", "device_type": "6-0050F204-1",
390                      "os_version": "01020300",
391                      "config_methods": "label push_button",
392                      "uuid": ap_uuid, "upnp_iface": "lo" })
393     hapd = hostapd.Hostapd(apdev[0]['ifname'])
394     appin = hapd.request("WPS_AP_PIN random")
395     if "FAIL" in appin:
396         raise Exception("Could not generate random AP PIN")
397     if appin not in hapd.request("WPS_AP_PIN get"):
398         raise Exception("Could not fetch current AP PIN")
399     logger.info("WPS provisioning step")
400     dev[0].wps_reg(apdev[0]['bssid'], appin)
401
402     hapd.request("WPS_AP_PIN disable")
403     logger.info("WPS provisioning step with AP PIN disabled")
404     check_wps_reg_failure(dev[1], apdev[0], appin)
405
406     logger.info("WPS provisioning step with AP PIN reset")
407     appin = "12345670"
408     hapd.request("WPS_AP_PIN set " + appin)
409     dev[1].wps_reg(apdev[0]['bssid'], appin)
410     dev[0].request("REMOVE_NETWORK all")
411     dev[1].request("REMOVE_NETWORK all")
412     dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
413     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
414
415     logger.info("WPS provisioning step after AP PIN timeout")
416     hapd.request("WPS_AP_PIN disable")
417     appin = hapd.request("WPS_AP_PIN random 1")
418     time.sleep(1.1)
419     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
420         raise Exception("AP PIN unexpectedly still enabled")
421     check_wps_reg_failure(dev[0], apdev[0], appin)
422
423     logger.info("WPS provisioning step after AP PIN timeout(2)")
424     hapd.request("WPS_AP_PIN disable")
425     appin = "12345670"
426     hapd.request("WPS_AP_PIN set " + appin + " 1")
427     time.sleep(1.1)
428     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
429         raise Exception("AP PIN unexpectedly still enabled")
430     check_wps_reg_failure(dev[1], apdev[0], appin)
431
432 def test_ap_wps_reg_config(dev, apdev):
433     """WPS registrar configuring an AP using AP PIN"""
434     ssid = "test-wps-init-ap-pin"
435     appin = "12345670"
436     hostapd.add_ap(apdev[0]['ifname'],
437                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
438                      "ap_pin": appin})
439     logger.info("WPS configuration step")
440     dev[0].dump_monitor()
441     new_ssid = "wps-new-ssid"
442     new_passphrase = "1234567890"
443     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
444                    new_passphrase)
445     status = dev[0].get_status()
446     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
447         raise Exception("Not fully connected")
448     if status['ssid'] != new_ssid:
449         raise Exception("Unexpected SSID")
450     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
451         raise Exception("Unexpected encryption configuration")
452     if status['key_mgmt'] != 'WPA2-PSK':
453         raise Exception("Unexpected key_mgmt")
454
455     logger.info("Re-configure back to open")
456     dev[0].request("REMOVE_NETWORK all")
457     dev[0].request("BSS_FLUSH 0")
458     dev[0].request("SCAN freq=2412 only_new=1")
459     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
460     if ev is None:
461         raise Exception("Scan timed out")
462     dev[0].dump_monitor()
463     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
464     status = dev[0].get_status()
465     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
466         raise Exception("Not fully connected")
467     if status['ssid'] != "wps-open":
468         raise Exception("Unexpected SSID")
469     if status['key_mgmt'] != 'NONE':
470         raise Exception("Unexpected key_mgmt")
471
472 def test_ap_wps_reg_config_ext_processing(dev, apdev):
473     """WPS registrar configuring an AP with external config processing"""
474     ssid = "test-wps-init-ap-pin"
475     appin = "12345670"
476     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
477                "wps_cred_processing": "1", "ap_pin": appin}
478     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
479     new_ssid = "wps-new-ssid"
480     new_passphrase = "1234567890"
481     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
482                    new_passphrase, no_wait=True)
483     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
484     if ev is None:
485         raise Exception("WPS registrar operation timed out")
486     ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=15)
487     if ev is None:
488         raise Exception("WPS configuration timed out")
489     if "1026" not in ev:
490         raise Exception("AP Settings missing from event")
491     hapd.request("SET wps_cred_processing 0")
492     if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
493         raise Exception("WPS_CONFIG command failed")
494     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
495     if ev is None:
496         raise Exception("Association with the AP timed out")
497
498 def test_ap_wps_reg_config_tkip(dev, apdev):
499     """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
500     ssid = "test-wps-init-ap"
501     appin = "12345670"
502     hostapd.add_ap(apdev[0]['ifname'],
503                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
504                      "ap_pin": appin})
505     logger.info("WPS configuration step")
506     dev[0].request("SET wps_version_number 0x10")
507     dev[0].dump_monitor()
508     new_ssid = "wps-new-ssid-with-tkip"
509     new_passphrase = "1234567890"
510     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
511                    new_passphrase)
512     logger.info("Re-connect to verify WPA2 mixed mode")
513     dev[0].request("DISCONNECT")
514     id = 0
515     dev[0].set_network(id, "pairwise", "CCMP")
516     dev[0].set_network(id, "proto", "RSN")
517     dev[0].connect_network(id)
518     status = dev[0].get_status()
519     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
520         raise Exception("Not fully connected")
521     if status['ssid'] != new_ssid:
522         raise Exception("Unexpected SSID")
523     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
524         raise Exception("Unexpected encryption configuration")
525     if status['key_mgmt'] != 'WPA2-PSK':
526         raise Exception("Unexpected key_mgmt")
527
528 def test_ap_wps_setup_locked(dev, apdev):
529     """WPS registrar locking up AP setup on AP PIN failures"""
530     ssid = "test-wps-incorrect-ap-pin"
531     appin = "12345670"
532     hostapd.add_ap(apdev[0]['ifname'],
533                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
534                      "wpa_passphrase": "12345678", "wpa": "2",
535                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
536                      "ap_pin": appin})
537     new_ssid = "wps-new-ssid-test"
538     new_passphrase = "1234567890"
539
540     ap_setup_locked=False
541     for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
542         dev[0].dump_monitor()
543         logger.info("Try incorrect AP PIN - attempt " + pin)
544         dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
545                        "CCMP", new_passphrase, no_wait=True)
546         ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
547         if ev is None:
548             raise Exception("Timeout on receiving WPS operation failure event")
549         if "CTRL-EVENT-CONNECTED" in ev:
550             raise Exception("Unexpected connection")
551         if "config_error=15" in ev:
552             logger.info("AP Setup Locked")
553             ap_setup_locked=True
554         elif "config_error=18" not in ev:
555             raise Exception("config_error=18 not reported")
556         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
557         if ev is None:
558             raise Exception("Timeout on disconnection event")
559         time.sleep(0.1)
560     if not ap_setup_locked:
561         raise Exception("AP setup was not locked")
562
563     hapd = hostapd.Hostapd(apdev[0]['ifname'])
564     status = hapd.request("WPS_GET_STATUS")
565     if "Last WPS result: Failed" not in status:
566         raise Exception("WPS failure result not shown correctly")
567     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
568         raise Exception("Peer address not shown correctly")
569
570     time.sleep(0.5)
571     dev[0].dump_monitor()
572     logger.info("WPS provisioning step")
573     pin = dev[0].wps_read_pin()
574     hapd = hostapd.Hostapd(apdev[0]['ifname'])
575     hapd.request("WPS_PIN any " + pin)
576     dev[0].request("WPS_PIN any " + pin)
577     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
578     if ev is None:
579         raise Exception("WPS success was not reported")
580     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
581     if ev is None:
582         raise Exception("Association with the AP timed out")
583
584     appin = hapd.request("WPS_AP_PIN random")
585     if "FAIL" in appin:
586         raise Exception("Could not generate random AP PIN")
587     ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=10)
588     if ev is None:
589         raise Exception("Failed to unlock AP PIN")
590
591 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
592     """WPS PBC session overlap with two active APs"""
593     hostapd.add_ap(apdev[0]['ifname'],
594                    { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
595                      "wpa_passphrase": "12345678", "wpa": "2",
596                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
597                      "wps_independent": "1"})
598     hostapd.add_ap(apdev[1]['ifname'],
599                    { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
600                      "wpa_passphrase": "123456789", "wpa": "2",
601                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
602                      "wps_independent": "1"})
603     hapd = hostapd.Hostapd(apdev[0]['ifname'])
604     hapd.request("WPS_PBC")
605     hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
606     hapd2.request("WPS_PBC")
607     logger.info("WPS provisioning step")
608     dev[0].dump_monitor()
609     dev[0].request("WPS_PBC")
610     ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
611     if ev is None:
612         raise Exception("PBC session overlap not detected")
613
614 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
615     """WPS PBC session overlap with two active STAs"""
616     ssid = "test-wps-pbc-overlap"
617     hostapd.add_ap(apdev[0]['ifname'],
618                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
619                      "wpa_passphrase": "12345678", "wpa": "2",
620                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
621     hapd = hostapd.Hostapd(apdev[0]['ifname'])
622     logger.info("WPS provisioning step")
623     hapd.request("WPS_PBC")
624     dev[0].dump_monitor()
625     dev[1].dump_monitor()
626     dev[0].request("WPS_PBC")
627     dev[1].request("WPS_PBC")
628     ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
629     if ev is None:
630         raise Exception("PBC session overlap not detected (dev0)")
631     if "config_error=12" not in ev:
632         raise Exception("PBC session overlap not correctly reported (dev0)")
633     ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
634     if ev is None:
635         raise Exception("PBC session overlap not detected (dev1)")
636     if "config_error=12" not in ev:
637         raise Exception("PBC session overlap not correctly reported (dev1)")
638     hapd.request("WPS_CANCEL")
639     ret = hapd.request("WPS_PBC")
640     if "FAIL" not in ret:
641         raise Exception("PBC mode allowed to be started while PBC overlap still active")
642
643 def test_ap_wps_cancel(dev, apdev):
644     """WPS AP cancelling enabled config method"""
645     ssid = "test-wps-ap-cancel"
646     hostapd.add_ap(apdev[0]['ifname'],
647                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
648                      "wpa_passphrase": "12345678", "wpa": "2",
649                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
650     bssid = apdev[0]['bssid']
651     hapd = hostapd.Hostapd(apdev[0]['ifname'])
652
653     logger.info("Verify PBC enable/cancel")
654     hapd.request("WPS_PBC")
655     dev[0].scan(freq="2412")
656     bss = dev[0].get_bss(apdev[0]['bssid'])
657     if "[WPS-PBC]" not in bss['flags']:
658         raise Exception("WPS-PBC flag missing")
659     if "FAIL" in hapd.request("WPS_CANCEL"):
660         raise Exception("WPS_CANCEL failed")
661     dev[0].scan(freq="2412")
662     bss = dev[0].get_bss(apdev[0]['bssid'])
663     if "[WPS-PBC]" in bss['flags']:
664         raise Exception("WPS-PBC flag not cleared")
665
666     logger.info("Verify PIN enable/cancel")
667     hapd.request("WPS_PIN any 12345670")
668     dev[0].scan(freq="2412")
669     bss = dev[0].get_bss(apdev[0]['bssid'])
670     if "[WPS-AUTH]" not in bss['flags']:
671         raise Exception("WPS-AUTH flag missing")
672     if "FAIL" in hapd.request("WPS_CANCEL"):
673         raise Exception("WPS_CANCEL failed")
674     dev[0].scan(freq="2412")
675     bss = dev[0].get_bss(apdev[0]['bssid'])
676     if "[WPS-AUTH]" in bss['flags']:
677         raise Exception("WPS-AUTH flag not cleared")
678
679 def test_ap_wps_er_add_enrollee(dev, apdev):
680     """WPS ER configuring AP and adding a new enrollee using PIN"""
681     ssid = "wps-er-add-enrollee"
682     ap_pin = "12345670"
683     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
684     hostapd.add_ap(apdev[0]['ifname'],
685                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
686                      "device_name": "Wireless AP", "manufacturer": "Company",
687                      "model_name": "WAP", "model_number": "123",
688                      "serial_number": "12345", "device_type": "6-0050F204-1",
689                      "os_version": "01020300",
690                      "config_methods": "label push_button",
691                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
692     logger.info("WPS configuration step")
693     new_passphrase = "1234567890"
694     dev[0].dump_monitor()
695     dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
696                    new_passphrase)
697     status = dev[0].get_status()
698     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
699         raise Exception("Not fully connected")
700     if status['ssid'] != ssid:
701         raise Exception("Unexpected SSID")
702     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
703         raise Exception("Unexpected encryption configuration")
704     if status['key_mgmt'] != 'WPA2-PSK':
705         raise Exception("Unexpected key_mgmt")
706
707     logger.info("Start ER")
708     dev[0].request("WPS_ER_START ifname=lo")
709     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
710     if ev is None:
711         raise Exception("AP discovery timed out")
712     if ap_uuid not in ev:
713         raise Exception("Expected AP UUID not found")
714
715     logger.info("Learn AP configuration through UPnP")
716     dev[0].dump_monitor()
717     dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
718     ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
719     if ev is None:
720         raise Exception("AP learn timed out")
721     if ap_uuid not in ev:
722         raise Exception("Expected AP UUID not in settings")
723     if "ssid=" + ssid not in ev:
724         raise Exception("Expected SSID not in settings")
725     if "key=" + new_passphrase not in ev:
726         raise Exception("Expected passphrase not in settings")
727
728     logger.info("Add Enrollee using ER")
729     pin = dev[1].wps_read_pin()
730     dev[0].dump_monitor()
731     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
732     dev[1].dump_monitor()
733     dev[1].request("WPS_PIN any " + pin)
734     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
735     if ev is None:
736         raise Exception("Enrollee did not report success")
737     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
738     if ev is None:
739         raise Exception("Association with the AP timed out")
740     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
741     if ev is None:
742         raise Exception("WPS ER did not report success")
743     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
744
745     logger.info("Add a specific Enrollee using ER")
746     pin = dev[2].wps_read_pin()
747     addr2 = dev[2].p2p_interface_addr()
748     dev[0].dump_monitor()
749     dev[2].dump_monitor()
750     dev[2].request("WPS_PIN any " + pin)
751     ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
752     if ev is None:
753         raise Exception("Enrollee not seen")
754     if addr2 not in ev:
755         raise Exception("Unexpected Enrollee MAC address")
756     dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
757     ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
758     if ev is None:
759         raise Exception("Association with the AP timed out")
760     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
761     if ev is None:
762         raise Exception("WPS ER did not report success")
763
764     logger.info("Verify registrar selection behavior")
765     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
766     dev[1].request("DISCONNECT")
767     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
768     dev[1].scan(freq="2412")
769     bss = dev[1].get_bss(apdev[0]['bssid'])
770     if "[WPS-AUTH]" not in bss['flags']:
771         raise Exception("WPS-AUTH flag missing")
772
773     logger.info("Stop ER")
774     dev[0].dump_monitor()
775     dev[0].request("WPS_ER_STOP")
776     ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
777     if ev is None:
778         raise Exception("WPS ER unsubscription timed out")
779     # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
780     # a bit before verifying that the scan results have change.
781     time.sleep(0.2)
782
783     dev[1].scan(freq="2412")
784     bss = dev[1].get_bss(apdev[0]['bssid'])
785     if "[WPS-AUTH]" in bss['flags']:
786         raise Exception("WPS-AUTH flag not removed")
787
788 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
789     """WPS ER connected to AP and adding a new enrollee using PBC"""
790     ssid = "wps-er-add-enrollee-pbc"
791     ap_pin = "12345670"
792     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
793     hostapd.add_ap(apdev[0]['ifname'],
794                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
795                      "wpa_passphrase": "12345678", "wpa": "2",
796                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
797                      "device_name": "Wireless AP", "manufacturer": "Company",
798                      "model_name": "WAP", "model_number": "123",
799                      "serial_number": "12345", "device_type": "6-0050F204-1",
800                      "os_version": "01020300",
801                      "config_methods": "label push_button",
802                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
803     logger.info("Learn AP configuration")
804     dev[0].dump_monitor()
805     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
806     status = dev[0].get_status()
807     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
808         raise Exception("Not fully connected")
809
810     logger.info("Start ER")
811     dev[0].request("WPS_ER_START ifname=lo")
812     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
813     if ev is None:
814         raise Exception("AP discovery timed out")
815     if ap_uuid not in ev:
816         raise Exception("Expected AP UUID not found")
817
818     logger.info("Use learned network configuration on ER")
819     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
820
821     logger.info("Add Enrollee using ER and PBC")
822     dev[0].dump_monitor()
823     enrollee = dev[1].p2p_interface_addr()
824     dev[1].dump_monitor()
825     dev[1].request("WPS_PBC")
826
827     for i in range(0, 2):
828         ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
829         if ev is None:
830             raise Exception("Enrollee discovery timed out")
831         if enrollee in ev:
832             break
833         if i == 1:
834             raise Exception("Expected Enrollee not found")
835     dev[0].request("WPS_ER_PBC " + enrollee)
836
837     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
838     if ev is None:
839         raise Exception("Enrollee did not report success")
840     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
841     if ev is None:
842         raise Exception("Association with the AP timed out")
843     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
844     if ev is None:
845         raise Exception("WPS ER did not report success")
846     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
847
848     # verify BSSID selection of the AP instead of UUID
849     if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
850         raise Exception("Could not select AP based on BSSID")
851
852 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
853     """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
854     ssid = "wps-er-add-enrollee-pbc"
855     ap_pin = "12345670"
856     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
857     hostapd.add_ap(apdev[0]['ifname'],
858                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
859                      "wpa_passphrase": "12345678", "wpa": "2",
860                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
861                      "device_name": "Wireless AP", "manufacturer": "Company",
862                      "model_name": "WAP", "model_number": "123",
863                      "serial_number": "12345", "device_type": "6-0050F204-1",
864                      "os_version": "01020300",
865                      "config_methods": "label push_button",
866                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
867     logger.info("Learn AP configuration")
868     dev[0].request("SET wps_version_number 0x10")
869     dev[0].dump_monitor()
870     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
871     status = dev[0].get_status()
872     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
873         raise Exception("Not fully connected")
874
875     logger.info("Start ER")
876     dev[0].request("WPS_ER_START ifname=lo")
877     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
878     if ev is None:
879         raise Exception("AP discovery timed out")
880     if ap_uuid not in ev:
881         raise Exception("Expected AP UUID not found")
882
883     logger.info("Use learned network configuration on ER")
884     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
885
886     logger.info("Add Enrollee using ER and PIN")
887     enrollee = dev[1].p2p_interface_addr()
888     pin = dev[1].wps_read_pin()
889     dev[0].dump_monitor()
890     dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
891     dev[1].dump_monitor()
892     dev[1].request("WPS_PIN any " + pin)
893     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
894     if ev is None:
895         raise Exception("Association with the AP timed out")
896     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
897     if ev is None:
898         raise Exception("WPS ER did not report success")
899
900 def test_ap_wps_er_config_ap(dev, apdev):
901     """WPS ER configuring AP over UPnP"""
902     ssid = "wps-er-ap-config"
903     ap_pin = "12345670"
904     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
905     hostapd.add_ap(apdev[0]['ifname'],
906                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
907                      "wpa_passphrase": "12345678", "wpa": "2",
908                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
909                      "device_name": "Wireless AP", "manufacturer": "Company",
910                      "model_name": "WAP", "model_number": "123",
911                      "serial_number": "12345", "device_type": "6-0050F204-1",
912                      "os_version": "01020300",
913                      "config_methods": "label push_button",
914                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
915
916     logger.info("Connect ER to the AP")
917     dev[0].connect(ssid, psk="12345678", scan_freq="2412")
918
919     logger.info("WPS configuration step")
920     dev[0].request("WPS_ER_START ifname=lo")
921     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
922     if ev is None:
923         raise Exception("AP discovery timed out")
924     if ap_uuid not in ev:
925         raise Exception("Expected AP UUID not found")
926     new_passphrase = "1234567890"
927     dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
928                    ssid.encode("hex") + " WPA2PSK CCMP " +
929                    new_passphrase.encode("hex"))
930     ev = dev[0].wait_event(["WPS-SUCCESS"])
931     if ev is None:
932         raise Exception("WPS ER configuration operation timed out")
933     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
934     dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
935
936 def test_ap_wps_fragmentation(dev, apdev):
937     """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
938     ssid = "test-wps-fragmentation"
939     hostapd.add_ap(apdev[0]['ifname'],
940                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
941                      "wpa_passphrase": "12345678", "wpa": "3",
942                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
943                      "wpa_pairwise": "TKIP",
944                      "fragment_size": "50" })
945     hapd = hostapd.Hostapd(apdev[0]['ifname'])
946     logger.info("WPS provisioning step")
947     hapd.request("WPS_PBC")
948     dev[0].dump_monitor()
949     dev[0].request("SET wps_fragment_size 50")
950     dev[0].request("WPS_PBC")
951     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
952     if ev is None:
953         raise Exception("Association with the AP timed out")
954     status = dev[0].get_status()
955     if status['wpa_state'] != 'COMPLETED':
956         raise Exception("Not fully connected")
957     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
958         raise Exception("Unexpected encryption configuration")
959     if status['key_mgmt'] != 'WPA2-PSK':
960         raise Exception("Unexpected key_mgmt")
961
962 def test_ap_wps_new_version_sta(dev, apdev):
963     """WPS compatibility with new version number on the station"""
964     ssid = "test-wps-ver"
965     hostapd.add_ap(apdev[0]['ifname'],
966                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
967                      "wpa_passphrase": "12345678", "wpa": "2",
968                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
969     hapd = hostapd.Hostapd(apdev[0]['ifname'])
970     logger.info("WPS provisioning step")
971     hapd.request("WPS_PBC")
972     dev[0].dump_monitor()
973     dev[0].request("SET wps_version_number 0x43")
974     dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
975     dev[0].request("WPS_PBC")
976     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
977     if ev is None:
978         raise Exception("Association with the AP timed out")
979
980 def test_ap_wps_new_version_ap(dev, apdev):
981     """WPS compatibility with new version number on the AP"""
982     ssid = "test-wps-ver"
983     hostapd.add_ap(apdev[0]['ifname'],
984                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
985                      "wpa_passphrase": "12345678", "wpa": "2",
986                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
987     hapd = hostapd.Hostapd(apdev[0]['ifname'])
988     logger.info("WPS provisioning step")
989     if "FAIL" in hapd.request("SET wps_version_number 0x43"):
990         raise Exception("Failed to enable test functionality")
991     hapd.request("WPS_PBC")
992     dev[0].dump_monitor()
993     dev[0].request("WPS_PBC")
994     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
995     hapd.request("SET wps_version_number 0x20")
996     if ev is None:
997         raise Exception("Association with the AP timed out")
998
999 def test_ap_wps_check_pin(dev, apdev):
1000     """Verify PIN checking through control interface"""
1001     hostapd.add_ap(apdev[0]['ifname'],
1002                    { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1003                      "wpa_passphrase": "12345678", "wpa": "2",
1004                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1005     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1006     for t in [ ("12345670", "12345670"),
1007                ("12345678", "FAIL-CHECKSUM"),
1008                ("1234-5670", "12345670"),
1009                ("1234 5670", "12345670"),
1010                ("1-2.3:4 5670", "12345670") ]:
1011         res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1012         res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1013         if res != res2:
1014             raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1015         if res != t[1]:
1016             raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
1017
1018     if "FAIL" not in hapd.request("WPS_CHECK_PIN 12345"):
1019         raise Exception("Unexpected WPS_CHECK_PIN success")
1020     if "FAIL" not in hapd.request("WPS_CHECK_PIN 123456789"):
1021         raise Exception("Unexpected WPS_CHECK_PIN success")
1022
1023 def test_ap_wps_wep_config(dev, apdev):
1024     """WPS 2.0 AP rejecting WEP configuration"""
1025     ssid = "test-wps-config"
1026     appin = "12345670"
1027     hostapd.add_ap(apdev[0]['ifname'],
1028                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1029                      "ap_pin": appin})
1030     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1031     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
1032                    "hello", no_wait=True)
1033     ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
1034     if ev is None:
1035         raise Exception("WPS-FAIL timed out")
1036     if "reason=2" not in ev:
1037         raise Exception("Unexpected reason code in WPS-FAIL")
1038     status = hapd.request("WPS_GET_STATUS")
1039     if "Last WPS result: Failed" not in status:
1040         raise Exception("WPS failure result not shown correctly")
1041     if "Failure Reason: WEP Prohibited" not in status:
1042         raise Exception("Failure reason not reported correctly")
1043     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
1044         raise Exception("Peer address not shown correctly")
1045
1046 def test_ap_wps_wep_enroll(dev, apdev):
1047     """WPS 2.0 STA rejecting WEP configuration"""
1048     ssid = "test-wps-wep"
1049     hostapd.add_ap(apdev[0]['ifname'],
1050                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1051                      "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1052     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1053     hapd.request("WPS_PBC")
1054     dev[0].request("WPS_PBC")
1055     ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
1056     if ev is None:
1057         raise Exception("WPS-FAIL event timed out")
1058     if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
1059         raise Exception("Unexpected WPS-FAIL event: " + ev)
1060
1061 def test_ap_wps_ie_fragmentation(dev, apdev):
1062     """WPS AP using fragmented WPS IE"""
1063     ssid = "test-wps-ie-fragmentation"
1064     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1065                "wpa_passphrase": "12345678", "wpa": "2",
1066                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1067                "device_name": "1234567890abcdef1234567890abcdef",
1068                "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1069                "model_name": "1234567890abcdef1234567890abcdef",
1070                "model_number": "1234567890abcdef1234567890abcdef",
1071                "serial_number": "1234567890abcdef1234567890abcdef" }
1072     hostapd.add_ap(apdev[0]['ifname'], params)
1073     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1074     hapd.request("WPS_PBC")
1075     dev[0].request("WPS_PBC")
1076     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1077     if ev is None:
1078         raise Exception("Association with the AP timed out")
1079     bss = dev[0].get_bss(apdev[0]['bssid'])
1080     if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1081         logger.info(bss)
1082         raise Exception("Device Name not received correctly")
1083     if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1084         raise Exception("Unexpected number of WPS IEs")
1085
1086 def get_psk(pskfile):
1087     psks = {}
1088     with open(pskfile, "r") as f:
1089         lines = f.read().splitlines()
1090         for l in lines:
1091             if l == "# WPA PSKs":
1092                 continue
1093             (addr,psk) = l.split(' ')
1094             psks[addr] = psk
1095     return psks
1096
1097 def test_ap_wps_per_station_psk(dev, apdev):
1098     """WPS PBC provisioning with per-station PSK"""
1099     addr0 = dev[0].p2p_dev_addr()
1100     addr1 = dev[1].p2p_dev_addr()
1101     addr2 = dev[2].p2p_dev_addr()
1102     ssid = "wps"
1103     appin = "12345670"
1104     pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1105     try:
1106         os.remove(pskfile)
1107     except:
1108         pass
1109
1110     try:
1111         with open(pskfile, "w") as f:
1112             f.write("# WPA PSKs\n")
1113
1114         params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1115                    "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1116                    "rsn_pairwise": "CCMP", "ap_pin": appin,
1117                    "wpa_psk_file": pskfile }
1118         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1119
1120         logger.info("First enrollee")
1121         hapd.request("WPS_PBC")
1122         dev[0].request("WPS_PBC")
1123         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1124         if ev is None:
1125             raise Exception("Association with the AP timed out (1)")
1126
1127         logger.info("Second enrollee")
1128         hapd.request("WPS_PBC")
1129         dev[1].request("WPS_PBC")
1130         ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1131         if ev is None:
1132             raise Exception("Association with the AP timed out (2)")
1133
1134         logger.info("External registrar")
1135         dev[2].wps_reg(apdev[0]['bssid'], appin)
1136
1137         logger.info("Verifying PSK results")
1138         psks = get_psk(pskfile)
1139         if addr0 not in psks:
1140             raise Exception("No PSK recorded for sta0")
1141         if addr1 not in psks:
1142             raise Exception("No PSK recorded for sta1")
1143         if addr2 not in psks:
1144             raise Exception("No PSK recorded for sta2")
1145         if psks[addr0] == psks[addr1]:
1146             raise Exception("Same PSK recorded for sta0 and sta1")
1147         if psks[addr0] == psks[addr2]:
1148             raise Exception("Same PSK recorded for sta0 and sta2")
1149         if psks[addr1] == psks[addr2]:
1150             raise Exception("Same PSK recorded for sta1 and sta2")
1151
1152         dev[0].request("REMOVE_NETWORK all")
1153         logger.info("Second external registrar")
1154         dev[0].wps_reg(apdev[0]['bssid'], appin)
1155         psks2 = get_psk(pskfile)
1156         if addr0 not in psks2:
1157             raise Exception("No PSK recorded for sta0(reg)")
1158         if psks[addr0] == psks2[addr0]:
1159             raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1160     finally:
1161         os.remove(pskfile)
1162
1163 def test_ap_wps_per_station_psk_failure(dev, apdev):
1164     """WPS PBC provisioning with per-station PSK (file not writable)"""
1165     addr0 = dev[0].p2p_dev_addr()
1166     addr1 = dev[1].p2p_dev_addr()
1167     addr2 = dev[2].p2p_dev_addr()
1168     ssid = "wps"
1169     appin = "12345670"
1170     pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1171     try:
1172         os.remove(pskfile)
1173     except:
1174         pass
1175
1176     try:
1177         with open(pskfile, "w") as f:
1178             f.write("# WPA PSKs\n")
1179
1180         params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1181                    "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1182                    "rsn_pairwise": "CCMP", "ap_pin": appin,
1183                    "wpa_psk_file": pskfile }
1184         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1185         if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1186             raise Exception("Failed to set wpa_psk_file")
1187
1188         logger.info("First enrollee")
1189         hapd.request("WPS_PBC")
1190         dev[0].request("WPS_PBC")
1191         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1192         if ev is None:
1193             raise Exception("Association with the AP timed out (1)")
1194
1195         logger.info("Second enrollee")
1196         hapd.request("WPS_PBC")
1197         dev[1].request("WPS_PBC")
1198         ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1199         if ev is None:
1200             raise Exception("Association with the AP timed out (2)")
1201
1202         logger.info("External registrar")
1203         dev[2].wps_reg(apdev[0]['bssid'], appin)
1204
1205         logger.info("Verifying PSK results")
1206         psks = get_psk(pskfile)
1207         if len(psks) > 0:
1208             raise Exception("PSK recorded unexpectedly")
1209     finally:
1210         os.remove(pskfile)
1211
1212 def test_ap_wps_pin_request_file(dev, apdev):
1213     """WPS PIN provisioning with configured AP"""
1214     ssid = "wps"
1215     pinfile = "/tmp/ap_wps_pin_request_file.log"
1216     if os.path.exists(pinfile):
1217         subprocess.call(['sudo', 'rm', pinfile])
1218     hostapd.add_ap(apdev[0]['ifname'],
1219                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1220                      "wps_pin_requests": pinfile,
1221                      "wpa_passphrase": "12345678", "wpa": "2",
1222                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1223     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1224     uuid = dev[0].get_status_field("uuid")
1225     pin = dev[0].wps_read_pin()
1226     try:
1227         dev[0].request("WPS_PIN any " + pin)
1228         ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1229         if ev is None:
1230             raise Exception("PIN needed event not shown")
1231         if uuid not in ev:
1232             raise Exception("UUID mismatch")
1233         dev[0].request("WPS_CANCEL")
1234         success = False
1235         with open(pinfile, "r") as f:
1236             lines = f.readlines()
1237             for l in lines:
1238                 if uuid in l:
1239                     success = True
1240                     break
1241         if not success:
1242             raise Exception("PIN request entry not in the log file")
1243     finally:
1244         subprocess.call(['sudo', 'rm', pinfile])
1245
1246 def test_ap_wps_auto_setup_with_config_file(dev, apdev):
1247     """WPS auto-setup with configuration file"""
1248     conffile = "/tmp/ap_wps_auto_setup_with_config_file.conf"
1249     ifname = apdev[0]['ifname']
1250     try:
1251         with open(conffile, "w") as f:
1252             f.write("driver=nl80211\n")
1253             f.write("hw_mode=g\n")
1254             f.write("channel=1\n")
1255             f.write("ieee80211n=1\n")
1256             f.write("interface=%s\n" % ifname)
1257             f.write("ctrl_interface=/var/run/hostapd\n")
1258             f.write("ssid=wps\n")
1259             f.write("eap_server=1\n")
1260             f.write("wps_state=1\n")
1261         hostapd.add_bss('phy3', ifname, conffile)
1262         hapd = hostapd.Hostapd(ifname)
1263         hapd.request("WPS_PBC")
1264         dev[0].request("WPS_PBC")
1265         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1266         if ev is None:
1267             raise Exception("Association with the AP timed out")
1268         with open(conffile, "r") as f:
1269             lines = f.read().splitlines()
1270             vals = dict()
1271             for l in lines:
1272                 try:
1273                     [name,value] = l.split('=', 1)
1274                     vals[name] = value
1275                 except ValueError, e:
1276                     if "# WPS configuration" in l:
1277                         pass
1278                     else:
1279                         raise Exception("Unexpected configuration line: " + l)
1280         if vals['ieee80211n'] != '1' or vals['wps_state'] != '2' or "WPA-PSK" not in vals['wpa_key_mgmt']:
1281             raise Exception("Incorrect configuration: " + str(vals))
1282     finally:
1283         subprocess.call(['sudo', 'rm', conffile])
1284
1285 def add_ssdp_ap(ifname, ap_uuid):
1286     ssid = "wps-ssdp"
1287     ap_pin = "12345670"
1288     hostapd.add_ap(ifname,
1289                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1290                      "wpa_passphrase": "12345678", "wpa": "2",
1291                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1292                      "device_name": "Wireless AP", "manufacturer": "Company",
1293                      "model_name": "WAP", "model_number": "123",
1294                      "serial_number": "12345", "device_type": "6-0050F204-1",
1295                      "os_version": "01020300",
1296                      "config_methods": "label push_button",
1297                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1298                      "friendly_name": "WPS Access Point",
1299                      "manufacturer_url": "http://www.example.com/",
1300                      "model_description": "Wireless Access Point",
1301                      "model_url": "http://www.example.com/model/",
1302                      "upc": "123456789012" })
1303
1304 def ssdp_send(msg, no_recv=False):
1305     socket.setdefaulttimeout(1)
1306     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1307     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1308     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1309     sock.bind(("127.0.0.1", 0))
1310     sock.sendto(msg, ("239.255.255.250", 1900))
1311     if no_recv:
1312         return None
1313     return sock.recv(1000)
1314
1315 def ssdp_send_msearch(st):
1316     msg = '\r\n'.join([
1317             'M-SEARCH * HTTP/1.1',
1318             'HOST: 239.255.255.250:1900',
1319             'MX: 1',
1320             'MAN: "ssdp:discover"',
1321             'ST: ' + st,
1322             '', ''])
1323     return ssdp_send(msg)
1324
1325 def test_ap_wps_ssdp_msearch(dev, apdev):
1326     """WPS AP and SSDP M-SEARCH messages"""
1327     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1328     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1329
1330     msg = '\r\n'.join([
1331             'M-SEARCH * HTTP/1.1',
1332             'Host: 239.255.255.250:1900',
1333             'Mx: 1',
1334             'Man: "ssdp:discover"',
1335             'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1336             '', ''])
1337     ssdp_send(msg)
1338
1339     msg = '\r\n'.join([
1340             'M-SEARCH * HTTP/1.1',
1341             'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1342             'mx: \t1\t\t   ',
1343             'man: \t \t "ssdp:discover"   ',
1344             'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1345             '', ''])
1346     ssdp_send(msg)
1347
1348     ssdp_send_msearch("ssdp:all")
1349     ssdp_send_msearch("upnp:rootdevice")
1350     ssdp_send_msearch("uuid:" + ap_uuid)
1351     ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1352     ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1353
1354     msg = '\r\n'.join([
1355             'M-SEARCH * HTTP/1.1',
1356             'HOST:\t239.255.255.250:1900',
1357             'MAN: "ssdp:discover"',
1358             'MX: 130',
1359             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1360             '', ''])
1361     ssdp_send(msg, no_recv=True)
1362
1363 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1364     """WPS AP and invalid SSDP M-SEARCH messages"""
1365     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1366     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1367
1368     socket.setdefaulttimeout(1)
1369     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1370     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1371     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1372     sock.bind(("127.0.0.1", 0))
1373
1374     logger.debug("Missing MX")
1375     msg = '\r\n'.join([
1376             'M-SEARCH * HTTP/1.1',
1377             'HOST: 239.255.255.250:1900',
1378             'MAN: "ssdp:discover"',
1379             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1380             '', ''])
1381     sock.sendto(msg, ("239.255.255.250", 1900))
1382
1383     logger.debug("Negative MX")
1384     msg = '\r\n'.join([
1385             'M-SEARCH * HTTP/1.1',
1386             'HOST: 239.255.255.250:1900',
1387             'MX: -1',
1388             'MAN: "ssdp:discover"',
1389             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1390             '', ''])
1391     sock.sendto(msg, ("239.255.255.250", 1900))
1392
1393     logger.debug("Invalid MX")
1394     msg = '\r\n'.join([
1395             'M-SEARCH * HTTP/1.1',
1396             'HOST: 239.255.255.250:1900',
1397             'MX; 1',
1398             'MAN: "ssdp:discover"',
1399             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1400             '', ''])
1401     sock.sendto(msg, ("239.255.255.250", 1900))
1402
1403     logger.debug("Missing MAN")
1404     msg = '\r\n'.join([
1405             'M-SEARCH * HTTP/1.1',
1406             'HOST: 239.255.255.250:1900',
1407             'MX: 1',
1408             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1409             '', ''])
1410     sock.sendto(msg, ("239.255.255.250", 1900))
1411
1412     logger.debug("Invalid MAN")
1413     msg = '\r\n'.join([
1414             'M-SEARCH * HTTP/1.1',
1415             'HOST: 239.255.255.250:1900',
1416             'MX: 1',
1417             'MAN: foo',
1418             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1419             '', ''])
1420     sock.sendto(msg, ("239.255.255.250", 1900))
1421     msg = '\r\n'.join([
1422             'M-SEARCH * HTTP/1.1',
1423             'HOST: 239.255.255.250:1900',
1424             'MX: 1',
1425             'MAN; "ssdp:discover"',
1426             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1427             '', ''])
1428     sock.sendto(msg, ("239.255.255.250", 1900))
1429
1430     logger.debug("Missing HOST")
1431     msg = '\r\n'.join([
1432             'M-SEARCH * HTTP/1.1',
1433             'MAN: "ssdp:discover"',
1434             'MX: 1',
1435             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1436             '', ''])
1437     sock.sendto(msg, ("239.255.255.250", 1900))
1438
1439     logger.debug("Missing ST")
1440     msg = '\r\n'.join([
1441             'M-SEARCH * HTTP/1.1',
1442             'HOST: 239.255.255.250:1900',
1443             'MAN: "ssdp:discover"',
1444             'MX: 1',
1445             '', ''])
1446     sock.sendto(msg, ("239.255.255.250", 1900))
1447
1448     logger.debug("Mismatching ST")
1449     msg = '\r\n'.join([
1450             'M-SEARCH * HTTP/1.1',
1451             'HOST: 239.255.255.250:1900',
1452             'MAN: "ssdp:discover"',
1453             'MX: 1',
1454             'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1455             '', ''])
1456     sock.sendto(msg, ("239.255.255.250", 1900))
1457     msg = '\r\n'.join([
1458             'M-SEARCH * HTTP/1.1',
1459             'HOST: 239.255.255.250:1900',
1460             'MAN: "ssdp:discover"',
1461             'MX: 1',
1462             'ST: foo:bar',
1463             '', ''])
1464     sock.sendto(msg, ("239.255.255.250", 1900))
1465     msg = '\r\n'.join([
1466             'M-SEARCH * HTTP/1.1',
1467             'HOST: 239.255.255.250:1900',
1468             'MAN: "ssdp:discover"',
1469             'MX: 1',
1470             'ST: foobar',
1471             '', ''])
1472     sock.sendto(msg, ("239.255.255.250", 1900))
1473
1474     logger.debug("Invalid ST")
1475     msg = '\r\n'.join([
1476             'M-SEARCH * HTTP/1.1',
1477             'HOST: 239.255.255.250:1900',
1478             'MAN: "ssdp:discover"',
1479             'MX: 1',
1480             'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1481             '', ''])
1482     sock.sendto(msg, ("239.255.255.250", 1900))
1483
1484     logger.debug("Invalid M-SEARCH")
1485     msg = '\r\n'.join([
1486             'M+SEARCH * HTTP/1.1',
1487             'HOST: 239.255.255.250:1900',
1488             'MAN: "ssdp:discover"',
1489             'MX: 1',
1490             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1491             '', ''])
1492     sock.sendto(msg, ("239.255.255.250", 1900))
1493     msg = '\r\n'.join([
1494             'M-SEARCH-* HTTP/1.1',
1495             'HOST: 239.255.255.250:1900',
1496             'MAN: "ssdp:discover"',
1497             'MX: 1',
1498             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1499             '', ''])
1500     sock.sendto(msg, ("239.255.255.250", 1900))
1501
1502     logger.debug("Invalid message format")
1503     sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1504     msg = '\r'.join([
1505             'M-SEARCH * HTTP/1.1',
1506             'HOST: 239.255.255.250:1900',
1507             'MAN: "ssdp:discover"',
1508             'MX: 1',
1509             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1510             '', ''])
1511     sock.sendto(msg, ("239.255.255.250", 1900))
1512
1513     try:
1514         r = sock.recv(1000)
1515         raise Exception("Unexpected M-SEARCH response: " + r)
1516     except socket.timeout:
1517         pass
1518
1519     logger.debug("Valid M-SEARCH")
1520     msg = '\r\n'.join([
1521             'M-SEARCH * HTTP/1.1',
1522             'HOST: 239.255.255.250:1900',
1523             'MAN: "ssdp:discover"',
1524             'MX: 1',
1525             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1526             '', ''])
1527     sock.sendto(msg, ("239.255.255.250", 1900))
1528
1529     try:
1530         r = sock.recv(1000)
1531         pass
1532     except socket.timeout:
1533         raise Exception("No SSDP response")
1534
1535 def test_ap_wps_ssdp_burst(dev, apdev):
1536     """WPS AP and SSDP burst"""
1537     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1538     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1539
1540     msg = '\r\n'.join([
1541             'M-SEARCH * HTTP/1.1',
1542             'HOST: 239.255.255.250:1900',
1543             'MAN: "ssdp:discover"',
1544             'MX: 1',
1545             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1546             '', ''])
1547     socket.setdefaulttimeout(1)
1548     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1549     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1550     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1551     sock.bind(("127.0.0.1", 0))
1552     for i in range(0, 25):
1553         sock.sendto(msg, ("239.255.255.250", 1900))
1554     resp = 0
1555     while True:
1556         try:
1557             r = sock.recv(1000)
1558             if not r.startswith("HTTP/1.1 200 OK\r\n"):
1559                 raise Exception("Unexpected message: " + r)
1560             resp += 1
1561         except socket.timeout:
1562             break
1563     if resp < 20:
1564         raise Exception("Too few SSDP responses")
1565
1566     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1567     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1568     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1569     sock.bind(("127.0.0.1", 0))
1570     for i in range(0, 25):
1571         sock.sendto(msg, ("239.255.255.250", 1900))
1572     while True:
1573         try:
1574             r = sock.recv(1000)
1575             if ap_uuid in r:
1576                 break
1577         except socket.timeout:
1578             raise Exception("No SSDP response")
1579
1580 def ssdp_get_location(uuid):
1581     res = ssdp_send_msearch("uuid:" + uuid)
1582     location = None
1583     for l in res.splitlines():
1584         if l.lower().startswith("location:"):
1585             location = l.split(':', 1)[1].strip()
1586             break
1587     if location is None:
1588         raise Exception("No UPnP location found")
1589     return location
1590
1591 def upnp_get_urls(location):
1592     conn = urllib.urlopen(location)
1593     tree = ET.parse(conn)
1594     root = tree.getroot()
1595     urn = '{urn:schemas-upnp-org:device-1-0}'
1596     service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1597     res = {}
1598     res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1599     res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1600     res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1601     return res
1602
1603 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1604     soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1605     wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1606     ET.register_namespace('soapenv', soapns)
1607     ET.register_namespace('wfa', wpsns)
1608     attrib = {}
1609     attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1610     root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1611     body = ET.SubElement(root, "{%s}Body" % soapns)
1612     act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1613     tree = ET.ElementTree(root)
1614     soap = StringIO.StringIO()
1615     tree.write(soap, xml_declaration=True, encoding='utf-8')
1616
1617     headers = { "Content-type": 'text/xml; charset="utf-8"' }
1618     if include_soap_action:
1619         headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1620     elif soap_action_override:
1621         headers["SOAPAction"] = soap_action_override
1622     conn.request("POST", path, soap.getvalue(), headers)
1623     return conn.getresponse()
1624
1625 def test_ap_wps_upnp(dev, apdev):
1626     """WPS AP and UPnP operations"""
1627     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1628     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1629
1630     location = ssdp_get_location(ap_uuid)
1631     urls = upnp_get_urls(location)
1632
1633     conn = urllib.urlopen(urls['scpd_url'])
1634     scpd = conn.read()
1635
1636     conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1637     if conn.getcode() != 404:
1638         raise Exception("Unexpected HTTP response to GET unknown URL")
1639
1640     url = urlparse.urlparse(location)
1641     conn = httplib.HTTPConnection(url.netloc)
1642     #conn.set_debuglevel(1)
1643     headers = { "Content-type": 'text/xml; charset="utf-8"',
1644                 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1645     conn.request("POST", "hello", "\r\n\r\n", headers)
1646     resp = conn.getresponse()
1647     if resp.status != 404:
1648         raise Exception("Unexpected HTTP response: %s" % resp.status)
1649
1650     conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1651     resp = conn.getresponse()
1652     if resp.status != 501:
1653         raise Exception("Unexpected HTTP response: %s" % resp.status)
1654
1655     headers = { "Content-type": 'text/xml; charset="utf-8"',
1656                 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1657     ctrlurl = urlparse.urlparse(urls['control_url'])
1658     conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1659     resp = conn.getresponse()
1660     if resp.status != 401:
1661         raise Exception("Unexpected HTTP response: %s" % resp.status)
1662
1663     logger.debug("GetDeviceInfo without SOAPAction header")
1664     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1665                             include_soap_action=False)
1666     if resp.status != 401:
1667         raise Exception("Unexpected HTTP response: %s" % resp.status)
1668
1669     logger.debug("GetDeviceInfo with invalid SOAPAction header")
1670     for act in [ "foo",
1671                  "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1672                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1673                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1674         resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1675                                 include_soap_action=False,
1676                                 soap_action_override=act)
1677         if resp.status != 401:
1678             raise Exception("Unexpected HTTP response: %s" % resp.status)
1679
1680     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1681     if resp.status != 200:
1682         raise Exception("Unexpected HTTP response: %s" % resp.status)
1683     dev = resp.read()
1684     if "NewDeviceInfo" not in dev:
1685         raise Exception("Unexpected GetDeviceInfo response")
1686
1687     logger.debug("PutMessage without required parameters")
1688     resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1689     if resp.status != 600:
1690         raise Exception("Unexpected HTTP response: %s" % resp.status)
1691
1692     logger.debug("PutWLANResponse without required parameters")
1693     resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1694     if resp.status != 600:
1695         raise Exception("Unexpected HTTP response: %s" % resp.status)
1696
1697     logger.debug("SetSelectedRegistrar from unregistered ER")
1698     resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1699     if resp.status != 501:
1700         raise Exception("Unexpected HTTP response: %s" % resp.status)
1701
1702     logger.debug("Unknown action")
1703     resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1704     if resp.status != 401:
1705         raise Exception("Unexpected HTTP response: %s" % resp.status)
1706
1707 def test_ap_wps_upnp_subscribe(dev, apdev):
1708     """WPS AP and UPnP event subscription"""
1709     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1710     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1711
1712     location = ssdp_get_location(ap_uuid)
1713     urls = upnp_get_urls(location)
1714     eventurl = urlparse.urlparse(urls['event_sub_url'])
1715
1716     url = urlparse.urlparse(location)
1717     conn = httplib.HTTPConnection(url.netloc)
1718     #conn.set_debuglevel(1)
1719     headers = { "callback": '<http://127.0.0.1:12345/event>',
1720                 "timeout": "Second-1234" }
1721     conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1722     resp = conn.getresponse()
1723     if resp.status != 412:
1724         raise Exception("Unexpected HTTP response: %s" % resp.status)
1725
1726     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1727     resp = conn.getresponse()
1728     if resp.status != 412:
1729         raise Exception("Unexpected HTTP response: %s" % resp.status)
1730
1731     headers = { "NT": "upnp:event",
1732                 "timeout": "Second-1234" }
1733     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1734     resp = conn.getresponse()
1735     if resp.status != 412:
1736         raise Exception("Unexpected HTTP response: %s" % resp.status)
1737
1738     headers = { "callback": '<http://127.0.0.1:12345/event>',
1739                 "NT": "upnp:foobar",
1740                 "timeout": "Second-1234" }
1741     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1742     resp = conn.getresponse()
1743     if resp.status != 400:
1744         raise Exception("Unexpected HTTP response: %s" % resp.status)
1745
1746     logger.debug("Valid subscription")
1747     headers = { "callback": '<http://127.0.0.1:12345/event>',
1748                 "NT": "upnp:event",
1749                 "timeout": "Second-1234" }
1750     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1751     resp = conn.getresponse()
1752     if resp.status != 200:
1753         raise Exception("Unexpected HTTP response: %s" % resp.status)
1754     sid = resp.getheader("sid")
1755     logger.debug("Subscription SID " + sid)
1756
1757     logger.debug("Invalid re-subscription")
1758     headers = { "NT": "upnp:event",
1759                 "sid": "123456734567854",
1760                 "timeout": "Second-1234" }
1761     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1762     resp = conn.getresponse()
1763     if resp.status != 400:
1764         raise Exception("Unexpected HTTP response: %s" % resp.status)
1765
1766     logger.debug("Invalid re-subscription")
1767     headers = { "NT": "upnp:event",
1768                 "sid": "uuid:123456734567854",
1769                 "timeout": "Second-1234" }
1770     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1771     resp = conn.getresponse()
1772     if resp.status != 400:
1773         raise Exception("Unexpected HTTP response: %s" % resp.status)
1774
1775     logger.debug("Invalid re-subscription")
1776     headers = { "callback": '<http://127.0.0.1:12345/event>',
1777                 "NT": "upnp:event",
1778                 "sid": sid,
1779                 "timeout": "Second-1234" }
1780     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1781     resp = conn.getresponse()
1782     if resp.status != 400:
1783         raise Exception("Unexpected HTTP response: %s" % resp.status)
1784
1785     logger.debug("SID mismatch in re-subscription")
1786     headers = { "NT": "upnp:event",
1787                 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1788                 "timeout": "Second-1234" }
1789     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1790     resp = conn.getresponse()
1791     if resp.status != 412:
1792         raise Exception("Unexpected HTTP response: %s" % resp.status)
1793
1794     logger.debug("Valid re-subscription")
1795     headers = { "NT": "upnp:event",
1796                 "sid": sid,
1797                 "timeout": "Second-1234" }
1798     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1799     resp = conn.getresponse()
1800     if resp.status != 200:
1801         raise Exception("Unexpected HTTP response: %s" % resp.status)
1802     sid2 = resp.getheader("sid")
1803     logger.debug("Subscription SID " + sid2)
1804
1805     if sid != sid2:
1806         raise Exception("Unexpected SID change")
1807
1808     logger.debug("Valid re-subscription")
1809     headers = { "NT": "upnp:event",
1810                 "sid": "uuid: \t \t" + sid.split(':')[1],
1811                 "timeout": "Second-1234" }
1812     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1813     resp = conn.getresponse()
1814     if resp.status != 200:
1815         raise Exception("Unexpected HTTP response: %s" % resp.status)
1816
1817     logger.debug("Invalid unsubscription")
1818     headers = { "sid": sid }
1819     conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1820     resp = conn.getresponse()
1821     if resp.status != 412:
1822         raise Exception("Unexpected HTTP response: %s" % resp.status)
1823     headers = { "foo": "bar" }
1824     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1825     resp = conn.getresponse()
1826     if resp.status != 412:
1827         raise Exception("Unexpected HTTP response: %s" % resp.status)
1828
1829     logger.debug("Valid unsubscription")
1830     headers = { "sid": sid }
1831     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1832     resp = conn.getresponse()
1833     if resp.status != 200:
1834         raise Exception("Unexpected HTTP response: %s" % resp.status)
1835
1836     logger.debug("Unsubscription for not existing SID")
1837     headers = { "sid": sid }
1838     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1839     resp = conn.getresponse()
1840     if resp.status != 412:
1841         raise Exception("Unexpected HTTP response: %s" % resp.status)
1842
1843     logger.debug("Invalid unsubscription")
1844     headers = { "sid": " \t \tfoo" }
1845     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1846     resp = conn.getresponse()
1847     if resp.status != 400:
1848         raise Exception("Unexpected HTTP response: %s" % resp.status)
1849
1850     logger.debug("Invalid unsubscription")
1851     headers = { "sid": "uuid:\t \tfoo" }
1852     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1853     resp = conn.getresponse()
1854     if resp.status != 400:
1855         raise Exception("Unexpected HTTP response: %s" % resp.status)
1856
1857     logger.debug("Invalid unsubscription")
1858     headers = { "NT": "upnp:event",
1859                 "sid": sid }
1860     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1861     resp = conn.getresponse()
1862     if resp.status != 400:
1863         raise Exception("Unexpected HTTP response: %s" % resp.status)
1864     headers = { "callback": '<http://127.0.0.1:12345/event>',
1865                 "sid": sid }
1866     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1867     resp = conn.getresponse()
1868     if resp.status != 400:
1869         raise Exception("Unexpected HTTP response: %s" % resp.status)
1870
1871     logger.debug("Valid subscription with multiple callbacks")
1872     headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1873                 "NT": "upnp:event",
1874                 "timeout": "Second-1234" }
1875     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1876     resp = conn.getresponse()
1877     if resp.status != 200:
1878         raise Exception("Unexpected HTTP response: %s" % resp.status)
1879     sid = resp.getheader("sid")
1880     logger.debug("Subscription SID " + sid)