tests: Add option for running test cases that take a long time
[mech_eap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import subprocess
10 import logging
11 logger = logging.getLogger()
12 import re
13 import socket
14 import httplib
15 import urlparse
16 import urllib
17 import xml.etree.ElementTree as ET
18 import StringIO
19
20 import hwsim_utils
21 import hostapd
22
23 def test_ap_wps_init(dev, apdev):
24     """Initial AP configuration with first WPS Enrollee"""
25     ssid = "test-wps"
26     hostapd.add_ap(apdev[0]['ifname'],
27                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
28     hapd = hostapd.Hostapd(apdev[0]['ifname'])
29     logger.info("WPS provisioning step")
30     hapd.request("WPS_PBC")
31     if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32         raise Exception("PBC status not shown correctly")
33
34     id = dev[0].add_network()
35     dev[0].set_network_quoted(id, "ssid", "home")
36     dev[0].set_network_quoted(id, "psk", "12345678")
37     dev[0].request("ENABLE_NETWORK %s no-connect" % id)
38
39     id = dev[0].add_network()
40     dev[0].set_network_quoted(id, "ssid", "home2")
41     dev[0].set_network(id, "bssid", "00:11:22:33:44:55")
42     dev[0].set_network(id, "key_mgmt", "NONE")
43     dev[0].request("ENABLE_NETWORK %s no-connect" % id)
44
45     dev[0].request("WPS_PBC")
46     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
47     if ev is None:
48         raise Exception("Association with the AP timed out")
49     status = dev[0].get_status()
50     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
51         raise Exception("Not fully connected")
52     if status['ssid'] != ssid:
53         raise Exception("Unexpected SSID")
54     if status['pairwise_cipher'] != 'CCMP':
55         raise Exception("Unexpected encryption configuration")
56     if status['key_mgmt'] != 'WPA2-PSK':
57         raise Exception("Unexpected key_mgmt")
58
59     status = hapd.request("WPS_GET_STATUS")
60     if "PBC Status: Disabled" not in status:
61         raise Exception("PBC status not shown correctly")
62     if "Last WPS result: Success" not in status:
63         raise Exception("Last WPS result not shown correctly")
64     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
65         raise Exception("Peer address not shown correctly")
66     conf = hapd.request("GET_CONFIG")
67     if "wps_state=configured" not in conf:
68         raise Exception("AP not in WPS configured state")
69     if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
70         raise Exception("Unexpected rsn_pairwise_cipher")
71     if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
72         raise Exception("Unexpected wpa_pairwise_cipher")
73     if "group_cipher=TKIP" not in conf:
74         raise Exception("Unexpected group_cipher")
75
76     if len(dev[0].list_networks()) != 3:
77         raise Exception("Unexpected number of network blocks")
78
79 def test_ap_wps_init_2ap_pbc(dev, apdev):
80     """Initial two-radio AP configuration with first WPS PBC Enrollee"""
81     ssid = "test-wps"
82     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
83     hostapd.add_ap(apdev[0]['ifname'], params)
84     hostapd.add_ap(apdev[1]['ifname'], params)
85     hapd = hostapd.Hostapd(apdev[0]['ifname'])
86     logger.info("WPS provisioning step")
87     hapd.request("WPS_PBC")
88     dev[0].scan(freq="2412")
89     bss = dev[0].get_bss(apdev[0]['bssid'])
90     if "[WPS-PBC]" not in bss['flags']:
91         raise Exception("WPS-PBC flag missing from AP1")
92     bss = dev[0].get_bss(apdev[1]['bssid'])
93     if "[WPS-PBC]" not in bss['flags']:
94         raise Exception("WPS-PBC flag missing from AP2")
95     dev[0].dump_monitor()
96     dev[0].request("SET wps_cred_processing 2")
97     dev[0].request("WPS_PBC")
98     ev = dev[0].wait_event(["WPS-CRED-RECEIVED"], timeout=30)
99     dev[0].request("SET wps_cred_processing 0")
100     if ev is None:
101         raise Exception("WPS cred event not seen")
102     if "100e" not in ev:
103         raise Exception("WPS attributes not included in the cred event")
104     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
105     if ev is None:
106         raise Exception("Association with the AP timed out")
107
108     dev[1].scan(freq="2412")
109     bss = dev[1].get_bss(apdev[0]['bssid'])
110     if "[WPS-PBC]" in bss['flags']:
111         raise Exception("WPS-PBC flag not cleared from AP1")
112     bss = dev[1].get_bss(apdev[1]['bssid'])
113     if "[WPS-PBC]" in bss['flags']:
114         raise Exception("WPS-PBC flag bit ckeared from AP2")
115
116 def test_ap_wps_init_2ap_pin(dev, apdev):
117     """Initial two-radio AP configuration with first WPS PIN Enrollee"""
118     ssid = "test-wps"
119     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
120     hostapd.add_ap(apdev[0]['ifname'], params)
121     hostapd.add_ap(apdev[1]['ifname'], params)
122     hapd = hostapd.Hostapd(apdev[0]['ifname'])
123     logger.info("WPS provisioning step")
124     pin = dev[0].wps_read_pin()
125     hapd.request("WPS_PIN any " + pin)
126     dev[0].scan(freq="2412")
127     bss = dev[0].get_bss(apdev[0]['bssid'])
128     if "[WPS-AUTH]" not in bss['flags']:
129         raise Exception("WPS-AUTH flag missing from AP1")
130     bss = dev[0].get_bss(apdev[1]['bssid'])
131     if "[WPS-AUTH]" not in bss['flags']:
132         raise Exception("WPS-AUTH flag missing from AP2")
133     dev[0].dump_monitor()
134     dev[0].request("WPS_PIN any " + pin)
135     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
136     if ev is None:
137         raise Exception("Association with the AP timed out")
138
139     dev[1].scan(freq="2412")
140     bss = dev[1].get_bss(apdev[0]['bssid'])
141     if "[WPS-AUTH]" in bss['flags']:
142         raise Exception("WPS-AUTH flag not cleared from AP1")
143     bss = dev[1].get_bss(apdev[1]['bssid'])
144     if "[WPS-AUTH]" in bss['flags']:
145         raise Exception("WPS-AUTH flag bit ckeared from AP2")
146
147 def test_ap_wps_init_through_wps_config(dev, apdev):
148     """Initial AP configuration using wps_config command"""
149     ssid = "test-wps-init-config"
150     hostapd.add_ap(apdev[0]['ifname'],
151                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
152     hapd = hostapd.Hostapd(apdev[0]['ifname'])
153     if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
154         raise Exception("WPS_CONFIG command failed")
155     ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
156     if ev is None:
157         raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
158     # It takes some time for the AP to update Beacon and Probe Response frames,
159     # so wait here before requesting the scan to be started to avoid adding
160     # extra five second wait to the test due to fetching obsolete scan results.
161     hapd.ping()
162     time.sleep(0.2)
163     dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
164                    pairwise="CCMP", group="CCMP")
165
166 def test_ap_wps_conf(dev, apdev):
167     """WPS PBC provisioning with configured AP"""
168     ssid = "test-wps-conf"
169     hostapd.add_ap(apdev[0]['ifname'],
170                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
171                      "wpa_passphrase": "12345678", "wpa": "2",
172                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
173     hapd = hostapd.Hostapd(apdev[0]['ifname'])
174     logger.info("WPS provisioning step")
175     hapd.request("WPS_PBC")
176     dev[0].dump_monitor()
177     dev[0].request("WPS_PBC")
178     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
179     if ev is None:
180         raise Exception("Association with the AP timed out")
181     status = dev[0].get_status()
182     if status['wpa_state'] != 'COMPLETED':
183         raise Exception("Not fully connected")
184     if status['bssid'] != apdev[0]['bssid']:
185         raise Exception("Unexpected BSSID")
186     if status['ssid'] != ssid:
187         raise Exception("Unexpected SSID")
188     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
189         raise Exception("Unexpected encryption configuration")
190     if status['key_mgmt'] != 'WPA2-PSK':
191         raise Exception("Unexpected key_mgmt")
192
193     sta = hapd.get_sta(dev[0].p2p_interface_addr())
194     if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
195         raise Exception("Device name not available in STA command")
196
197 def test_ap_wps_twice(dev, apdev):
198     """WPS provisioning with twice to change passphrase"""
199     ssid = "test-wps-twice"
200     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
201                "wpa_passphrase": "12345678", "wpa": "2",
202                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
203     hostapd.add_ap(apdev[0]['ifname'], params)
204     hapd = hostapd.Hostapd(apdev[0]['ifname'])
205     logger.info("WPS provisioning step")
206     hapd.request("WPS_PBC")
207     dev[0].dump_monitor()
208     dev[0].request("WPS_PBC")
209     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
210     if ev is None:
211         raise Exception("Association with the AP timed out")
212     dev[0].request("DISCONNECT")
213
214     logger.info("Restart AP with different passphrase and re-run WPS")
215     hapd_global = hostapd.HostapdGlobal()
216     hapd_global.remove(apdev[0]['ifname'])
217     params['wpa_passphrase'] = 'another passphrase'
218     hostapd.add_ap(apdev[0]['ifname'], params)
219     hapd = hostapd.Hostapd(apdev[0]['ifname'])
220     logger.info("WPS provisioning step")
221     hapd.request("WPS_PBC")
222     dev[0].dump_monitor()
223     dev[0].request("WPS_PBC")
224     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
225     if ev is None:
226         raise Exception("Association with the AP timed out")
227     networks = dev[0].list_networks()
228     if len(networks) > 1:
229         raise Exception("Unexpected duplicated network block present")
230
231 def test_ap_wps_incorrect_pin(dev, apdev):
232     """WPS PIN provisioning with incorrect PIN"""
233     ssid = "test-wps-incorrect-pin"
234     hostapd.add_ap(apdev[0]['ifname'],
235                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
236                      "wpa_passphrase": "12345678", "wpa": "2",
237                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
238     hapd = hostapd.Hostapd(apdev[0]['ifname'])
239
240     logger.info("WPS provisioning attempt 1")
241     hapd.request("WPS_PIN any 12345670")
242     dev[0].dump_monitor()
243     dev[0].request("WPS_PIN any 55554444")
244     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
245     if ev is None:
246         raise Exception("WPS operation timed out")
247     if "config_error=18" not in ev:
248         raise Exception("Incorrect config_error reported")
249     if "msg=8" not in ev:
250         raise Exception("PIN error detected on incorrect message")
251     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
252     if ev is None:
253         raise Exception("Timeout on disconnection event")
254     dev[0].request("WPS_CANCEL")
255     # if a scan was in progress, wait for it to complete before trying WPS again
256     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
257
258     status = hapd.request("WPS_GET_STATUS")
259     if "Last WPS result: Failed" not in status:
260         raise Exception("WPS failure result not shown correctly")
261
262     logger.info("WPS provisioning attempt 2")
263     hapd.request("WPS_PIN any 12345670")
264     dev[0].dump_monitor()
265     dev[0].request("WPS_PIN any 12344444")
266     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
267     if ev is None:
268         raise Exception("WPS operation timed out")
269     if "config_error=18" not in ev:
270         raise Exception("Incorrect config_error reported")
271     if "msg=10" not in ev:
272         raise Exception("PIN error detected on incorrect message")
273     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
274     if ev is None:
275         raise Exception("Timeout on disconnection event")
276
277 def test_ap_wps_conf_pin(dev, apdev):
278     """WPS PIN provisioning with configured AP"""
279     ssid = "test-wps-conf-pin"
280     hostapd.add_ap(apdev[0]['ifname'],
281                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
282                      "wpa_passphrase": "12345678", "wpa": "2",
283                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
284     hapd = hostapd.Hostapd(apdev[0]['ifname'])
285     logger.info("WPS provisioning step")
286     pin = dev[0].wps_read_pin()
287     hapd.request("WPS_PIN any " + pin)
288     dev[0].dump_monitor()
289     dev[0].request("WPS_PIN any " + pin)
290     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
291     if ev is None:
292         raise Exception("Association with the AP timed out")
293     status = dev[0].get_status()
294     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
295         raise Exception("Not fully connected")
296     if status['ssid'] != ssid:
297         raise Exception("Unexpected SSID")
298     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
299         raise Exception("Unexpected encryption configuration")
300     if status['key_mgmt'] != 'WPA2-PSK':
301         raise Exception("Unexpected key_mgmt")
302
303     dev[1].scan(freq="2412")
304     bss = dev[1].get_bss(apdev[0]['bssid'])
305     if "[WPS-AUTH]" in bss['flags']:
306         raise Exception("WPS-AUTH flag not cleared")
307     logger.info("Try to connect from another station using the same PIN")
308     dev[1].request("WPS_PIN any " + pin)
309     ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
310     if ev is None:
311         raise Exception("Operation timed out")
312     if "WPS-M2D" not in ev:
313         raise Exception("Unexpected WPS operation started")
314
315 def test_ap_wps_conf_pin_2sta(dev, apdev):
316     """Two stations trying to use WPS PIN at the same time"""
317     ssid = "test-wps-conf-pin2"
318     hostapd.add_ap(apdev[0]['ifname'],
319                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
320                      "wpa_passphrase": "12345678", "wpa": "2",
321                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
322     hapd = hostapd.Hostapd(apdev[0]['ifname'])
323     logger.info("WPS provisioning step")
324     pin = "12345670"
325     pin2 = "55554444"
326     hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
327     hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
328     dev[0].dump_monitor()
329     dev[1].dump_monitor()
330     dev[0].request("WPS_PIN any " + pin)
331     dev[1].request("WPS_PIN any " + pin)
332     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
333     if ev is None:
334         raise Exception("Association with the AP timed out")
335     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
336     if ev is None:
337         raise Exception("Association with the AP timed out")
338
339 def test_ap_wps_conf_pin_timeout(dev, apdev):
340     """WPS PIN provisioning with configured AP timing out PIN"""
341     ssid = "test-wps-conf-pin"
342     hostapd.add_ap(apdev[0]['ifname'],
343                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
344                      "wpa_passphrase": "12345678", "wpa": "2",
345                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
346     hapd = hostapd.Hostapd(apdev[0]['ifname'])
347     addr = dev[0].p2p_interface_addr()
348     pin = dev[0].wps_read_pin()
349     if "FAIL" not in hapd.request("WPS_PIN "):
350         raise Exception("Unexpected success on invalid WPS_PIN")
351     hapd.request("WPS_PIN any " + pin + " 1")
352     time.sleep(1.1)
353     dev[0].request("WPS_PIN any " + pin)
354     ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=20)
355     if ev is None:
356         raise Exception("WPS-PIN-NEEDED event timed out")
357     ev = dev[0].wait_event(["WPS-M2D"])
358     if ev is None:
359         raise Exception("M2D not reported")
360     dev[0].request("WPS_CANCEL")
361
362     hapd.request("WPS_PIN any " + pin + " 20 " + addr)
363     dev[0].request("WPS_PIN any " + pin)
364     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
365     if ev is None:
366         raise Exception("Association with the AP timed out")
367
368 def test_ap_wps_reg_connect(dev, apdev):
369     """WPS registrar using AP PIN to connect"""
370     ssid = "test-wps-reg-ap-pin"
371     appin = "12345670"
372     hostapd.add_ap(apdev[0]['ifname'],
373                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
374                      "wpa_passphrase": "12345678", "wpa": "2",
375                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
376                      "ap_pin": appin})
377     logger.info("WPS provisioning step")
378     dev[0].dump_monitor()
379     dev[0].wps_reg(apdev[0]['bssid'], appin)
380     status = dev[0].get_status()
381     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
382         raise Exception("Not fully connected")
383     if status['ssid'] != ssid:
384         raise Exception("Unexpected SSID")
385     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
386         raise Exception("Unexpected encryption configuration")
387     if status['key_mgmt'] != 'WPA2-PSK':
388         raise Exception("Unexpected key_mgmt")
389
390 def check_wps_reg_failure(dev, ap, appin):
391     dev.request("WPS_REG " + ap['bssid'] + " " + appin)
392     ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
393     if ev is None:
394         raise Exception("WPS operation timed out")
395     if "WPS-SUCCESS" in ev:
396         raise Exception("WPS operation succeeded unexpectedly")
397     if "config_error=15" not in ev:
398         raise Exception("WPS setup locked state was not reported correctly")
399
400 def test_ap_wps_random_ap_pin(dev, apdev):
401     """WPS registrar using random AP PIN"""
402     ssid = "test-wps-reg-random-ap-pin"
403     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
404     hostapd.add_ap(apdev[0]['ifname'],
405                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
406                      "wpa_passphrase": "12345678", "wpa": "2",
407                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
408                      "device_name": "Wireless AP", "manufacturer": "Company",
409                      "model_name": "WAP", "model_number": "123",
410                      "serial_number": "12345", "device_type": "6-0050F204-1",
411                      "os_version": "01020300",
412                      "config_methods": "label push_button",
413                      "uuid": ap_uuid, "upnp_iface": "lo" })
414     hapd = hostapd.Hostapd(apdev[0]['ifname'])
415     appin = hapd.request("WPS_AP_PIN random")
416     if "FAIL" in appin:
417         raise Exception("Could not generate random AP PIN")
418     if appin not in hapd.request("WPS_AP_PIN get"):
419         raise Exception("Could not fetch current AP PIN")
420     logger.info("WPS provisioning step")
421     dev[0].wps_reg(apdev[0]['bssid'], appin)
422
423     hapd.request("WPS_AP_PIN disable")
424     logger.info("WPS provisioning step with AP PIN disabled")
425     check_wps_reg_failure(dev[1], apdev[0], appin)
426
427     logger.info("WPS provisioning step with AP PIN reset")
428     appin = "12345670"
429     hapd.request("WPS_AP_PIN set " + appin)
430     dev[1].wps_reg(apdev[0]['bssid'], appin)
431     dev[0].request("REMOVE_NETWORK all")
432     dev[1].request("REMOVE_NETWORK all")
433     dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
434     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
435
436     logger.info("WPS provisioning step after AP PIN timeout")
437     hapd.request("WPS_AP_PIN disable")
438     appin = hapd.request("WPS_AP_PIN random 1")
439     time.sleep(1.1)
440     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
441         raise Exception("AP PIN unexpectedly still enabled")
442     check_wps_reg_failure(dev[0], apdev[0], appin)
443
444     logger.info("WPS provisioning step after AP PIN timeout(2)")
445     hapd.request("WPS_AP_PIN disable")
446     appin = "12345670"
447     hapd.request("WPS_AP_PIN set " + appin + " 1")
448     time.sleep(1.1)
449     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
450         raise Exception("AP PIN unexpectedly still enabled")
451     check_wps_reg_failure(dev[1], apdev[0], appin)
452
453 def test_ap_wps_reg_config(dev, apdev):
454     """WPS registrar configuring an AP using AP PIN"""
455     ssid = "test-wps-init-ap-pin"
456     appin = "12345670"
457     hostapd.add_ap(apdev[0]['ifname'],
458                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
459                      "ap_pin": appin})
460     logger.info("WPS configuration step")
461     dev[0].dump_monitor()
462     new_ssid = "wps-new-ssid"
463     new_passphrase = "1234567890"
464     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
465                    new_passphrase)
466     status = dev[0].get_status()
467     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
468         raise Exception("Not fully connected")
469     if status['ssid'] != new_ssid:
470         raise Exception("Unexpected SSID")
471     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
472         raise Exception("Unexpected encryption configuration")
473     if status['key_mgmt'] != 'WPA2-PSK':
474         raise Exception("Unexpected key_mgmt")
475
476     logger.info("Re-configure back to open")
477     dev[0].request("REMOVE_NETWORK all")
478     dev[0].request("BSS_FLUSH 0")
479     dev[0].request("SCAN freq=2412 only_new=1")
480     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
481     if ev is None:
482         raise Exception("Scan timed out")
483     dev[0].dump_monitor()
484     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
485     status = dev[0].get_status()
486     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
487         raise Exception("Not fully connected")
488     if status['ssid'] != "wps-open":
489         raise Exception("Unexpected SSID")
490     if status['key_mgmt'] != 'NONE':
491         raise Exception("Unexpected key_mgmt")
492
493 def test_ap_wps_reg_config_ext_processing(dev, apdev):
494     """WPS registrar configuring an AP with external config processing"""
495     ssid = "test-wps-init-ap-pin"
496     appin = "12345670"
497     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
498                "wps_cred_processing": "1", "ap_pin": appin}
499     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
500     new_ssid = "wps-new-ssid"
501     new_passphrase = "1234567890"
502     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
503                    new_passphrase, no_wait=True)
504     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
505     if ev is None:
506         raise Exception("WPS registrar operation timed out")
507     ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=15)
508     if ev is None:
509         raise Exception("WPS configuration timed out")
510     if "1026" not in ev:
511         raise Exception("AP Settings missing from event")
512     hapd.request("SET wps_cred_processing 0")
513     if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
514         raise Exception("WPS_CONFIG command failed")
515     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
516     if ev is None:
517         raise Exception("Association with the AP timed out")
518
519 def test_ap_wps_reg_config_tkip(dev, apdev):
520     """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
521     ssid = "test-wps-init-ap"
522     appin = "12345670"
523     hostapd.add_ap(apdev[0]['ifname'],
524                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
525                      "ap_pin": appin})
526     logger.info("WPS configuration step")
527     dev[0].request("SET wps_version_number 0x10")
528     dev[0].dump_monitor()
529     new_ssid = "wps-new-ssid-with-tkip"
530     new_passphrase = "1234567890"
531     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
532                    new_passphrase)
533     logger.info("Re-connect to verify WPA2 mixed mode")
534     dev[0].request("DISCONNECT")
535     id = 0
536     dev[0].set_network(id, "pairwise", "CCMP")
537     dev[0].set_network(id, "proto", "RSN")
538     dev[0].connect_network(id)
539     status = dev[0].get_status()
540     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
541         raise Exception("Not fully connected")
542     if status['ssid'] != new_ssid:
543         raise Exception("Unexpected SSID")
544     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
545         raise Exception("Unexpected encryption configuration")
546     if status['key_mgmt'] != 'WPA2-PSK':
547         raise Exception("Unexpected key_mgmt")
548
549 def test_ap_wps_setup_locked(dev, apdev):
550     """WPS registrar locking up AP setup on AP PIN failures"""
551     ssid = "test-wps-incorrect-ap-pin"
552     appin = "12345670"
553     hostapd.add_ap(apdev[0]['ifname'],
554                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
555                      "wpa_passphrase": "12345678", "wpa": "2",
556                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
557                      "ap_pin": appin})
558     new_ssid = "wps-new-ssid-test"
559     new_passphrase = "1234567890"
560
561     ap_setup_locked=False
562     for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
563         dev[0].dump_monitor()
564         logger.info("Try incorrect AP PIN - attempt " + pin)
565         dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
566                        "CCMP", new_passphrase, no_wait=True)
567         ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
568         if ev is None:
569             raise Exception("Timeout on receiving WPS operation failure event")
570         if "CTRL-EVENT-CONNECTED" in ev:
571             raise Exception("Unexpected connection")
572         if "config_error=15" in ev:
573             logger.info("AP Setup Locked")
574             ap_setup_locked=True
575         elif "config_error=18" not in ev:
576             raise Exception("config_error=18 not reported")
577         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
578         if ev is None:
579             raise Exception("Timeout on disconnection event")
580         time.sleep(0.1)
581     if not ap_setup_locked:
582         raise Exception("AP setup was not locked")
583
584     hapd = hostapd.Hostapd(apdev[0]['ifname'])
585     status = hapd.request("WPS_GET_STATUS")
586     if "Last WPS result: Failed" not in status:
587         raise Exception("WPS failure result not shown correctly")
588     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
589         raise Exception("Peer address not shown correctly")
590
591     time.sleep(0.5)
592     dev[0].dump_monitor()
593     logger.info("WPS provisioning step")
594     pin = dev[0].wps_read_pin()
595     hapd = hostapd.Hostapd(apdev[0]['ifname'])
596     hapd.request("WPS_PIN any " + pin)
597     dev[0].request("WPS_PIN any " + pin)
598     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
599     if ev is None:
600         raise Exception("WPS success was not reported")
601     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
602     if ev is None:
603         raise Exception("Association with the AP timed out")
604
605     appin = hapd.request("WPS_AP_PIN random")
606     if "FAIL" in appin:
607         raise Exception("Could not generate random AP PIN")
608     ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=10)
609     if ev is None:
610         raise Exception("Failed to unlock AP PIN")
611
612 def test_ap_wps_setup_locked_timeout(dev, apdev):
613     """WPS re-enabling AP PIN after timeout"""
614     ssid = "test-wps-incorrect-ap-pin"
615     appin = "12345670"
616     hostapd.add_ap(apdev[0]['ifname'],
617                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
618                      "wpa_passphrase": "12345678", "wpa": "2",
619                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
620                      "ap_pin": appin})
621     new_ssid = "wps-new-ssid-test"
622     new_passphrase = "1234567890"
623
624     ap_setup_locked=False
625     for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
626         dev[0].dump_monitor()
627         logger.info("Try incorrect AP PIN - attempt " + pin)
628         dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
629                        "CCMP", new_passphrase, no_wait=True)
630         ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
631         if ev is None:
632             raise Exception("Timeout on receiving WPS operation failure event")
633         if "CTRL-EVENT-CONNECTED" in ev:
634             raise Exception("Unexpected connection")
635         if "config_error=15" in ev:
636             logger.info("AP Setup Locked")
637             ap_setup_locked=True
638             break
639         elif "config_error=18" not in ev:
640             raise Exception("config_error=18 not reported")
641         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
642         if ev is None:
643             raise Exception("Timeout on disconnection event")
644         time.sleep(0.1)
645     if not ap_setup_locked:
646         raise Exception("AP setup was not locked")
647     hapd = hostapd.Hostapd(apdev[0]['ifname'])
648     ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=80)
649     if ev is None:
650         raise Exception("AP PIN did not get unlocked on 60 second timeout")
651
652 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
653     """WPS PBC session overlap with two active APs"""
654     hostapd.add_ap(apdev[0]['ifname'],
655                    { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
656                      "wpa_passphrase": "12345678", "wpa": "2",
657                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
658                      "wps_independent": "1"})
659     hostapd.add_ap(apdev[1]['ifname'],
660                    { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
661                      "wpa_passphrase": "123456789", "wpa": "2",
662                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
663                      "wps_independent": "1"})
664     hapd = hostapd.Hostapd(apdev[0]['ifname'])
665     hapd.request("WPS_PBC")
666     hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
667     hapd2.request("WPS_PBC")
668     logger.info("WPS provisioning step")
669     dev[0].dump_monitor()
670     dev[0].request("WPS_PBC")
671     ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
672     if ev is None:
673         raise Exception("PBC session overlap not detected")
674
675 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
676     """WPS PBC session overlap with two active STAs"""
677     ssid = "test-wps-pbc-overlap"
678     hostapd.add_ap(apdev[0]['ifname'],
679                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
680                      "wpa_passphrase": "12345678", "wpa": "2",
681                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
682     hapd = hostapd.Hostapd(apdev[0]['ifname'])
683     logger.info("WPS provisioning step")
684     hapd.request("WPS_PBC")
685     dev[0].dump_monitor()
686     dev[1].dump_monitor()
687     dev[0].request("WPS_PBC")
688     dev[1].request("WPS_PBC")
689     ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
690     if ev is None:
691         raise Exception("PBC session overlap not detected (dev0)")
692     if "config_error=12" not in ev:
693         raise Exception("PBC session overlap not correctly reported (dev0)")
694     ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
695     if ev is None:
696         raise Exception("PBC session overlap not detected (dev1)")
697     if "config_error=12" not in ev:
698         raise Exception("PBC session overlap not correctly reported (dev1)")
699     hapd.request("WPS_CANCEL")
700     ret = hapd.request("WPS_PBC")
701     if "FAIL" not in ret:
702         raise Exception("PBC mode allowed to be started while PBC overlap still active")
703
704 def test_ap_wps_cancel(dev, apdev):
705     """WPS AP cancelling enabled config method"""
706     ssid = "test-wps-ap-cancel"
707     hostapd.add_ap(apdev[0]['ifname'],
708                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
709                      "wpa_passphrase": "12345678", "wpa": "2",
710                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
711     bssid = apdev[0]['bssid']
712     hapd = hostapd.Hostapd(apdev[0]['ifname'])
713
714     logger.info("Verify PBC enable/cancel")
715     hapd.request("WPS_PBC")
716     dev[0].scan(freq="2412")
717     bss = dev[0].get_bss(apdev[0]['bssid'])
718     if "[WPS-PBC]" not in bss['flags']:
719         raise Exception("WPS-PBC flag missing")
720     if "FAIL" in hapd.request("WPS_CANCEL"):
721         raise Exception("WPS_CANCEL failed")
722     dev[0].scan(freq="2412")
723     bss = dev[0].get_bss(apdev[0]['bssid'])
724     if "[WPS-PBC]" in bss['flags']:
725         raise Exception("WPS-PBC flag not cleared")
726
727     logger.info("Verify PIN enable/cancel")
728     hapd.request("WPS_PIN any 12345670")
729     dev[0].scan(freq="2412")
730     bss = dev[0].get_bss(apdev[0]['bssid'])
731     if "[WPS-AUTH]" not in bss['flags']:
732         raise Exception("WPS-AUTH flag missing")
733     if "FAIL" in hapd.request("WPS_CANCEL"):
734         raise Exception("WPS_CANCEL failed")
735     dev[0].scan(freq="2412")
736     bss = dev[0].get_bss(apdev[0]['bssid'])
737     if "[WPS-AUTH]" in bss['flags']:
738         raise Exception("WPS-AUTH flag not cleared")
739
740 def test_ap_wps_er_add_enrollee(dev, apdev):
741     """WPS ER configuring AP and adding a new enrollee using PIN"""
742     ssid = "wps-er-add-enrollee"
743     ap_pin = "12345670"
744     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
745     hostapd.add_ap(apdev[0]['ifname'],
746                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
747                      "device_name": "Wireless AP", "manufacturer": "Company",
748                      "model_name": "WAP", "model_number": "123",
749                      "serial_number": "12345", "device_type": "6-0050F204-1",
750                      "os_version": "01020300",
751                      "config_methods": "label push_button",
752                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
753     logger.info("WPS configuration step")
754     new_passphrase = "1234567890"
755     dev[0].dump_monitor()
756     dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
757                    new_passphrase)
758     status = dev[0].get_status()
759     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
760         raise Exception("Not fully connected")
761     if status['ssid'] != ssid:
762         raise Exception("Unexpected SSID")
763     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
764         raise Exception("Unexpected encryption configuration")
765     if status['key_mgmt'] != 'WPA2-PSK':
766         raise Exception("Unexpected key_mgmt")
767
768     logger.info("Start ER")
769     dev[0].request("WPS_ER_START ifname=lo")
770     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
771     if ev is None:
772         raise Exception("AP discovery timed out")
773     if ap_uuid not in ev:
774         raise Exception("Expected AP UUID not found")
775
776     logger.info("Learn AP configuration through UPnP")
777     dev[0].dump_monitor()
778     dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
779     ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
780     if ev is None:
781         raise Exception("AP learn timed out")
782     if ap_uuid not in ev:
783         raise Exception("Expected AP UUID not in settings")
784     if "ssid=" + ssid not in ev:
785         raise Exception("Expected SSID not in settings")
786     if "key=" + new_passphrase not in ev:
787         raise Exception("Expected passphrase not in settings")
788
789     logger.info("Add Enrollee using ER")
790     pin = dev[1].wps_read_pin()
791     dev[0].dump_monitor()
792     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
793     dev[1].dump_monitor()
794     dev[1].request("WPS_PIN any " + pin)
795     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
796     if ev is None:
797         raise Exception("Enrollee did not report success")
798     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
799     if ev is None:
800         raise Exception("Association with the AP timed out")
801     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
802     if ev is None:
803         raise Exception("WPS ER did not report success")
804     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
805
806     logger.info("Add a specific Enrollee using ER")
807     pin = dev[2].wps_read_pin()
808     addr2 = dev[2].p2p_interface_addr()
809     dev[0].dump_monitor()
810     dev[2].dump_monitor()
811     dev[2].request("WPS_PIN any " + pin)
812     ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
813     if ev is None:
814         raise Exception("Enrollee not seen")
815     if addr2 not in ev:
816         raise Exception("Unexpected Enrollee MAC address")
817     dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
818     ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
819     if ev is None:
820         raise Exception("Association with the AP timed out")
821     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
822     if ev is None:
823         raise Exception("WPS ER did not report success")
824
825     logger.info("Verify registrar selection behavior")
826     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
827     dev[1].request("DISCONNECT")
828     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
829     dev[1].scan(freq="2412")
830     bss = dev[1].get_bss(apdev[0]['bssid'])
831     if "[WPS-AUTH]" not in bss['flags']:
832         raise Exception("WPS-AUTH flag missing")
833
834     logger.info("Stop ER")
835     dev[0].dump_monitor()
836     dev[0].request("WPS_ER_STOP")
837     ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
838     if ev is None:
839         raise Exception("WPS ER unsubscription timed out")
840     # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
841     # a bit before verifying that the scan results have change.
842     time.sleep(0.2)
843
844     dev[1].scan(freq="2412")
845     bss = dev[1].get_bss(apdev[0]['bssid'])
846     if "[WPS-AUTH]" in bss['flags']:
847         raise Exception("WPS-AUTH flag not removed")
848
849 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
850     """WPS ER connected to AP and adding a new enrollee using PBC"""
851     ssid = "wps-er-add-enrollee-pbc"
852     ap_pin = "12345670"
853     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
854     hostapd.add_ap(apdev[0]['ifname'],
855                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
856                      "wpa_passphrase": "12345678", "wpa": "2",
857                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
858                      "device_name": "Wireless AP", "manufacturer": "Company",
859                      "model_name": "WAP", "model_number": "123",
860                      "serial_number": "12345", "device_type": "6-0050F204-1",
861                      "os_version": "01020300",
862                      "config_methods": "label push_button",
863                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
864     logger.info("Learn AP configuration")
865     dev[0].dump_monitor()
866     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
867     status = dev[0].get_status()
868     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
869         raise Exception("Not fully connected")
870
871     logger.info("Start ER")
872     dev[0].request("WPS_ER_START ifname=lo")
873     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
874     if ev is None:
875         raise Exception("AP discovery timed out")
876     if ap_uuid not in ev:
877         raise Exception("Expected AP UUID not found")
878
879     logger.info("Use learned network configuration on ER")
880     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
881
882     logger.info("Add Enrollee using ER and PBC")
883     dev[0].dump_monitor()
884     enrollee = dev[1].p2p_interface_addr()
885     dev[1].dump_monitor()
886     dev[1].request("WPS_PBC")
887
888     for i in range(0, 2):
889         ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
890         if ev is None:
891             raise Exception("Enrollee discovery timed out")
892         if enrollee in ev:
893             break
894         if i == 1:
895             raise Exception("Expected Enrollee not found")
896     dev[0].request("WPS_ER_PBC " + enrollee)
897
898     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
899     if ev is None:
900         raise Exception("Enrollee did not report success")
901     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
902     if ev is None:
903         raise Exception("Association with the AP timed out")
904     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
905     if ev is None:
906         raise Exception("WPS ER did not report success")
907     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
908
909     # verify BSSID selection of the AP instead of UUID
910     if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
911         raise Exception("Could not select AP based on BSSID")
912
913 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
914     """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
915     ssid = "wps-er-add-enrollee-pbc"
916     ap_pin = "12345670"
917     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
918     hostapd.add_ap(apdev[0]['ifname'],
919                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
920                      "wpa_passphrase": "12345678", "wpa": "2",
921                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
922                      "device_name": "Wireless AP", "manufacturer": "Company",
923                      "model_name": "WAP", "model_number": "123",
924                      "serial_number": "12345", "device_type": "6-0050F204-1",
925                      "os_version": "01020300",
926                      "config_methods": "label push_button",
927                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
928     logger.info("Learn AP configuration")
929     dev[0].request("SET wps_version_number 0x10")
930     dev[0].dump_monitor()
931     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
932     status = dev[0].get_status()
933     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
934         raise Exception("Not fully connected")
935
936     logger.info("Start ER")
937     dev[0].request("WPS_ER_START ifname=lo")
938     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
939     if ev is None:
940         raise Exception("AP discovery timed out")
941     if ap_uuid not in ev:
942         raise Exception("Expected AP UUID not found")
943
944     logger.info("Use learned network configuration on ER")
945     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
946
947     logger.info("Add Enrollee using ER and PIN")
948     enrollee = dev[1].p2p_interface_addr()
949     pin = dev[1].wps_read_pin()
950     dev[0].dump_monitor()
951     dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
952     dev[1].dump_monitor()
953     dev[1].request("WPS_PIN any " + pin)
954     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
955     if ev is None:
956         raise Exception("Association with the AP timed out")
957     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
958     if ev is None:
959         raise Exception("WPS ER did not report success")
960
961 def test_ap_wps_er_config_ap(dev, apdev):
962     """WPS ER configuring AP over UPnP"""
963     ssid = "wps-er-ap-config"
964     ap_pin = "12345670"
965     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
966     hostapd.add_ap(apdev[0]['ifname'],
967                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
968                      "wpa_passphrase": "12345678", "wpa": "2",
969                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
970                      "device_name": "Wireless AP", "manufacturer": "Company",
971                      "model_name": "WAP", "model_number": "123",
972                      "serial_number": "12345", "device_type": "6-0050F204-1",
973                      "os_version": "01020300",
974                      "config_methods": "label push_button",
975                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
976
977     logger.info("Connect ER to the AP")
978     dev[0].connect(ssid, psk="12345678", scan_freq="2412")
979
980     logger.info("WPS configuration step")
981     dev[0].request("WPS_ER_START ifname=lo")
982     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
983     if ev is None:
984         raise Exception("AP discovery timed out")
985     if ap_uuid not in ev:
986         raise Exception("Expected AP UUID not found")
987     new_passphrase = "1234567890"
988     dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
989                    ssid.encode("hex") + " WPA2PSK CCMP " +
990                    new_passphrase.encode("hex"))
991     ev = dev[0].wait_event(["WPS-SUCCESS"])
992     if ev is None:
993         raise Exception("WPS ER configuration operation timed out")
994     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
995     dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
996
997 def test_ap_wps_fragmentation(dev, apdev):
998     """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
999     ssid = "test-wps-fragmentation"
1000     hostapd.add_ap(apdev[0]['ifname'],
1001                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1002                      "wpa_passphrase": "12345678", "wpa": "3",
1003                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1004                      "wpa_pairwise": "TKIP",
1005                      "fragment_size": "50" })
1006     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1007     logger.info("WPS provisioning step")
1008     hapd.request("WPS_PBC")
1009     dev[0].dump_monitor()
1010     dev[0].request("SET wps_fragment_size 50")
1011     dev[0].request("WPS_PBC")
1012     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1013     if ev is None:
1014         raise Exception("Association with the AP timed out")
1015     status = dev[0].get_status()
1016     if status['wpa_state'] != 'COMPLETED':
1017         raise Exception("Not fully connected")
1018     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
1019         raise Exception("Unexpected encryption configuration")
1020     if status['key_mgmt'] != 'WPA2-PSK':
1021         raise Exception("Unexpected key_mgmt")
1022
1023 def test_ap_wps_new_version_sta(dev, apdev):
1024     """WPS compatibility with new version number on the station"""
1025     ssid = "test-wps-ver"
1026     hostapd.add_ap(apdev[0]['ifname'],
1027                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1028                      "wpa_passphrase": "12345678", "wpa": "2",
1029                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1030     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1031     logger.info("WPS provisioning step")
1032     hapd.request("WPS_PBC")
1033     dev[0].dump_monitor()
1034     dev[0].request("SET wps_version_number 0x43")
1035     dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
1036     dev[0].request("WPS_PBC")
1037     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1038     if ev is None:
1039         raise Exception("Association with the AP timed out")
1040
1041 def test_ap_wps_new_version_ap(dev, apdev):
1042     """WPS compatibility with new version number on the AP"""
1043     ssid = "test-wps-ver"
1044     hostapd.add_ap(apdev[0]['ifname'],
1045                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1046                      "wpa_passphrase": "12345678", "wpa": "2",
1047                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1048     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1049     logger.info("WPS provisioning step")
1050     if "FAIL" in hapd.request("SET wps_version_number 0x43"):
1051         raise Exception("Failed to enable test functionality")
1052     hapd.request("WPS_PBC")
1053     dev[0].dump_monitor()
1054     dev[0].request("WPS_PBC")
1055     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1056     hapd.request("SET wps_version_number 0x20")
1057     if ev is None:
1058         raise Exception("Association with the AP timed out")
1059
1060 def test_ap_wps_check_pin(dev, apdev):
1061     """Verify PIN checking through control interface"""
1062     hostapd.add_ap(apdev[0]['ifname'],
1063                    { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1064                      "wpa_passphrase": "12345678", "wpa": "2",
1065                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1066     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1067     for t in [ ("12345670", "12345670"),
1068                ("12345678", "FAIL-CHECKSUM"),
1069                ("1234-5670", "12345670"),
1070                ("1234 5670", "12345670"),
1071                ("1-2.3:4 5670", "12345670") ]:
1072         res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1073         res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1074         if res != res2:
1075             raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1076         if res != t[1]:
1077             raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
1078
1079     if "FAIL" not in hapd.request("WPS_CHECK_PIN 12345"):
1080         raise Exception("Unexpected WPS_CHECK_PIN success")
1081     if "FAIL" not in hapd.request("WPS_CHECK_PIN 123456789"):
1082         raise Exception("Unexpected WPS_CHECK_PIN success")
1083
1084 def test_ap_wps_wep_config(dev, apdev):
1085     """WPS 2.0 AP rejecting WEP configuration"""
1086     ssid = "test-wps-config"
1087     appin = "12345670"
1088     hostapd.add_ap(apdev[0]['ifname'],
1089                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1090                      "ap_pin": appin})
1091     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1092     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
1093                    "hello", no_wait=True)
1094     ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
1095     if ev is None:
1096         raise Exception("WPS-FAIL timed out")
1097     if "reason=2" not in ev:
1098         raise Exception("Unexpected reason code in WPS-FAIL")
1099     status = hapd.request("WPS_GET_STATUS")
1100     if "Last WPS result: Failed" not in status:
1101         raise Exception("WPS failure result not shown correctly")
1102     if "Failure Reason: WEP Prohibited" not in status:
1103         raise Exception("Failure reason not reported correctly")
1104     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
1105         raise Exception("Peer address not shown correctly")
1106
1107 def test_ap_wps_wep_enroll(dev, apdev):
1108     """WPS 2.0 STA rejecting WEP configuration"""
1109     ssid = "test-wps-wep"
1110     hostapd.add_ap(apdev[0]['ifname'],
1111                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1112                      "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1113     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1114     hapd.request("WPS_PBC")
1115     dev[0].request("WPS_PBC")
1116     ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
1117     if ev is None:
1118         raise Exception("WPS-FAIL event timed out")
1119     if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
1120         raise Exception("Unexpected WPS-FAIL event: " + ev)
1121
1122 def test_ap_wps_ie_fragmentation(dev, apdev):
1123     """WPS AP using fragmented WPS IE"""
1124     ssid = "test-wps-ie-fragmentation"
1125     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1126                "wpa_passphrase": "12345678", "wpa": "2",
1127                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1128                "device_name": "1234567890abcdef1234567890abcdef",
1129                "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1130                "model_name": "1234567890abcdef1234567890abcdef",
1131                "model_number": "1234567890abcdef1234567890abcdef",
1132                "serial_number": "1234567890abcdef1234567890abcdef" }
1133     hostapd.add_ap(apdev[0]['ifname'], params)
1134     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1135     hapd.request("WPS_PBC")
1136     dev[0].request("WPS_PBC")
1137     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1138     if ev is None:
1139         raise Exception("Association with the AP timed out")
1140     bss = dev[0].get_bss(apdev[0]['bssid'])
1141     if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1142         logger.info(bss)
1143         raise Exception("Device Name not received correctly")
1144     if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1145         raise Exception("Unexpected number of WPS IEs")
1146
1147 def get_psk(pskfile):
1148     psks = {}
1149     with open(pskfile, "r") as f:
1150         lines = f.read().splitlines()
1151         for l in lines:
1152             if l == "# WPA PSKs":
1153                 continue
1154             (addr,psk) = l.split(' ')
1155             psks[addr] = psk
1156     return psks
1157
1158 def test_ap_wps_per_station_psk(dev, apdev):
1159     """WPS PBC provisioning with per-station PSK"""
1160     addr0 = dev[0].p2p_dev_addr()
1161     addr1 = dev[1].p2p_dev_addr()
1162     addr2 = dev[2].p2p_dev_addr()
1163     ssid = "wps"
1164     appin = "12345670"
1165     pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1166     try:
1167         os.remove(pskfile)
1168     except:
1169         pass
1170
1171     try:
1172         with open(pskfile, "w") as f:
1173             f.write("# WPA PSKs\n")
1174
1175         params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1176                    "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1177                    "rsn_pairwise": "CCMP", "ap_pin": appin,
1178                    "wpa_psk_file": pskfile }
1179         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1180
1181         logger.info("First enrollee")
1182         hapd.request("WPS_PBC")
1183         dev[0].request("WPS_PBC")
1184         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1185         if ev is None:
1186             raise Exception("Association with the AP timed out (1)")
1187
1188         logger.info("Second enrollee")
1189         hapd.request("WPS_PBC")
1190         dev[1].request("WPS_PBC")
1191         ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1192         if ev is None:
1193             raise Exception("Association with the AP timed out (2)")
1194
1195         logger.info("External registrar")
1196         dev[2].wps_reg(apdev[0]['bssid'], appin)
1197
1198         logger.info("Verifying PSK results")
1199         psks = get_psk(pskfile)
1200         if addr0 not in psks:
1201             raise Exception("No PSK recorded for sta0")
1202         if addr1 not in psks:
1203             raise Exception("No PSK recorded for sta1")
1204         if addr2 not in psks:
1205             raise Exception("No PSK recorded for sta2")
1206         if psks[addr0] == psks[addr1]:
1207             raise Exception("Same PSK recorded for sta0 and sta1")
1208         if psks[addr0] == psks[addr2]:
1209             raise Exception("Same PSK recorded for sta0 and sta2")
1210         if psks[addr1] == psks[addr2]:
1211             raise Exception("Same PSK recorded for sta1 and sta2")
1212
1213         dev[0].request("REMOVE_NETWORK all")
1214         logger.info("Second external registrar")
1215         dev[0].wps_reg(apdev[0]['bssid'], appin)
1216         psks2 = get_psk(pskfile)
1217         if addr0 not in psks2:
1218             raise Exception("No PSK recorded for sta0(reg)")
1219         if psks[addr0] == psks2[addr0]:
1220             raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1221     finally:
1222         os.remove(pskfile)
1223
1224 def test_ap_wps_per_station_psk_failure(dev, apdev):
1225     """WPS PBC provisioning with per-station PSK (file not writable)"""
1226     addr0 = dev[0].p2p_dev_addr()
1227     addr1 = dev[1].p2p_dev_addr()
1228     addr2 = dev[2].p2p_dev_addr()
1229     ssid = "wps"
1230     appin = "12345670"
1231     pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1232     try:
1233         os.remove(pskfile)
1234     except:
1235         pass
1236
1237     try:
1238         with open(pskfile, "w") as f:
1239             f.write("# WPA PSKs\n")
1240
1241         params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1242                    "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1243                    "rsn_pairwise": "CCMP", "ap_pin": appin,
1244                    "wpa_psk_file": pskfile }
1245         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1246         if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1247             raise Exception("Failed to set wpa_psk_file")
1248
1249         logger.info("First enrollee")
1250         hapd.request("WPS_PBC")
1251         dev[0].request("WPS_PBC")
1252         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1253         if ev is None:
1254             raise Exception("Association with the AP timed out (1)")
1255
1256         logger.info("Second enrollee")
1257         hapd.request("WPS_PBC")
1258         dev[1].request("WPS_PBC")
1259         ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1260         if ev is None:
1261             raise Exception("Association with the AP timed out (2)")
1262
1263         logger.info("External registrar")
1264         dev[2].wps_reg(apdev[0]['bssid'], appin)
1265
1266         logger.info("Verifying PSK results")
1267         psks = get_psk(pskfile)
1268         if len(psks) > 0:
1269             raise Exception("PSK recorded unexpectedly")
1270     finally:
1271         os.remove(pskfile)
1272
1273 def test_ap_wps_pin_request_file(dev, apdev):
1274     """WPS PIN provisioning with configured AP"""
1275     ssid = "wps"
1276     pinfile = "/tmp/ap_wps_pin_request_file.log"
1277     if os.path.exists(pinfile):
1278         subprocess.call(['sudo', 'rm', pinfile])
1279     hostapd.add_ap(apdev[0]['ifname'],
1280                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1281                      "wps_pin_requests": pinfile,
1282                      "wpa_passphrase": "12345678", "wpa": "2",
1283                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1284     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1285     uuid = dev[0].get_status_field("uuid")
1286     pin = dev[0].wps_read_pin()
1287     try:
1288         dev[0].request("WPS_PIN any " + pin)
1289         ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1290         if ev is None:
1291             raise Exception("PIN needed event not shown")
1292         if uuid not in ev:
1293             raise Exception("UUID mismatch")
1294         dev[0].request("WPS_CANCEL")
1295         success = False
1296         with open(pinfile, "r") as f:
1297             lines = f.readlines()
1298             for l in lines:
1299                 if uuid in l:
1300                     success = True
1301                     break
1302         if not success:
1303             raise Exception("PIN request entry not in the log file")
1304     finally:
1305         subprocess.call(['sudo', 'rm', pinfile])
1306
1307 def test_ap_wps_auto_setup_with_config_file(dev, apdev):
1308     """WPS auto-setup with configuration file"""
1309     conffile = "/tmp/ap_wps_auto_setup_with_config_file.conf"
1310     ifname = apdev[0]['ifname']
1311     try:
1312         with open(conffile, "w") as f:
1313             f.write("driver=nl80211\n")
1314             f.write("hw_mode=g\n")
1315             f.write("channel=1\n")
1316             f.write("ieee80211n=1\n")
1317             f.write("interface=%s\n" % ifname)
1318             f.write("ctrl_interface=/var/run/hostapd\n")
1319             f.write("ssid=wps\n")
1320             f.write("eap_server=1\n")
1321             f.write("wps_state=1\n")
1322         hostapd.add_bss('phy3', ifname, conffile)
1323         hapd = hostapd.Hostapd(ifname)
1324         hapd.request("WPS_PBC")
1325         dev[0].request("WPS_PBC")
1326         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1327         if ev is None:
1328             raise Exception("Association with the AP timed out")
1329         with open(conffile, "r") as f:
1330             lines = f.read().splitlines()
1331             vals = dict()
1332             for l in lines:
1333                 try:
1334                     [name,value] = l.split('=', 1)
1335                     vals[name] = value
1336                 except ValueError, e:
1337                     if "# WPS configuration" in l:
1338                         pass
1339                     else:
1340                         raise Exception("Unexpected configuration line: " + l)
1341         if vals['ieee80211n'] != '1' or vals['wps_state'] != '2' or "WPA-PSK" not in vals['wpa_key_mgmt']:
1342             raise Exception("Incorrect configuration: " + str(vals))
1343     finally:
1344         subprocess.call(['sudo', 'rm', conffile])
1345
1346 def test_ap_wps_pbc_timeout(dev, apdev, params):
1347     """wpa_supplicant PBC walk time [long]"""
1348     if not params['long']:
1349         logger.info("Skip test case with long duration due to --long not specified")
1350         return "skip"
1351     ssid = "test-wps"
1352     hostapd.add_ap(apdev[0]['ifname'],
1353                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
1354     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1355     logger.info("Start WPS_PBC and wait for PBC walk time expiration")
1356     if "OK" not in dev[0].request("WPS_PBC"):
1357         raise Exception("WPS_PBC failed")
1358     ev = dev[0].wait_event(["WPS-TIMEOUT"], timeout=150)
1359     if ev is None:
1360         raise Exception("WPS-TIMEOUT not reported")
1361
1362 def add_ssdp_ap(ifname, ap_uuid):
1363     ssid = "wps-ssdp"
1364     ap_pin = "12345670"
1365     hostapd.add_ap(ifname,
1366                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1367                      "wpa_passphrase": "12345678", "wpa": "2",
1368                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1369                      "device_name": "Wireless AP", "manufacturer": "Company",
1370                      "model_name": "WAP", "model_number": "123",
1371                      "serial_number": "12345", "device_type": "6-0050F204-1",
1372                      "os_version": "01020300",
1373                      "config_methods": "label push_button",
1374                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1375                      "friendly_name": "WPS Access Point",
1376                      "manufacturer_url": "http://www.example.com/",
1377                      "model_description": "Wireless Access Point",
1378                      "model_url": "http://www.example.com/model/",
1379                      "upc": "123456789012" })
1380
1381 def ssdp_send(msg, no_recv=False):
1382     socket.setdefaulttimeout(1)
1383     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1384     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1385     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1386     sock.bind(("127.0.0.1", 0))
1387     sock.sendto(msg, ("239.255.255.250", 1900))
1388     if no_recv:
1389         return None
1390     return sock.recv(1000)
1391
1392 def ssdp_send_msearch(st):
1393     msg = '\r\n'.join([
1394             'M-SEARCH * HTTP/1.1',
1395             'HOST: 239.255.255.250:1900',
1396             'MX: 1',
1397             'MAN: "ssdp:discover"',
1398             'ST: ' + st,
1399             '', ''])
1400     return ssdp_send(msg)
1401
1402 def test_ap_wps_ssdp_msearch(dev, apdev):
1403     """WPS AP and SSDP M-SEARCH messages"""
1404     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1405     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1406
1407     msg = '\r\n'.join([
1408             'M-SEARCH * HTTP/1.1',
1409             'Host: 239.255.255.250:1900',
1410             'Mx: 1',
1411             'Man: "ssdp:discover"',
1412             'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1413             '', ''])
1414     ssdp_send(msg)
1415
1416     msg = '\r\n'.join([
1417             'M-SEARCH * HTTP/1.1',
1418             'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1419             'mx: \t1\t\t   ',
1420             'man: \t \t "ssdp:discover"   ',
1421             'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1422             '', ''])
1423     ssdp_send(msg)
1424
1425     ssdp_send_msearch("ssdp:all")
1426     ssdp_send_msearch("upnp:rootdevice")
1427     ssdp_send_msearch("uuid:" + ap_uuid)
1428     ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1429     ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1430
1431     msg = '\r\n'.join([
1432             'M-SEARCH * HTTP/1.1',
1433             'HOST:\t239.255.255.250:1900',
1434             'MAN: "ssdp:discover"',
1435             'MX: 130',
1436             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1437             '', ''])
1438     ssdp_send(msg, no_recv=True)
1439
1440 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1441     """WPS AP and invalid SSDP M-SEARCH messages"""
1442     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1443     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1444
1445     socket.setdefaulttimeout(1)
1446     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1447     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1448     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1449     sock.bind(("127.0.0.1", 0))
1450
1451     logger.debug("Missing MX")
1452     msg = '\r\n'.join([
1453             'M-SEARCH * HTTP/1.1',
1454             'HOST: 239.255.255.250:1900',
1455             'MAN: "ssdp:discover"',
1456             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1457             '', ''])
1458     sock.sendto(msg, ("239.255.255.250", 1900))
1459
1460     logger.debug("Negative MX")
1461     msg = '\r\n'.join([
1462             'M-SEARCH * HTTP/1.1',
1463             'HOST: 239.255.255.250:1900',
1464             'MX: -1',
1465             'MAN: "ssdp:discover"',
1466             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1467             '', ''])
1468     sock.sendto(msg, ("239.255.255.250", 1900))
1469
1470     logger.debug("Invalid MX")
1471     msg = '\r\n'.join([
1472             'M-SEARCH * HTTP/1.1',
1473             'HOST: 239.255.255.250:1900',
1474             'MX; 1',
1475             'MAN: "ssdp:discover"',
1476             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1477             '', ''])
1478     sock.sendto(msg, ("239.255.255.250", 1900))
1479
1480     logger.debug("Missing MAN")
1481     msg = '\r\n'.join([
1482             'M-SEARCH * HTTP/1.1',
1483             'HOST: 239.255.255.250:1900',
1484             'MX: 1',
1485             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1486             '', ''])
1487     sock.sendto(msg, ("239.255.255.250", 1900))
1488
1489     logger.debug("Invalid MAN")
1490     msg = '\r\n'.join([
1491             'M-SEARCH * HTTP/1.1',
1492             'HOST: 239.255.255.250:1900',
1493             'MX: 1',
1494             'MAN: foo',
1495             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1496             '', ''])
1497     sock.sendto(msg, ("239.255.255.250", 1900))
1498     msg = '\r\n'.join([
1499             'M-SEARCH * HTTP/1.1',
1500             'HOST: 239.255.255.250:1900',
1501             'MX: 1',
1502             'MAN; "ssdp:discover"',
1503             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1504             '', ''])
1505     sock.sendto(msg, ("239.255.255.250", 1900))
1506
1507     logger.debug("Missing HOST")
1508     msg = '\r\n'.join([
1509             'M-SEARCH * HTTP/1.1',
1510             'MAN: "ssdp:discover"',
1511             'MX: 1',
1512             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1513             '', ''])
1514     sock.sendto(msg, ("239.255.255.250", 1900))
1515
1516     logger.debug("Missing ST")
1517     msg = '\r\n'.join([
1518             'M-SEARCH * HTTP/1.1',
1519             'HOST: 239.255.255.250:1900',
1520             'MAN: "ssdp:discover"',
1521             'MX: 1',
1522             '', ''])
1523     sock.sendto(msg, ("239.255.255.250", 1900))
1524
1525     logger.debug("Mismatching ST")
1526     msg = '\r\n'.join([
1527             'M-SEARCH * HTTP/1.1',
1528             'HOST: 239.255.255.250:1900',
1529             'MAN: "ssdp:discover"',
1530             'MX: 1',
1531             'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1532             '', ''])
1533     sock.sendto(msg, ("239.255.255.250", 1900))
1534     msg = '\r\n'.join([
1535             'M-SEARCH * HTTP/1.1',
1536             'HOST: 239.255.255.250:1900',
1537             'MAN: "ssdp:discover"',
1538             'MX: 1',
1539             'ST: foo:bar',
1540             '', ''])
1541     sock.sendto(msg, ("239.255.255.250", 1900))
1542     msg = '\r\n'.join([
1543             'M-SEARCH * HTTP/1.1',
1544             'HOST: 239.255.255.250:1900',
1545             'MAN: "ssdp:discover"',
1546             'MX: 1',
1547             'ST: foobar',
1548             '', ''])
1549     sock.sendto(msg, ("239.255.255.250", 1900))
1550
1551     logger.debug("Invalid ST")
1552     msg = '\r\n'.join([
1553             'M-SEARCH * HTTP/1.1',
1554             'HOST: 239.255.255.250:1900',
1555             'MAN: "ssdp:discover"',
1556             'MX: 1',
1557             'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1558             '', ''])
1559     sock.sendto(msg, ("239.255.255.250", 1900))
1560
1561     logger.debug("Invalid M-SEARCH")
1562     msg = '\r\n'.join([
1563             'M+SEARCH * HTTP/1.1',
1564             'HOST: 239.255.255.250:1900',
1565             'MAN: "ssdp:discover"',
1566             'MX: 1',
1567             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1568             '', ''])
1569     sock.sendto(msg, ("239.255.255.250", 1900))
1570     msg = '\r\n'.join([
1571             'M-SEARCH-* HTTP/1.1',
1572             'HOST: 239.255.255.250:1900',
1573             'MAN: "ssdp:discover"',
1574             'MX: 1',
1575             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1576             '', ''])
1577     sock.sendto(msg, ("239.255.255.250", 1900))
1578
1579     logger.debug("Invalid message format")
1580     sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1581     msg = '\r'.join([
1582             'M-SEARCH * HTTP/1.1',
1583             'HOST: 239.255.255.250:1900',
1584             'MAN: "ssdp:discover"',
1585             'MX: 1',
1586             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1587             '', ''])
1588     sock.sendto(msg, ("239.255.255.250", 1900))
1589
1590     try:
1591         r = sock.recv(1000)
1592         raise Exception("Unexpected M-SEARCH response: " + r)
1593     except socket.timeout:
1594         pass
1595
1596     logger.debug("Valid M-SEARCH")
1597     msg = '\r\n'.join([
1598             'M-SEARCH * HTTP/1.1',
1599             'HOST: 239.255.255.250:1900',
1600             'MAN: "ssdp:discover"',
1601             'MX: 1',
1602             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1603             '', ''])
1604     sock.sendto(msg, ("239.255.255.250", 1900))
1605
1606     try:
1607         r = sock.recv(1000)
1608         pass
1609     except socket.timeout:
1610         raise Exception("No SSDP response")
1611
1612 def test_ap_wps_ssdp_burst(dev, apdev):
1613     """WPS AP and SSDP burst"""
1614     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1615     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1616
1617     msg = '\r\n'.join([
1618             'M-SEARCH * HTTP/1.1',
1619             'HOST: 239.255.255.250:1900',
1620             'MAN: "ssdp:discover"',
1621             'MX: 1',
1622             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1623             '', ''])
1624     socket.setdefaulttimeout(1)
1625     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1626     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1627     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1628     sock.bind(("127.0.0.1", 0))
1629     for i in range(0, 25):
1630         sock.sendto(msg, ("239.255.255.250", 1900))
1631     resp = 0
1632     while True:
1633         try:
1634             r = sock.recv(1000)
1635             if not r.startswith("HTTP/1.1 200 OK\r\n"):
1636                 raise Exception("Unexpected message: " + r)
1637             resp += 1
1638         except socket.timeout:
1639             break
1640     if resp < 20:
1641         raise Exception("Too few SSDP responses")
1642
1643     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1644     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1645     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1646     sock.bind(("127.0.0.1", 0))
1647     for i in range(0, 25):
1648         sock.sendto(msg, ("239.255.255.250", 1900))
1649     while True:
1650         try:
1651             r = sock.recv(1000)
1652             if ap_uuid in r:
1653                 break
1654         except socket.timeout:
1655             raise Exception("No SSDP response")
1656
1657 def ssdp_get_location(uuid):
1658     res = ssdp_send_msearch("uuid:" + uuid)
1659     location = None
1660     for l in res.splitlines():
1661         if l.lower().startswith("location:"):
1662             location = l.split(':', 1)[1].strip()
1663             break
1664     if location is None:
1665         raise Exception("No UPnP location found")
1666     return location
1667
1668 def upnp_get_urls(location):
1669     conn = urllib.urlopen(location)
1670     tree = ET.parse(conn)
1671     root = tree.getroot()
1672     urn = '{urn:schemas-upnp-org:device-1-0}'
1673     service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1674     res = {}
1675     res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1676     res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1677     res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1678     return res
1679
1680 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1681     soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1682     wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1683     ET.register_namespace('soapenv', soapns)
1684     ET.register_namespace('wfa', wpsns)
1685     attrib = {}
1686     attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1687     root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1688     body = ET.SubElement(root, "{%s}Body" % soapns)
1689     act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1690     tree = ET.ElementTree(root)
1691     soap = StringIO.StringIO()
1692     tree.write(soap, xml_declaration=True, encoding='utf-8')
1693
1694     headers = { "Content-type": 'text/xml; charset="utf-8"' }
1695     if include_soap_action:
1696         headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1697     elif soap_action_override:
1698         headers["SOAPAction"] = soap_action_override
1699     conn.request("POST", path, soap.getvalue(), headers)
1700     return conn.getresponse()
1701
1702 def test_ap_wps_upnp(dev, apdev):
1703     """WPS AP and UPnP operations"""
1704     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1705     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1706
1707     location = ssdp_get_location(ap_uuid)
1708     urls = upnp_get_urls(location)
1709
1710     conn = urllib.urlopen(urls['scpd_url'])
1711     scpd = conn.read()
1712
1713     conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1714     if conn.getcode() != 404:
1715         raise Exception("Unexpected HTTP response to GET unknown URL")
1716
1717     url = urlparse.urlparse(location)
1718     conn = httplib.HTTPConnection(url.netloc)
1719     #conn.set_debuglevel(1)
1720     headers = { "Content-type": 'text/xml; charset="utf-8"',
1721                 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1722     conn.request("POST", "hello", "\r\n\r\n", headers)
1723     resp = conn.getresponse()
1724     if resp.status != 404:
1725         raise Exception("Unexpected HTTP response: %s" % resp.status)
1726
1727     conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1728     resp = conn.getresponse()
1729     if resp.status != 501:
1730         raise Exception("Unexpected HTTP response: %s" % resp.status)
1731
1732     headers = { "Content-type": 'text/xml; charset="utf-8"',
1733                 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1734     ctrlurl = urlparse.urlparse(urls['control_url'])
1735     conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1736     resp = conn.getresponse()
1737     if resp.status != 401:
1738         raise Exception("Unexpected HTTP response: %s" % resp.status)
1739
1740     logger.debug("GetDeviceInfo without SOAPAction header")
1741     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1742                             include_soap_action=False)
1743     if resp.status != 401:
1744         raise Exception("Unexpected HTTP response: %s" % resp.status)
1745
1746     logger.debug("GetDeviceInfo with invalid SOAPAction header")
1747     for act in [ "foo",
1748                  "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1749                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1750                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1751         resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1752                                 include_soap_action=False,
1753                                 soap_action_override=act)
1754         if resp.status != 401:
1755             raise Exception("Unexpected HTTP response: %s" % resp.status)
1756
1757     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1758     if resp.status != 200:
1759         raise Exception("Unexpected HTTP response: %s" % resp.status)
1760     dev = resp.read()
1761     if "NewDeviceInfo" not in dev:
1762         raise Exception("Unexpected GetDeviceInfo response")
1763
1764     logger.debug("PutMessage without required parameters")
1765     resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1766     if resp.status != 600:
1767         raise Exception("Unexpected HTTP response: %s" % resp.status)
1768
1769     logger.debug("PutWLANResponse without required parameters")
1770     resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1771     if resp.status != 600:
1772         raise Exception("Unexpected HTTP response: %s" % resp.status)
1773
1774     logger.debug("SetSelectedRegistrar from unregistered ER")
1775     resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1776     if resp.status != 501:
1777         raise Exception("Unexpected HTTP response: %s" % resp.status)
1778
1779     logger.debug("Unknown action")
1780     resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1781     if resp.status != 401:
1782         raise Exception("Unexpected HTTP response: %s" % resp.status)
1783
1784 def test_ap_wps_upnp_subscribe(dev, apdev):
1785     """WPS AP and UPnP event subscription"""
1786     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1787     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1788
1789     location = ssdp_get_location(ap_uuid)
1790     urls = upnp_get_urls(location)
1791     eventurl = urlparse.urlparse(urls['event_sub_url'])
1792
1793     url = urlparse.urlparse(location)
1794     conn = httplib.HTTPConnection(url.netloc)
1795     #conn.set_debuglevel(1)
1796     headers = { "callback": '<http://127.0.0.1:12345/event>',
1797                 "timeout": "Second-1234" }
1798     conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1799     resp = conn.getresponse()
1800     if resp.status != 412:
1801         raise Exception("Unexpected HTTP response: %s" % resp.status)
1802
1803     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1804     resp = conn.getresponse()
1805     if resp.status != 412:
1806         raise Exception("Unexpected HTTP response: %s" % resp.status)
1807
1808     headers = { "NT": "upnp:event",
1809                 "timeout": "Second-1234" }
1810     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1811     resp = conn.getresponse()
1812     if resp.status != 412:
1813         raise Exception("Unexpected HTTP response: %s" % resp.status)
1814
1815     headers = { "callback": '<http://127.0.0.1:12345/event>',
1816                 "NT": "upnp:foobar",
1817                 "timeout": "Second-1234" }
1818     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1819     resp = conn.getresponse()
1820     if resp.status != 400:
1821         raise Exception("Unexpected HTTP response: %s" % resp.status)
1822
1823     logger.debug("Valid subscription")
1824     headers = { "callback": '<http://127.0.0.1:12345/event>',
1825                 "NT": "upnp:event",
1826                 "timeout": "Second-1234" }
1827     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1828     resp = conn.getresponse()
1829     if resp.status != 200:
1830         raise Exception("Unexpected HTTP response: %s" % resp.status)
1831     sid = resp.getheader("sid")
1832     logger.debug("Subscription SID " + sid)
1833
1834     logger.debug("Invalid re-subscription")
1835     headers = { "NT": "upnp:event",
1836                 "sid": "123456734567854",
1837                 "timeout": "Second-1234" }
1838     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1839     resp = conn.getresponse()
1840     if resp.status != 400:
1841         raise Exception("Unexpected HTTP response: %s" % resp.status)
1842
1843     logger.debug("Invalid re-subscription")
1844     headers = { "NT": "upnp:event",
1845                 "sid": "uuid:123456734567854",
1846                 "timeout": "Second-1234" }
1847     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1848     resp = conn.getresponse()
1849     if resp.status != 400:
1850         raise Exception("Unexpected HTTP response: %s" % resp.status)
1851
1852     logger.debug("Invalid re-subscription")
1853     headers = { "callback": '<http://127.0.0.1:12345/event>',
1854                 "NT": "upnp:event",
1855                 "sid": sid,
1856                 "timeout": "Second-1234" }
1857     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1858     resp = conn.getresponse()
1859     if resp.status != 400:
1860         raise Exception("Unexpected HTTP response: %s" % resp.status)
1861
1862     logger.debug("SID mismatch in re-subscription")
1863     headers = { "NT": "upnp:event",
1864                 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1865                 "timeout": "Second-1234" }
1866     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1867     resp = conn.getresponse()
1868     if resp.status != 412:
1869         raise Exception("Unexpected HTTP response: %s" % resp.status)
1870
1871     logger.debug("Valid re-subscription")
1872     headers = { "NT": "upnp:event",
1873                 "sid": sid,
1874                 "timeout": "Second-1234" }
1875     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1876     resp = conn.getresponse()
1877     if resp.status != 200:
1878         raise Exception("Unexpected HTTP response: %s" % resp.status)
1879     sid2 = resp.getheader("sid")
1880     logger.debug("Subscription SID " + sid2)
1881
1882     if sid != sid2:
1883         raise Exception("Unexpected SID change")
1884
1885     logger.debug("Valid re-subscription")
1886     headers = { "NT": "upnp:event",
1887                 "sid": "uuid: \t \t" + sid.split(':')[1],
1888                 "timeout": "Second-1234" }
1889     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1890     resp = conn.getresponse()
1891     if resp.status != 200:
1892         raise Exception("Unexpected HTTP response: %s" % resp.status)
1893
1894     logger.debug("Invalid unsubscription")
1895     headers = { "sid": sid }
1896     conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1897     resp = conn.getresponse()
1898     if resp.status != 412:
1899         raise Exception("Unexpected HTTP response: %s" % resp.status)
1900     headers = { "foo": "bar" }
1901     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1902     resp = conn.getresponse()
1903     if resp.status != 412:
1904         raise Exception("Unexpected HTTP response: %s" % resp.status)
1905
1906     logger.debug("Valid unsubscription")
1907     headers = { "sid": sid }
1908     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1909     resp = conn.getresponse()
1910     if resp.status != 200:
1911         raise Exception("Unexpected HTTP response: %s" % resp.status)
1912
1913     logger.debug("Unsubscription for not existing SID")
1914     headers = { "sid": sid }
1915     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1916     resp = conn.getresponse()
1917     if resp.status != 412:
1918         raise Exception("Unexpected HTTP response: %s" % resp.status)
1919
1920     logger.debug("Invalid unsubscription")
1921     headers = { "sid": " \t \tfoo" }
1922     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1923     resp = conn.getresponse()
1924     if resp.status != 400:
1925         raise Exception("Unexpected HTTP response: %s" % resp.status)
1926
1927     logger.debug("Invalid unsubscription")
1928     headers = { "sid": "uuid:\t \tfoo" }
1929     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1930     resp = conn.getresponse()
1931     if resp.status != 400:
1932         raise Exception("Unexpected HTTP response: %s" % resp.status)
1933
1934     logger.debug("Invalid unsubscription")
1935     headers = { "NT": "upnp:event",
1936                 "sid": sid }
1937     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1938     resp = conn.getresponse()
1939     if resp.status != 400:
1940         raise Exception("Unexpected HTTP response: %s" % resp.status)
1941     headers = { "callback": '<http://127.0.0.1:12345/event>',
1942                 "sid": sid }
1943     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1944     resp = conn.getresponse()
1945     if resp.status != 400:
1946         raise Exception("Unexpected HTTP response: %s" % resp.status)
1947
1948     logger.debug("Valid subscription with multiple callbacks")
1949     headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1950                 "NT": "upnp:event",
1951                 "timeout": "Second-1234" }
1952     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1953     resp = conn.getresponse()
1954     if resp.status != 200:
1955         raise Exception("Unexpected HTTP response: %s" % resp.status)
1956     sid = resp.getheader("sid")
1957     logger.debug("Subscription SID " + sid)