tests: WPS UPnP operations
[mech_eap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import time
8 import subprocess
9 import logging
10 logger = logging.getLogger()
11 import re
12 import socket
13 import httplib
14 import urlparse
15 import urllib
16 import xml.etree.ElementTree as ET
17 import StringIO
18
19 import hwsim_utils
20 import hostapd
21
22 def test_ap_wps_init(dev, apdev):
23     """Initial AP configuration with first WPS Enrollee"""
24     ssid = "test-wps"
25     hostapd.add_ap(apdev[0]['ifname'],
26                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
27     hapd = hostapd.Hostapd(apdev[0]['ifname'])
28     logger.info("WPS provisioning step")
29     hapd.request("WPS_PBC")
30     if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
31         raise Exception("PBC status not shown correctly")
32     dev[0].dump_monitor()
33     dev[0].request("WPS_PBC")
34     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
35     if ev is None:
36         raise Exception("Association with the AP timed out")
37     status = dev[0].get_status()
38     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
39         raise Exception("Not fully connected")
40     if status['ssid'] != ssid:
41         raise Exception("Unexpected SSID")
42     if status['pairwise_cipher'] != 'CCMP':
43         raise Exception("Unexpected encryption configuration")
44     if status['key_mgmt'] != 'WPA2-PSK':
45         raise Exception("Unexpected key_mgmt")
46
47     status = hapd.request("WPS_GET_STATUS")
48     if "PBC Status: Disabled" not in status:
49         raise Exception("PBC status not shown correctly")
50     if "Last WPS result: Success" not in status:
51         raise Exception("Last WPS result not shown correctly")
52     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
53         raise Exception("Peer address not shown correctly")
54     conf = hapd.request("GET_CONFIG")
55     if "wps_state=configured" not in conf:
56         raise Exception("AP not in WPS configured state")
57     if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
58         raise Exception("Unexpected rsn_pairwise_cipher")
59     if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
60         raise Exception("Unexpected wpa_pairwise_cipher")
61     if "group_cipher=TKIP" not in conf:
62         raise Exception("Unexpected group_cipher")
63
64 def test_ap_wps_init_2ap_pbc(dev, apdev):
65     """Initial two-radio AP configuration with first WPS PBC Enrollee"""
66     ssid = "test-wps"
67     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
68     hostapd.add_ap(apdev[0]['ifname'], params)
69     hostapd.add_ap(apdev[1]['ifname'], params)
70     hapd = hostapd.Hostapd(apdev[0]['ifname'])
71     logger.info("WPS provisioning step")
72     hapd.request("WPS_PBC")
73     dev[0].scan(freq="2412")
74     bss = dev[0].get_bss(apdev[0]['bssid'])
75     if "[WPS-PBC]" not in bss['flags']:
76         raise Exception("WPS-PBC flag missing from AP1")
77     bss = dev[0].get_bss(apdev[1]['bssid'])
78     if "[WPS-PBC]" not in bss['flags']:
79         raise Exception("WPS-PBC flag missing from AP2")
80     dev[0].dump_monitor()
81     dev[0].request("WPS_PBC")
82     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
83     if ev is None:
84         raise Exception("Association with the AP timed out")
85
86     dev[1].scan(freq="2412")
87     bss = dev[1].get_bss(apdev[0]['bssid'])
88     if "[WPS-PBC]" in bss['flags']:
89         raise Exception("WPS-PBC flag not cleared from AP1")
90     bss = dev[1].get_bss(apdev[1]['bssid'])
91     if "[WPS-PBC]" in bss['flags']:
92         raise Exception("WPS-PBC flag bit ckeared from AP2")
93
94 def test_ap_wps_init_2ap_pin(dev, apdev):
95     """Initial two-radio AP configuration with first WPS PIN Enrollee"""
96     ssid = "test-wps"
97     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
98     hostapd.add_ap(apdev[0]['ifname'], params)
99     hostapd.add_ap(apdev[1]['ifname'], params)
100     hapd = hostapd.Hostapd(apdev[0]['ifname'])
101     logger.info("WPS provisioning step")
102     pin = dev[0].wps_read_pin()
103     hapd.request("WPS_PIN any " + pin)
104     dev[0].scan(freq="2412")
105     bss = dev[0].get_bss(apdev[0]['bssid'])
106     if "[WPS-AUTH]" not in bss['flags']:
107         raise Exception("WPS-AUTH flag missing from AP1")
108     bss = dev[0].get_bss(apdev[1]['bssid'])
109     if "[WPS-AUTH]" not in bss['flags']:
110         raise Exception("WPS-AUTH flag missing from AP2")
111     dev[0].dump_monitor()
112     dev[0].request("WPS_PIN any " + pin)
113     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
114     if ev is None:
115         raise Exception("Association with the AP timed out")
116
117     dev[1].scan(freq="2412")
118     bss = dev[1].get_bss(apdev[0]['bssid'])
119     if "[WPS-AUTH]" in bss['flags']:
120         raise Exception("WPS-AUTH flag not cleared from AP1")
121     bss = dev[1].get_bss(apdev[1]['bssid'])
122     if "[WPS-AUTH]" in bss['flags']:
123         raise Exception("WPS-AUTH flag bit ckeared from AP2")
124
125 def test_ap_wps_init_through_wps_config(dev, apdev):
126     """Initial AP configuration using wps_config command"""
127     ssid = "test-wps-init-config"
128     hostapd.add_ap(apdev[0]['ifname'],
129                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
130     hapd = hostapd.Hostapd(apdev[0]['ifname'])
131     if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
132         raise Exception("WPS_CONFIG command failed")
133     dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
134                    pairwise="CCMP", group="CCMP")
135
136 def test_ap_wps_conf(dev, apdev):
137     """WPS PBC provisioning with configured AP"""
138     ssid = "test-wps-conf"
139     hostapd.add_ap(apdev[0]['ifname'],
140                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
141                      "wpa_passphrase": "12345678", "wpa": "2",
142                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
143     hapd = hostapd.Hostapd(apdev[0]['ifname'])
144     logger.info("WPS provisioning step")
145     hapd.request("WPS_PBC")
146     dev[0].dump_monitor()
147     dev[0].request("WPS_PBC")
148     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
149     if ev is None:
150         raise Exception("Association with the AP timed out")
151     status = dev[0].get_status()
152     if status['wpa_state'] != 'COMPLETED':
153         raise Exception("Not fully connected")
154     if status['bssid'] != apdev[0]['bssid']:
155         raise Exception("Unexpected BSSID")
156     if status['ssid'] != ssid:
157         raise Exception("Unexpected SSID")
158     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
159         raise Exception("Unexpected encryption configuration")
160     if status['key_mgmt'] != 'WPA2-PSK':
161         raise Exception("Unexpected key_mgmt")
162
163     sta = hapd.get_sta(dev[0].p2p_interface_addr())
164     if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
165         raise Exception("Device name not available in STA command")
166
167 def test_ap_wps_twice(dev, apdev):
168     """WPS provisioning with twice to change passphrase"""
169     ssid = "test-wps-twice"
170     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
171                "wpa_passphrase": "12345678", "wpa": "2",
172                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
173     hostapd.add_ap(apdev[0]['ifname'], params)
174     hapd = hostapd.Hostapd(apdev[0]['ifname'])
175     logger.info("WPS provisioning step")
176     hapd.request("WPS_PBC")
177     dev[0].dump_monitor()
178     dev[0].request("WPS_PBC")
179     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
180     if ev is None:
181         raise Exception("Association with the AP timed out")
182     dev[0].request("DISCONNECT")
183
184     logger.info("Restart AP with different passphrase and re-run WPS")
185     hapd_global = hostapd.HostapdGlobal()
186     hapd_global.remove(apdev[0]['ifname'])
187     params['wpa_passphrase'] = 'another passphrase'
188     hostapd.add_ap(apdev[0]['ifname'], params)
189     hapd = hostapd.Hostapd(apdev[0]['ifname'])
190     logger.info("WPS provisioning step")
191     hapd.request("WPS_PBC")
192     dev[0].dump_monitor()
193     dev[0].request("WPS_PBC")
194     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
195     if ev is None:
196         raise Exception("Association with the AP timed out")
197     networks = dev[0].list_networks()
198     if len(networks) > 1:
199         raise Exception("Unexpected duplicated network block present")
200
201 def test_ap_wps_incorrect_pin(dev, apdev):
202     """WPS PIN provisioning with incorrect PIN"""
203     ssid = "test-wps-incorrect-pin"
204     hostapd.add_ap(apdev[0]['ifname'],
205                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
206                      "wpa_passphrase": "12345678", "wpa": "2",
207                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
208     hapd = hostapd.Hostapd(apdev[0]['ifname'])
209
210     logger.info("WPS provisioning attempt 1")
211     hapd.request("WPS_PIN any 12345670")
212     dev[0].dump_monitor()
213     dev[0].request("WPS_PIN any 55554444")
214     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
215     if ev is None:
216         raise Exception("WPS operation timed out")
217     if "config_error=18" not in ev:
218         raise Exception("Incorrect config_error reported")
219     if "msg=8" not in ev:
220         raise Exception("PIN error detected on incorrect message")
221     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
222     if ev is None:
223         raise Exception("Timeout on disconnection event")
224     dev[0].request("WPS_CANCEL")
225     # if a scan was in progress, wait for it to complete before trying WPS again
226     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
227
228     status = hapd.request("WPS_GET_STATUS")
229     if "Last WPS result: Failed" not in status:
230         raise Exception("WPS failure result not shown correctly")
231
232     logger.info("WPS provisioning attempt 2")
233     hapd.request("WPS_PIN any 12345670")
234     dev[0].dump_monitor()
235     dev[0].request("WPS_PIN any 12344444")
236     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
237     if ev is None:
238         raise Exception("WPS operation timed out")
239     if "config_error=18" not in ev:
240         raise Exception("Incorrect config_error reported")
241     if "msg=10" not in ev:
242         raise Exception("PIN error detected on incorrect message")
243     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
244     if ev is None:
245         raise Exception("Timeout on disconnection event")
246
247 def test_ap_wps_conf_pin(dev, apdev):
248     """WPS PIN provisioning with configured AP"""
249     ssid = "test-wps-conf-pin"
250     hostapd.add_ap(apdev[0]['ifname'],
251                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
252                      "wpa_passphrase": "12345678", "wpa": "2",
253                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
254     hapd = hostapd.Hostapd(apdev[0]['ifname'])
255     logger.info("WPS provisioning step")
256     pin = dev[0].wps_read_pin()
257     hapd.request("WPS_PIN any " + pin)
258     dev[0].dump_monitor()
259     dev[0].request("WPS_PIN any " + pin)
260     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
261     if ev is None:
262         raise Exception("Association with the AP timed out")
263     status = dev[0].get_status()
264     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
265         raise Exception("Not fully connected")
266     if status['ssid'] != ssid:
267         raise Exception("Unexpected SSID")
268     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
269         raise Exception("Unexpected encryption configuration")
270     if status['key_mgmt'] != 'WPA2-PSK':
271         raise Exception("Unexpected key_mgmt")
272
273     dev[1].scan(freq="2412")
274     bss = dev[1].get_bss(apdev[0]['bssid'])
275     if "[WPS-AUTH]" in bss['flags']:
276         raise Exception("WPS-AUTH flag not cleared")
277     logger.info("Try to connect from another station using the same PIN")
278     dev[1].request("WPS_PIN any " + pin)
279     ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
280     if ev is None:
281         raise Exception("Operation timed out")
282     if "WPS-M2D" not in ev:
283         raise Exception("Unexpected WPS operation started")
284
285 def test_ap_wps_conf_pin_2sta(dev, apdev):
286     """Two stations trying to use WPS PIN at the same time"""
287     ssid = "test-wps-conf-pin2"
288     hostapd.add_ap(apdev[0]['ifname'],
289                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
290                      "wpa_passphrase": "12345678", "wpa": "2",
291                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
292     hapd = hostapd.Hostapd(apdev[0]['ifname'])
293     logger.info("WPS provisioning step")
294     pin = "12345670"
295     pin2 = "55554444"
296     hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
297     hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
298     dev[0].dump_monitor()
299     dev[1].dump_monitor()
300     dev[0].request("WPS_PIN any " + pin)
301     dev[1].request("WPS_PIN any " + pin)
302     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
303     if ev is None:
304         raise Exception("Association with the AP timed out")
305     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
306     if ev is None:
307         raise Exception("Association with the AP timed out")
308
309 def test_ap_wps_reg_connect(dev, apdev):
310     """WPS registrar using AP PIN to connect"""
311     ssid = "test-wps-reg-ap-pin"
312     appin = "12345670"
313     hostapd.add_ap(apdev[0]['ifname'],
314                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
315                      "wpa_passphrase": "12345678", "wpa": "2",
316                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
317                      "ap_pin": appin})
318     logger.info("WPS provisioning step")
319     dev[0].dump_monitor()
320     dev[0].wps_reg(apdev[0]['bssid'], appin)
321     status = dev[0].get_status()
322     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
323         raise Exception("Not fully connected")
324     if status['ssid'] != ssid:
325         raise Exception("Unexpected SSID")
326     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
327         raise Exception("Unexpected encryption configuration")
328     if status['key_mgmt'] != 'WPA2-PSK':
329         raise Exception("Unexpected key_mgmt")
330
331 def check_wps_reg_failure(dev, ap, appin):
332     dev.request("WPS_REG " + ap['bssid'] + " " + appin)
333     ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
334     if ev is None:
335         raise Exception("WPS operation timed out")
336     if "WPS-SUCCESS" in ev:
337         raise Exception("WPS operation succeeded unexpectedly")
338     if "config_error=15" not in ev:
339         raise Exception("WPS setup locked state was not reported correctly")
340
341 def test_ap_wps_random_ap_pin(dev, apdev):
342     """WPS registrar using random AP PIN"""
343     ssid = "test-wps-reg-random-ap-pin"
344     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
345     hostapd.add_ap(apdev[0]['ifname'],
346                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
347                      "wpa_passphrase": "12345678", "wpa": "2",
348                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
349                      "device_name": "Wireless AP", "manufacturer": "Company",
350                      "model_name": "WAP", "model_number": "123",
351                      "serial_number": "12345", "device_type": "6-0050F204-1",
352                      "os_version": "01020300",
353                      "config_methods": "label push_button",
354                      "uuid": ap_uuid, "upnp_iface": "lo" })
355     hapd = hostapd.Hostapd(apdev[0]['ifname'])
356     appin = hapd.request("WPS_AP_PIN random")
357     if "FAIL" in appin:
358         raise Exception("Could not generate random AP PIN")
359     if appin not in hapd.request("WPS_AP_PIN get"):
360         raise Exception("Could not fetch current AP PIN")
361     logger.info("WPS provisioning step")
362     dev[0].wps_reg(apdev[0]['bssid'], appin)
363
364     hapd.request("WPS_AP_PIN disable")
365     logger.info("WPS provisioning step with AP PIN disabled")
366     check_wps_reg_failure(dev[1], apdev[0], appin)
367
368     logger.info("WPS provisioning step with AP PIN reset")
369     appin = "12345670"
370     hapd.request("WPS_AP_PIN set " + appin)
371     dev[1].wps_reg(apdev[0]['bssid'], appin)
372     dev[0].request("REMOVE_NETWORK all")
373     dev[1].request("REMOVE_NETWORK all")
374     dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
375     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
376
377     logger.info("WPS provisioning step after AP PIN timeout")
378     hapd.request("WPS_AP_PIN disable")
379     appin = hapd.request("WPS_AP_PIN random 1")
380     time.sleep(1.1)
381     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
382         raise Exception("AP PIN unexpectedly still enabled")
383     check_wps_reg_failure(dev[0], apdev[0], appin)
384
385     logger.info("WPS provisioning step after AP PIN timeout(2)")
386     hapd.request("WPS_AP_PIN disable")
387     appin = "12345670"
388     hapd.request("WPS_AP_PIN set " + appin + " 1")
389     time.sleep(1.1)
390     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
391         raise Exception("AP PIN unexpectedly still enabled")
392     check_wps_reg_failure(dev[1], apdev[0], appin)
393
394 def test_ap_wps_reg_config(dev, apdev):
395     """WPS registrar configuring and AP using AP PIN"""
396     ssid = "test-wps-init-ap-pin"
397     appin = "12345670"
398     hostapd.add_ap(apdev[0]['ifname'],
399                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
400                      "ap_pin": appin})
401     logger.info("WPS configuration step")
402     dev[0].dump_monitor()
403     new_ssid = "wps-new-ssid"
404     new_passphrase = "1234567890"
405     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
406                    new_passphrase)
407     status = dev[0].get_status()
408     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
409         raise Exception("Not fully connected")
410     if status['ssid'] != new_ssid:
411         raise Exception("Unexpected SSID")
412     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
413         raise Exception("Unexpected encryption configuration")
414     if status['key_mgmt'] != 'WPA2-PSK':
415         raise Exception("Unexpected key_mgmt")
416
417 def test_ap_wps_reg_config_tkip(dev, apdev):
418     """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
419     ssid = "test-wps-init-ap"
420     appin = "12345670"
421     hostapd.add_ap(apdev[0]['ifname'],
422                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
423                      "ap_pin": appin})
424     logger.info("WPS configuration step")
425     dev[0].request("SET wps_version_number 0x10")
426     dev[0].dump_monitor()
427     new_ssid = "wps-new-ssid-with-tkip"
428     new_passphrase = "1234567890"
429     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
430                    new_passphrase)
431     logger.info("Re-connect to verify WPA2 mixed mode")
432     dev[0].request("DISCONNECT")
433     id = 0
434     dev[0].set_network(id, "pairwise", "CCMP")
435     dev[0].set_network(id, "proto", "RSN")
436     dev[0].connect_network(id)
437     status = dev[0].get_status()
438     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
439         raise Exception("Not fully connected")
440     if status['ssid'] != new_ssid:
441         raise Exception("Unexpected SSID")
442     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
443         raise Exception("Unexpected encryption configuration")
444     if status['key_mgmt'] != 'WPA2-PSK':
445         raise Exception("Unexpected key_mgmt")
446
447 def test_ap_wps_setup_locked(dev, apdev):
448     """WPS registrar locking up AP setup on AP PIN failures"""
449     ssid = "test-wps-incorrect-ap-pin"
450     appin = "12345670"
451     hostapd.add_ap(apdev[0]['ifname'],
452                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
453                      "wpa_passphrase": "12345678", "wpa": "2",
454                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
455                      "ap_pin": appin})
456     new_ssid = "wps-new-ssid-test"
457     new_passphrase = "1234567890"
458
459     ap_setup_locked=False
460     for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
461         dev[0].dump_monitor()
462         logger.info("Try incorrect AP PIN - attempt " + pin)
463         dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
464                        "CCMP", new_passphrase, no_wait=True)
465         ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
466         if ev is None:
467             raise Exception("Timeout on receiving WPS operation failure event")
468         if "CTRL-EVENT-CONNECTED" in ev:
469             raise Exception("Unexpected connection")
470         if "config_error=15" in ev:
471             logger.info("AP Setup Locked")
472             ap_setup_locked=True
473         elif "config_error=18" not in ev:
474             raise Exception("config_error=18 not reported")
475         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
476         if ev is None:
477             raise Exception("Timeout on disconnection event")
478         time.sleep(0.1)
479     if not ap_setup_locked:
480         raise Exception("AP setup was not locked")
481
482     hapd = hostapd.Hostapd(apdev[0]['ifname'])
483     status = hapd.request("WPS_GET_STATUS")
484     if "Last WPS result: Failed" not in status:
485         raise Exception("WPS failure result not shown correctly")
486     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
487         raise Exception("Peer address not shown correctly")
488
489     time.sleep(0.5)
490     dev[0].dump_monitor()
491     logger.info("WPS provisioning step")
492     pin = dev[0].wps_read_pin()
493     hapd = hostapd.Hostapd(apdev[0]['ifname'])
494     hapd.request("WPS_PIN any " + pin)
495     dev[0].request("WPS_PIN any " + pin)
496     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
497     if ev is None:
498         raise Exception("WPS success was not reported")
499     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
500     if ev is None:
501         raise Exception("Association with the AP timed out")
502
503 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
504     """WPS PBC session overlap with two active APs"""
505     hostapd.add_ap(apdev[0]['ifname'],
506                    { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
507                      "wpa_passphrase": "12345678", "wpa": "2",
508                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
509                      "wps_independent": "1"})
510     hostapd.add_ap(apdev[1]['ifname'],
511                    { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
512                      "wpa_passphrase": "123456789", "wpa": "2",
513                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
514                      "wps_independent": "1"})
515     hapd = hostapd.Hostapd(apdev[0]['ifname'])
516     hapd.request("WPS_PBC")
517     hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
518     hapd2.request("WPS_PBC")
519     logger.info("WPS provisioning step")
520     dev[0].dump_monitor()
521     dev[0].request("WPS_PBC")
522     ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
523     if ev is None:
524         raise Exception("PBC session overlap not detected")
525
526 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
527     """WPS PBC session overlap with two active STAs"""
528     ssid = "test-wps-pbc-overlap"
529     hostapd.add_ap(apdev[0]['ifname'],
530                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
531                      "wpa_passphrase": "12345678", "wpa": "2",
532                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
533     hapd = hostapd.Hostapd(apdev[0]['ifname'])
534     logger.info("WPS provisioning step")
535     hapd.request("WPS_PBC")
536     dev[0].dump_monitor()
537     dev[1].dump_monitor()
538     dev[0].request("WPS_PBC")
539     dev[1].request("WPS_PBC")
540     ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
541     if ev is None:
542         raise Exception("PBC session overlap not detected (dev0)")
543     if "config_error=12" not in ev:
544         raise Exception("PBC session overlap not correctly reported (dev0)")
545     ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
546     if ev is None:
547         raise Exception("PBC session overlap not detected (dev1)")
548     if "config_error=12" not in ev:
549         raise Exception("PBC session overlap not correctly reported (dev1)")
550     hapd.request("WPS_CANCEL")
551     ret = hapd.request("WPS_PBC")
552     if "FAIL" not in ret:
553         raise Exception("PBC mode allowed to be started while PBC overlap still active")
554
555 def test_ap_wps_cancel(dev, apdev):
556     """WPS AP cancelling enabled config method"""
557     ssid = "test-wps-ap-cancel"
558     hostapd.add_ap(apdev[0]['ifname'],
559                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
560                      "wpa_passphrase": "12345678", "wpa": "2",
561                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
562     bssid = apdev[0]['bssid']
563     hapd = hostapd.Hostapd(apdev[0]['ifname'])
564
565     logger.info("Verify PBC enable/cancel")
566     hapd.request("WPS_PBC")
567     dev[0].scan(freq="2412")
568     bss = dev[0].get_bss(apdev[0]['bssid'])
569     if "[WPS-PBC]" not in bss['flags']:
570         raise Exception("WPS-PBC flag missing")
571     if "FAIL" in hapd.request("WPS_CANCEL"):
572         raise Exception("WPS_CANCEL failed")
573     dev[0].scan(freq="2412")
574     bss = dev[0].get_bss(apdev[0]['bssid'])
575     if "[WPS-PBC]" in bss['flags']:
576         raise Exception("WPS-PBC flag not cleared")
577
578     logger.info("Verify PIN enable/cancel")
579     hapd.request("WPS_PIN any 12345670")
580     dev[0].scan(freq="2412")
581     bss = dev[0].get_bss(apdev[0]['bssid'])
582     if "[WPS-AUTH]" not in bss['flags']:
583         raise Exception("WPS-AUTH flag missing")
584     if "FAIL" in hapd.request("WPS_CANCEL"):
585         raise Exception("WPS_CANCEL failed")
586     dev[0].scan(freq="2412")
587     bss = dev[0].get_bss(apdev[0]['bssid'])
588     if "[WPS-AUTH]" in bss['flags']:
589         raise Exception("WPS-AUTH flag not cleared")
590
591 def test_ap_wps_er_add_enrollee(dev, apdev):
592     """WPS ER configuring AP and adding a new enrollee using PIN"""
593     ssid = "wps-er-add-enrollee"
594     ap_pin = "12345670"
595     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
596     hostapd.add_ap(apdev[0]['ifname'],
597                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
598                      "device_name": "Wireless AP", "manufacturer": "Company",
599                      "model_name": "WAP", "model_number": "123",
600                      "serial_number": "12345", "device_type": "6-0050F204-1",
601                      "os_version": "01020300",
602                      "config_methods": "label push_button",
603                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
604     logger.info("WPS configuration step")
605     new_passphrase = "1234567890"
606     dev[0].dump_monitor()
607     dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
608                    new_passphrase)
609     status = dev[0].get_status()
610     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
611         raise Exception("Not fully connected")
612     if status['ssid'] != ssid:
613         raise Exception("Unexpected SSID")
614     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
615         raise Exception("Unexpected encryption configuration")
616     if status['key_mgmt'] != 'WPA2-PSK':
617         raise Exception("Unexpected key_mgmt")
618
619     logger.info("Start ER")
620     dev[0].request("WPS_ER_START ifname=lo")
621     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
622     if ev is None:
623         raise Exception("AP discovery timed out")
624     if ap_uuid not in ev:
625         raise Exception("Expected AP UUID not found")
626
627     logger.info("Learn AP configuration through UPnP")
628     dev[0].dump_monitor()
629     dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
630     ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
631     if ev is None:
632         raise Exception("AP learn timed out")
633     if ap_uuid not in ev:
634         raise Exception("Expected AP UUID not in settings")
635     if "ssid=" + ssid not in ev:
636         raise Exception("Expected SSID not in settings")
637     if "key=" + new_passphrase not in ev:
638         raise Exception("Expected passphrase not in settings")
639
640     logger.info("Add Enrollee using ER")
641     pin = dev[1].wps_read_pin()
642     dev[0].dump_monitor()
643     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
644     dev[1].dump_monitor()
645     dev[1].request("WPS_PIN any " + pin)
646     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
647     if ev is None:
648         raise Exception("Enrollee did not report success")
649     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
650     if ev is None:
651         raise Exception("Association with the AP timed out")
652     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
653     if ev is None:
654         raise Exception("WPS ER did not report success")
655     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
656
657     logger.info("Add a specific Enrollee using ER")
658     pin = dev[2].wps_read_pin()
659     addr2 = dev[2].p2p_interface_addr()
660     dev[0].dump_monitor()
661     dev[2].dump_monitor()
662     dev[2].request("WPS_PIN any " + pin)
663     ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
664     if ev is None:
665         raise Exception("Enrollee not seen")
666     if addr2 not in ev:
667         raise Exception("Unexpected Enrollee MAC address")
668     dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
669     ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
670     if ev is None:
671         raise Exception("Association with the AP timed out")
672     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
673     if ev is None:
674         raise Exception("WPS ER did not report success")
675
676     logger.info("Verify registrar selection behavior")
677     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
678     dev[1].request("DISCONNECT")
679     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
680     dev[1].scan(freq="2412")
681     bss = dev[1].get_bss(apdev[0]['bssid'])
682     if "[WPS-AUTH]" not in bss['flags']:
683         raise Exception("WPS-AUTH flag missing")
684
685     logger.info("Stop ER")
686     dev[0].dump_monitor()
687     dev[0].request("WPS_ER_STOP")
688     ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
689     if ev is None:
690         raise Exception("WPS ER unsubscription timed out")
691     # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
692     # a bit before verifying that the scan results have change.
693     time.sleep(0.2)
694
695     dev[1].scan(freq="2412")
696     bss = dev[1].get_bss(apdev[0]['bssid'])
697     if "[WPS-AUTH]" in bss['flags']:
698         raise Exception("WPS-AUTH flag not removed")
699
700 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
701     """WPS ER connected to AP and adding a new enrollee using PBC"""
702     ssid = "wps-er-add-enrollee-pbc"
703     ap_pin = "12345670"
704     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
705     hostapd.add_ap(apdev[0]['ifname'],
706                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
707                      "wpa_passphrase": "12345678", "wpa": "2",
708                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
709                      "device_name": "Wireless AP", "manufacturer": "Company",
710                      "model_name": "WAP", "model_number": "123",
711                      "serial_number": "12345", "device_type": "6-0050F204-1",
712                      "os_version": "01020300",
713                      "config_methods": "label push_button",
714                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
715     logger.info("Learn AP configuration")
716     dev[0].dump_monitor()
717     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
718     status = dev[0].get_status()
719     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
720         raise Exception("Not fully connected")
721
722     logger.info("Start ER")
723     dev[0].request("WPS_ER_START ifname=lo")
724     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
725     if ev is None:
726         raise Exception("AP discovery timed out")
727     if ap_uuid not in ev:
728         raise Exception("Expected AP UUID not found")
729
730     logger.info("Use learned network configuration on ER")
731     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
732
733     logger.info("Add Enrollee using ER and PBC")
734     dev[0].dump_monitor()
735     enrollee = dev[1].p2p_interface_addr()
736     dev[1].dump_monitor()
737     dev[1].request("WPS_PBC")
738
739     for i in range(0, 2):
740         ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
741         if ev is None:
742             raise Exception("Enrollee discovery timed out")
743         if enrollee in ev:
744             break
745         if i == 1:
746             raise Exception("Expected Enrollee not found")
747     dev[0].request("WPS_ER_PBC " + enrollee)
748
749     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
750     if ev is None:
751         raise Exception("Enrollee did not report success")
752     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
753     if ev is None:
754         raise Exception("Association with the AP timed out")
755     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
756     if ev is None:
757         raise Exception("WPS ER did not report success")
758     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
759
760     # verify BSSID selection of the AP instead of UUID
761     if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
762         raise Exception("Could not select AP based on BSSID")
763
764 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
765     """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
766     ssid = "wps-er-add-enrollee-pbc"
767     ap_pin = "12345670"
768     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
769     hostapd.add_ap(apdev[0]['ifname'],
770                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
771                      "wpa_passphrase": "12345678", "wpa": "2",
772                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
773                      "device_name": "Wireless AP", "manufacturer": "Company",
774                      "model_name": "WAP", "model_number": "123",
775                      "serial_number": "12345", "device_type": "6-0050F204-1",
776                      "os_version": "01020300",
777                      "config_methods": "label push_button",
778                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
779     logger.info("Learn AP configuration")
780     dev[0].request("SET wps_version_number 0x10")
781     dev[0].dump_monitor()
782     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
783     status = dev[0].get_status()
784     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
785         raise Exception("Not fully connected")
786
787     logger.info("Start ER")
788     dev[0].request("WPS_ER_START ifname=lo")
789     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
790     if ev is None:
791         raise Exception("AP discovery timed out")
792     if ap_uuid not in ev:
793         raise Exception("Expected AP UUID not found")
794
795     logger.info("Use learned network configuration on ER")
796     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
797
798     logger.info("Add Enrollee using ER and PIN")
799     enrollee = dev[1].p2p_interface_addr()
800     pin = dev[1].wps_read_pin()
801     dev[0].dump_monitor()
802     dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
803     dev[1].dump_monitor()
804     dev[1].request("WPS_PIN any " + pin)
805     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
806     if ev is None:
807         raise Exception("Association with the AP timed out")
808     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
809     if ev is None:
810         raise Exception("WPS ER did not report success")
811
812 def test_ap_wps_er_config_ap(dev, apdev):
813     """WPS ER configuring AP over UPnP"""
814     ssid = "wps-er-ap-config"
815     ap_pin = "12345670"
816     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
817     hostapd.add_ap(apdev[0]['ifname'],
818                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
819                      "wpa_passphrase": "12345678", "wpa": "2",
820                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
821                      "device_name": "Wireless AP", "manufacturer": "Company",
822                      "model_name": "WAP", "model_number": "123",
823                      "serial_number": "12345", "device_type": "6-0050F204-1",
824                      "os_version": "01020300",
825                      "config_methods": "label push_button",
826                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
827
828     logger.info("Connect ER to the AP")
829     dev[0].connect(ssid, psk="12345678", scan_freq="2412")
830
831     logger.info("WPS configuration step")
832     dev[0].request("WPS_ER_START ifname=lo")
833     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
834     if ev is None:
835         raise Exception("AP discovery timed out")
836     if ap_uuid not in ev:
837         raise Exception("Expected AP UUID not found")
838     new_passphrase = "1234567890"
839     dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
840                    ssid.encode("hex") + " WPA2PSK CCMP " +
841                    new_passphrase.encode("hex"))
842     ev = dev[0].wait_event(["WPS-SUCCESS"])
843     if ev is None:
844         raise Exception("WPS ER configuration operation timed out")
845     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
846     dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
847
848 def test_ap_wps_fragmentation(dev, apdev):
849     """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
850     ssid = "test-wps-fragmentation"
851     hostapd.add_ap(apdev[0]['ifname'],
852                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
853                      "wpa_passphrase": "12345678", "wpa": "3",
854                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
855                      "wpa_pairwise": "TKIP",
856                      "fragment_size": "50" })
857     hapd = hostapd.Hostapd(apdev[0]['ifname'])
858     logger.info("WPS provisioning step")
859     hapd.request("WPS_PBC")
860     dev[0].dump_monitor()
861     dev[0].request("SET wps_fragment_size 50")
862     dev[0].request("WPS_PBC")
863     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
864     if ev is None:
865         raise Exception("Association with the AP timed out")
866     status = dev[0].get_status()
867     if status['wpa_state'] != 'COMPLETED':
868         raise Exception("Not fully connected")
869     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
870         raise Exception("Unexpected encryption configuration")
871     if status['key_mgmt'] != 'WPA2-PSK':
872         raise Exception("Unexpected key_mgmt")
873
874 def test_ap_wps_new_version_sta(dev, apdev):
875     """WPS compatibility with new version number on the station"""
876     ssid = "test-wps-ver"
877     hostapd.add_ap(apdev[0]['ifname'],
878                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
879                      "wpa_passphrase": "12345678", "wpa": "2",
880                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
881     hapd = hostapd.Hostapd(apdev[0]['ifname'])
882     logger.info("WPS provisioning step")
883     hapd.request("WPS_PBC")
884     dev[0].dump_monitor()
885     dev[0].request("SET wps_version_number 0x43")
886     dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
887     dev[0].request("WPS_PBC")
888     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
889     if ev is None:
890         raise Exception("Association with the AP timed out")
891
892 def test_ap_wps_new_version_ap(dev, apdev):
893     """WPS compatibility with new version number on the AP"""
894     ssid = "test-wps-ver"
895     hostapd.add_ap(apdev[0]['ifname'],
896                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
897                      "wpa_passphrase": "12345678", "wpa": "2",
898                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
899     hapd = hostapd.Hostapd(apdev[0]['ifname'])
900     logger.info("WPS provisioning step")
901     if "FAIL" in hapd.request("SET wps_version_number 0x43"):
902         raise Exception("Failed to enable test functionality")
903     hapd.request("WPS_PBC")
904     dev[0].dump_monitor()
905     dev[0].request("WPS_PBC")
906     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
907     hapd.request("SET wps_version_number 0x20")
908     if ev is None:
909         raise Exception("Association with the AP timed out")
910
911 def test_ap_wps_check_pin(dev, apdev):
912     """Verify PIN checking through control interface"""
913     hostapd.add_ap(apdev[0]['ifname'],
914                    { "ssid": "wps", "eap_server": "1", "wps_state": "2",
915                      "wpa_passphrase": "12345678", "wpa": "2",
916                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
917     hapd = hostapd.Hostapd(apdev[0]['ifname'])
918     for t in [ ("12345670", "12345670"),
919                ("12345678", "FAIL-CHECKSUM"),
920                ("1234-5670", "12345670"),
921                ("1234 5670", "12345670"),
922                ("1-2.3:4 5670", "12345670") ]:
923         res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
924         res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
925         if res != res2:
926             raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
927         if res != t[1]:
928             raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
929
930 def test_ap_wps_wep_config(dev, apdev):
931     """WPS 2.0 AP rejecting WEP configuration"""
932     ssid = "test-wps-config"
933     appin = "12345670"
934     hostapd.add_ap(apdev[0]['ifname'],
935                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
936                      "ap_pin": appin})
937     hapd = hostapd.Hostapd(apdev[0]['ifname'])
938     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
939                    "hello", no_wait=True)
940     ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
941     if ev is None:
942         raise Exception("WPS-FAIL timed out")
943     if "reason=2" not in ev:
944         raise Exception("Unexpected reason code in WPS-FAIL")
945     status = hapd.request("WPS_GET_STATUS")
946     if "Last WPS result: Failed" not in status:
947         raise Exception("WPS failure result not shown correctly")
948     if "Failure Reason: WEP Prohibited" not in status:
949         raise Exception("Failure reason not reported correctly")
950     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
951         raise Exception("Peer address not shown correctly")
952
953 def test_ap_wps_ie_fragmentation(dev, apdev):
954     """WPS AP using fragmented WPS IE"""
955     ssid = "test-wps-ie-fragmentation"
956     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
957                "wpa_passphrase": "12345678", "wpa": "2",
958                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
959                "device_name": "1234567890abcdef1234567890abcdef",
960                "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
961                "model_name": "1234567890abcdef1234567890abcdef",
962                "model_number": "1234567890abcdef1234567890abcdef",
963                "serial_number": "1234567890abcdef1234567890abcdef" }
964     hostapd.add_ap(apdev[0]['ifname'], params)
965     hapd = hostapd.Hostapd(apdev[0]['ifname'])
966     hapd.request("WPS_PBC")
967     dev[0].request("WPS_PBC")
968     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
969     if ev is None:
970         raise Exception("Association with the AP timed out")
971     bss = dev[0].get_bss(apdev[0]['bssid'])
972     if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
973         raise Exception("Device Name not received correctly")
974     if len(re.findall("dd..0050f204", bss['ie'])) != 2:
975         raise Exception("Unexpected number of WPS IEs")
976
977 def add_ssdp_ap(ifname, ap_uuid):
978     ssid = "wps-ssdp"
979     ap_pin = "12345670"
980     hostapd.add_ap(ifname,
981                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
982                      "wpa_passphrase": "12345678", "wpa": "2",
983                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
984                      "device_name": "Wireless AP", "manufacturer": "Company",
985                      "model_name": "WAP", "model_number": "123",
986                      "serial_number": "12345", "device_type": "6-0050F204-1",
987                      "os_version": "01020300",
988                      "config_methods": "label push_button",
989                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
990                      "friendly_name": "WPS Access Point",
991                      "manufacturer_url": "http://www.example.com/",
992                      "model_description": "Wireless Access Point",
993                      "model_url": "http://www.example.com/model/",
994                      "upc": "123456789012" })
995
996 def ssdp_send(msg, no_recv=False):
997     socket.setdefaulttimeout(1)
998     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
999     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1000     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1001     sock.bind(("127.0.0.1", 0))
1002     sock.sendto(msg, ("239.255.255.250", 1900))
1003     if no_recv:
1004         return None
1005     return sock.recv(1000)
1006
1007 def ssdp_send_msearch(st):
1008     msg = '\r\n'.join([
1009             'M-SEARCH * HTTP/1.1',
1010             'HOST: 239.255.255.250:1900',
1011             'MX: 1',
1012             'MAN: "ssdp:discover"',
1013             'ST: ' + st,
1014             '', ''])
1015     return ssdp_send(msg)
1016
1017 def test_ap_wps_ssdp_msearch(dev, apdev):
1018     """WPS AP and SSDP M-SEARCH messages"""
1019     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1020     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1021
1022     msg = '\r\n'.join([
1023             'M-SEARCH * HTTP/1.1',
1024             'Host: 239.255.255.250:1900',
1025             'Mx: 1',
1026             'Man: "ssdp:discover"',
1027             'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1028             '', ''])
1029     ssdp_send(msg)
1030
1031     msg = '\r\n'.join([
1032             'M-SEARCH * HTTP/1.1',
1033             'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1034             'mx: \t1\t\t   ',
1035             'man: \t \t "ssdp:discover"   ',
1036             'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1037             '', ''])
1038     ssdp_send(msg)
1039
1040     ssdp_send_msearch("ssdp:all")
1041     ssdp_send_msearch("upnp:rootdevice")
1042     ssdp_send_msearch("uuid:" + ap_uuid)
1043     ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1044     ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1045
1046     msg = '\r\n'.join([
1047             'M-SEARCH * HTTP/1.1',
1048             'HOST:\t239.255.255.250:1900',
1049             'MAN: "ssdp:discover"',
1050             'MX: 130',
1051             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1052             '', ''])
1053     ssdp_send(msg, no_recv=True)
1054
1055 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1056     """WPS AP and invalid SSDP M-SEARCH messages"""
1057     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1058     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1059
1060     socket.setdefaulttimeout(1)
1061     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1062     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1063     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1064     sock.bind(("127.0.0.1", 0))
1065
1066     logger.debug("Missing MX")
1067     msg = '\r\n'.join([
1068             'M-SEARCH * HTTP/1.1',
1069             'HOST: 239.255.255.250:1900',
1070             'MAN: "ssdp:discover"',
1071             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1072             '', ''])
1073     sock.sendto(msg, ("239.255.255.250", 1900))
1074
1075     logger.debug("Negative MX")
1076     msg = '\r\n'.join([
1077             'M-SEARCH * HTTP/1.1',
1078             'HOST: 239.255.255.250:1900',
1079             'MX: -1',
1080             'MAN: "ssdp:discover"',
1081             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1082             '', ''])
1083     sock.sendto(msg, ("239.255.255.250", 1900))
1084
1085     logger.debug("Invalid MX")
1086     msg = '\r\n'.join([
1087             'M-SEARCH * HTTP/1.1',
1088             'HOST: 239.255.255.250:1900',
1089             'MX; 1',
1090             'MAN: "ssdp:discover"',
1091             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1092             '', ''])
1093     sock.sendto(msg, ("239.255.255.250", 1900))
1094
1095     logger.debug("Missing MAN")
1096     msg = '\r\n'.join([
1097             'M-SEARCH * HTTP/1.1',
1098             'HOST: 239.255.255.250:1900',
1099             'MX: 1',
1100             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1101             '', ''])
1102     sock.sendto(msg, ("239.255.255.250", 1900))
1103
1104     logger.debug("Invalid MAN")
1105     msg = '\r\n'.join([
1106             'M-SEARCH * HTTP/1.1',
1107             'HOST: 239.255.255.250:1900',
1108             'MX: 1',
1109             'MAN: foo',
1110             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1111             '', ''])
1112     sock.sendto(msg, ("239.255.255.250", 1900))
1113     msg = '\r\n'.join([
1114             'M-SEARCH * HTTP/1.1',
1115             'HOST: 239.255.255.250:1900',
1116             'MX: 1',
1117             'MAN; "ssdp:discover"',
1118             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1119             '', ''])
1120     sock.sendto(msg, ("239.255.255.250", 1900))
1121
1122     logger.debug("Missing HOST")
1123     msg = '\r\n'.join([
1124             'M-SEARCH * HTTP/1.1',
1125             'MAN: "ssdp:discover"',
1126             'MX: 1',
1127             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1128             '', ''])
1129     sock.sendto(msg, ("239.255.255.250", 1900))
1130
1131     logger.debug("Missing ST")
1132     msg = '\r\n'.join([
1133             'M-SEARCH * HTTP/1.1',
1134             'HOST: 239.255.255.250:1900',
1135             'MAN: "ssdp:discover"',
1136             'MX: 1',
1137             '', ''])
1138     sock.sendto(msg, ("239.255.255.250", 1900))
1139
1140     logger.debug("Mismatching ST")
1141     msg = '\r\n'.join([
1142             'M-SEARCH * HTTP/1.1',
1143             'HOST: 239.255.255.250:1900',
1144             'MAN: "ssdp:discover"',
1145             'MX: 1',
1146             'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1147             '', ''])
1148     sock.sendto(msg, ("239.255.255.250", 1900))
1149     msg = '\r\n'.join([
1150             'M-SEARCH * HTTP/1.1',
1151             'HOST: 239.255.255.250:1900',
1152             'MAN: "ssdp:discover"',
1153             'MX: 1',
1154             'ST: foo:bar',
1155             '', ''])
1156     sock.sendto(msg, ("239.255.255.250", 1900))
1157     msg = '\r\n'.join([
1158             'M-SEARCH * HTTP/1.1',
1159             'HOST: 239.255.255.250:1900',
1160             'MAN: "ssdp:discover"',
1161             'MX: 1',
1162             'ST: foobar',
1163             '', ''])
1164     sock.sendto(msg, ("239.255.255.250", 1900))
1165
1166     logger.debug("Invalid ST")
1167     msg = '\r\n'.join([
1168             'M-SEARCH * HTTP/1.1',
1169             'HOST: 239.255.255.250:1900',
1170             'MAN: "ssdp:discover"',
1171             'MX: 1',
1172             'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1173             '', ''])
1174     sock.sendto(msg, ("239.255.255.250", 1900))
1175
1176     logger.debug("Invalid M-SEARCH")
1177     msg = '\r\n'.join([
1178             'M+SEARCH * HTTP/1.1',
1179             'HOST: 239.255.255.250:1900',
1180             'MAN: "ssdp:discover"',
1181             'MX: 1',
1182             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1183             '', ''])
1184     sock.sendto(msg, ("239.255.255.250", 1900))
1185     msg = '\r\n'.join([
1186             'M-SEARCH-* HTTP/1.1',
1187             'HOST: 239.255.255.250:1900',
1188             'MAN: "ssdp:discover"',
1189             'MX: 1',
1190             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1191             '', ''])
1192     sock.sendto(msg, ("239.255.255.250", 1900))
1193
1194     logger.debug("Invalid message format")
1195     sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1196     msg = '\r'.join([
1197             'M-SEARCH * HTTP/1.1',
1198             'HOST: 239.255.255.250:1900',
1199             'MAN: "ssdp:discover"',
1200             'MX: 1',
1201             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1202             '', ''])
1203     sock.sendto(msg, ("239.255.255.250", 1900))
1204
1205     try:
1206         r = sock.recv(1000)
1207         raise Exception("Unexpected M-SEARCH response: " + r)
1208     except socket.timeout:
1209         pass
1210
1211     logger.debug("Valid M-SEARCH")
1212     msg = '\r\n'.join([
1213             'M-SEARCH * HTTP/1.1',
1214             'HOST: 239.255.255.250:1900',
1215             'MAN: "ssdp:discover"',
1216             'MX: 1',
1217             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1218             '', ''])
1219     sock.sendto(msg, ("239.255.255.250", 1900))
1220
1221     try:
1222         r = sock.recv(1000)
1223         pass
1224     except socket.timeout:
1225         raise Exception("No SSDP response")
1226
1227 def test_ap_wps_ssdp_burst(dev, apdev):
1228     """WPS AP and SSDP burst"""
1229     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1230     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1231
1232     msg = '\r\n'.join([
1233             'M-SEARCH * HTTP/1.1',
1234             'HOST: 239.255.255.250:1900',
1235             'MAN: "ssdp:discover"',
1236             'MX: 1',
1237             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1238             '', ''])
1239     socket.setdefaulttimeout(1)
1240     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1241     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1242     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1243     sock.bind(("127.0.0.1", 0))
1244     for i in range(0, 25):
1245         sock.sendto(msg, ("239.255.255.250", 1900))
1246     resp = 0
1247     while True:
1248         try:
1249             r = sock.recv(1000)
1250             if not r.startswith("HTTP/1.1 200 OK\r\n"):
1251                 raise Exception("Unexpected message: " + r)
1252             resp += 1
1253         except socket.timeout:
1254             break
1255     if resp < 20:
1256         raise Exception("Too few SSDP responses")
1257
1258     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1259     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1260     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1261     sock.bind(("127.0.0.1", 0))
1262     for i in range(0, 25):
1263         sock.sendto(msg, ("239.255.255.250", 1900))
1264     while True:
1265         try:
1266             r = sock.recv(1000)
1267             if ap_uuid in r:
1268                 break
1269         except socket.timeout:
1270             raise Exception("No SSDP response")
1271
1272 def ssdp_get_location(uuid):
1273     res = ssdp_send_msearch("uuid:" + uuid)
1274     location = None
1275     for l in res.splitlines():
1276         if l.lower().startswith("location:"):
1277             location = l.split(':', 1)[1].strip()
1278             break
1279     if location is None:
1280         raise Exception("No UPnP location found")
1281     return location
1282
1283 def upnp_get_urls(location):
1284     conn = urllib.urlopen(location)
1285     tree = ET.parse(conn)
1286     root = tree.getroot()
1287     urn = '{urn:schemas-upnp-org:device-1-0}'
1288     service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1289     res = {}
1290     res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1291     res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1292     res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1293     return res
1294
1295 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1296     soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1297     wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1298     ET.register_namespace('soapenv', soapns)
1299     ET.register_namespace('wfa', wpsns)
1300     attrib = {}
1301     attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1302     root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1303     body = ET.SubElement(root, "{%s}Body" % soapns)
1304     act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1305     tree = ET.ElementTree(root)
1306     soap = StringIO.StringIO()
1307     tree.write(soap, xml_declaration=True, encoding='utf-8')
1308
1309     headers = { "Content-type": 'text/xml; charset="utf-8"' }
1310     if include_soap_action:
1311         headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1312     elif soap_action_override:
1313         headers["SOAPAction"] = soap_action_override
1314     conn.request("POST", path, soap.getvalue(), headers)
1315     return conn.getresponse()
1316
1317 def test_ap_wps_upnp(dev, apdev):
1318     """WPS AP and UPnP operations"""
1319     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1320     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1321
1322     location = ssdp_get_location(ap_uuid)
1323     urls = upnp_get_urls(location)
1324
1325     conn = urllib.urlopen(urls['scpd_url'])
1326     scpd = conn.read()
1327
1328     conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1329     if conn.getcode() != 404:
1330         raise Exception("Unexpected HTTP response to GET unknown URL")
1331
1332     url = urlparse.urlparse(location)
1333     conn = httplib.HTTPConnection(url.netloc)
1334     #conn.set_debuglevel(1)
1335     headers = { "Content-type": 'text/xml; charset="utf-8"',
1336                 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1337     conn.request("POST", "hello", "\r\n\r\n", headers)
1338     resp = conn.getresponse()
1339     if resp.status != 404:
1340         raise Exception("Unexpected HTTP response: %s" % resp.status)
1341
1342     conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1343     resp = conn.getresponse()
1344     if resp.status != 501:
1345         raise Exception("Unexpected HTTP response: %s" % resp.status)
1346
1347     headers = { "Content-type": 'text/xml; charset="utf-8"',
1348                 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1349     ctrlurl = urlparse.urlparse(urls['control_url'])
1350     conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1351     resp = conn.getresponse()
1352     if resp.status != 401:
1353         raise Exception("Unexpected HTTP response: %s" % resp.status)
1354
1355     logger.debug("GetDeviceInfo without SOAPAction header")
1356     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1357                             include_soap_action=False)
1358     if resp.status != 401:
1359         raise Exception("Unexpected HTTP response: %s" % resp.status)
1360
1361     logger.debug("GetDeviceInfo with invalid SOAPAction header")
1362     for act in [ "foo",
1363                  "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1364                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1365                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1366         resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1367                                 include_soap_action=False,
1368                                 soap_action_override=act)
1369         if resp.status != 401:
1370             raise Exception("Unexpected HTTP response: %s" % resp.status)
1371
1372     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1373     if resp.status != 200:
1374         raise Exception("Unexpected HTTP response: %s" % resp.status)
1375     dev = resp.read()
1376     if "NewDeviceInfo" not in dev:
1377         raise Exception("Unexpected GetDeviceInfo response")
1378
1379     logger.debug("PutMessage without required parameters")
1380     resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1381     if resp.status != 600:
1382         raise Exception("Unexpected HTTP response: %s" % resp.status)
1383
1384     logger.debug("PutWLANResponse without required parameters")
1385     resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1386     if resp.status != 600:
1387         raise Exception("Unexpected HTTP response: %s" % resp.status)
1388
1389     logger.debug("SetSelectedRegistrar from unregistered ER")
1390     resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1391     if resp.status != 501:
1392         raise Exception("Unexpected HTTP response: %s" % resp.status)
1393
1394     logger.debug("Unknown action")
1395     resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1396     if resp.status != 401:
1397         raise Exception("Unexpected HTTP response: %s" % resp.status)
1398
1399 def test_ap_wps_upnp_subscribe(dev, apdev):
1400     """WPS AP and UPnP event subscription"""
1401     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1402     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1403
1404     location = ssdp_get_location(ap_uuid)
1405     urls = upnp_get_urls(location)
1406     eventurl = urlparse.urlparse(urls['event_sub_url'])
1407
1408     url = urlparse.urlparse(location)
1409     conn = httplib.HTTPConnection(url.netloc)
1410     #conn.set_debuglevel(1)
1411     headers = { "callback": '<http://127.0.0.1:12345/event>',
1412                 "timeout": "Second-1234" }
1413     conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1414     resp = conn.getresponse()
1415     if resp.status != 412:
1416         raise Exception("Unexpected HTTP response: %s" % resp.status)
1417
1418     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1419     resp = conn.getresponse()
1420     if resp.status != 412:
1421         raise Exception("Unexpected HTTP response: %s" % resp.status)
1422
1423     headers = { "NT": "upnp:event",
1424                 "timeout": "Second-1234" }
1425     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1426     resp = conn.getresponse()
1427     if resp.status != 412:
1428         raise Exception("Unexpected HTTP response: %s" % resp.status)
1429
1430     headers = { "callback": '<http://127.0.0.1:12345/event>',
1431                 "NT": "upnp:foobar",
1432                 "timeout": "Second-1234" }
1433     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1434     resp = conn.getresponse()
1435     if resp.status != 400:
1436         raise Exception("Unexpected HTTP response: %s" % resp.status)
1437
1438     logger.debug("Valid subscription")
1439     headers = { "callback": '<http://127.0.0.1:12345/event>',
1440                 "NT": "upnp:event",
1441                 "timeout": "Second-1234" }
1442     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1443     resp = conn.getresponse()
1444     if resp.status != 200:
1445         raise Exception("Unexpected HTTP response: %s" % resp.status)
1446     sid = resp.getheader("sid")
1447     logger.debug("Subscription SID " + sid)
1448
1449     logger.debug("Invalid re-subscription")
1450     headers = { "NT": "upnp:event",
1451                 "sid": "123456734567854",
1452                 "timeout": "Second-1234" }
1453     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1454     resp = conn.getresponse()
1455     if resp.status != 400:
1456         raise Exception("Unexpected HTTP response: %s" % resp.status)
1457
1458     logger.debug("Invalid re-subscription")
1459     headers = { "NT": "upnp:event",
1460                 "sid": "uuid:123456734567854",
1461                 "timeout": "Second-1234" }
1462     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1463     resp = conn.getresponse()
1464     if resp.status != 400:
1465         raise Exception("Unexpected HTTP response: %s" % resp.status)
1466
1467     logger.debug("Invalid re-subscription")
1468     headers = { "callback": '<http://127.0.0.1:12345/event>',
1469                 "NT": "upnp:event",
1470                 "sid": sid,
1471                 "timeout": "Second-1234" }
1472     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1473     resp = conn.getresponse()
1474     if resp.status != 400:
1475         raise Exception("Unexpected HTTP response: %s" % resp.status)
1476
1477     logger.debug("SID mismatch in re-subscription")
1478     headers = { "NT": "upnp:event",
1479                 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1480                 "timeout": "Second-1234" }
1481     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1482     resp = conn.getresponse()
1483     if resp.status != 412:
1484         raise Exception("Unexpected HTTP response: %s" % resp.status)
1485
1486     logger.debug("Valid re-subscription")
1487     headers = { "NT": "upnp:event",
1488                 "sid": sid,
1489                 "timeout": "Second-1234" }
1490     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1491     resp = conn.getresponse()
1492     if resp.status != 200:
1493         raise Exception("Unexpected HTTP response: %s" % resp.status)
1494     sid2 = resp.getheader("sid")
1495     logger.debug("Subscription SID " + sid2)
1496
1497     if sid != sid2:
1498         raise Exception("Unexpected SID change")
1499
1500     logger.debug("Valid re-subscription")
1501     headers = { "NT": "upnp:event",
1502                 "sid": "uuid: \t \t" + sid.split(':')[1],
1503                 "timeout": "Second-1234" }
1504     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1505     resp = conn.getresponse()
1506     if resp.status != 200:
1507         raise Exception("Unexpected HTTP response: %s" % resp.status)
1508
1509     logger.debug("Invalid unsubscription")
1510     headers = { "sid": sid }
1511     conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1512     resp = conn.getresponse()
1513     if resp.status != 412:
1514         raise Exception("Unexpected HTTP response: %s" % resp.status)
1515     headers = { "foo": "bar" }
1516     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1517     resp = conn.getresponse()
1518     if resp.status != 412:
1519         raise Exception("Unexpected HTTP response: %s" % resp.status)
1520
1521     logger.debug("Valid unsubscription")
1522     headers = { "sid": sid }
1523     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1524     resp = conn.getresponse()
1525     if resp.status != 200:
1526         raise Exception("Unexpected HTTP response: %s" % resp.status)
1527
1528     logger.debug("Unsubscription for not existing SID")
1529     headers = { "sid": sid }
1530     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1531     resp = conn.getresponse()
1532     if resp.status != 412:
1533         raise Exception("Unexpected HTTP response: %s" % resp.status)
1534
1535     logger.debug("Invalid unsubscription")
1536     headers = { "sid": " \t \tfoo" }
1537     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1538     resp = conn.getresponse()
1539     if resp.status != 400:
1540         raise Exception("Unexpected HTTP response: %s" % resp.status)
1541
1542     logger.debug("Invalid unsubscription")
1543     headers = { "sid": "uuid:\t \tfoo" }
1544     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1545     resp = conn.getresponse()
1546     if resp.status != 400:
1547         raise Exception("Unexpected HTTP response: %s" % resp.status)
1548
1549     logger.debug("Invalid unsubscription")
1550     headers = { "NT": "upnp:event",
1551                 "sid": sid }
1552     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1553     resp = conn.getresponse()
1554     if resp.status != 400:
1555         raise Exception("Unexpected HTTP response: %s" % resp.status)
1556     headers = { "callback": '<http://127.0.0.1:12345/event>',
1557                 "sid": sid }
1558     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1559     resp = conn.getresponse()
1560     if resp.status != 400:
1561         raise Exception("Unexpected HTTP response: %s" % resp.status)
1562
1563     logger.debug("Valid subscription with multiple callbacks")
1564     headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1565                 "NT": "upnp:event",
1566                 "timeout": "Second-1234" }
1567     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1568     resp = conn.getresponse()
1569     if resp.status != 200:
1570         raise Exception("Unexpected HTTP response: %s" % resp.status)
1571     sid = resp.getheader("sid")
1572     logger.debug("Subscription SID " + sid)