import subprocess
import time
import os
+import re
import hwsim_utils
from hwsim import HWSimRadio
import hostapd
+from wpasupplicant import WpaSupplicant
import fst_test_common
import fst_module_aux
-from utils import alloc_fail
+from utils import alloc_fail, HwsimSkip
#enum - bad parameter types
bad_param_none = 0
exception_text = "Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type]
raise Exception(exception_text)
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
def fst_initiate_session(apdev, test_params, bad_param_type, init_on_ap):
"""This function makes the necessary preparations and then adds, sets and
else:
raise Exception("Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type])
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
def fst_transfer_session(apdev, test_params, bad_param_type, init_on_ap,
rsn=False):
else:
raise Exception("Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type])
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
def fst_tear_down_session(apdev, test_params, bad_param_type, init_on_ap):
else:
raise Exception("Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type])
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
#enum - remove session scenarios
else:
raise Exception("Failure. Remove scenario ended in an unexpected way (%s)" % remove_scenario_names[remove_session_scenario])
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
#enum - frame types
else:
raise Exception("Failure. Frame was not ignored (%s)" % frame_type_names[frame_type])
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
#enum - bad session transfer scenarios
else:
raise Exception("Failure. Bad scenario was handled incorrectly (%s)" % bad_scenario_names[bad_scenario_type])
else:
- print "Failure. Unexpected exception"
+ logger.info("Failure. Unexpected exception")
def test_fst_sta_connect_to_non_fst_ap(dev, apdev, test_params):
"""FST STA connecting to non-FST AP"""
res_sta2_mbies = sta2.get_local_mbies()
if (orig_sta1_mbies.startswith("FAIL") or
orig_sta2_mbies.startswith("FAIL") or
- not res_sta1_mbies.startswith("FAIL") or
- not res_sta2_mbies.startswith("FAIL")):
- raise Exception("Failure. MB IEs have not been removed on the stations")
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs must be present on the stations")
except Exception, e:
logger.info(e)
raise
finally:
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_sta_connect_to_fst_ap(dev, apdev, test_params):
"""FST STA connecting to FST AP"""
res_sta2_mbies = sta2.get_local_mbies()
if (orig_sta1_mbies.startswith("FAIL") or
orig_sta2_mbies.startswith("FAIL") or
- not res_sta1_mbies.startswith("FAIL") or
- not res_sta2_mbies.startswith("FAIL")):
- raise Exception("Failure. MB IEs have not been removed on the stations")
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs must be present on the stations")
except Exception, e:
logger.info(e)
raise
sta1.disconnect()
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
-
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_second_sta_connect_to_fst_ap(dev, apdev, test_params):
"""FST STA 2nd connecting to FST AP"""
time.sleep(2)
res_sta1_mbies = sta1.get_local_mbies()
res_sta2_mbies = sta2.get_local_mbies()
- if (not orig_sta1_mbies.startswith("FAIL") or
- not orig_sta2_mbies.startswith("FAIL") or
- not res_sta1_mbies.startswith("FAIL") or
- not res_sta2_mbies.startswith("FAIL")):
- raise Exception("Failure. MB IEs should have stayed non-present on the stations")
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs must be present on the stations")
except Exception, e:
logger.info(e)
raise
sta1.disconnect()
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_disconnect_1_of_2_stas_from_non_fst_ap(dev, apdev, test_params):
"""FST disconnect 1 of 2 STAs from non-FST AP"""
time.sleep(2)
res_sta1_mbies = sta1.get_local_mbies()
res_sta2_mbies = sta2.get_local_mbies()
- if (not orig_sta1_mbies.startswith("FAIL") or
- not orig_sta2_mbies.startswith("FAIL") or
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
res_sta1_mbies.startswith("FAIL") or
res_sta2_mbies.startswith("FAIL")):
- raise Exception("Failure. MB IEs haven't reappeared on the stations")
+ raise Exception("Failure. MB IEs must be present on the stations")
except Exception, e:
logger.info(e)
raise
sta1.disconnect()
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
-
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_disconnect_1_of_2_stas_from_fst_ap(dev, apdev, test_params):
"""FST disconnect 1 of 2 STAs from FST AP"""
time.sleep(2)
res_sta1_mbies = sta1.get_local_mbies()
res_sta2_mbies = sta2.get_local_mbies()
- if (not orig_sta1_mbies.startswith("FAIL") or
- not orig_sta2_mbies.startswith("FAIL") or
- not res_sta1_mbies.startswith("FAIL") or
- not res_sta2_mbies.startswith("FAIL")):
- raise Exception("Failure. MB IEs should have stayed non-present on the stations")
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs must be present on the stations")
except Exception, e:
logger.info(e)
raise
sta1.disconnect()
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_disconnect_2_of_2_stas_from_non_fst_ap(dev, apdev, test_params):
"""FST disconnect 2 of 2 STAs from non-FST AP"""
time.sleep(2)
res_sta1_mbies = sta1.get_local_mbies()
res_sta2_mbies = sta2.get_local_mbies()
- if (not orig_sta1_mbies.startswith("FAIL") or
- not orig_sta2_mbies.startswith("FAIL") or
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
res_sta1_mbies.startswith("FAIL") or
res_sta2_mbies.startswith("FAIL")):
- raise Exception("Failure. MB IEs haven't reappeared on the stations")
+ raise Exception("Failure. MB IEs must be present on the stations")
except Exception, e:
logger.info(e)
raise
sta1.disconnect()
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_disconnect_2_of_2_stas_from_fst_ap(dev, apdev, test_params):
"""FST disconnect 2 of 2 STAs from FST AP"""
sta1.disconnect()
sta2.disconnect_from_external_ap()
fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+ hostapd.HostapdGlobal().remove(iface)
def test_fst_disconnect_non_fst_sta(dev, apdev, test_params):
"""FST disconnect non-FST STA"""
def test_fst_disconnect_fst_sta(dev, apdev, test_params):
"""FST disconnect FST STA"""
ap1, ap2, fst_sta1, fst_sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
- external_sta_connected = False;
+ external_sta_connected = False
try:
vals = fst_sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
fst_sta1.connect(ap1, key_mgmt="NONE",
ap2.start()
sta1 = fst_module_aux.FstSTA('wlan5',
- fst_module_aux.fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_low,
fst_test_common.fst_test_def_llt)
sta1.start()
orig_sta2_mbies = sta2.get_local_mbies()
orig_ap2_mbies = ap2.get_local_mbies()
sta2.send_iface_attach_request(sta2.ifname(),
- fst_module_aux.fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_group,
'52', '27')
event = sta2.wait_for_iface_event(5)
if event['event_type'] != 'attached':
raise Exception("Failure. Iface was not properly attached")
ap2.send_iface_attach_request(ap2.ifname(),
- fst_module_aux.fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_group,
'102', '77')
event = ap2.wait_for_iface_event(5)
if event['event_type'] != 'attached':
def test_fst_ap_ctrl_iface(dev, apdev, test_params):
"""FST control interface behavior"""
+ hglobal = hostapd.HostapdGlobal()
+ start_num_groups = 0
+ res = hglobal.request("FST-MANAGER LIST_GROUPS")
+ del hglobal
+ if "FAIL" not in res:
+ start_num_groups = len(res.splitlines())
+
ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
try:
fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
raise Exception("Unexpected number of interfaces")
res = initiator.list_groups()
logger.info("Groups: " + str(res))
- if len(res) != 1:
+ if len(res) != 1 + start_num_groups:
raise Exception("Unexpected number of groups")
tests = [ "LIST_IFACES unknown",
"SESSION_RESPOND " + sid,
"SESSION_RESPOND " + sid + " foo",
"TEST_REQUEST foo",
+ "TEST_REQUEST SEND_SETUP_REQUEST",
+ "TEST_REQUEST SEND_SETUP_REQUEST foo",
+ "TEST_REQUEST SEND_SETUP_RESPONSE",
+ "TEST_REQUEST SEND_SETUP_RESPONSE foo",
+ "TEST_REQUEST SEND_ACK_REQUEST",
+ "TEST_REQUEST SEND_ACK_REQUEST foo",
+ "TEST_REQUEST SEND_ACK_RESPONSE",
+ "TEST_REQUEST SEND_ACK_RESPONSE foo",
+ "TEST_REQUEST SEND_TEAR_DOWN",
+ "TEST_REQUEST SEND_TEAR_DOWN foo",
+ "TEST_REQUEST GET_FSTS_ID",
+ "TEST_REQUEST GET_FSTS_ID foo",
+ "TEST_REQUEST GET_LOCAL_MBIES",
+ "TEST_REQUEST GET_LOCAL_MBIES foo",
"GET_PEER_MBIES",
"GET_PEER_MBIES ",
"GET_PEER_MBIES unknown",
"GET_PEER_MBIES unknown unknown",
"GET_PEER_MBIES unknown " + initiator.get_new_peer_addr(),
"GET_PEER_MBIES " + initiator.ifname() + " 01:ff:ff:ff:ff:ff",
+ "GET_PEER_MBIES " + initiator.ifname() + " 00:ff:ff:ff:ff:ff",
+ "GET_PEER_MBIES " + initiator.ifname() + " 00:00:00:00:00:00",
"IFACE_PEERS",
"IFACE_PEERS ",
"IFACE_PEERS unknown",
except Exception, e:
if not str(e).startswith("Cannot attach"):
raise
+
+ try:
+ ap1.get_fsts_id_by_sid("123")
+ except Exception, e:
+ if not str(e).startswith("Cannot get fsts_id for sid"):
+ raise
finally:
fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
fst_test_common.fst_test_def_prio_low,
fst_test_common.fst_test_def_llt)
ap1.start()
+ try:
+ run_fst_ap_start_session_oom(apdev, ap1)
+ finally:
+ ap1.stop()
+
+def run_fst_ap_start_session_oom(apdev, ap1):
with alloc_fail(ap1, 1, "fst_iface_create"):
ap2_started = False
try:
except:
pass
finally:
- ap1.stop()
try:
ap2.stop()
except:
FST_ACTION_ACK_RESPONSE = 4
FST_ACTION_ON_CHANNEL_TUNNEL = 5
+def hostapd_tx_and_status(hapd, msg):
+ hapd.set("ext_mgmt_frame_handling", "1")
+ hapd.mgmt_tx(msg)
+ ev = hapd.wait_event([ "MGMT-TX-STATUS" ], timeout=1)
+ if ev is None or "ok=1" not in ev:
+ raise Exception("No ACK")
+ hapd.set("ext_mgmt_frame_handling", "0")
+
def test_fst_proto(dev, apdev, test_params):
"""FST protocol testing"""
ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
# unknown FST Action (255) received!
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST, 255)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# FST Request dropped: too short
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_SETUP_REQUEST)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Request dropped: invalid STIE (EID)
+ msg['payload'] = struct.pack("<BBBLBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_REQUEST, 0, 0,
+ 163, 11, 0, 0, 0, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Request dropped: invalid STIE (Len)
+ msg['payload'] = struct.pack("<BBBLBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_REQUEST, 0, 0,
+ 164, 10, 0, 0, 0, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
# FST Request dropped: new and old band IDs are the same
msg['payload'] = struct.pack("<BBBLBBLBBBBBBB", ACTION_CATEG_FST,
FST_ACTION_SETUP_REQUEST, 0, 0,
164, 11, 0, 0, 0, 0, 0, 0, 0, 0)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
ifaces = sta1.list_ifaces()
id = int(ifaces[0]['name'].split('|')[1])
msg['payload'] = struct.pack("<BBBLBBLBBBBBBB", ACTION_CATEG_FST,
FST_ACTION_SETUP_REQUEST, 0, 0,
164, 11, 0, 0, id + 1, 0, 0, 0, 0, 0)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# FST Action 'Setup Response' dropped: no session in progress found
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_SETUP_RESPONSE)
- hapd.mgmt_tx(msg)
+ hostapd_tx_and_status(hapd, msg)
# Create session
initiator = ap1
# FST Response dropped due to wrong state: SETUP_COMPLETION
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_SETUP_RESPONSE)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# Too short FST Tear Down dropped
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_TEAR_DOWN)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# tear down for wrong FST Setup ID (0)
msg['payload'] = struct.pack("<BBL", ACTION_CATEG_FST,
FST_ACTION_TEAR_DOWN, 0)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# Ack received on wrong interface
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_ACK_REQUEST)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# Ack Response in inappropriate session state (SETUP_COMPLETION)
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_ACK_RESPONSE)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# Unsupported FST Action frame (On channel tunnel)
msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_ON_CHANNEL_TUNNEL)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
# FST Request dropped: new iface not found (new_band_id match)
# FST Request dropped due to MAC comparison
msg['payload'] = struct.pack("<BBBLBBLBBBBBBB", ACTION_CATEG_FST,
FST_ACTION_SETUP_REQUEST, 0, 0,
164, 11, 0, 0, id, 0, 0, 0, 0, 0)
- hapd.mgmt_tx(msg)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd, msg)
hapd2 = ap2.get_instance()
dst2 = sta2.get_instance().own_addr()
# FST Response dropped: wlan6 is not the old iface
msg2['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
FST_ACTION_SETUP_RESPONSE)
- hapd2.mgmt_tx(msg2)
- time.sleep(0.1)
+ hostapd_tx_and_status(hapd2, msg2)
sta.dump_monitor()
msg['payload'] = struct.pack("<BBBLBBLBBBBBBB", ACTION_CATEG_FST,
FST_ACTION_SETUP_REQUEST, 0, 0,
164, 11, 0, 0, id, 0, 0, 0, 0, 0)
- hapd.mgmt_tx(msg)
+ hostapd_tx_and_status(hapd, msg)
finally:
fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
try:
fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
except:
pass
+
+def test_fst_setup_response_proto(dev, apdev, test_params):
+ """FST protocol testing for Setup Response"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ hapd = ap1.get_instance()
+ sta = sta1.get_instance()
+ dst = sta.own_addr()
+ src = apdev[0]['bssid']
+
+ sta1.add_peer(ap1, None, sta2.get_actual_peer_addr())
+ sta1.set_fst_parameters(llt='0')
+ sid = sta1.add_session()
+ sta1.configure_session(sid, sta2.ifname())
+ sta1.initiate_session(sid, "")
+
+ msg = {}
+ msg['fc'] = MGMT_SUBTYPE_ACTION << 4
+ msg['da'] = dst
+ msg['sa'] = src
+ msg['bssid'] = src
+
+ # Too short FST Response dropped
+ msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_RESPONSE)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Response dropped: invalid STIE (EID)
+ dialog_token = 1
+ status_code = 0
+ id = 0
+ msg['payload'] = struct.pack("<BBBBBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_RESPONSE, dialog_token,
+ status_code,
+ 163, 11, 0, 0, id, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Response dropped: invalid STIE (Len)
+ dialog_token = 1
+ status_code = 0
+ id = 0
+ msg['payload'] = struct.pack("<BBBBBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_RESPONSE, dialog_token,
+ status_code,
+ 164, 10, 0, 0, id, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Response dropped due to wrong dialog token
+ dialog_token = 123
+ status_code = 0
+ id = 0
+ msg['payload'] = struct.pack("<BBBBBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_RESPONSE, dialog_token,
+ status_code,
+ 164, 11, 0, 0, id, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Response dropped due to wrong FST Session ID
+ dialog_token = 1
+ status_code = 0
+ id = 1
+ msg['payload'] = struct.pack("<BBBBBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_RESPONSE, dialog_token,
+ status_code,
+ 164, 11, int(sid) + 123456,
+ 0, id, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
+
+ # FST Response with non-zero status code
+ dialog_token = 1
+ status_code = 1
+ id = 1
+ msg['payload'] = struct.pack("<BBBBBBLBBBBBBB", ACTION_CATEG_FST,
+ FST_ACTION_SETUP_RESPONSE, dialog_token,
+ status_code,
+ 164, 11, int(sid), 0, id, 0, 0, 0, 0, 0)
+ hostapd_tx_and_status(hapd, msg)
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_ack_response_proto(dev, apdev, test_params):
+ """FST protocol testing for Ack Response"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ hapd = ap2.get_instance()
+ sta = sta2.get_instance()
+ dst = sta.own_addr()
+ src = apdev[1]['bssid']
+
+ sta1.add_peer(ap1, None, sta2.get_actual_peer_addr())
+ sta1.set_fst_parameters(llt='0')
+ sid = sta1.add_session()
+ sta1.configure_session(sid, sta2.ifname())
+
+ s = sta1.grequest("FST-MANAGER SESSION_INITIATE "+ sid)
+ if not s.startswith('OK'):
+ raise Exception("Cannot initiate fst session: %s" % s)
+ ev = sta1.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION received")
+ event = fst_module_aux.parse_fst_session_event(ev)
+ if event == None:
+ raise Exception("Unrecognized FST event: " % ev)
+ if event['type'] != 'EVENT_FST_SETUP':
+ raise Exception("Expected FST_SETUP event, got: " + event['type'])
+ ev = sta1.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION received")
+ event = fst_module_aux.parse_fst_session_event(ev)
+ if event == None:
+ raise Exception("Unrecognized FST event: " % ev)
+ if event['type'] != 'EVENT_FST_SESSION_STATE':
+ raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
+ if event['new_state'] != "SETUP_COMPLETION":
+ raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state'])
+
+ hapd.set("ext_mgmt_frame_handling", "1")
+ s = sta1.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " accept")
+ if not s.startswith('OK'):
+ raise Exception("Error session_respond: %s" % s)
+ req = hapd.mgmt_rx()
+ if req is None:
+ raise Exception("No Ack Request seen")
+ msg = {}
+ msg['fc'] = MGMT_SUBTYPE_ACTION << 4
+ msg['da'] = dst
+ msg['sa'] = src
+ msg['bssid'] = src
+
+ # Too short FST Ack Response dropped
+ msg['payload'] = struct.pack("<BB", ACTION_CATEG_FST,
+ FST_ACTION_ACK_RESPONSE)
+ hapd.mgmt_tx(msg)
+ ev = hapd.wait_event([ "MGMT-TX-STATUS" ], timeout=1)
+ if ev is None or "ok=1" not in ev:
+ raise Exception("No ACK")
+
+ # Ack Response for wrong FSt Setup ID
+ msg['payload'] = struct.pack("<BBBL", ACTION_CATEG_FST,
+ FST_ACTION_ACK_RESPONSE,
+ 0, int(sid) + 123456)
+ hostapd_tx_and_status(hapd, msg)
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_ap_config_oom(dev, apdev, test_params):
+ """FST AP configuration and OOM"""
+ ap1 = fst_module_aux.FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
+ fst_test_common.fst_test_def_chan_a,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low)
+ hapd = ap1.start(return_early=True)
+ with alloc_fail(hapd, 1, "fst_group_create"):
+ res = ap1.grequest("FST-ATTACH %s %s" % (ap1.iface, ap1.fst_group))
+ if not res.startswith("FAIL"):
+ raise Exception("FST-ATTACH succeeded unexpectedly")
+
+ with alloc_fail(hapd, 1, "fst_iface_create"):
+ res = ap1.grequest("FST-ATTACH %s %s" % (ap1.iface, ap1.fst_group))
+ if not res.startswith("FAIL"):
+ raise Exception("FST-ATTACH succeeded unexpectedly")
+
+ with alloc_fail(hapd, 1, "fst_group_create_mb_ie"):
+ res = ap1.grequest("FST-ATTACH %s %s" % (ap1.iface, ap1.fst_group))
+ # This is allowed to complete currently
+
+ ap1.stop()
+
+def test_fst_send_oom(dev, apdev, test_params):
+ """FST send action OOM"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ hapd = ap1.get_instance()
+ sta = sta1.get_instance()
+ dst = sta.own_addr()
+ src = apdev[0]['bssid']
+
+ # Create session
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ resp_newif = sta2.ifname()
+ peeraddr = None
+ initiator.add_peer(responder, peeraddr, new_peer_addr)
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ with alloc_fail(hapd, 1, "fst_session_send_action"):
+ res = initiator.grequest("FST-MANAGER SESSION_INITIATE " + sid)
+ if not res.startswith("FAIL"):
+ raise Exception("Unexpected SESSION_INITIATE result")
+
+ res = initiator.grequest("FST-MANAGER SESSION_INITIATE " + sid)
+ if not res.startswith("OK"):
+ raise Exception("SESSION_INITIATE failed")
+
+ tests = [ "", "foo", sid, sid + " foo", sid + " foo=bar" ]
+ for t in tests:
+ res = initiator.grequest("FST-MANAGER SESSION_SET " + t)
+ if not res.startswith("FAIL"):
+ raise Exception("Invalid SESSION_SET accepted")
+
+ with alloc_fail(hapd, 1, "fst_session_send_action"):
+ res = initiator.grequest("FST-MANAGER SESSION_TEARDOWN " + sid)
+ if not res.startswith("FAIL"):
+ raise Exception("Unexpected SESSION_TEARDOWN result")
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_session_oom(dev, apdev, test_params):
+ """FST session create OOM"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ hapd = ap1.get_instance()
+ sta = sta1.get_instance()
+ dst = sta.own_addr()
+ src = apdev[0]['bssid']
+
+ # Create session
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ resp_newif = sta2.ifname()
+ peeraddr = None
+ initiator.add_peer(responder, peeraddr, new_peer_addr)
+ with alloc_fail(hapd, 1, "fst_session_create"):
+ sid = initiator.grequest("FST-MANAGER SESSION_ADD " + initiator.fst_group)
+ if not sid.startswith("FAIL"):
+ raise Exception("Unexpected SESSION_ADD success")
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ with alloc_fail(sta, 1, "fst_session_create"):
+ res = initiator.grequest("FST-MANAGER SESSION_INITIATE " + sid)
+ if not res.startswith("OK"):
+ raise Exception("Unexpected SESSION_INITIATE result")
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_attach_zero_llt(dev, apdev):
+ """FST attach with llt=0"""
+ sta1 = fst_module_aux.FstSTA('wlan5', fst_test_common.fst_test_def_group,
+ "100", "0")
+ sta1.start()
+ sta1.stop()
+
+def test_fst_session_respond_fail(dev, apdev, test_params):
+ """FST-MANAGER SESSION_RESPOND failure"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ sta1.add_peer(ap1, None, sta2.get_actual_peer_addr())
+ sid = sta1.add_session()
+ sta1.configure_session(sid, sta2.ifname())
+ sta1.send_session_setup_request(sid)
+ sta1.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ ev = ap1.wait_for_session_event(5, [], ['EVENT_FST_SETUP'])
+ if not 'id' in ev:
+ raise Exception("No session id in FST setup event")
+ # Disconnect STA to make SESSION_RESPOND fail due to no peer found
+ sta = sta1.get_instance()
+ sta.request("DISCONNECT")
+ sta.wait_disconnected()
+ req = "FST-MANAGER SESSION_RESPOND %s reject" % ev['id']
+ s = ap1.grequest(req)
+ if not s.startswith("FAIL"):
+ raise Exception("SESSION_RESPOND succeeded unexpectedly")
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def fst_session_set(dev, sid, param, value):
+ cmd = "FST-MANAGER SESSION_SET %s %s=%s" % (sid, param, value)
+ if "OK" not in dev.global_request(cmd):
+ raise Exception(cmd + " failed")
+
+def fst_session_set_ap(dev, sid, param, value):
+ cmd = "FST-MANAGER SESSION_SET %s %s=%s" % (sid, param, value)
+ if "OK" not in dev.request(cmd):
+ raise Exception(cmd + " failed")
+
+def fst_attach_ap(dev, ifname, group):
+ cmd = "FST-ATTACH %s %s" % (ifname, group)
+ if "OK" not in dev.request(cmd):
+ raise Exception("FST-ATTACH (AP) failed")
+ ev = dev.wait_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE attached (AP)")
+ for t in [ "attached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (AP): " + ev)
+
+def fst_attach_sta(dev, ifname, group):
+ if "OK" not in dev.global_request("FST-ATTACH %s %s" % (ifname, group)):
+ raise Exception("FST-ATTACH (STA) failed")
+ ev = dev.wait_global_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE attached (STA)")
+ for t in [ "attached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (STA): " + ev)
+
+def fst_detach_ap(dev, ifname, group):
+ if "OK" not in dev.request("FST-DETACH " + ifname):
+ raise Exception("FST-DETACH (AP) failed for " + ifname)
+ ev = dev.wait_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE detached (AP) for " + ifname)
+ for t in [ "detached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (AP): " + ev)
+
+def fst_detach_sta(dev, ifname, group):
+ dev.dump_monitor()
+ if "OK" not in dev.global_request("FST-DETACH " + ifname):
+ raise Exception("FST-DETACH (STA) failed for " + ifname)
+ ev = dev.wait_global_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE detached (STA) for " + ifname)
+ for t in [ "detached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (STA): " + ev)
+
+def fst_wait_event_peer_ap(dev, event, ifname, addr):
+ ev = dev.wait_event(['FST-EVENT-PEER'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-PEER connected (AP)")
+ for t in [ " " + event + " ", "ifname=" + ifname, "peer_addr=" + addr ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-PEER data (AP): " + ev)
+
+def fst_wait_event_peer_sta(dev, event, ifname, addr):
+ ev = dev.wait_global_event(['FST-EVENT-PEER'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-PEER connected (STA)")
+ for t in [ " " + event + " ", "ifname=" + ifname, "peer_addr=" + addr ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-PEER data (STA): " + ev)
+
+def fst_setup_req(dev, hglobal, freq, dst, req, stie, mbie="", no_wait=False):
+ act = req + stie + mbie
+ dev.request("MGMT_TX %s %s freq=%d action=%s" % (dst, dst, freq, act))
+ ev = dev.wait_event(['MGMT-TX-STATUS'], timeout=5)
+ if ev is None or "result=SUCCESS" not in ev:
+ raise Exception("FST Action frame not ACKed")
+
+ if no_wait:
+ return
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ break
+
+def fst_start_and_connect(apdev, group, sgroup):
+ hglobal = hostapd.HostapdGlobal()
+ if "OK" not in hglobal.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED"):
+ raise HwsimSkip("No FST testing support")
+
+ params = { "ssid": "fst_11a", "hw_mode": "a", "channel": "36",
+ "country_code": "US" }
+ hapd = hostapd.add_ap(apdev[0], params)
+
+ fst_attach_ap(hglobal, apdev[0]['ifname'], group)
+
+ cmd = "FST-ATTACH %s %s" % (apdev[0]['ifname'], group)
+ if "FAIL" not in hglobal.request(cmd):
+ raise Exception("Duplicated FST-ATTACH (AP) accepted")
+
+ params = { "ssid": "fst_11g", "hw_mode": "g", "channel": "1",
+ "country_code": "US" }
+ hapd2 = hostapd.add_ap(apdev[1], params)
+ fst_attach_ap(hglobal, apdev[1]['ifname'], group)
+
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("wlan5")
+ fst_attach_sta(wpas, wpas.ifname, sgroup)
+
+ wpas.interface_add("wlan6", set_ifname=False)
+ wpas2 = WpaSupplicant(ifname="wlan6")
+ fst_attach_sta(wpas, wpas2.ifname, sgroup)
+
+ wpas.connect("fst_11a", key_mgmt="NONE", scan_freq="5180",
+ wait_connect=False)
+ wpas.wait_connected()
+
+ fst_wait_event_peer_sta(wpas, "connected", wpas.ifname, apdev[0]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "connected", apdev[0]['ifname'],
+ wpas.own_addr())
+
+ wpas2.connect("fst_11g", key_mgmt="NONE", scan_freq="2412",
+ wait_connect=False)
+ wpas2.wait_connected()
+
+ fst_wait_event_peer_sta(wpas, "connected", wpas2.ifname, apdev[1]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "connected", apdev[1]['ifname'],
+ wpas2.own_addr())
+ return hglobal, wpas, wpas2, hapd, hapd2
+
+def test_fst_test_setup(dev, apdev, test_params):
+ """FST setup using separate commands"""
+ try:
+ _test_fst_test_setup(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_test_setup(dev, apdev, test_params):
+ group = "fstg0b"
+ sgroup = "fstg1b"
+ hglobal, wpas, wpas2, hapd, hapd2 = fst_start_and_connect(apdev, group, sgroup)
+
+ sid = wpas.global_request("FST-MANAGER SESSION_ADD " + sgroup).strip()
+ if "FAIL" in sid:
+ raise Exception("FST-MANAGER SESSION_ADD (STA) failed")
+
+ fst_session_set(wpas, sid, "old_ifname", wpas.ifname)
+ fst_session_set(wpas, sid, "old_peer_addr", apdev[0]['bssid'])
+ fst_session_set(wpas, sid, "new_ifname", wpas2.ifname)
+ fst_session_set(wpas, sid, "new_peer_addr", apdev[1]['bssid'])
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("FST-MANAGER SESSION_INITIATE failed")
+
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ f = re.search("session_id=(\d+)", ev)
+ if f is None:
+ raise Exception("No session_id in FST-EVENT-SESSION")
+ sid_ap = f.group(1)
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "OK" not in hglobal.request(cmd):
+ raise Exception("FST-MANAGER SESSION_RESPOND failed on AP")
+ break
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION")
+ if "new_state=SETUP_COMPLETION" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data: " + ev)
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION")
+ if "event_type=EVENT_FST_ESTABLISHED" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data: " + ev)
+
+ cmd = "FST-MANAGER SESSION_REMOVE " + sid
+ if "OK" not in wpas.global_request(cmd):
+ raise Exception("FST-MANAGER SESSION_REMOVE failed")
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION")
+ if "new_state=INITIAL" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data (STA): " + ev)
+
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=INITIAL" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data (AP): " + ev)
+
+ if "FAIL" not in wpas.global_request(cmd):
+ raise Exception("Duplicated FST-MANAGER SESSION_REMOVE accepted")
+
+ hglobal.request("FST-MANAGER SESSION_REMOVE " + sid_ap)
+
+ wpas.request("DISCONNECT")
+ wpas.wait_disconnected()
+ fst_wait_event_peer_sta(wpas, "disconnected", wpas.ifname,
+ apdev[0]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "disconnected", apdev[0]['ifname'],
+ wpas.own_addr())
+
+ wpas2.request("DISCONNECT")
+ wpas2.wait_disconnected()
+ fst_wait_event_peer_sta(wpas, "disconnected", wpas2.ifname,
+ apdev[1]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "disconnected", apdev[1]['ifname'],
+ wpas2.own_addr())
+
+ fst_detach_ap(hglobal, apdev[0]['ifname'], group)
+ if "FAIL" not in hglobal.request("FST-DETACH " + apdev[0]['ifname']):
+ raise Exception("Duplicated FST-DETACH (AP) accepted")
+ hapd.disable()
+
+ fst_detach_ap(hglobal, apdev[1]['ifname'], group)
+ hapd2.disable()
+
+ fst_detach_sta(wpas, wpas.ifname, sgroup)
+ fst_detach_sta(wpas, wpas2.ifname, sgroup)
+
+def test_fst_setup_mbie_diff(dev, apdev, test_params):
+ """FST setup and different MBIE in FST Setup Request"""
+ try:
+ _test_fst_setup_mbie_diff(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_setup_mbie_diff(dev, apdev, test_params):
+ group = "fstg0c"
+ sgroup = "fstg1c"
+ hglobal, wpas, wpas2, hapd, hapd2 = fst_start_and_connect(apdev, group, sgroup)
+
+ # FST Setup Request: Category, FST Action, Dialog Token (non-zero),
+ # LLT (32 bits, see 10.32), Session Transition (see 8.4.2.147),
+ # Multi-band element (optional, see 8.4.2.140)
+
+ # Session Transition: EID, Len, FSTS ID(4), Session Control,
+ # New Band (Band ID, Setup, Operation), Old Band (Band ID, Setup, Operation)
+
+ # Multi-band element: EID, Len, Multi-band Control, Band ID,
+ # Operating Class, Channel Number, BSSID (6), Beacon Interval (2),
+ # TSF Offset (8), Multi-band Connection Capability, FSTSessionTimeOut,
+ # STA MAC Address (6, optional), Pairwise Cipher Suite Count (2, optional),
+ # Pairwise Cipher Suite List (4xm, optional)
+
+ # MBIE with the non-matching STA MAC Address:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e1c0c0200010200000004000000000000000000000000ff0200000006ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # MBIE without the STA MAC Address:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e16040200010200000004000000000000000000000000ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # MBIE with unsupported STA Role:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e16070200010200000004000000000000000000000000ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # MBIE with unsupported Band ID:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e1604ff00010200000004000000000000000000000000ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # FST Setup Request without MBIE (different FSTS ID):
+ req = "1200011a060000"
+ stie = "a40b0200000000020001040001"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie)
+
+ # MBIE update OOM on AP
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e16040200010200000004000000000000000000000000ff"
+ try:
+ with alloc_fail(hapd, 1, "mb_ies_by_info"):
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie,
+ mbie, no_wait=True)
+ except HwsimSkip, e:
+ # Skip exception to allow proper cleanup
+ pass
+
+ # Remove sessions to avoid causing issues to following test ases
+ s = hglobal.request("FST-MANAGER LIST_SESSIONS " + group)
+ if not s.startswith("FAIL"):
+ for sid in s.split(' '):
+ if len(sid):
+ hglobal.request("FST-MANAGER SESSION_REMOVE " + sid)
+
+def test_fst_many_setup(dev, apdev, test_params):
+ """FST setup multiple times"""
+ try:
+ _test_fst_many_setup(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_many_setup(dev, apdev, test_params):
+ group = "fstg0d"
+ sgroup = "fstg1d"
+ hglobal, wpas, wpas2, hapd, hapd2 = fst_start_and_connect(apdev, group, sgroup)
+
+ sid = wpas.global_request("FST-MANAGER SESSION_ADD " + sgroup).strip()
+ if "FAIL" in sid:
+ raise Exception("FST-MANAGER SESSION_ADD (STA) failed")
+
+ fst_session_set(wpas, sid, "old_ifname", wpas.ifname)
+ fst_session_set(wpas, sid, "old_peer_addr", apdev[0]['bssid'])
+ fst_session_set(wpas, sid, "new_ifname", wpas2.ifname)
+ fst_session_set(wpas, sid, "new_peer_addr", apdev[1]['bssid'])
+
+ for i in range(257):
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("FST-MANAGER SESSION_INITIATE failed")
+
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ f = re.search("session_id=(\d+)", ev)
+ if f is None:
+ raise Exception("No session_id in FST-EVENT-SESSION")
+ sid_ap = f.group(1)
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "OK" not in hglobal.request(cmd):
+ raise Exception("FST-MANAGER SESSION_RESPOND failed on AP")
+ break
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (STA)")
+ if "new_state=SETUP_COMPLETION" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data: " + ev)
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (STA)")
+ if "event_type=EVENT_FST_ESTABLISHED" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data: " + ev)
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_TEARDOWN " + sid):
+ raise Exception("FST-MANAGER SESSION_INITIATE failed")
+
+ if i == 0:
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_TEARDOWN " + sid):
+ raise Exception("Duplicate FST-MANAGER SESSION_TEARDOWN accepted")
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (STA teardown -->initial)")
+ if "new_state=INITIAL" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data (STA): " + ev)
+
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP teardown -->initial)")
+ if "new_state=INITIAL" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data (AP): " + ev)
+
+ if "OK" not in hglobal.request("FST-MANAGER SESSION_REMOVE " + sid_ap):
+ raise Exception("FST-MANAGER SESSION_REMOVE (AP) failed")
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_REMOVE " + sid):
+ raise Exception("FST-MANAGER SESSION_REMOVE failed")
+
+ wpas.request("DISCONNECT")
+ wpas.wait_disconnected()
+ fst_wait_event_peer_sta(wpas, "disconnected", wpas.ifname,
+ apdev[0]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "disconnected", apdev[0]['ifname'],
+ wpas.own_addr())
+
+ wpas2.request("DISCONNECT")
+ wpas2.wait_disconnected()
+ fst_wait_event_peer_sta(wpas, "disconnected", wpas2.ifname,
+ apdev[1]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "disconnected", apdev[1]['ifname'],
+ wpas2.own_addr())
+
+ fst_detach_ap(hglobal, apdev[0]['ifname'], group)
+ fst_detach_ap(hglobal, apdev[1]['ifname'], group)
+ hapd.disable()
+ hapd2.disable()
+
+ fst_detach_sta(wpas, wpas.ifname, sgroup)
+ fst_detach_sta(wpas, wpas2.ifname, sgroup)
+
+def test_fst_attach_wpas_error(dev, apdev, test_params):
+ """FST attach errors in wpa_supplicant"""
+ if "OK" not in dev[0].global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED"):
+ raise HwsimSkip("No FST testing support")
+ group = "fstg0"
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("wlan5")
+ fst_attach_sta(wpas, wpas.ifname, group)
+ if "FAIL" not in wpas.global_request("FST-ATTACH %s %s" % (wpas.ifname,
+ group)):
+ raise Exception("Duplicated FST-ATTACH accepted")
+ if "FAIL" not in wpas.global_request("FST-ATTACH %s %s" % ("foofoo",
+ group)):
+ raise Exception("FST-ATTACH for unknown interface accepted")
+
+def test_fst_session_initiate_errors(dev, apdev, test_params):
+ """FST SESSION_INITIATE error cases"""
+ try:
+ _test_fst_session_initiate_errors(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_session_initiate_errors(dev, apdev, test_params):
+ group = "fstg0"
+ sgroup = "fstg1"
+ hglobal, wpas, wpas2, hapd, hapd2 = fst_start_and_connect(apdev, group, sgroup)
+
+ sid = wpas.global_request("FST-MANAGER SESSION_ADD " + sgroup).strip()
+ if "FAIL" in sid:
+ raise Exception("FST-MANAGER SESSION_ADD (STA) failed")
+
+ # No old peer MAC address
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "old_peer_addr", "00:ff:ff:ff:ff:ff")
+ # No new peer MAC address
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "new_peer_addr", "00:ff:ff:ff:ff:fe")
+ # No old interface defined
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "old_ifname", wpas.ifname)
+ # No new interface defined
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "new_ifname", wpas.ifname)
+ # Same interface set as old and new
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "new_ifname", wpas2.ifname)
+ # The preset old peer address is not connected
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "old_peer_addr", apdev[0]['bssid'])
+ # The preset new peer address is not connected
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Invalid FST-MANAGER SESSION_INITIATE accepted")
+
+ fst_session_set(wpas, sid, "new_peer_addr", apdev[1]['bssid'])
+ # Initiate session setup
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("FST-MANAGER SESSION_INITIATE failed")
+
+ # Session in progress
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("Duplicated FST-MANAGER SESSION_INITIATE accepted")
+
+ sid2 = wpas.global_request("FST-MANAGER SESSION_ADD " + sgroup).strip()
+ if "FAIL" in sid:
+ raise Exception("FST-MANAGER SESSION_ADD (STA) failed")
+ fst_session_set(wpas, sid2, "old_ifname", wpas.ifname)
+ fst_session_set(wpas, sid2, "old_peer_addr", apdev[0]['bssid'])
+ fst_session_set(wpas, sid2, "new_ifname", wpas2.ifname)
+ fst_session_set(wpas, sid2, "new_peer_addr", apdev[1]['bssid'])
+
+ # There is another session in progress (old)
+ if "FAIL" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid2):
+ raise Exception("Duplicated FST-MANAGER SESSION_INITIATE accepted")
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_REMOVE " + sid):
+ raise Exception("FST-MANAGER SESSION_REMOVE failed")
+
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ f = re.search("session_id=(\d+)", ev)
+ if f is None:
+ raise Exception("No session_id in FST-EVENT-SESSION")
+ sid_ap = f.group(1)
+ break
+ if "OK" not in hglobal.request("FST-MANAGER SESSION_REMOVE " + sid_ap):
+ raise Exception("FST-MANAGER SESSION_REMOVE (AP) failed")
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_REMOVE " + sid2):
+ raise Exception("FST-MANAGER SESSION_REMOVE failed")
+
+def test_fst_session_respond_errors(dev, apdev, test_params):
+ """FST SESSION_RESPOND error cases"""
+ try:
+ _test_fst_session_respond_errors(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_session_respond_errors(dev, apdev, test_params):
+ group = "fstg0b"
+ sgroup = "fstg1b"
+ hglobal, wpas, wpas2, hapd, hapd2 = fst_start_and_connect(apdev, group, sgroup)
+
+ sid = wpas.global_request("FST-MANAGER SESSION_ADD " + sgroup).strip()
+ if "FAIL" in sid:
+ raise Exception("FST-MANAGER SESSION_ADD (STA) failed")
+
+ fst_session_set(wpas, sid, "old_ifname", wpas.ifname)
+ fst_session_set(wpas, sid, "old_peer_addr", apdev[0]['bssid'])
+ fst_session_set(wpas, sid, "new_ifname", wpas2.ifname)
+ fst_session_set(wpas, sid, "new_peer_addr", apdev[1]['bssid'])
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("FST-MANAGER SESSION_INITIATE failed")
+
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ f = re.search("session_id=(\d+)", ev)
+ if f is None:
+ raise Exception("No session_id in FST-EVENT-SESSION")
+ sid_ap = f.group(1)
+ break
+
+ # The preset peer address is not in the peer list
+ fst_session_set_ap(hglobal, sid_ap, "old_peer_addr", "00:00:00:00:00:01")
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "FAIL" not in hglobal.request(cmd):
+ raise Exception("Invalid FST-MANAGER SESSION_RESPOND accepted")
+
+ # Same interface set as old and new
+ fst_session_set_ap(hglobal, sid_ap, "old_peer_addr", wpas.own_addr())
+ fst_session_set_ap(hglobal, sid_ap, "old_ifname", apdev[1]['ifname'])
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "FAIL" not in hglobal.request(cmd):
+ raise Exception("Invalid FST-MANAGER SESSION_RESPOND accepted")
+
+ # valid command
+ fst_session_set_ap(hglobal, sid_ap, "old_ifname", apdev[0]['ifname'])
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "OK" not in hglobal.request(cmd):
+ raise Exception("FST-MANAGER SESSION_RESPOND failed")
+
+ # incorrect state
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "FAIL" not in hglobal.request(cmd):
+ raise Exception("Invalid FST-MANAGER SESSION_RESPOND accepted")
+
+ cmd = "FST-MANAGER SESSION_REMOVE " + sid
+ if "OK" not in wpas.global_request(cmd):
+ raise Exception("FST-MANAGER SESSION_REMOVE (STA) failed")
+
+ cmd = "FST-MANAGER SESSION_REMOVE %s" % sid_ap
+ if "OK" not in hglobal.request(cmd):
+ raise Exception("FST-MANAGER SESSION_REMOVE (AP) failed")