tests: WPS ER init failure
[mech_eap.git] / tests / hwsim / test_ap_wps.py
index 7dfc622..9684a36 100644 (file)
@@ -787,6 +787,44 @@ def test_ap_wps_setup_locked_timeout(dev, apdev):
     if ev is None:
         raise Exception("AP PIN did not get unlocked on 60 second timeout")
 
+def test_ap_wps_setup_locked_2(dev, apdev):
+    """WPS AP configured for special ap_setup_locked=2 mode"""
+    ssid = "test-wps-ap-pin"
+    appin = "12345670"
+    params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
+               "wpa_passphrase": "12345678", "wpa": "2",
+               "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
+               "ap_pin": appin, "ap_setup_locked": "2" }
+    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    new_ssid = "wps-new-ssid-test"
+    new_passphrase = "1234567890"
+
+    dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412)
+    dev[0].wps_reg(apdev[0]['bssid'], appin)
+    dev[0].request("REMOVE_NETWORK all")
+    dev[0].wait_disconnected()
+
+    hapd.dump_monitor()
+    dev[0].dump_monitor()
+    dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK",
+                   "CCMP", new_passphrase, no_wait=True)
+
+    ev = hapd.wait_event(["WPS-FAIL"], timeout=5)
+    if ev is None:
+        raise Exception("hostapd did not report WPS failure")
+    if "msg=12 config_error=15" not in ev:
+        raise Exception("Unexpected failure reason (AP): " + ev)
+
+    ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
+    if ev is None:
+        raise Exception("Timeout on receiving WPS operation failure event")
+    if "CTRL-EVENT-CONNECTED" in ev:
+        raise Exception("Unexpected connection")
+    if "config_error=15" not in ev:
+        raise Exception("Unexpected failure reason (STA): " + ev)
+    dev[0].request("WPS_CANCEL")
+    dev[0].wait_disconnected()
+
 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
     """WPS PBC session overlap with two active APs"""
     hostapd.add_ap(apdev[0]['ifname'],
@@ -1116,6 +1154,63 @@ def _test_ap_wps_er_add_enrollee_uuid(dev, apdev):
     dev[0].dump_monitor()
     dev[0].request("WPS_ER_STOP")
 
+def test_ap_wps_er_multi_add_enrollee(dev, apdev):
+    """Multiple WPS ERs adding a new enrollee using PIN"""
+    try:
+        _test_ap_wps_er_multi_add_enrollee(dev, apdev)
+    finally:
+        dev[0].request("WPS_ER_STOP")
+
+def _test_ap_wps_er_multi_add_enrollee(dev, apdev):
+    ssid = "wps-er-add-enrollee"
+    ap_pin = "12345670"
+    ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+    hostapd.add_ap(apdev[0]['ifname'],
+                   { "ssid": ssid, "eap_server": "1", "wps_state": "2",
+                     "wpa_passphrase": "12345678", "wpa": "2",
+                     "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
+                     "device_name": "Wireless AP", "manufacturer": "Company",
+                     "model_name": "WAP", "model_number": "123",
+                     "serial_number": "12345", "device_type": "6-0050F204-1",
+                     "os_version": "01020300",
+                     'friendly_name': "WPS AP",
+                     "config_methods": "label push_button",
+                     "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
+
+    for i in range(2):
+        dev[i].scan_for_bss(apdev[0]['bssid'], freq=2412)
+        dev[i].wps_reg(apdev[0]['bssid'], ap_pin)
+        dev[i].request("WPS_ER_START ifname=lo")
+    for i in range(2):
+        ev = dev[i].wait_event(["WPS-ER-AP-ADD"], timeout=15)
+        if ev is None:
+            raise Exception("AP discovery timed out")
+        dev[i].dump_monitor()
+        dev[i].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
+        ev = dev[i].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
+        if ev is None:
+            raise Exception("AP learn timed out")
+        ev = dev[i].wait_event(["WPS-FAIL"], timeout=15)
+        if ev is None:
+            raise Exception("WPS-FAIL after AP learn timed out")
+
+    time.sleep(0.1)
+
+    pin = dev[2].wps_read_pin()
+    addr = dev[2].own_addr()
+    dev[0].dump_monitor()
+    dev[0].request("WPS_ER_PIN any " + pin + " " + addr)
+    dev[1].dump_monitor()
+    dev[1].request("WPS_ER_PIN any " + pin + " " + addr)
+
+    dev[2].scan_for_bss(apdev[0]['bssid'], freq=2412)
+    dev[2].dump_monitor()
+    dev[2].request("WPS_PIN %s %s" % (apdev[0]['bssid'], pin))
+    ev = dev[2].wait_event(["WPS-SUCCESS"], timeout=30)
+    if ev is None:
+        raise Exception("Enrollee did not report success")
+    dev[2].wait_connected(timeout=15)
+
 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
     """WPS ER connected to AP and adding a new enrollee using PBC"""
     try:
@@ -2725,6 +2820,73 @@ def test_ap_wps_upnp_subscribe(dev, apdev):
         if "FAIL" not in hapd.request("ENABLE"):
             raise Exception("ENABLE succeeded during OOM")
 
+def test_ap_wps_upnp_subscribe_events(dev, apdev):
+    """WPS AP and UPnP event subscription and many events"""
+    ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+    hapd = add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
+
+    location = ssdp_get_location(ap_uuid)
+    urls = upnp_get_urls(location)
+    eventurl = urlparse.urlparse(urls['event_sub_url'])
+
+    class WPSERHTTPServer(SocketServer.StreamRequestHandler):
+        def handle(self):
+            data = self.rfile.readline().strip()
+            logger.debug(data)
+            self.wfile.write(gen_wps_event())
+
+    server = MyTCPServer(("127.0.0.1", 12345), WPSERHTTPServer)
+    server.timeout = 1
+
+    url = urlparse.urlparse(location)
+    conn = httplib.HTTPConnection(url.netloc)
+
+    headers = { "callback": '<http://127.0.0.1:12345/event>',
+                "NT": "upnp:event",
+                "timeout": "Second-1234" }
+    conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+    resp = conn.getresponse()
+    if resp.status != 200:
+        raise Exception("Unexpected HTTP response: %d" % resp.status)
+    sid = resp.getheader("sid")
+    logger.debug("Subscription SID " + sid)
+
+    # Fetch the first event message
+    server.handle_request()
+
+    # Force subscription event queue to reach the maximum length by generating
+    # new proxied events without the ER fetching any of the pending events.
+    dev[1].scan_for_bss(apdev[0]['bssid'], freq=2412)
+    dev[2].scan_for_bss(apdev[0]['bssid'], freq=2412)
+    for i in range(16):
+        dev[1].dump_monitor()
+        dev[2].dump_monitor()
+        dev[1].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670")
+        dev[2].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670")
+        dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
+        dev[1].request("WPS_CANCEL")
+        dev[2].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
+        dev[2].request("WPS_CANCEL")
+        if i % 4 == 1:
+            time.sleep(1)
+        else:
+            time.sleep(0.1)
+
+    hapd.request("WPS_PIN any 12345670")
+    dev[1].dump_monitor()
+    dev[1].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670")
+    ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=10)
+    if ev is None:
+        raise Exception("WPS success not reported")
+
+    # Close the WPS ER HTTP server without fetching all the pending events.
+    # This tests hostapd code path that clears subscription and the remaining
+    # event queue when the interface is deinitialized.
+    server.handle_request()
+    server.server_close()
+
+    dev[1].wait_connected()
+
 def test_ap_wps_upnp_http_proto(dev, apdev):
     """WPS AP and UPnP/HTTP protocol testing"""
     ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
@@ -3257,6 +3419,12 @@ def _test_ap_wps_er_init_oom(dev, apdev):
         if "FAIL" not in dev[0].request("WPS_ER_START ifname=lo"):
             raise Exception("WPS_ER_START succeeded during os_get_random failure")
 
+def test_ap_wps_er_init_fail(dev, apdev):
+    """WPS ER init failure"""
+    if "FAIL" not in dev[0].request("WPS_ER_START ifname=does-not-exist"):
+        dev[0].request("WPS_ER_STOP")
+        raise Exception("WPS_ER_START with non-existing ifname succeeded")
+
 def test_ap_wps_wpa_cli_action(dev, apdev, test_params):
     """WPS events and wpa_cli action script"""
     logdir = os.path.abspath(test_params['logdir'])
@@ -5309,3 +5477,16 @@ def test_ap_wps_adv_oom(dev, apdev):
     with alloc_fail(hapd, 1, "ssdp_listener_start"):
         if "FAIL" not in hapd.request("ENABLE"):
             raise Exception("ENABLE succeeded during OOM")
+
+def test_wps_config_methods(dev):
+    """WPS config method update"""
+    wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+    wpas.interface_add("wlan5")
+    if "OK" not in wpas.request("SET config_methods display label"):
+        raise Exception("Failed to set config_methods")
+    if wpas.request("GET config_methods").strip() != "display label":
+        raise Exception("config_methods were not updated")
+    if "OK" not in wpas.request("SET config_methods "):
+        raise Exception("Failed to clear config_methods")
+    if wpas.request("GET config_methods").strip() != "":
+        raise Exception("config_methods were not cleared")