# This software may be distributed under the terms of the BSD license.
# See README for more details.
+from remotehost import remote_compatible
import time
import binascii
import logging
logger = logging.getLogger()
+import os
import re
import struct
import hostapd
from wpasupplicant import WpaSupplicant
-from utils import alloc_fail, skip_with_fips
+from tshark import run_tshark
+from utils import alloc_fail, wait_fail_trigger, skip_with_fips
+from hwsim import HWSimRadio
def hs20_ap_params():
params = hostapd.wpa2_params(ssid="test-gas")
def start_ap(ap):
params = hs20_ap_params()
params['hessid'] = ap['bssid']
- hostapd.add_ap(ap['ifname'], params)
- return hostapd.Hostapd(ap['ifname'])
+ return hostapd.add_ap(ap, params)
def get_gas_response(dev, bssid, info, allow_fetch_failure=False,
extra_test=False):
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0]['ifname'], params)
+ hostapd.add_ap(apdev[0], params)
cmds = [ "foo",
"00:11:22:33:44:55",
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0]['ifname'], params)
+ hostapd.add_ap(apdev[0], params)
# get BSS entry available to allow GAS query
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0]['ifname'], params)
+ hostapd.add_ap(apdev[0], params)
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
if "CTRL-EVENT-CONNECTED" not in ev:
raise Exception("Unexpected operation order")
-def test_gas_fragment(dev, apdev):
- """GAS fragmentation"""
- hapd = start_ap(apdev[0])
- hapd.set("gas_frag_limit", "50")
+def gas_fragment_and_comeback(dev, apdev, frag_limit=0, comeback_delay=0):
+ hapd = start_ap(apdev)
+ if frag_limit:
+ hapd.set("gas_frag_limit", str(frag_limit))
+ if comeback_delay:
+ hapd.set("gas_comeback_delay", str(comeback_delay))
- dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True)
- dev[0].request("FETCH_ANQP")
- ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=1)
+ dev.scan_for_bss(apdev['bssid'], freq="2412", force_scan=True)
+ dev.request("FETCH_ANQP")
+ ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=5)
if ev is None:
raise Exception("No GAS-QUERY-DONE event")
if "result=SUCCESS" not in ev:
raise Exception("Unexpected GAS result: " + ev)
for i in range(0, 13):
- ev = dev[0].wait_event(["RX-ANQP", "RX-HS20-ANQP"], timeout=5)
+ ev = dev.wait_event(["RX-ANQP", "RX-HS20-ANQP"], timeout=5)
if ev is None:
raise Exception("Operation timed out")
- ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=1)
+ ev = dev.wait_event(["ANQP-QUERY-DONE"], timeout=1)
if ev is None:
raise Exception("No ANQP-QUERY-DONE event")
if "result=SUCCESS" not in ev:
raise Exception("Unexpected ANQP result: " + ev)
+def test_gas_fragment(dev, apdev):
+ """GAS fragmentation"""
+ gas_fragment_and_comeback(dev[0], apdev[0], frag_limit=50)
+
+def test_gas_fragment_mcc(dev, apdev):
+ """GAS fragmentation with mac80211_hwsim MCC enabled"""
+ with HWSimRadio(n_channels=2) as (radio, iface):
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add(iface)
+ gas_fragment_and_comeback(wpas, apdev[0], frag_limit=50)
+
+def test_gas_fragment_with_comeback_delay(dev, apdev):
+ """GAS fragmentation and comeback delay"""
+ gas_fragment_and_comeback(dev[0], apdev[0], frag_limit=50,
+ comeback_delay=500)
+
+def test_gas_fragment_with_comeback_delay_mcc(dev, apdev):
+ """GAS fragmentation and comeback delay with mac80211_hwsim MCC enabled"""
+ with HWSimRadio(n_channels=2) as (radio, iface):
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add(iface)
+ gas_fragment_and_comeback(wpas, apdev[0], frag_limit=50,
+ comeback_delay=500)
+
def test_gas_comeback_delay(dev, apdev):
"""GAS comeback delay"""
hapd = start_ap(apdev[0])
if ev is None:
raise Exception("Operation timed out")
+@remote_compatible
def test_gas_stop_fetch_anqp(dev, apdev):
"""Stop FETCH_ANQP operation"""
hapd = start_ap(apdev[0])
if ev is None or "WAN Metrics" not in ev:
raise Exception("Did not receive WAN Metrics")
+ logger.info("Attempt an MBO request with an AP that does not support MBO")
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 272,mbo:1"):
+ raise Exception("ANQP_GET command failed (2)")
+
+ ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
+ if ev is None:
+ raise Exception("GAS query start timed out (2)")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out (2)")
+
cmds = [ "",
"foo",
"00:11:22:33:44:55 258,hs20:-1",
"00:11:22:33:44:55 hs20:-1",
"00:11:22:33:44:55 hs20:0",
"00:11:22:33:44:55 hs20:32",
+ "00:11:22:33:44:55 mbo:-1",
+ "00:11:22:33:44:55 mbo:0",
+ "00:11:22:33:44:55 mbo:999",
"00:11:22:33:44:55",
"00:11:22:33:44:55 ",
- "00:11:22:33:44:55 0" ]
+ "00:11:22:33:44:55 0",
+ "00:11:22:33:44:55 1" ]
for cmd in cmds:
if "FAIL" not in dev[0].request("ANQP_GET " + cmd):
raise Exception("Invalid ANQP_GET accepted")
"00:11:22:33:44:55 32",
"00:11:22:33:44:55",
"00:11:22:33:44:55 ",
- "00:11:22:33:44:55 0" ]
+ "00:11:22:33:44:55 0",
+ "00:11:22:33:44:55 1" ]
for cmd in cmds:
if "FAIL" not in dev[0].request("HS20_ANQP_GET " + cmd):
raise Exception("Invalid HS20_ANQP_GET accepted")
+def test_gas_anqp_get_oom(dev, apdev):
+ """GAS/ANQP query OOM"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ with alloc_fail(dev[0], 1, "wpabuf_alloc;anqp_send_req"):
+ if "FAIL" not in dev[0].request("ANQP_GET " + bssid + " 258,268,hs20:3,hs20:4"):
+ raise Exception("ANQP_GET command accepted during OOM")
+ with alloc_fail(dev[0], 1, "hs20_build_anqp_req;hs20_anqp_send_req"):
+ if "FAIL" not in dev[0].request("HS20_ANQP_GET " + bssid + " 1"):
+ raise Exception("HS20_ANQP_GET command accepted during OOM")
+ with alloc_fail(dev[0], 1, "gas_query_req;hs20_anqp_send_req"):
+ if "FAIL" not in dev[0].request("HS20_ANQP_GET " + bssid + " 1"):
+ raise Exception("HS20_ANQP_GET command accepted during OOM")
+ with alloc_fail(dev[0], 1, "=hs20_anqp_send_req"):
+ if "FAIL" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
+ raise Exception("REQ_HS20_ICON command accepted during OOM")
+ with alloc_fail(dev[0], 2, "=hs20_anqp_send_req"):
+ if "FAIL" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
+ raise Exception("REQ_HS20_ICON command accepted during OOM")
+
+def test_gas_anqp_icon_binary_proto(dev, apdev):
+ """GAS/ANQP and icon binary protocol testing"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ hapd.set("ext_mgmt_frame_handling", "1")
+
+ tests = [ '010000', '01000000', '00000000', '00030000', '00020000',
+ '00000100', '0001ff0100ee', '0001ff0200ee' ]
+ for test in tests:
+ dev[0].request("HS20_ICON_REQUEST " + bssid + " w1fi_logo")
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ resp = action_response(query)
+ data = binascii.unhexlify(test)
+ data = binascii.unhexlify('506f9a110b00') + data
+ data = struct.pack('<HHH', len(data) + 4, 0xdddd, len(data)) + data
+ resp['payload'] = anqp_initial_resp(gas['dialog_token'], 0) + data
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "SUCCESS")
+
+def test_gas_anqp_hs20_proto(dev, apdev):
+ """GAS/ANQP and Hotspot 2.0 element protocol testing"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ hapd.set("ext_mgmt_frame_handling", "1")
+
+ tests = [ '00', '0100', '0201', '0300', '0400', '0500', '0600', '0700',
+ '0800', '0900', '0a00', '0b0000000000' ]
+ for test in tests:
+ dev[0].request("HS20_ANQP_GET " + bssid + " 3,4")
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ resp = action_response(query)
+ data = binascii.unhexlify(test)
+ data = binascii.unhexlify('506f9a11') + data
+ data = struct.pack('<HHH', len(data) + 4, 0xdddd, len(data)) + data
+ resp['payload'] = anqp_initial_resp(gas['dialog_token'], 0) + data
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "SUCCESS")
+
def expect_gas_result(dev, result, status=None):
ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("Unexpected dialog token change")
return query, dialog_token
+def allow_gas_initial_req(hapd, dialog_token):
+ msg = hapd.mgmt_rx(timeout=1)
+ if msg is not None:
+ gas = parse_gas(msg['payload'])
+ if gas['action'] != GAS_INITIAL_REQUEST or dialog_token == gas['dialog_token']:
+ raise Exception("Unexpected management frame")
+
def test_gas_malformed_comeback_resp(dev, apdev):
"""GAS malformed comeback response frames"""
hapd = start_ap(apdev[0])
resp = action_response(query)
resp['payload'] = anqp_initial_resp(dialog_token, 0) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
- ev = hapd.wait_event(["MGMT-RX"], timeout=1)
- if ev is not None:
- raise Exception("Unexpected management frame")
+ allow_gas_initial_req(hapd, dialog_token)
expect_gas_result(dev[0], "TIMEOUT")
logger.debug("Too short comeback response")
resp['payload'] = struct.pack('<BBBH', ACTION_CATEG_PUBLIC,
GAS_COMEBACK_RESPONSE, dialog_token, 0)
send_gas_resp(hapd, resp)
- ev = hapd.wait_event(["MGMT-RX"], timeout=1)
- if ev is not None:
- raise Exception("Unexpected management frame")
+ allow_gas_initial_req(hapd, dialog_token)
expect_gas_result(dev[0], "TIMEOUT")
logger.debug("Too short comeback response(2)")
GAS_COMEBACK_RESPONSE, dialog_token, 0, 0x80,
0)
send_gas_resp(hapd, resp)
- ev = hapd.wait_event(["MGMT-RX"], timeout=1)
- if ev is not None:
- raise Exception("Unexpected management frame")
+ allow_gas_initial_req(hapd, dialog_token)
expect_gas_result(dev[0], "TIMEOUT")
logger.debug("Maximum comeback response fragment claiming more fragments")
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0]['ifname'], params)
+ hostapd.add_ap(apdev[0], params)
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
req = dev[0].request("GAS_REQUEST " + bssid + " 42 000102000101")
if status != "59":
raise Exception("Unexpected GAS-RESPONSE-INFO status")
+def test_gas_request_oom(dev, apdev):
+ """GAS_REQUEST OOM"""
+ bssid = apdev[0]['bssid']
+ params = hs20_ap_params()
+ params['hessid'] = bssid
+ hostapd.add_ap(apdev[0], params)
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+
+ with alloc_fail(dev[0], 1, "gas_build_req;gas_send_request"):
+ if "FAIL" not in dev[0].request("GAS_REQUEST " + bssid + " 42"):
+ raise Exception("GAS query request rejected")
+
+ with alloc_fail(dev[0], 1, "gas_query_req;gas_send_request"):
+ if "FAIL" not in dev[0].request("GAS_REQUEST " + bssid + " 42"):
+ raise Exception("GAS query request rejected")
+
+ with alloc_fail(dev[0], 1, "wpabuf_dup;gas_resp_cb"):
+ if "OK" not in dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101"):
+ raise Exception("GAS query request rejected")
+ ev = dev[0].wait_event(["GAS-RESPONSE-INFO"], timeout=10)
+ if ev is None:
+ raise Exception("No GAS response")
+ if "status_code=0" not in ev:
+ raise Exception("GAS response indicated a failure")
+
def test_gas_max_pending(dev, apdev):
"""GAS and maximum pending query limit"""
hapd = start_ap(apdev[0])
if status_code != 60:
raise Exception("Unexpected status code {} (expected 60)".format(status_code))
+def test_gas_delete_at_deinit(dev, apdev):
+ """GAS query deleted at deinit"""
+ hapd = start_ap(apdev[0])
+ hapd.set("gas_comeback_delay", "1000")
+ bssid = apdev[0]['bssid']
+
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("wlan5")
+ wpas.scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True)
+ wpas.request("ANQP_GET " + bssid + " 258")
+
+ wpas.global_request("INTERFACE_REMOVE " + wpas.ifname)
+ ev = wpas.wait_event(["GAS-QUERY-DONE"], timeout=2)
+ del wpas
+ if ev is None:
+ raise Exception("GAS-QUERY-DONE not seen")
+ if "result=DELETED_AT_DEINIT" not in ev:
+ raise Exception("Unexpected result code: " + ev)
+
def test_gas_missing_payload(dev, apdev):
"""No action code in the query frame"""
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0]['ifname'], params)
+ hostapd.add_ap(apdev[0], params)
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
# GAS query has not yet been started.
wpas.interface_remove("wlan5")
+@remote_compatible
def test_gas_anqp_oom_wpas(dev, apdev):
"""GAS/ANQP query and OOM in wpa_supplicant"""
hapd = start_ap(apdev[0])
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ with alloc_fail(dev[0], 1, "wpa_bss_anqp_alloc"):
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET command failed")
+ ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=5)
+ if ev is None:
+ raise Exception("ANQP query did not complete")
+
with alloc_fail(dev[0], 1, "gas_build_req"):
if "FAIL" not in dev[0].request("ANQP_GET " + bssid + " 258"):
raise Exception("Unexpected ANQP_GET command success (OOM)")
with alloc_fail(hapd, 1, "gas_anqp_build_comeback_resp"):
hapd.set("gas_frag_limit", "50")
- # This query will time out due to the AP not sending a response (OOM).
- print dev[0].request("FETCH_ANQP")
+ # The first attempt of this query will time out due to the AP not
+ # sending a response (OOM), but the retry succeeds.
+ dev[0].request("FETCH_ANQP")
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
if ev is None:
raise Exception("GAS query start timed out")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("GAS query timed out")
- if "result=TIMEOUT" not in ev:
+ if "result=SUCCESS" not in ev:
raise Exception("Unexpected result: " + ev)
ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("ANQP-QUERY-DONE event not seen")
- if "result=FAILURE" not in ev:
+ if "result=SUCCESS" not in ev:
raise Exception("Unexpected result: " + ev)
+
+def test_gas_anqp_extra_elements(dev, apdev):
+ """GAS/ANQP and extra ANQP elements"""
+ geo_loc = "001052834d12efd2b08b9b4bf1cc2c00004104050000000000060100"
+ civic_loc = "0000f9555302f50102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f404142434445464748494a4b4c4d4e4f505152535455565758595a5b5c5d5e5f606162636465666768696a6b6c6d6e6f707172737475767778797a7b7c7d7e7f808182838485868788898a8b8c8d8e8f909192939495969798999a9b9c9d9e9fa0a1a2a3a4a5a6a7a8a9aaabacadaeafb0b1b2b3b4b5b6b7b8b9babbbcbdbebfc0c1c2c3c4c5c6c7c8c9cacbcccdcecfd0d1d2d3d4d5d6d7d8d9dadbdcdddedfe0e1e2e3e4e5e6e7e8e9eaebecedeeeff0f1f2f3f4f5"
+ held_uri = "https://held.example.com/location"
+ held = struct.pack('BBB', 0, 1 + len(held_uri), 1) + held_uri
+ supl_fqdn = "supl.example.com"
+ supl = struct.pack('BBB', 0, 1 + len(supl_fqdn), 1) + supl_fqdn
+ public_id = binascii.hexlify(held + supl)
+ params = { "ssid": "gas/anqp",
+ "interworking": "1",
+ "anqp_elem": [ "265:" + geo_loc,
+ "266:" + civic_loc,
+ "262:1122334455",
+ "267:" + public_id,
+ "275:01020304",
+ "60000:01",
+ "299:0102" ] }
+ hapd = hostapd.add_ap(apdev[0], params)
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 265,266"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ bss = dev[0].get_bss(bssid)
+
+ if 'anqp[265]' not in bss:
+ raise Exception("AP Geospatial Location ANQP-element not seen")
+ if bss['anqp[265]'] != geo_loc:
+ raise Exception("Unexpected AP Geospatial Location ANQP-element value: " + bss['anqp[265]'])
+
+ if 'anqp[266]' not in bss:
+ raise Exception("AP Civic Location ANQP-element not seen")
+ if bss['anqp[266]'] != civic_loc:
+ raise Exception("Unexpected AP Civic Location ANQP-element value: " + bss['anqp[266]'])
+
+ dev[1].scan_for_bss(bssid, freq="2412", force_scan=True)
+ if "OK" not in dev[1].request("ANQP_GET " + bssid + " 257,258,259,260,261,262,263,264,265,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[1].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ bss = dev[1].get_bss(bssid)
+
+ if 'anqp[265]' not in bss:
+ raise Exception("AP Geospatial Location ANQP-element not seen")
+ if bss['anqp[265]'] != geo_loc:
+ raise Exception("Unexpected AP Geospatial Location ANQP-element value: " + bss['anqp[265]'])
+
+ if 'anqp[266]' in bss:
+ raise Exception("AP Civic Location ANQP-element unexpectedly seen")
+
+ if 'anqp[267]' not in bss:
+ raise Exception("AP Location Public Identifier ANQP-element not seen")
+ if bss['anqp[267]'] != public_id:
+ raise Exception("Unexpected AP Location Public Identifier ANQP-element value: " + bss['anqp[267]'])
+
+ if 'anqp[275]' not in bss:
+ raise Exception("ANQP-element Info ID 275 not seen")
+ if bss['anqp[275]'] != "01020304":
+ raise Exception("Unexpected AP ANQP-element Info ID 299 value: " + bss['anqp[299]'])
+
+ if 'anqp[299]' not in bss:
+ raise Exception("ANQP-element Info ID 299 not seen")
+ if bss['anqp[299]'] != "0102":
+ raise Exception("Unexpected AP ANQP-element Info ID 299 value: " + bss['anqp[299]'])
+
+ if 'anqp_ip_addr_type_availability' not in bss:
+ raise Exception("ANQP-element Info ID 292 not seen")
+ if bss['anqp_ip_addr_type_availability'] != "1122334455":
+ raise Exception("Unexpected AP ANQP-element Info ID 262 value: " + bss['anqp_ip_addr_type_availability'])
+
+def test_gas_anqp_address3_not_assoc(dev, apdev, params):
+ """GAS/ANQP query using IEEE 802.11 compliant Address 3 value when not associated"""
+ try:
+ _test_gas_anqp_address3_not_assoc(dev, apdev, params)
+ finally:
+ dev[0].request("SET gas_address3 0")
+
+def _test_gas_anqp_address3_not_assoc(dev, apdev, params):
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ if "OK" not in dev[0].request("SET gas_address3 1"):
+ raise Exception("Failed to set gas_address3")
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
+ if ev is None:
+ raise Exception("GAS query start timed out")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
+ if ev is None or "Venue Name" not in ev:
+ raise Exception("Did not receive Venue Name")
+
+ ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("ANQP-QUERY-DONE event not seen")
+ if "result=SUCCESS" not in ev:
+ raise Exception("Unexpected result: " + ev)
+
+ out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
+ "wlan_mgt.fixed.category_code == 4 && (wlan_mgt.fixed.publicact == 0x0a || wlan_mgt.fixed.publicact == 0x0b)",
+ display=["wlan.bssid"])
+ res = out.splitlines()
+ if len(res) != 2:
+ raise Exception("Unexpected number of GAS frames")
+ if res[0] != 'ff:ff:ff:ff:ff:ff':
+ raise Exception("GAS request used unexpected Address3 field value: " + res[0])
+ if res[1] != 'ff:ff:ff:ff:ff:ff':
+ raise Exception("GAS response used unexpected Address3 field value: " + res[1])
+
+def test_gas_anqp_address3_assoc(dev, apdev, params):
+ """GAS/ANQP query using IEEE 802.11 compliant Address 3 value when associated"""
+ try:
+ _test_gas_anqp_address3_assoc(dev, apdev, params)
+ finally:
+ dev[0].request("SET gas_address3 0")
+
+def _test_gas_anqp_address3_assoc(dev, apdev, params):
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ if "OK" not in dev[0].request("SET gas_address3 1"):
+ raise Exception("Failed to set gas_address3")
+
+ dev[0].scan_for_bss(bssid, freq="2412")
+ dev[0].connect("test-gas", key_mgmt="WPA-EAP", eap="TTLS",
+ identity="DOMAIN\mschapv2 user", anonymous_identity="ttls",
+ password="password", phase2="auth=MSCHAPV2",
+ ca_cert="auth_serv/ca.pem", scan_freq="2412")
+
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
+ if ev is None:
+ raise Exception("GAS query start timed out")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
+ if ev is None or "Venue Name" not in ev:
+ raise Exception("Did not receive Venue Name")
+
+ ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("ANQP-QUERY-DONE event not seen")
+ if "result=SUCCESS" not in ev:
+ raise Exception("Unexpected result: " + ev)
+
+ out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
+ "wlan_mgt.fixed.category_code == 4 && (wlan_mgt.fixed.publicact == 0x0a || wlan_mgt.fixed.publicact == 0x0b)",
+ display=["wlan.bssid"])
+ res = out.splitlines()
+ if len(res) != 2:
+ raise Exception("Unexpected number of GAS frames")
+ if res[0] != bssid:
+ raise Exception("GAS request used unexpected Address3 field value: " + res[0])
+ if res[1] != bssid:
+ raise Exception("GAS response used unexpected Address3 field value: " + res[1])
+
+def test_gas_anqp_address3_ap_forced(dev, apdev, params):
+ """GAS/ANQP query using IEEE 802.11 compliant Address 3 value on AP"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+ hapd.set("gas_address3", "1")
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
+ if ev is None:
+ raise Exception("GAS query start timed out")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
+ if ev is None or "Venue Name" not in ev:
+ raise Exception("Did not receive Venue Name")
+
+ ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("ANQP-QUERY-DONE event not seen")
+ if "result=SUCCESS" not in ev:
+ raise Exception("Unexpected result: " + ev)
+
+ out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
+ "wlan_mgt.fixed.category_code == 4 && (wlan_mgt.fixed.publicact == 0x0a || wlan_mgt.fixed.publicact == 0x0b)",
+ display=["wlan.bssid"])
+ res = out.splitlines()
+ if len(res) != 2:
+ raise Exception("Unexpected number of GAS frames")
+ if res[0] != bssid:
+ raise Exception("GAS request used unexpected Address3 field value: " + res[0])
+ if res[1] != 'ff:ff:ff:ff:ff:ff':
+ raise Exception("GAS response used unexpected Address3 field value: " + res[1])
+
+def test_gas_anqp_address3_ap_non_compliant(dev, apdev, params):
+ """GAS/ANQP query using IEEE 802.11 non-compliant Address 3 (AP)"""
+ try:
+ _test_gas_anqp_address3_ap_non_compliant(dev, apdev, params)
+ finally:
+ dev[0].request("SET gas_address3 0")
+
+def _test_gas_anqp_address3_ap_non_compliant(dev, apdev, params):
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+ hapd.set("gas_address3", "2")
+
+ if "OK" not in dev[0].request("SET gas_address3 1"):
+ raise Exception("Failed to set gas_address3")
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
+ if ev is None:
+ raise Exception("GAS query start timed out")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
+ if ev is None or "Venue Name" not in ev:
+ raise Exception("Did not receive Venue Name")
+
+ ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("ANQP-QUERY-DONE event not seen")
+ if "result=SUCCESS" not in ev:
+ raise Exception("Unexpected result: " + ev)
+
+ out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
+ "wlan_mgt.fixed.category_code == 4 && (wlan_mgt.fixed.publicact == 0x0a || wlan_mgt.fixed.publicact == 0x0b)",
+ display=["wlan.bssid"])
+ res = out.splitlines()
+ if len(res) != 2:
+ raise Exception("Unexpected number of GAS frames")
+ if res[0] != 'ff:ff:ff:ff:ff:ff':
+ raise Exception("GAS request used unexpected Address3 field value: " + res[0])
+ if res[1] != bssid:
+ raise Exception("GAS response used unexpected Address3 field value: " + res[1])
+
+def test_gas_prot_vs_not_prot(dev, apdev, params):
+ """GAS/ANQP query protected vs. not protected"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan_for_bss(bssid, freq="2412")
+ dev[0].connect("test-gas", key_mgmt="WPA-EAP", eap="TTLS",
+ identity="DOMAIN\mschapv2 user", anonymous_identity="ttls",
+ password="password", phase2="auth=MSCHAPV2",
+ ca_cert="auth_serv/ca.pem", scan_freq="2412",
+ ieee80211w="2")
+
+ if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET command failed")
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
+ if ev is None:
+ raise Exception("No GAS-QUERY-DONE event")
+ if "result=SUCCESS" not in ev:
+ raise Exception("Unexpected GAS result: " + ev)
+
+ # GAS: Drop unexpected unprotected GAS frame when PMF is enabled
+ dev[0].request("SET ext_mgmt_frame_handling 1")
+ res = dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=d0003a010200000000000200000003000200000003001000040b00000005006c027f000000")
+ dev[0].request("SET ext_mgmt_frame_handling 0")
+ if "OK" not in res:
+ raise Exception("MGMT_RX_PROCESS failed")
+
+ dev[0].request("DISCONNECT")
+ dev[0].wait_disconnected()
+
+ # GAS: No pending query found for 02:00:00:00:03:00 dialog token 0
+ dev[0].request("SET ext_mgmt_frame_handling 1")
+ res = dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=d0003a010200000000000200000003000200000003001000040b00000005006c027f000000")
+ dev[0].request("SET ext_mgmt_frame_handling 0")
+ if "OK" not in res:
+ raise Exception("MGMT_RX_PROCESS failed")
+
+ # GAS: Drop unexpected protected GAS frame when PMF is disabled
+ dev[0].request("SET ext_mgmt_frame_handling 1")
+ res = dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=d0003a010200000000000200000003000200000003001000090b00000005006c027f000000")
+ dev[0].request("SET ext_mgmt_frame_handling 0")
+ if "OK" not in res:
+ raise Exception("MGMT_RX_PROCESS failed")
+
+def test_gas_failures(dev, apdev):
+ """GAS failure cases"""
+ hapd = start_ap(apdev[0])
+ hapd.set("gas_comeback_delay", "5")
+ bssid = apdev[0]['bssid']
+
+ hapd2 = start_ap(apdev[1])
+ bssid2 = apdev[1]['bssid']
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ dev[0].scan_for_bss(bssid2, freq="2412")
+
+ tests = [ (bssid, "gas_build_req;gas_query_tx_comeback_req"),
+ (bssid, "gas_query_tx;gas_query_tx_comeback_req"),
+ (bssid, "gas_query_append;gas_query_rx_comeback"),
+ (bssid2, "gas_query_append;gas_query_rx_initial"),
+ (bssid2, "wpabuf_alloc_copy;gas_query_rx_initial"),
+ (bssid, "gas_query_tx;gas_query_tx_initial_req") ]
+ for addr,func in tests:
+ with alloc_fail(dev[0], 1, func):
+ dev[0].request("ANQP_GET " + addr + " 258")
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
+ if ev is None:
+ raise Exception("No GAS-QUERY-DONE seen")
+ if "result=INTERNAL_ERROR" not in ev:
+ raise Exception("Unexpected result code: " + ev)
+ dev[0].dump_monitor()
+
+ tests = [ "=gas_query_req", "radio_add_work;gas_query_req" ]
+ for func in tests:
+ with alloc_fail(dev[0], 1, func):
+ if "FAIL" not in dev[0].request("ANQP_GET " + bssid + " 258"):
+ raise Exception("ANQP_GET succeeded unexpectedly during OOM")
+ dev[0].dump_monitor()
+
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("wlan5")
+ wpas.scan_for_bss(bssid2, freq="2412")
+ wpas.request("SET preassoc_mac_addr 1111")
+ wpas.request("ANQP_GET " + bssid2 + " 258")
+ ev = wpas.wait_event(["Failed to assign random MAC address for GAS"],
+ timeout=5)
+ wpas.request("SET preassoc_mac_addr 0")
+ if ev is None:
+ raise Exception("No random MAC address error seen")