# See README for more details.
import base64
+import binascii
import os
import time
import stat
dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
pairwise="CCMP", group="CCMP")
+def test_ap_wps_init_through_wps_config_2(dev, apdev):
+ """AP configuration using wps_config and wps_cred_processing=2"""
+ ssid = "test-wps-init-config"
+ hostapd.add_ap(apdev[0]['ifname'],
+ { "ssid": ssid, "eap_server": "1", "wps_state": "1",
+ "wps_cred_processing": "2" })
+ hapd = hostapd.Hostapd(apdev[0]['ifname'])
+ if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
+ raise Exception("WPS_CONFIG command failed")
+ ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
+ if ev is None:
+ raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
+ if "100e" not in ev:
+ raise Exception("WPS-NEW-AP-SETTINGS did not include Credential")
+
def test_ap_wps_invalid_wps_config_passphrase(dev, apdev):
"""AP configuration using wps_config command with invalid passphrase"""
ssid = "test-wps-init-config"
if ev is None:
raise Exception("WPS ER did not report success")
+ ev = dev[0].wait_event(["WPS-ER-ENROLLEE-REMOVE"], timeout=15)
+ if ev is None:
+ raise Exception("No Enrollee STA entry timeout seen")
+
logger.info("Stop ER")
dev[0].dump_monitor()
dev[0].request("WPS_ER_STOP")
pass
def test_ap_wps_pbc_timeout(dev, apdev, params):
- """wpa_supplicant PBC walk time [long]"""
+ """wpa_supplicant PBC walk time and WPS ER SelReg timeout [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
- ssid = "test-wps"
- hostapd.add_ap(apdev[0]['ifname'],
- { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
- hapd = hostapd.Hostapd(apdev[0]['ifname'])
+ ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+ hapd = add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
+
+ location = ssdp_get_location(ap_uuid)
+ urls = upnp_get_urls(location)
+ eventurl = urlparse.urlparse(urls['event_sub_url'])
+ ctrlurl = urlparse.urlparse(urls['control_url'])
+
+ url = urlparse.urlparse(location)
+ conn = httplib.HTTPConnection(url.netloc)
+
+ class WPSERHTTPServer(SocketServer.StreamRequestHandler):
+ def handle(self):
+ data = self.rfile.readline().strip()
+ logger.debug(data)
+ self.wfile.write(gen_wps_event())
+
+ server = MyTCPServer(("127.0.0.1", 12345), WPSERHTTPServer)
+ server.timeout = 1
+
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "NT": "upnp:event",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %d" % resp.status)
+ sid = resp.getheader("sid")
+ logger.debug("Subscription SID " + sid)
+
+ msg = '''<?xml version="1.0"?>
+<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+<s:Body>
+<u:SetSelectedRegistrar xmlns:u="urn:schemas-wifialliance-org:service:WFAWLANConfig:1">
+<NewMessage>EEoAARAQQQABARASAAIAABBTAAIxSBBJAA4ANyoAASABBv///////xBIABA2LbR7pTpRkYj7
+VFi5hrLk
+</NewMessage>
+</u:SetSelectedRegistrar>
+</s:Body>
+</s:Envelope>'''
+ headers = { "Content-type": 'text/xml; charset="utf-8"' }
+ headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % "SetSelectedRegistrar"
+ conn.request("POST", ctrlurl.path, msg, headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %d" % resp.status)
+
+ server.handle_request()
+
logger.info("Start WPS_PBC and wait for PBC walk time expiration")
if "OK" not in dev[0].request("WPS_PBC"):
raise Exception("WPS_PBC failed")
- ev = dev[0].wait_event(["WPS-TIMEOUT"], timeout=150)
+
+ start = os.times()[4]
+
+ server.handle_request()
+ dev[1].request("BSS_FLUSH 0")
+ dev[1].scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True,
+ only_new=True)
+ bss = dev[1].get_bss(apdev[0]['bssid'])
+ logger.debug("BSS: " + str(bss))
+ if '[WPS-AUTH]' not in bss['flags']:
+ raise Exception("WPS not indicated authorized")
+
+ server.handle_request()
+
+ wps_timeout_seen = False
+
+ while True:
+ hapd.dump_monitor()
+ dev[1].dump_monitor()
+ if not wps_timeout_seen:
+ ev = dev[0].wait_event(["WPS-TIMEOUT"], timeout=0)
+ if ev is not None:
+ logger.info("PBC timeout seen")
+ wps_timeout_seen = True
+ else:
+ dev[0].dump_monitor()
+ now = os.times()[4]
+ if now - start > 130:
+ raise Exception("Selected registration information not removed")
+ dev[1].request("BSS_FLUSH 0")
+ dev[1].scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True,
+ only_new=True)
+ bss = dev[1].get_bss(apdev[0]['bssid'])
+ logger.debug("BSS: " + str(bss))
+ if '[WPS-AUTH]' not in bss['flags']:
+ break
+ server.handle_request()
+
+ server.server_close()
+
+ if wps_timeout_seen:
+ return
+
+ now = os.times()[4]
+ if now < start + 150:
+ dur = start + 150 - now
+ else:
+ dur = 1
+ logger.info("Continue waiting for PBC timeout (%d sec)" % dur)
+ ev = dev[0].wait_event(["WPS-TIMEOUT"], timeout=dur)
if ev is None:
raise Exception("WPS-TIMEOUT not reported")
return None
return sock.recv(1000)
-def ssdp_send_msearch(st):
+def ssdp_send_msearch(st, no_recv=False):
msg = '\r\n'.join([
'M-SEARCH * HTTP/1.1',
'HOST: 239.255.255.250:1900',
'MAN: "ssdp:discover"',
'ST: ' + st,
'', ''])
- return ssdp_send(msg)
+ return ssdp_send(msg, no_recv=no_recv)
def test_ap_wps_ssdp_msearch(dev, apdev):
"""WPS AP and SSDP M-SEARCH messages"""
"wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
"config_methods": "display push_button" }
hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
+
+def test_ap_wps_set_selected_registrar_proto(dev, apdev):
+ """WPS UPnP SetSelectedRegistrar protocol testing"""
+ ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+ hapd = add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
+
+ location = ssdp_get_location(ap_uuid)
+ urls = upnp_get_urls(location)
+ eventurl = urlparse.urlparse(urls['event_sub_url'])
+ ctrlurl = urlparse.urlparse(urls['control_url'])
+ url = urlparse.urlparse(location)
+ conn = httplib.HTTPConnection(url.netloc)
+
+ class WPSERHTTPServer(SocketServer.StreamRequestHandler):
+ def handle(self):
+ data = self.rfile.readline().strip()
+ logger.debug(data)
+ self.wfile.write(gen_wps_event())
+
+ server = MyTCPServer(("127.0.0.1", 12345), WPSERHTTPServer)
+ server.timeout = 1
+
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "NT": "upnp:event",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %d" % resp.status)
+ sid = resp.getheader("sid")
+ logger.debug("Subscription SID " + sid)
+ server.handle_request()
+
+ tests = [ (500, "10"),
+ (200, "104a000110" + "1041000101" + "101200020000" +
+ "105300023148" +
+ "1049002c00372a0001200124111111111111222222222222333333333333444444444444555555555555666666666666" +
+ "10480010362db47ba53a519188fb5458b986b2e4"),
+ (200, "104a000110" + "1041000100" + "101200020000" +
+ "105300020000"),
+ (200, "104a000110" + "1041000100"),
+ (200, "104a000110") ]
+ for status,test in tests:
+ tlvs = binascii.unhexlify(test)
+ newmsg = base64.b64encode(tlvs)
+ msg = '<?xml version="1.0"?>\n'
+ msg += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
+ msg += '<s:Body>'
+ msg += '<u:SetSelectedRegistrar xmlns:u="urn:schemas-wifialliance-org:service:WFAWLANConfig:1">'
+ msg += '<NewMessage>'
+ msg += newmsg
+ msg += "</NewMessage></u:SetSelectedRegistrar></s:Body></s:Envelope>"
+ headers = { "Content-type": 'text/xml; charset="utf-8"' }
+ headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % "SetSelectedRegistrar"
+ conn.request("POST", ctrlurl.path, msg, headers)
+ resp = conn.getresponse()
+ if resp.status != status:
+ raise Exception("Unexpected HTTP response: %d (expected %d)" % (resp.status, status))
+
+def test_ap_wps_adv_oom(dev, apdev):
+ """WPS AP and advertisement OOM"""
+ ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+ hapd = add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
+
+ with alloc_fail(hapd, 1, "=msearchreply_state_machine_start"):
+ ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1",
+ no_recv=True)
+ time.sleep(0.2)
+
+ with alloc_fail(hapd, 1, "eloop_register_timeout;msearchreply_state_machine_start"):
+ ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1",
+ no_recv=True)
+ time.sleep(0.2)
+
+ with alloc_fail(hapd, 1,
+ "next_advertisement;advertisement_state_machine_stop"):
+ hapd.disable()
+
+ with alloc_fail(hapd, 1, "ssdp_listener_start"):
+ if "FAIL" not in hapd.request("ENABLE"):
+ raise Exception("ENABLE succeeded during OOM")