tests: WPS cred event for external use
[mech_eap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import subprocess
10 import logging
11 logger = logging.getLogger()
12 import re
13 import socket
14 import httplib
15 import urlparse
16 import urllib
17 import xml.etree.ElementTree as ET
18 import StringIO
19
20 import hwsim_utils
21 import hostapd
22
23 def test_ap_wps_init(dev, apdev):
24     """Initial AP configuration with first WPS Enrollee"""
25     ssid = "test-wps"
26     hostapd.add_ap(apdev[0]['ifname'],
27                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
28     hapd = hostapd.Hostapd(apdev[0]['ifname'])
29     logger.info("WPS provisioning step")
30     hapd.request("WPS_PBC")
31     if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32         raise Exception("PBC status not shown correctly")
33
34     id = dev[0].add_network()
35     dev[0].set_network_quoted(id, "ssid", "home")
36     dev[0].set_network_quoted(id, "psk", "12345678")
37     dev[0].request("ENABLE_NETWORK %s no-connect" % id)
38
39     id = dev[0].add_network()
40     dev[0].set_network_quoted(id, "ssid", "home2")
41     dev[0].set_network(id, "bssid", "00:11:22:33:44:55")
42     dev[0].set_network(id, "key_mgmt", "NONE")
43     dev[0].request("ENABLE_NETWORK %s no-connect" % id)
44
45     dev[0].request("WPS_PBC")
46     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
47     if ev is None:
48         raise Exception("Association with the AP timed out")
49     status = dev[0].get_status()
50     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
51         raise Exception("Not fully connected")
52     if status['ssid'] != ssid:
53         raise Exception("Unexpected SSID")
54     if status['pairwise_cipher'] != 'CCMP':
55         raise Exception("Unexpected encryption configuration")
56     if status['key_mgmt'] != 'WPA2-PSK':
57         raise Exception("Unexpected key_mgmt")
58
59     status = hapd.request("WPS_GET_STATUS")
60     if "PBC Status: Disabled" not in status:
61         raise Exception("PBC status not shown correctly")
62     if "Last WPS result: Success" not in status:
63         raise Exception("Last WPS result not shown correctly")
64     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
65         raise Exception("Peer address not shown correctly")
66     conf = hapd.request("GET_CONFIG")
67     if "wps_state=configured" not in conf:
68         raise Exception("AP not in WPS configured state")
69     if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
70         raise Exception("Unexpected rsn_pairwise_cipher")
71     if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
72         raise Exception("Unexpected wpa_pairwise_cipher")
73     if "group_cipher=TKIP" not in conf:
74         raise Exception("Unexpected group_cipher")
75
76     if len(dev[0].list_networks()) != 3:
77         raise Exception("Unexpected number of network blocks")
78
79 def test_ap_wps_init_2ap_pbc(dev, apdev):
80     """Initial two-radio AP configuration with first WPS PBC Enrollee"""
81     ssid = "test-wps"
82     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
83     hostapd.add_ap(apdev[0]['ifname'], params)
84     hostapd.add_ap(apdev[1]['ifname'], params)
85     hapd = hostapd.Hostapd(apdev[0]['ifname'])
86     logger.info("WPS provisioning step")
87     hapd.request("WPS_PBC")
88     dev[0].scan(freq="2412")
89     bss = dev[0].get_bss(apdev[0]['bssid'])
90     if "[WPS-PBC]" not in bss['flags']:
91         raise Exception("WPS-PBC flag missing from AP1")
92     bss = dev[0].get_bss(apdev[1]['bssid'])
93     if "[WPS-PBC]" not in bss['flags']:
94         raise Exception("WPS-PBC flag missing from AP2")
95     dev[0].dump_monitor()
96     dev[0].request("SET wps_cred_processing 2")
97     dev[0].request("WPS_PBC")
98     ev = dev[0].wait_event(["WPS-CRED-RECEIVED"], timeout=30)
99     dev[0].request("SET wps_cred_processing 0")
100     if ev is None:
101         raise Exception("WPS cred event not seen")
102     if "100e" not in ev:
103         raise Exception("WPS attributes not included in the cred event")
104     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
105     if ev is None:
106         raise Exception("Association with the AP timed out")
107
108     dev[1].scan(freq="2412")
109     bss = dev[1].get_bss(apdev[0]['bssid'])
110     if "[WPS-PBC]" in bss['flags']:
111         raise Exception("WPS-PBC flag not cleared from AP1")
112     bss = dev[1].get_bss(apdev[1]['bssid'])
113     if "[WPS-PBC]" in bss['flags']:
114         raise Exception("WPS-PBC flag bit ckeared from AP2")
115
116 def test_ap_wps_init_2ap_pin(dev, apdev):
117     """Initial two-radio AP configuration with first WPS PIN Enrollee"""
118     ssid = "test-wps"
119     params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
120     hostapd.add_ap(apdev[0]['ifname'], params)
121     hostapd.add_ap(apdev[1]['ifname'], params)
122     hapd = hostapd.Hostapd(apdev[0]['ifname'])
123     logger.info("WPS provisioning step")
124     pin = dev[0].wps_read_pin()
125     hapd.request("WPS_PIN any " + pin)
126     dev[0].scan(freq="2412")
127     bss = dev[0].get_bss(apdev[0]['bssid'])
128     if "[WPS-AUTH]" not in bss['flags']:
129         raise Exception("WPS-AUTH flag missing from AP1")
130     bss = dev[0].get_bss(apdev[1]['bssid'])
131     if "[WPS-AUTH]" not in bss['flags']:
132         raise Exception("WPS-AUTH flag missing from AP2")
133     dev[0].dump_monitor()
134     dev[0].request("WPS_PIN any " + pin)
135     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
136     if ev is None:
137         raise Exception("Association with the AP timed out")
138
139     dev[1].scan(freq="2412")
140     bss = dev[1].get_bss(apdev[0]['bssid'])
141     if "[WPS-AUTH]" in bss['flags']:
142         raise Exception("WPS-AUTH flag not cleared from AP1")
143     bss = dev[1].get_bss(apdev[1]['bssid'])
144     if "[WPS-AUTH]" in bss['flags']:
145         raise Exception("WPS-AUTH flag bit ckeared from AP2")
146
147 def test_ap_wps_init_through_wps_config(dev, apdev):
148     """Initial AP configuration using wps_config command"""
149     ssid = "test-wps-init-config"
150     hostapd.add_ap(apdev[0]['ifname'],
151                    { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
152     hapd = hostapd.Hostapd(apdev[0]['ifname'])
153     if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
154         raise Exception("WPS_CONFIG command failed")
155     ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
156     if ev is None:
157         raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
158     # It takes some time for the AP to update Beacon and Probe Response frames,
159     # so wait here before requesting the scan to be started to avoid adding
160     # extra five second wait to the test due to fetching obsolete scan results.
161     hapd.ping()
162     time.sleep(0.2)
163     dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
164                    pairwise="CCMP", group="CCMP")
165
166 def test_ap_wps_conf(dev, apdev):
167     """WPS PBC provisioning with configured AP"""
168     ssid = "test-wps-conf"
169     hostapd.add_ap(apdev[0]['ifname'],
170                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
171                      "wpa_passphrase": "12345678", "wpa": "2",
172                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
173     hapd = hostapd.Hostapd(apdev[0]['ifname'])
174     logger.info("WPS provisioning step")
175     hapd.request("WPS_PBC")
176     dev[0].dump_monitor()
177     dev[0].request("WPS_PBC")
178     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
179     if ev is None:
180         raise Exception("Association with the AP timed out")
181     status = dev[0].get_status()
182     if status['wpa_state'] != 'COMPLETED':
183         raise Exception("Not fully connected")
184     if status['bssid'] != apdev[0]['bssid']:
185         raise Exception("Unexpected BSSID")
186     if status['ssid'] != ssid:
187         raise Exception("Unexpected SSID")
188     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
189         raise Exception("Unexpected encryption configuration")
190     if status['key_mgmt'] != 'WPA2-PSK':
191         raise Exception("Unexpected key_mgmt")
192
193     sta = hapd.get_sta(dev[0].p2p_interface_addr())
194     if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
195         raise Exception("Device name not available in STA command")
196
197 def test_ap_wps_twice(dev, apdev):
198     """WPS provisioning with twice to change passphrase"""
199     ssid = "test-wps-twice"
200     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
201                "wpa_passphrase": "12345678", "wpa": "2",
202                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
203     hostapd.add_ap(apdev[0]['ifname'], params)
204     hapd = hostapd.Hostapd(apdev[0]['ifname'])
205     logger.info("WPS provisioning step")
206     hapd.request("WPS_PBC")
207     dev[0].dump_monitor()
208     dev[0].request("WPS_PBC")
209     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
210     if ev is None:
211         raise Exception("Association with the AP timed out")
212     dev[0].request("DISCONNECT")
213
214     logger.info("Restart AP with different passphrase and re-run WPS")
215     hapd_global = hostapd.HostapdGlobal()
216     hapd_global.remove(apdev[0]['ifname'])
217     params['wpa_passphrase'] = 'another passphrase'
218     hostapd.add_ap(apdev[0]['ifname'], params)
219     hapd = hostapd.Hostapd(apdev[0]['ifname'])
220     logger.info("WPS provisioning step")
221     hapd.request("WPS_PBC")
222     dev[0].dump_monitor()
223     dev[0].request("WPS_PBC")
224     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
225     if ev is None:
226         raise Exception("Association with the AP timed out")
227     networks = dev[0].list_networks()
228     if len(networks) > 1:
229         raise Exception("Unexpected duplicated network block present")
230
231 def test_ap_wps_incorrect_pin(dev, apdev):
232     """WPS PIN provisioning with incorrect PIN"""
233     ssid = "test-wps-incorrect-pin"
234     hostapd.add_ap(apdev[0]['ifname'],
235                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
236                      "wpa_passphrase": "12345678", "wpa": "2",
237                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
238     hapd = hostapd.Hostapd(apdev[0]['ifname'])
239
240     logger.info("WPS provisioning attempt 1")
241     hapd.request("WPS_PIN any 12345670")
242     dev[0].dump_monitor()
243     dev[0].request("WPS_PIN any 55554444")
244     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
245     if ev is None:
246         raise Exception("WPS operation timed out")
247     if "config_error=18" not in ev:
248         raise Exception("Incorrect config_error reported")
249     if "msg=8" not in ev:
250         raise Exception("PIN error detected on incorrect message")
251     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
252     if ev is None:
253         raise Exception("Timeout on disconnection event")
254     dev[0].request("WPS_CANCEL")
255     # if a scan was in progress, wait for it to complete before trying WPS again
256     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
257
258     status = hapd.request("WPS_GET_STATUS")
259     if "Last WPS result: Failed" not in status:
260         raise Exception("WPS failure result not shown correctly")
261
262     logger.info("WPS provisioning attempt 2")
263     hapd.request("WPS_PIN any 12345670")
264     dev[0].dump_monitor()
265     dev[0].request("WPS_PIN any 12344444")
266     ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
267     if ev is None:
268         raise Exception("WPS operation timed out")
269     if "config_error=18" not in ev:
270         raise Exception("Incorrect config_error reported")
271     if "msg=10" not in ev:
272         raise Exception("PIN error detected on incorrect message")
273     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
274     if ev is None:
275         raise Exception("Timeout on disconnection event")
276
277 def test_ap_wps_conf_pin(dev, apdev):
278     """WPS PIN provisioning with configured AP"""
279     ssid = "test-wps-conf-pin"
280     hostapd.add_ap(apdev[0]['ifname'],
281                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
282                      "wpa_passphrase": "12345678", "wpa": "2",
283                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
284     hapd = hostapd.Hostapd(apdev[0]['ifname'])
285     logger.info("WPS provisioning step")
286     pin = dev[0].wps_read_pin()
287     hapd.request("WPS_PIN any " + pin)
288     dev[0].dump_monitor()
289     dev[0].request("WPS_PIN any " + pin)
290     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
291     if ev is None:
292         raise Exception("Association with the AP timed out")
293     status = dev[0].get_status()
294     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
295         raise Exception("Not fully connected")
296     if status['ssid'] != ssid:
297         raise Exception("Unexpected SSID")
298     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
299         raise Exception("Unexpected encryption configuration")
300     if status['key_mgmt'] != 'WPA2-PSK':
301         raise Exception("Unexpected key_mgmt")
302
303     dev[1].scan(freq="2412")
304     bss = dev[1].get_bss(apdev[0]['bssid'])
305     if "[WPS-AUTH]" in bss['flags']:
306         raise Exception("WPS-AUTH flag not cleared")
307     logger.info("Try to connect from another station using the same PIN")
308     dev[1].request("WPS_PIN any " + pin)
309     ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
310     if ev is None:
311         raise Exception("Operation timed out")
312     if "WPS-M2D" not in ev:
313         raise Exception("Unexpected WPS operation started")
314
315 def test_ap_wps_conf_pin_2sta(dev, apdev):
316     """Two stations trying to use WPS PIN at the same time"""
317     ssid = "test-wps-conf-pin2"
318     hostapd.add_ap(apdev[0]['ifname'],
319                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
320                      "wpa_passphrase": "12345678", "wpa": "2",
321                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
322     hapd = hostapd.Hostapd(apdev[0]['ifname'])
323     logger.info("WPS provisioning step")
324     pin = "12345670"
325     pin2 = "55554444"
326     hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
327     hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
328     dev[0].dump_monitor()
329     dev[1].dump_monitor()
330     dev[0].request("WPS_PIN any " + pin)
331     dev[1].request("WPS_PIN any " + pin)
332     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
333     if ev is None:
334         raise Exception("Association with the AP timed out")
335     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
336     if ev is None:
337         raise Exception("Association with the AP timed out")
338
339 def test_ap_wps_conf_pin_timeout(dev, apdev):
340     """WPS PIN provisioning with configured AP timing out PIN"""
341     ssid = "test-wps-conf-pin"
342     hostapd.add_ap(apdev[0]['ifname'],
343                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
344                      "wpa_passphrase": "12345678", "wpa": "2",
345                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
346     hapd = hostapd.Hostapd(apdev[0]['ifname'])
347     addr = dev[0].p2p_interface_addr()
348     pin = dev[0].wps_read_pin()
349     if "FAIL" not in hapd.request("WPS_PIN "):
350         raise Exception("Unexpected success on invalid WPS_PIN")
351     hapd.request("WPS_PIN any " + pin + " 1")
352     time.sleep(1.1)
353     dev[0].request("WPS_PIN any " + pin)
354     ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=20)
355     if ev is None:
356         raise Exception("WPS-PIN-NEEDED event timed out")
357     ev = dev[0].wait_event(["WPS-M2D"])
358     if ev is None:
359         raise Exception("M2D not reported")
360     dev[0].request("WPS_CANCEL")
361
362     hapd.request("WPS_PIN any " + pin + " 20 " + addr)
363     dev[0].request("WPS_PIN any " + pin)
364     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
365     if ev is None:
366         raise Exception("Association with the AP timed out")
367
368 def test_ap_wps_reg_connect(dev, apdev):
369     """WPS registrar using AP PIN to connect"""
370     ssid = "test-wps-reg-ap-pin"
371     appin = "12345670"
372     hostapd.add_ap(apdev[0]['ifname'],
373                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
374                      "wpa_passphrase": "12345678", "wpa": "2",
375                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
376                      "ap_pin": appin})
377     logger.info("WPS provisioning step")
378     dev[0].dump_monitor()
379     dev[0].wps_reg(apdev[0]['bssid'], appin)
380     status = dev[0].get_status()
381     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
382         raise Exception("Not fully connected")
383     if status['ssid'] != ssid:
384         raise Exception("Unexpected SSID")
385     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
386         raise Exception("Unexpected encryption configuration")
387     if status['key_mgmt'] != 'WPA2-PSK':
388         raise Exception("Unexpected key_mgmt")
389
390 def check_wps_reg_failure(dev, ap, appin):
391     dev.request("WPS_REG " + ap['bssid'] + " " + appin)
392     ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
393     if ev is None:
394         raise Exception("WPS operation timed out")
395     if "WPS-SUCCESS" in ev:
396         raise Exception("WPS operation succeeded unexpectedly")
397     if "config_error=15" not in ev:
398         raise Exception("WPS setup locked state was not reported correctly")
399
400 def test_ap_wps_random_ap_pin(dev, apdev):
401     """WPS registrar using random AP PIN"""
402     ssid = "test-wps-reg-random-ap-pin"
403     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
404     hostapd.add_ap(apdev[0]['ifname'],
405                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
406                      "wpa_passphrase": "12345678", "wpa": "2",
407                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
408                      "device_name": "Wireless AP", "manufacturer": "Company",
409                      "model_name": "WAP", "model_number": "123",
410                      "serial_number": "12345", "device_type": "6-0050F204-1",
411                      "os_version": "01020300",
412                      "config_methods": "label push_button",
413                      "uuid": ap_uuid, "upnp_iface": "lo" })
414     hapd = hostapd.Hostapd(apdev[0]['ifname'])
415     appin = hapd.request("WPS_AP_PIN random")
416     if "FAIL" in appin:
417         raise Exception("Could not generate random AP PIN")
418     if appin not in hapd.request("WPS_AP_PIN get"):
419         raise Exception("Could not fetch current AP PIN")
420     logger.info("WPS provisioning step")
421     dev[0].wps_reg(apdev[0]['bssid'], appin)
422
423     hapd.request("WPS_AP_PIN disable")
424     logger.info("WPS provisioning step with AP PIN disabled")
425     check_wps_reg_failure(dev[1], apdev[0], appin)
426
427     logger.info("WPS provisioning step with AP PIN reset")
428     appin = "12345670"
429     hapd.request("WPS_AP_PIN set " + appin)
430     dev[1].wps_reg(apdev[0]['bssid'], appin)
431     dev[0].request("REMOVE_NETWORK all")
432     dev[1].request("REMOVE_NETWORK all")
433     dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
434     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
435
436     logger.info("WPS provisioning step after AP PIN timeout")
437     hapd.request("WPS_AP_PIN disable")
438     appin = hapd.request("WPS_AP_PIN random 1")
439     time.sleep(1.1)
440     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
441         raise Exception("AP PIN unexpectedly still enabled")
442     check_wps_reg_failure(dev[0], apdev[0], appin)
443
444     logger.info("WPS provisioning step after AP PIN timeout(2)")
445     hapd.request("WPS_AP_PIN disable")
446     appin = "12345670"
447     hapd.request("WPS_AP_PIN set " + appin + " 1")
448     time.sleep(1.1)
449     if "FAIL" not in hapd.request("WPS_AP_PIN get"):
450         raise Exception("AP PIN unexpectedly still enabled")
451     check_wps_reg_failure(dev[1], apdev[0], appin)
452
453 def test_ap_wps_reg_config(dev, apdev):
454     """WPS registrar configuring an AP using AP PIN"""
455     ssid = "test-wps-init-ap-pin"
456     appin = "12345670"
457     hostapd.add_ap(apdev[0]['ifname'],
458                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
459                      "ap_pin": appin})
460     logger.info("WPS configuration step")
461     dev[0].dump_monitor()
462     new_ssid = "wps-new-ssid"
463     new_passphrase = "1234567890"
464     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
465                    new_passphrase)
466     status = dev[0].get_status()
467     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
468         raise Exception("Not fully connected")
469     if status['ssid'] != new_ssid:
470         raise Exception("Unexpected SSID")
471     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
472         raise Exception("Unexpected encryption configuration")
473     if status['key_mgmt'] != 'WPA2-PSK':
474         raise Exception("Unexpected key_mgmt")
475
476     logger.info("Re-configure back to open")
477     dev[0].request("REMOVE_NETWORK all")
478     dev[0].request("BSS_FLUSH 0")
479     dev[0].request("SCAN freq=2412 only_new=1")
480     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
481     if ev is None:
482         raise Exception("Scan timed out")
483     dev[0].dump_monitor()
484     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
485     status = dev[0].get_status()
486     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
487         raise Exception("Not fully connected")
488     if status['ssid'] != "wps-open":
489         raise Exception("Unexpected SSID")
490     if status['key_mgmt'] != 'NONE':
491         raise Exception("Unexpected key_mgmt")
492
493 def test_ap_wps_reg_config_ext_processing(dev, apdev):
494     """WPS registrar configuring an AP with external config processing"""
495     ssid = "test-wps-init-ap-pin"
496     appin = "12345670"
497     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
498                "wps_cred_processing": "1", "ap_pin": appin}
499     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
500     new_ssid = "wps-new-ssid"
501     new_passphrase = "1234567890"
502     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
503                    new_passphrase, no_wait=True)
504     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
505     if ev is None:
506         raise Exception("WPS registrar operation timed out")
507     ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=15)
508     if ev is None:
509         raise Exception("WPS configuration timed out")
510     if "1026" not in ev:
511         raise Exception("AP Settings missing from event")
512     hapd.request("SET wps_cred_processing 0")
513     if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
514         raise Exception("WPS_CONFIG command failed")
515     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
516     if ev is None:
517         raise Exception("Association with the AP timed out")
518
519 def test_ap_wps_reg_config_tkip(dev, apdev):
520     """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
521     ssid = "test-wps-init-ap"
522     appin = "12345670"
523     hostapd.add_ap(apdev[0]['ifname'],
524                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
525                      "ap_pin": appin})
526     logger.info("WPS configuration step")
527     dev[0].request("SET wps_version_number 0x10")
528     dev[0].dump_monitor()
529     new_ssid = "wps-new-ssid-with-tkip"
530     new_passphrase = "1234567890"
531     dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
532                    new_passphrase)
533     logger.info("Re-connect to verify WPA2 mixed mode")
534     dev[0].request("DISCONNECT")
535     id = 0
536     dev[0].set_network(id, "pairwise", "CCMP")
537     dev[0].set_network(id, "proto", "RSN")
538     dev[0].connect_network(id)
539     status = dev[0].get_status()
540     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
541         raise Exception("Not fully connected")
542     if status['ssid'] != new_ssid:
543         raise Exception("Unexpected SSID")
544     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
545         raise Exception("Unexpected encryption configuration")
546     if status['key_mgmt'] != 'WPA2-PSK':
547         raise Exception("Unexpected key_mgmt")
548
549 def test_ap_wps_setup_locked(dev, apdev):
550     """WPS registrar locking up AP setup on AP PIN failures"""
551     ssid = "test-wps-incorrect-ap-pin"
552     appin = "12345670"
553     hostapd.add_ap(apdev[0]['ifname'],
554                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
555                      "wpa_passphrase": "12345678", "wpa": "2",
556                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
557                      "ap_pin": appin})
558     new_ssid = "wps-new-ssid-test"
559     new_passphrase = "1234567890"
560
561     ap_setup_locked=False
562     for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
563         dev[0].dump_monitor()
564         logger.info("Try incorrect AP PIN - attempt " + pin)
565         dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
566                        "CCMP", new_passphrase, no_wait=True)
567         ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
568         if ev is None:
569             raise Exception("Timeout on receiving WPS operation failure event")
570         if "CTRL-EVENT-CONNECTED" in ev:
571             raise Exception("Unexpected connection")
572         if "config_error=15" in ev:
573             logger.info("AP Setup Locked")
574             ap_setup_locked=True
575         elif "config_error=18" not in ev:
576             raise Exception("config_error=18 not reported")
577         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
578         if ev is None:
579             raise Exception("Timeout on disconnection event")
580         time.sleep(0.1)
581     if not ap_setup_locked:
582         raise Exception("AP setup was not locked")
583
584     hapd = hostapd.Hostapd(apdev[0]['ifname'])
585     status = hapd.request("WPS_GET_STATUS")
586     if "Last WPS result: Failed" not in status:
587         raise Exception("WPS failure result not shown correctly")
588     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
589         raise Exception("Peer address not shown correctly")
590
591     time.sleep(0.5)
592     dev[0].dump_monitor()
593     logger.info("WPS provisioning step")
594     pin = dev[0].wps_read_pin()
595     hapd = hostapd.Hostapd(apdev[0]['ifname'])
596     hapd.request("WPS_PIN any " + pin)
597     dev[0].request("WPS_PIN any " + pin)
598     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
599     if ev is None:
600         raise Exception("WPS success was not reported")
601     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
602     if ev is None:
603         raise Exception("Association with the AP timed out")
604
605     appin = hapd.request("WPS_AP_PIN random")
606     if "FAIL" in appin:
607         raise Exception("Could not generate random AP PIN")
608     ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=10)
609     if ev is None:
610         raise Exception("Failed to unlock AP PIN")
611
612 def test_ap_wps_setup_locked_timeout(dev, apdev):
613     """WPS re-enabling AP PIN after timeout"""
614     ssid = "test-wps-incorrect-ap-pin"
615     appin = "12345670"
616     hostapd.add_ap(apdev[0]['ifname'],
617                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
618                      "wpa_passphrase": "12345678", "wpa": "2",
619                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
620                      "ap_pin": appin})
621     new_ssid = "wps-new-ssid-test"
622     new_passphrase = "1234567890"
623
624     ap_setup_locked=False
625     for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
626         dev[0].dump_monitor()
627         logger.info("Try incorrect AP PIN - attempt " + pin)
628         dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
629                        "CCMP", new_passphrase, no_wait=True)
630         ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
631         if ev is None:
632             raise Exception("Timeout on receiving WPS operation failure event")
633         if "CTRL-EVENT-CONNECTED" in ev:
634             raise Exception("Unexpected connection")
635         if "config_error=15" in ev:
636             logger.info("AP Setup Locked")
637             ap_setup_locked=True
638             break
639         elif "config_error=18" not in ev:
640             raise Exception("config_error=18 not reported")
641         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
642         if ev is None:
643             raise Exception("Timeout on disconnection event")
644         time.sleep(0.1)
645     if not ap_setup_locked:
646         raise Exception("AP setup was not locked")
647     hapd = hostapd.Hostapd(apdev[0]['ifname'])
648     ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=80)
649     if ev is None:
650         raise Exception("AP PIN did not get unlocked on 60 second timeout")
651
652 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
653     """WPS PBC session overlap with two active APs"""
654     hostapd.add_ap(apdev[0]['ifname'],
655                    { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
656                      "wpa_passphrase": "12345678", "wpa": "2",
657                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
658                      "wps_independent": "1"})
659     hostapd.add_ap(apdev[1]['ifname'],
660                    { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
661                      "wpa_passphrase": "123456789", "wpa": "2",
662                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
663                      "wps_independent": "1"})
664     hapd = hostapd.Hostapd(apdev[0]['ifname'])
665     hapd.request("WPS_PBC")
666     hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
667     hapd2.request("WPS_PBC")
668     logger.info("WPS provisioning step")
669     dev[0].dump_monitor()
670     dev[0].request("WPS_PBC")
671     ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
672     if ev is None:
673         raise Exception("PBC session overlap not detected")
674
675 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
676     """WPS PBC session overlap with two active STAs"""
677     ssid = "test-wps-pbc-overlap"
678     hostapd.add_ap(apdev[0]['ifname'],
679                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
680                      "wpa_passphrase": "12345678", "wpa": "2",
681                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
682     hapd = hostapd.Hostapd(apdev[0]['ifname'])
683     logger.info("WPS provisioning step")
684     hapd.request("WPS_PBC")
685     dev[0].dump_monitor()
686     dev[1].dump_monitor()
687     dev[0].request("WPS_PBC")
688     dev[1].request("WPS_PBC")
689     ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
690     if ev is None:
691         raise Exception("PBC session overlap not detected (dev0)")
692     if "config_error=12" not in ev:
693         raise Exception("PBC session overlap not correctly reported (dev0)")
694     ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
695     if ev is None:
696         raise Exception("PBC session overlap not detected (dev1)")
697     if "config_error=12" not in ev:
698         raise Exception("PBC session overlap not correctly reported (dev1)")
699     hapd.request("WPS_CANCEL")
700     ret = hapd.request("WPS_PBC")
701     if "FAIL" not in ret:
702         raise Exception("PBC mode allowed to be started while PBC overlap still active")
703
704 def test_ap_wps_cancel(dev, apdev):
705     """WPS AP cancelling enabled config method"""
706     ssid = "test-wps-ap-cancel"
707     hostapd.add_ap(apdev[0]['ifname'],
708                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
709                      "wpa_passphrase": "12345678", "wpa": "2",
710                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
711     bssid = apdev[0]['bssid']
712     hapd = hostapd.Hostapd(apdev[0]['ifname'])
713
714     logger.info("Verify PBC enable/cancel")
715     hapd.request("WPS_PBC")
716     dev[0].scan(freq="2412")
717     bss = dev[0].get_bss(apdev[0]['bssid'])
718     if "[WPS-PBC]" not in bss['flags']:
719         raise Exception("WPS-PBC flag missing")
720     if "FAIL" in hapd.request("WPS_CANCEL"):
721         raise Exception("WPS_CANCEL failed")
722     dev[0].scan(freq="2412")
723     bss = dev[0].get_bss(apdev[0]['bssid'])
724     if "[WPS-PBC]" in bss['flags']:
725         raise Exception("WPS-PBC flag not cleared")
726
727     logger.info("Verify PIN enable/cancel")
728     hapd.request("WPS_PIN any 12345670")
729     dev[0].scan(freq="2412")
730     bss = dev[0].get_bss(apdev[0]['bssid'])
731     if "[WPS-AUTH]" not in bss['flags']:
732         raise Exception("WPS-AUTH flag missing")
733     if "FAIL" in hapd.request("WPS_CANCEL"):
734         raise Exception("WPS_CANCEL failed")
735     dev[0].scan(freq="2412")
736     bss = dev[0].get_bss(apdev[0]['bssid'])
737     if "[WPS-AUTH]" in bss['flags']:
738         raise Exception("WPS-AUTH flag not cleared")
739
740 def test_ap_wps_er_add_enrollee(dev, apdev):
741     """WPS ER configuring AP and adding a new enrollee using PIN"""
742     ssid = "wps-er-add-enrollee"
743     ap_pin = "12345670"
744     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
745     hostapd.add_ap(apdev[0]['ifname'],
746                    { "ssid": ssid, "eap_server": "1", "wps_state": "1",
747                      "device_name": "Wireless AP", "manufacturer": "Company",
748                      "model_name": "WAP", "model_number": "123",
749                      "serial_number": "12345", "device_type": "6-0050F204-1",
750                      "os_version": "01020300",
751                      "config_methods": "label push_button",
752                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
753     logger.info("WPS configuration step")
754     new_passphrase = "1234567890"
755     dev[0].dump_monitor()
756     dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
757                    new_passphrase)
758     status = dev[0].get_status()
759     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
760         raise Exception("Not fully connected")
761     if status['ssid'] != ssid:
762         raise Exception("Unexpected SSID")
763     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
764         raise Exception("Unexpected encryption configuration")
765     if status['key_mgmt'] != 'WPA2-PSK':
766         raise Exception("Unexpected key_mgmt")
767
768     logger.info("Start ER")
769     dev[0].request("WPS_ER_START ifname=lo")
770     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
771     if ev is None:
772         raise Exception("AP discovery timed out")
773     if ap_uuid not in ev:
774         raise Exception("Expected AP UUID not found")
775
776     logger.info("Learn AP configuration through UPnP")
777     dev[0].dump_monitor()
778     dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
779     ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
780     if ev is None:
781         raise Exception("AP learn timed out")
782     if ap_uuid not in ev:
783         raise Exception("Expected AP UUID not in settings")
784     if "ssid=" + ssid not in ev:
785         raise Exception("Expected SSID not in settings")
786     if "key=" + new_passphrase not in ev:
787         raise Exception("Expected passphrase not in settings")
788
789     logger.info("Add Enrollee using ER")
790     pin = dev[1].wps_read_pin()
791     dev[0].dump_monitor()
792     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
793     dev[1].dump_monitor()
794     dev[1].request("WPS_PIN any " + pin)
795     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
796     if ev is None:
797         raise Exception("Enrollee did not report success")
798     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
799     if ev is None:
800         raise Exception("Association with the AP timed out")
801     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
802     if ev is None:
803         raise Exception("WPS ER did not report success")
804     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
805
806     logger.info("Add a specific Enrollee using ER")
807     pin = dev[2].wps_read_pin()
808     addr2 = dev[2].p2p_interface_addr()
809     dev[0].dump_monitor()
810     dev[2].dump_monitor()
811     dev[2].request("WPS_PIN any " + pin)
812     ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
813     if ev is None:
814         raise Exception("Enrollee not seen")
815     if addr2 not in ev:
816         raise Exception("Unexpected Enrollee MAC address")
817     dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
818     ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
819     if ev is None:
820         raise Exception("Association with the AP timed out")
821     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
822     if ev is None:
823         raise Exception("WPS ER did not report success")
824
825     logger.info("Verify registrar selection behavior")
826     dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
827     dev[1].request("DISCONNECT")
828     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
829     dev[1].scan(freq="2412")
830     bss = dev[1].get_bss(apdev[0]['bssid'])
831     if "[WPS-AUTH]" not in bss['flags']:
832         raise Exception("WPS-AUTH flag missing")
833
834     logger.info("Stop ER")
835     dev[0].dump_monitor()
836     dev[0].request("WPS_ER_STOP")
837     ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
838     if ev is None:
839         raise Exception("WPS ER unsubscription timed out")
840     # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
841     # a bit before verifying that the scan results have change.
842     time.sleep(0.2)
843
844     dev[1].scan(freq="2412")
845     bss = dev[1].get_bss(apdev[0]['bssid'])
846     if "[WPS-AUTH]" in bss['flags']:
847         raise Exception("WPS-AUTH flag not removed")
848
849 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
850     """WPS ER connected to AP and adding a new enrollee using PBC"""
851     ssid = "wps-er-add-enrollee-pbc"
852     ap_pin = "12345670"
853     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
854     hostapd.add_ap(apdev[0]['ifname'],
855                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
856                      "wpa_passphrase": "12345678", "wpa": "2",
857                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
858                      "device_name": "Wireless AP", "manufacturer": "Company",
859                      "model_name": "WAP", "model_number": "123",
860                      "serial_number": "12345", "device_type": "6-0050F204-1",
861                      "os_version": "01020300",
862                      "config_methods": "label push_button",
863                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
864     logger.info("Learn AP configuration")
865     dev[0].dump_monitor()
866     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
867     status = dev[0].get_status()
868     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
869         raise Exception("Not fully connected")
870
871     logger.info("Start ER")
872     dev[0].request("WPS_ER_START ifname=lo")
873     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
874     if ev is None:
875         raise Exception("AP discovery timed out")
876     if ap_uuid not in ev:
877         raise Exception("Expected AP UUID not found")
878
879     logger.info("Use learned network configuration on ER")
880     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
881
882     logger.info("Add Enrollee using ER and PBC")
883     dev[0].dump_monitor()
884     enrollee = dev[1].p2p_interface_addr()
885     dev[1].dump_monitor()
886     dev[1].request("WPS_PBC")
887
888     for i in range(0, 2):
889         ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
890         if ev is None:
891             raise Exception("Enrollee discovery timed out")
892         if enrollee in ev:
893             break
894         if i == 1:
895             raise Exception("Expected Enrollee not found")
896     dev[0].request("WPS_ER_PBC " + enrollee)
897
898     ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
899     if ev is None:
900         raise Exception("Enrollee did not report success")
901     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
902     if ev is None:
903         raise Exception("Association with the AP timed out")
904     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
905     if ev is None:
906         raise Exception("WPS ER did not report success")
907     hwsim_utils.test_connectivity_sta(dev[0], dev[1])
908
909     # verify BSSID selection of the AP instead of UUID
910     if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
911         raise Exception("Could not select AP based on BSSID")
912
913 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
914     """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
915     ssid = "wps-er-add-enrollee-pbc"
916     ap_pin = "12345670"
917     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
918     hostapd.add_ap(apdev[0]['ifname'],
919                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
920                      "wpa_passphrase": "12345678", "wpa": "2",
921                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
922                      "device_name": "Wireless AP", "manufacturer": "Company",
923                      "model_name": "WAP", "model_number": "123",
924                      "serial_number": "12345", "device_type": "6-0050F204-1",
925                      "os_version": "01020300",
926                      "config_methods": "label push_button",
927                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
928     logger.info("Learn AP configuration")
929     dev[0].request("SET wps_version_number 0x10")
930     dev[0].dump_monitor()
931     dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
932     status = dev[0].get_status()
933     if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
934         raise Exception("Not fully connected")
935
936     logger.info("Start ER")
937     dev[0].request("WPS_ER_START ifname=lo")
938     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
939     if ev is None:
940         raise Exception("AP discovery timed out")
941     if ap_uuid not in ev:
942         raise Exception("Expected AP UUID not found")
943
944     logger.info("Use learned network configuration on ER")
945     dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
946
947     logger.info("Add Enrollee using ER and PIN")
948     enrollee = dev[1].p2p_interface_addr()
949     pin = dev[1].wps_read_pin()
950     dev[0].dump_monitor()
951     dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
952     dev[1].dump_monitor()
953     dev[1].request("WPS_PIN any " + pin)
954     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
955     if ev is None:
956         raise Exception("Association with the AP timed out")
957     ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
958     if ev is None:
959         raise Exception("WPS ER did not report success")
960
961 def test_ap_wps_er_config_ap(dev, apdev):
962     """WPS ER configuring AP over UPnP"""
963     ssid = "wps-er-ap-config"
964     ap_pin = "12345670"
965     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
966     hostapd.add_ap(apdev[0]['ifname'],
967                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
968                      "wpa_passphrase": "12345678", "wpa": "2",
969                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
970                      "device_name": "Wireless AP", "manufacturer": "Company",
971                      "model_name": "WAP", "model_number": "123",
972                      "serial_number": "12345", "device_type": "6-0050F204-1",
973                      "os_version": "01020300",
974                      "config_methods": "label push_button",
975                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
976
977     logger.info("Connect ER to the AP")
978     dev[0].connect(ssid, psk="12345678", scan_freq="2412")
979
980     logger.info("WPS configuration step")
981     dev[0].request("WPS_ER_START ifname=lo")
982     ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
983     if ev is None:
984         raise Exception("AP discovery timed out")
985     if ap_uuid not in ev:
986         raise Exception("Expected AP UUID not found")
987     new_passphrase = "1234567890"
988     dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
989                    ssid.encode("hex") + " WPA2PSK CCMP " +
990                    new_passphrase.encode("hex"))
991     ev = dev[0].wait_event(["WPS-SUCCESS"])
992     if ev is None:
993         raise Exception("WPS ER configuration operation timed out")
994     dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
995     dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
996
997 def test_ap_wps_fragmentation(dev, apdev):
998     """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
999     ssid = "test-wps-fragmentation"
1000     hostapd.add_ap(apdev[0]['ifname'],
1001                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1002                      "wpa_passphrase": "12345678", "wpa": "3",
1003                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1004                      "wpa_pairwise": "TKIP",
1005                      "fragment_size": "50" })
1006     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1007     logger.info("WPS provisioning step")
1008     hapd.request("WPS_PBC")
1009     dev[0].dump_monitor()
1010     dev[0].request("SET wps_fragment_size 50")
1011     dev[0].request("WPS_PBC")
1012     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1013     if ev is None:
1014         raise Exception("Association with the AP timed out")
1015     status = dev[0].get_status()
1016     if status['wpa_state'] != 'COMPLETED':
1017         raise Exception("Not fully connected")
1018     if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
1019         raise Exception("Unexpected encryption configuration")
1020     if status['key_mgmt'] != 'WPA2-PSK':
1021         raise Exception("Unexpected key_mgmt")
1022
1023 def test_ap_wps_new_version_sta(dev, apdev):
1024     """WPS compatibility with new version number on the station"""
1025     ssid = "test-wps-ver"
1026     hostapd.add_ap(apdev[0]['ifname'],
1027                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1028                      "wpa_passphrase": "12345678", "wpa": "2",
1029                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1030     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1031     logger.info("WPS provisioning step")
1032     hapd.request("WPS_PBC")
1033     dev[0].dump_monitor()
1034     dev[0].request("SET wps_version_number 0x43")
1035     dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
1036     dev[0].request("WPS_PBC")
1037     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1038     if ev is None:
1039         raise Exception("Association with the AP timed out")
1040
1041 def test_ap_wps_new_version_ap(dev, apdev):
1042     """WPS compatibility with new version number on the AP"""
1043     ssid = "test-wps-ver"
1044     hostapd.add_ap(apdev[0]['ifname'],
1045                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1046                      "wpa_passphrase": "12345678", "wpa": "2",
1047                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1048     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1049     logger.info("WPS provisioning step")
1050     if "FAIL" in hapd.request("SET wps_version_number 0x43"):
1051         raise Exception("Failed to enable test functionality")
1052     hapd.request("WPS_PBC")
1053     dev[0].dump_monitor()
1054     dev[0].request("WPS_PBC")
1055     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1056     hapd.request("SET wps_version_number 0x20")
1057     if ev is None:
1058         raise Exception("Association with the AP timed out")
1059
1060 def test_ap_wps_check_pin(dev, apdev):
1061     """Verify PIN checking through control interface"""
1062     hostapd.add_ap(apdev[0]['ifname'],
1063                    { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1064                      "wpa_passphrase": "12345678", "wpa": "2",
1065                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1066     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1067     for t in [ ("12345670", "12345670"),
1068                ("12345678", "FAIL-CHECKSUM"),
1069                ("1234-5670", "12345670"),
1070                ("1234 5670", "12345670"),
1071                ("1-2.3:4 5670", "12345670") ]:
1072         res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1073         res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1074         if res != res2:
1075             raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1076         if res != t[1]:
1077             raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
1078
1079     if "FAIL" not in hapd.request("WPS_CHECK_PIN 12345"):
1080         raise Exception("Unexpected WPS_CHECK_PIN success")
1081     if "FAIL" not in hapd.request("WPS_CHECK_PIN 123456789"):
1082         raise Exception("Unexpected WPS_CHECK_PIN success")
1083
1084 def test_ap_wps_wep_config(dev, apdev):
1085     """WPS 2.0 AP rejecting WEP configuration"""
1086     ssid = "test-wps-config"
1087     appin = "12345670"
1088     hostapd.add_ap(apdev[0]['ifname'],
1089                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1090                      "ap_pin": appin})
1091     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1092     dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
1093                    "hello", no_wait=True)
1094     ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
1095     if ev is None:
1096         raise Exception("WPS-FAIL timed out")
1097     if "reason=2" not in ev:
1098         raise Exception("Unexpected reason code in WPS-FAIL")
1099     status = hapd.request("WPS_GET_STATUS")
1100     if "Last WPS result: Failed" not in status:
1101         raise Exception("WPS failure result not shown correctly")
1102     if "Failure Reason: WEP Prohibited" not in status:
1103         raise Exception("Failure reason not reported correctly")
1104     if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
1105         raise Exception("Peer address not shown correctly")
1106
1107 def test_ap_wps_wep_enroll(dev, apdev):
1108     """WPS 2.0 STA rejecting WEP configuration"""
1109     ssid = "test-wps-wep"
1110     hostapd.add_ap(apdev[0]['ifname'],
1111                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1112                      "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1113     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1114     hapd.request("WPS_PBC")
1115     dev[0].request("WPS_PBC")
1116     ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
1117     if ev is None:
1118         raise Exception("WPS-FAIL event timed out")
1119     if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
1120         raise Exception("Unexpected WPS-FAIL event: " + ev)
1121
1122 def test_ap_wps_ie_fragmentation(dev, apdev):
1123     """WPS AP using fragmented WPS IE"""
1124     ssid = "test-wps-ie-fragmentation"
1125     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1126                "wpa_passphrase": "12345678", "wpa": "2",
1127                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1128                "device_name": "1234567890abcdef1234567890abcdef",
1129                "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1130                "model_name": "1234567890abcdef1234567890abcdef",
1131                "model_number": "1234567890abcdef1234567890abcdef",
1132                "serial_number": "1234567890abcdef1234567890abcdef" }
1133     hostapd.add_ap(apdev[0]['ifname'], params)
1134     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1135     hapd.request("WPS_PBC")
1136     dev[0].request("WPS_PBC")
1137     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1138     if ev is None:
1139         raise Exception("Association with the AP timed out")
1140     bss = dev[0].get_bss(apdev[0]['bssid'])
1141     if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1142         logger.info(bss)
1143         raise Exception("Device Name not received correctly")
1144     if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1145         raise Exception("Unexpected number of WPS IEs")
1146
1147 def get_psk(pskfile):
1148     psks = {}
1149     with open(pskfile, "r") as f:
1150         lines = f.read().splitlines()
1151         for l in lines:
1152             if l == "# WPA PSKs":
1153                 continue
1154             (addr,psk) = l.split(' ')
1155             psks[addr] = psk
1156     return psks
1157
1158 def test_ap_wps_per_station_psk(dev, apdev):
1159     """WPS PBC provisioning with per-station PSK"""
1160     addr0 = dev[0].p2p_dev_addr()
1161     addr1 = dev[1].p2p_dev_addr()
1162     addr2 = dev[2].p2p_dev_addr()
1163     ssid = "wps"
1164     appin = "12345670"
1165     pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1166     try:
1167         os.remove(pskfile)
1168     except:
1169         pass
1170
1171     try:
1172         with open(pskfile, "w") as f:
1173             f.write("# WPA PSKs\n")
1174
1175         params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1176                    "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1177                    "rsn_pairwise": "CCMP", "ap_pin": appin,
1178                    "wpa_psk_file": pskfile }
1179         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1180
1181         logger.info("First enrollee")
1182         hapd.request("WPS_PBC")
1183         dev[0].request("WPS_PBC")
1184         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1185         if ev is None:
1186             raise Exception("Association with the AP timed out (1)")
1187
1188         logger.info("Second enrollee")
1189         hapd.request("WPS_PBC")
1190         dev[1].request("WPS_PBC")
1191         ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1192         if ev is None:
1193             raise Exception("Association with the AP timed out (2)")
1194
1195         logger.info("External registrar")
1196         dev[2].wps_reg(apdev[0]['bssid'], appin)
1197
1198         logger.info("Verifying PSK results")
1199         psks = get_psk(pskfile)
1200         if addr0 not in psks:
1201             raise Exception("No PSK recorded for sta0")
1202         if addr1 not in psks:
1203             raise Exception("No PSK recorded for sta1")
1204         if addr2 not in psks:
1205             raise Exception("No PSK recorded for sta2")
1206         if psks[addr0] == psks[addr1]:
1207             raise Exception("Same PSK recorded for sta0 and sta1")
1208         if psks[addr0] == psks[addr2]:
1209             raise Exception("Same PSK recorded for sta0 and sta2")
1210         if psks[addr1] == psks[addr2]:
1211             raise Exception("Same PSK recorded for sta1 and sta2")
1212
1213         dev[0].request("REMOVE_NETWORK all")
1214         logger.info("Second external registrar")
1215         dev[0].wps_reg(apdev[0]['bssid'], appin)
1216         psks2 = get_psk(pskfile)
1217         if addr0 not in psks2:
1218             raise Exception("No PSK recorded for sta0(reg)")
1219         if psks[addr0] == psks2[addr0]:
1220             raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1221     finally:
1222         os.remove(pskfile)
1223
1224 def test_ap_wps_per_station_psk_failure(dev, apdev):
1225     """WPS PBC provisioning with per-station PSK (file not writable)"""
1226     addr0 = dev[0].p2p_dev_addr()
1227     addr1 = dev[1].p2p_dev_addr()
1228     addr2 = dev[2].p2p_dev_addr()
1229     ssid = "wps"
1230     appin = "12345670"
1231     pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1232     try:
1233         os.remove(pskfile)
1234     except:
1235         pass
1236
1237     try:
1238         with open(pskfile, "w") as f:
1239             f.write("# WPA PSKs\n")
1240
1241         params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1242                    "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1243                    "rsn_pairwise": "CCMP", "ap_pin": appin,
1244                    "wpa_psk_file": pskfile }
1245         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1246         if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1247             raise Exception("Failed to set wpa_psk_file")
1248
1249         logger.info("First enrollee")
1250         hapd.request("WPS_PBC")
1251         dev[0].request("WPS_PBC")
1252         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1253         if ev is None:
1254             raise Exception("Association with the AP timed out (1)")
1255
1256         logger.info("Second enrollee")
1257         hapd.request("WPS_PBC")
1258         dev[1].request("WPS_PBC")
1259         ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1260         if ev is None:
1261             raise Exception("Association with the AP timed out (2)")
1262
1263         logger.info("External registrar")
1264         dev[2].wps_reg(apdev[0]['bssid'], appin)
1265
1266         logger.info("Verifying PSK results")
1267         psks = get_psk(pskfile)
1268         if len(psks) > 0:
1269             raise Exception("PSK recorded unexpectedly")
1270     finally:
1271         os.remove(pskfile)
1272
1273 def test_ap_wps_pin_request_file(dev, apdev):
1274     """WPS PIN provisioning with configured AP"""
1275     ssid = "wps"
1276     pinfile = "/tmp/ap_wps_pin_request_file.log"
1277     if os.path.exists(pinfile):
1278         subprocess.call(['sudo', 'rm', pinfile])
1279     hostapd.add_ap(apdev[0]['ifname'],
1280                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1281                      "wps_pin_requests": pinfile,
1282                      "wpa_passphrase": "12345678", "wpa": "2",
1283                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1284     hapd = hostapd.Hostapd(apdev[0]['ifname'])
1285     uuid = dev[0].get_status_field("uuid")
1286     pin = dev[0].wps_read_pin()
1287     try:
1288         dev[0].request("WPS_PIN any " + pin)
1289         ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1290         if ev is None:
1291             raise Exception("PIN needed event not shown")
1292         if uuid not in ev:
1293             raise Exception("UUID mismatch")
1294         dev[0].request("WPS_CANCEL")
1295         success = False
1296         with open(pinfile, "r") as f:
1297             lines = f.readlines()
1298             for l in lines:
1299                 if uuid in l:
1300                     success = True
1301                     break
1302         if not success:
1303             raise Exception("PIN request entry not in the log file")
1304     finally:
1305         subprocess.call(['sudo', 'rm', pinfile])
1306
1307 def test_ap_wps_auto_setup_with_config_file(dev, apdev):
1308     """WPS auto-setup with configuration file"""
1309     conffile = "/tmp/ap_wps_auto_setup_with_config_file.conf"
1310     ifname = apdev[0]['ifname']
1311     try:
1312         with open(conffile, "w") as f:
1313             f.write("driver=nl80211\n")
1314             f.write("hw_mode=g\n")
1315             f.write("channel=1\n")
1316             f.write("ieee80211n=1\n")
1317             f.write("interface=%s\n" % ifname)
1318             f.write("ctrl_interface=/var/run/hostapd\n")
1319             f.write("ssid=wps\n")
1320             f.write("eap_server=1\n")
1321             f.write("wps_state=1\n")
1322         hostapd.add_bss('phy3', ifname, conffile)
1323         hapd = hostapd.Hostapd(ifname)
1324         hapd.request("WPS_PBC")
1325         dev[0].request("WPS_PBC")
1326         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1327         if ev is None:
1328             raise Exception("Association with the AP timed out")
1329         with open(conffile, "r") as f:
1330             lines = f.read().splitlines()
1331             vals = dict()
1332             for l in lines:
1333                 try:
1334                     [name,value] = l.split('=', 1)
1335                     vals[name] = value
1336                 except ValueError, e:
1337                     if "# WPS configuration" in l:
1338                         pass
1339                     else:
1340                         raise Exception("Unexpected configuration line: " + l)
1341         if vals['ieee80211n'] != '1' or vals['wps_state'] != '2' or "WPA-PSK" not in vals['wpa_key_mgmt']:
1342             raise Exception("Incorrect configuration: " + str(vals))
1343     finally:
1344         subprocess.call(['sudo', 'rm', conffile])
1345
1346 def add_ssdp_ap(ifname, ap_uuid):
1347     ssid = "wps-ssdp"
1348     ap_pin = "12345670"
1349     hostapd.add_ap(ifname,
1350                    { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1351                      "wpa_passphrase": "12345678", "wpa": "2",
1352                      "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1353                      "device_name": "Wireless AP", "manufacturer": "Company",
1354                      "model_name": "WAP", "model_number": "123",
1355                      "serial_number": "12345", "device_type": "6-0050F204-1",
1356                      "os_version": "01020300",
1357                      "config_methods": "label push_button",
1358                      "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1359                      "friendly_name": "WPS Access Point",
1360                      "manufacturer_url": "http://www.example.com/",
1361                      "model_description": "Wireless Access Point",
1362                      "model_url": "http://www.example.com/model/",
1363                      "upc": "123456789012" })
1364
1365 def ssdp_send(msg, no_recv=False):
1366     socket.setdefaulttimeout(1)
1367     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1368     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1369     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1370     sock.bind(("127.0.0.1", 0))
1371     sock.sendto(msg, ("239.255.255.250", 1900))
1372     if no_recv:
1373         return None
1374     return sock.recv(1000)
1375
1376 def ssdp_send_msearch(st):
1377     msg = '\r\n'.join([
1378             'M-SEARCH * HTTP/1.1',
1379             'HOST: 239.255.255.250:1900',
1380             'MX: 1',
1381             'MAN: "ssdp:discover"',
1382             'ST: ' + st,
1383             '', ''])
1384     return ssdp_send(msg)
1385
1386 def test_ap_wps_ssdp_msearch(dev, apdev):
1387     """WPS AP and SSDP M-SEARCH messages"""
1388     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1389     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1390
1391     msg = '\r\n'.join([
1392             'M-SEARCH * HTTP/1.1',
1393             'Host: 239.255.255.250:1900',
1394             'Mx: 1',
1395             'Man: "ssdp:discover"',
1396             'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1397             '', ''])
1398     ssdp_send(msg)
1399
1400     msg = '\r\n'.join([
1401             'M-SEARCH * HTTP/1.1',
1402             'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1403             'mx: \t1\t\t   ',
1404             'man: \t \t "ssdp:discover"   ',
1405             'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1406             '', ''])
1407     ssdp_send(msg)
1408
1409     ssdp_send_msearch("ssdp:all")
1410     ssdp_send_msearch("upnp:rootdevice")
1411     ssdp_send_msearch("uuid:" + ap_uuid)
1412     ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1413     ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1414
1415     msg = '\r\n'.join([
1416             'M-SEARCH * HTTP/1.1',
1417             'HOST:\t239.255.255.250:1900',
1418             'MAN: "ssdp:discover"',
1419             'MX: 130',
1420             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1421             '', ''])
1422     ssdp_send(msg, no_recv=True)
1423
1424 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1425     """WPS AP and invalid SSDP M-SEARCH messages"""
1426     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1427     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1428
1429     socket.setdefaulttimeout(1)
1430     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1431     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1432     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1433     sock.bind(("127.0.0.1", 0))
1434
1435     logger.debug("Missing MX")
1436     msg = '\r\n'.join([
1437             'M-SEARCH * HTTP/1.1',
1438             'HOST: 239.255.255.250:1900',
1439             'MAN: "ssdp:discover"',
1440             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1441             '', ''])
1442     sock.sendto(msg, ("239.255.255.250", 1900))
1443
1444     logger.debug("Negative MX")
1445     msg = '\r\n'.join([
1446             'M-SEARCH * HTTP/1.1',
1447             'HOST: 239.255.255.250:1900',
1448             'MX: -1',
1449             'MAN: "ssdp:discover"',
1450             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1451             '', ''])
1452     sock.sendto(msg, ("239.255.255.250", 1900))
1453
1454     logger.debug("Invalid MX")
1455     msg = '\r\n'.join([
1456             'M-SEARCH * HTTP/1.1',
1457             'HOST: 239.255.255.250:1900',
1458             'MX; 1',
1459             'MAN: "ssdp:discover"',
1460             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1461             '', ''])
1462     sock.sendto(msg, ("239.255.255.250", 1900))
1463
1464     logger.debug("Missing MAN")
1465     msg = '\r\n'.join([
1466             'M-SEARCH * HTTP/1.1',
1467             'HOST: 239.255.255.250:1900',
1468             'MX: 1',
1469             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1470             '', ''])
1471     sock.sendto(msg, ("239.255.255.250", 1900))
1472
1473     logger.debug("Invalid MAN")
1474     msg = '\r\n'.join([
1475             'M-SEARCH * HTTP/1.1',
1476             'HOST: 239.255.255.250:1900',
1477             'MX: 1',
1478             'MAN: foo',
1479             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1480             '', ''])
1481     sock.sendto(msg, ("239.255.255.250", 1900))
1482     msg = '\r\n'.join([
1483             'M-SEARCH * HTTP/1.1',
1484             'HOST: 239.255.255.250:1900',
1485             'MX: 1',
1486             'MAN; "ssdp:discover"',
1487             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1488             '', ''])
1489     sock.sendto(msg, ("239.255.255.250", 1900))
1490
1491     logger.debug("Missing HOST")
1492     msg = '\r\n'.join([
1493             'M-SEARCH * HTTP/1.1',
1494             'MAN: "ssdp:discover"',
1495             'MX: 1',
1496             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1497             '', ''])
1498     sock.sendto(msg, ("239.255.255.250", 1900))
1499
1500     logger.debug("Missing ST")
1501     msg = '\r\n'.join([
1502             'M-SEARCH * HTTP/1.1',
1503             'HOST: 239.255.255.250:1900',
1504             'MAN: "ssdp:discover"',
1505             'MX: 1',
1506             '', ''])
1507     sock.sendto(msg, ("239.255.255.250", 1900))
1508
1509     logger.debug("Mismatching ST")
1510     msg = '\r\n'.join([
1511             'M-SEARCH * HTTP/1.1',
1512             'HOST: 239.255.255.250:1900',
1513             'MAN: "ssdp:discover"',
1514             'MX: 1',
1515             'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1516             '', ''])
1517     sock.sendto(msg, ("239.255.255.250", 1900))
1518     msg = '\r\n'.join([
1519             'M-SEARCH * HTTP/1.1',
1520             'HOST: 239.255.255.250:1900',
1521             'MAN: "ssdp:discover"',
1522             'MX: 1',
1523             'ST: foo:bar',
1524             '', ''])
1525     sock.sendto(msg, ("239.255.255.250", 1900))
1526     msg = '\r\n'.join([
1527             'M-SEARCH * HTTP/1.1',
1528             'HOST: 239.255.255.250:1900',
1529             'MAN: "ssdp:discover"',
1530             'MX: 1',
1531             'ST: foobar',
1532             '', ''])
1533     sock.sendto(msg, ("239.255.255.250", 1900))
1534
1535     logger.debug("Invalid ST")
1536     msg = '\r\n'.join([
1537             'M-SEARCH * HTTP/1.1',
1538             'HOST: 239.255.255.250:1900',
1539             'MAN: "ssdp:discover"',
1540             'MX: 1',
1541             'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1542             '', ''])
1543     sock.sendto(msg, ("239.255.255.250", 1900))
1544
1545     logger.debug("Invalid M-SEARCH")
1546     msg = '\r\n'.join([
1547             'M+SEARCH * HTTP/1.1',
1548             'HOST: 239.255.255.250:1900',
1549             'MAN: "ssdp:discover"',
1550             'MX: 1',
1551             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1552             '', ''])
1553     sock.sendto(msg, ("239.255.255.250", 1900))
1554     msg = '\r\n'.join([
1555             'M-SEARCH-* HTTP/1.1',
1556             'HOST: 239.255.255.250:1900',
1557             'MAN: "ssdp:discover"',
1558             'MX: 1',
1559             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1560             '', ''])
1561     sock.sendto(msg, ("239.255.255.250", 1900))
1562
1563     logger.debug("Invalid message format")
1564     sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1565     msg = '\r'.join([
1566             'M-SEARCH * HTTP/1.1',
1567             'HOST: 239.255.255.250:1900',
1568             'MAN: "ssdp:discover"',
1569             'MX: 1',
1570             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1571             '', ''])
1572     sock.sendto(msg, ("239.255.255.250", 1900))
1573
1574     try:
1575         r = sock.recv(1000)
1576         raise Exception("Unexpected M-SEARCH response: " + r)
1577     except socket.timeout:
1578         pass
1579
1580     logger.debug("Valid M-SEARCH")
1581     msg = '\r\n'.join([
1582             'M-SEARCH * HTTP/1.1',
1583             'HOST: 239.255.255.250:1900',
1584             'MAN: "ssdp:discover"',
1585             'MX: 1',
1586             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1587             '', ''])
1588     sock.sendto(msg, ("239.255.255.250", 1900))
1589
1590     try:
1591         r = sock.recv(1000)
1592         pass
1593     except socket.timeout:
1594         raise Exception("No SSDP response")
1595
1596 def test_ap_wps_ssdp_burst(dev, apdev):
1597     """WPS AP and SSDP burst"""
1598     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1599     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1600
1601     msg = '\r\n'.join([
1602             'M-SEARCH * HTTP/1.1',
1603             'HOST: 239.255.255.250:1900',
1604             'MAN: "ssdp:discover"',
1605             'MX: 1',
1606             'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1607             '', ''])
1608     socket.setdefaulttimeout(1)
1609     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1610     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1611     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1612     sock.bind(("127.0.0.1", 0))
1613     for i in range(0, 25):
1614         sock.sendto(msg, ("239.255.255.250", 1900))
1615     resp = 0
1616     while True:
1617         try:
1618             r = sock.recv(1000)
1619             if not r.startswith("HTTP/1.1 200 OK\r\n"):
1620                 raise Exception("Unexpected message: " + r)
1621             resp += 1
1622         except socket.timeout:
1623             break
1624     if resp < 20:
1625         raise Exception("Too few SSDP responses")
1626
1627     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1628     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1629     sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1630     sock.bind(("127.0.0.1", 0))
1631     for i in range(0, 25):
1632         sock.sendto(msg, ("239.255.255.250", 1900))
1633     while True:
1634         try:
1635             r = sock.recv(1000)
1636             if ap_uuid in r:
1637                 break
1638         except socket.timeout:
1639             raise Exception("No SSDP response")
1640
1641 def ssdp_get_location(uuid):
1642     res = ssdp_send_msearch("uuid:" + uuid)
1643     location = None
1644     for l in res.splitlines():
1645         if l.lower().startswith("location:"):
1646             location = l.split(':', 1)[1].strip()
1647             break
1648     if location is None:
1649         raise Exception("No UPnP location found")
1650     return location
1651
1652 def upnp_get_urls(location):
1653     conn = urllib.urlopen(location)
1654     tree = ET.parse(conn)
1655     root = tree.getroot()
1656     urn = '{urn:schemas-upnp-org:device-1-0}'
1657     service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1658     res = {}
1659     res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1660     res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1661     res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1662     return res
1663
1664 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1665     soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1666     wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1667     ET.register_namespace('soapenv', soapns)
1668     ET.register_namespace('wfa', wpsns)
1669     attrib = {}
1670     attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1671     root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1672     body = ET.SubElement(root, "{%s}Body" % soapns)
1673     act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1674     tree = ET.ElementTree(root)
1675     soap = StringIO.StringIO()
1676     tree.write(soap, xml_declaration=True, encoding='utf-8')
1677
1678     headers = { "Content-type": 'text/xml; charset="utf-8"' }
1679     if include_soap_action:
1680         headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1681     elif soap_action_override:
1682         headers["SOAPAction"] = soap_action_override
1683     conn.request("POST", path, soap.getvalue(), headers)
1684     return conn.getresponse()
1685
1686 def test_ap_wps_upnp(dev, apdev):
1687     """WPS AP and UPnP operations"""
1688     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1689     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1690
1691     location = ssdp_get_location(ap_uuid)
1692     urls = upnp_get_urls(location)
1693
1694     conn = urllib.urlopen(urls['scpd_url'])
1695     scpd = conn.read()
1696
1697     conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1698     if conn.getcode() != 404:
1699         raise Exception("Unexpected HTTP response to GET unknown URL")
1700
1701     url = urlparse.urlparse(location)
1702     conn = httplib.HTTPConnection(url.netloc)
1703     #conn.set_debuglevel(1)
1704     headers = { "Content-type": 'text/xml; charset="utf-8"',
1705                 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1706     conn.request("POST", "hello", "\r\n\r\n", headers)
1707     resp = conn.getresponse()
1708     if resp.status != 404:
1709         raise Exception("Unexpected HTTP response: %s" % resp.status)
1710
1711     conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1712     resp = conn.getresponse()
1713     if resp.status != 501:
1714         raise Exception("Unexpected HTTP response: %s" % resp.status)
1715
1716     headers = { "Content-type": 'text/xml; charset="utf-8"',
1717                 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1718     ctrlurl = urlparse.urlparse(urls['control_url'])
1719     conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1720     resp = conn.getresponse()
1721     if resp.status != 401:
1722         raise Exception("Unexpected HTTP response: %s" % resp.status)
1723
1724     logger.debug("GetDeviceInfo without SOAPAction header")
1725     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1726                             include_soap_action=False)
1727     if resp.status != 401:
1728         raise Exception("Unexpected HTTP response: %s" % resp.status)
1729
1730     logger.debug("GetDeviceInfo with invalid SOAPAction header")
1731     for act in [ "foo",
1732                  "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1733                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1734                  '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1735         resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1736                                 include_soap_action=False,
1737                                 soap_action_override=act)
1738         if resp.status != 401:
1739             raise Exception("Unexpected HTTP response: %s" % resp.status)
1740
1741     resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1742     if resp.status != 200:
1743         raise Exception("Unexpected HTTP response: %s" % resp.status)
1744     dev = resp.read()
1745     if "NewDeviceInfo" not in dev:
1746         raise Exception("Unexpected GetDeviceInfo response")
1747
1748     logger.debug("PutMessage without required parameters")
1749     resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1750     if resp.status != 600:
1751         raise Exception("Unexpected HTTP response: %s" % resp.status)
1752
1753     logger.debug("PutWLANResponse without required parameters")
1754     resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1755     if resp.status != 600:
1756         raise Exception("Unexpected HTTP response: %s" % resp.status)
1757
1758     logger.debug("SetSelectedRegistrar from unregistered ER")
1759     resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1760     if resp.status != 501:
1761         raise Exception("Unexpected HTTP response: %s" % resp.status)
1762
1763     logger.debug("Unknown action")
1764     resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1765     if resp.status != 401:
1766         raise Exception("Unexpected HTTP response: %s" % resp.status)
1767
1768 def test_ap_wps_upnp_subscribe(dev, apdev):
1769     """WPS AP and UPnP event subscription"""
1770     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1771     add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1772
1773     location = ssdp_get_location(ap_uuid)
1774     urls = upnp_get_urls(location)
1775     eventurl = urlparse.urlparse(urls['event_sub_url'])
1776
1777     url = urlparse.urlparse(location)
1778     conn = httplib.HTTPConnection(url.netloc)
1779     #conn.set_debuglevel(1)
1780     headers = { "callback": '<http://127.0.0.1:12345/event>',
1781                 "timeout": "Second-1234" }
1782     conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1783     resp = conn.getresponse()
1784     if resp.status != 412:
1785         raise Exception("Unexpected HTTP response: %s" % resp.status)
1786
1787     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1788     resp = conn.getresponse()
1789     if resp.status != 412:
1790         raise Exception("Unexpected HTTP response: %s" % resp.status)
1791
1792     headers = { "NT": "upnp:event",
1793                 "timeout": "Second-1234" }
1794     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1795     resp = conn.getresponse()
1796     if resp.status != 412:
1797         raise Exception("Unexpected HTTP response: %s" % resp.status)
1798
1799     headers = { "callback": '<http://127.0.0.1:12345/event>',
1800                 "NT": "upnp:foobar",
1801                 "timeout": "Second-1234" }
1802     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1803     resp = conn.getresponse()
1804     if resp.status != 400:
1805         raise Exception("Unexpected HTTP response: %s" % resp.status)
1806
1807     logger.debug("Valid subscription")
1808     headers = { "callback": '<http://127.0.0.1:12345/event>',
1809                 "NT": "upnp:event",
1810                 "timeout": "Second-1234" }
1811     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1812     resp = conn.getresponse()
1813     if resp.status != 200:
1814         raise Exception("Unexpected HTTP response: %s" % resp.status)
1815     sid = resp.getheader("sid")
1816     logger.debug("Subscription SID " + sid)
1817
1818     logger.debug("Invalid re-subscription")
1819     headers = { "NT": "upnp:event",
1820                 "sid": "123456734567854",
1821                 "timeout": "Second-1234" }
1822     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1823     resp = conn.getresponse()
1824     if resp.status != 400:
1825         raise Exception("Unexpected HTTP response: %s" % resp.status)
1826
1827     logger.debug("Invalid re-subscription")
1828     headers = { "NT": "upnp:event",
1829                 "sid": "uuid:123456734567854",
1830                 "timeout": "Second-1234" }
1831     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1832     resp = conn.getresponse()
1833     if resp.status != 400:
1834         raise Exception("Unexpected HTTP response: %s" % resp.status)
1835
1836     logger.debug("Invalid re-subscription")
1837     headers = { "callback": '<http://127.0.0.1:12345/event>',
1838                 "NT": "upnp:event",
1839                 "sid": sid,
1840                 "timeout": "Second-1234" }
1841     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1842     resp = conn.getresponse()
1843     if resp.status != 400:
1844         raise Exception("Unexpected HTTP response: %s" % resp.status)
1845
1846     logger.debug("SID mismatch in re-subscription")
1847     headers = { "NT": "upnp:event",
1848                 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1849                 "timeout": "Second-1234" }
1850     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1851     resp = conn.getresponse()
1852     if resp.status != 412:
1853         raise Exception("Unexpected HTTP response: %s" % resp.status)
1854
1855     logger.debug("Valid re-subscription")
1856     headers = { "NT": "upnp:event",
1857                 "sid": sid,
1858                 "timeout": "Second-1234" }
1859     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1860     resp = conn.getresponse()
1861     if resp.status != 200:
1862         raise Exception("Unexpected HTTP response: %s" % resp.status)
1863     sid2 = resp.getheader("sid")
1864     logger.debug("Subscription SID " + sid2)
1865
1866     if sid != sid2:
1867         raise Exception("Unexpected SID change")
1868
1869     logger.debug("Valid re-subscription")
1870     headers = { "NT": "upnp:event",
1871                 "sid": "uuid: \t \t" + sid.split(':')[1],
1872                 "timeout": "Second-1234" }
1873     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1874     resp = conn.getresponse()
1875     if resp.status != 200:
1876         raise Exception("Unexpected HTTP response: %s" % resp.status)
1877
1878     logger.debug("Invalid unsubscription")
1879     headers = { "sid": sid }
1880     conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1881     resp = conn.getresponse()
1882     if resp.status != 412:
1883         raise Exception("Unexpected HTTP response: %s" % resp.status)
1884     headers = { "foo": "bar" }
1885     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1886     resp = conn.getresponse()
1887     if resp.status != 412:
1888         raise Exception("Unexpected HTTP response: %s" % resp.status)
1889
1890     logger.debug("Valid unsubscription")
1891     headers = { "sid": sid }
1892     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1893     resp = conn.getresponse()
1894     if resp.status != 200:
1895         raise Exception("Unexpected HTTP response: %s" % resp.status)
1896
1897     logger.debug("Unsubscription for not existing SID")
1898     headers = { "sid": sid }
1899     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1900     resp = conn.getresponse()
1901     if resp.status != 412:
1902         raise Exception("Unexpected HTTP response: %s" % resp.status)
1903
1904     logger.debug("Invalid unsubscription")
1905     headers = { "sid": " \t \tfoo" }
1906     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1907     resp = conn.getresponse()
1908     if resp.status != 400:
1909         raise Exception("Unexpected HTTP response: %s" % resp.status)
1910
1911     logger.debug("Invalid unsubscription")
1912     headers = { "sid": "uuid:\t \tfoo" }
1913     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1914     resp = conn.getresponse()
1915     if resp.status != 400:
1916         raise Exception("Unexpected HTTP response: %s" % resp.status)
1917
1918     logger.debug("Invalid unsubscription")
1919     headers = { "NT": "upnp:event",
1920                 "sid": sid }
1921     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1922     resp = conn.getresponse()
1923     if resp.status != 400:
1924         raise Exception("Unexpected HTTP response: %s" % resp.status)
1925     headers = { "callback": '<http://127.0.0.1:12345/event>',
1926                 "sid": sid }
1927     conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1928     resp = conn.getresponse()
1929     if resp.status != 400:
1930         raise Exception("Unexpected HTTP response: %s" % resp.status)
1931
1932     logger.debug("Valid subscription with multiple callbacks")
1933     headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1934                 "NT": "upnp:event",
1935                 "timeout": "Second-1234" }
1936     conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1937     resp = conn.getresponse()
1938     if resp.status != 200:
1939         raise Exception("Unexpected HTTP response: %s" % resp.status)
1940     sid = resp.getheader("sid")
1941     logger.debug("Subscription SID " + sid)