import subprocess
import time
import os
+import re
import hwsim_utils
from hwsim import HWSimRadio
import hostapd
+from wpasupplicant import WpaSupplicant
import fst_test_common
import fst_module_aux
-from utils import alloc_fail
+from utils import alloc_fail, HwsimSkip
#enum - bad parameter types
bad_param_none = 0
finally:
fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def fst_session_set(dev, sid, param, value):
+ cmd = "FST-MANAGER SESSION_SET %s %s=%s" % (sid, param, value)
+ if "OK" not in dev.global_request(cmd):
+ raise Exception(cmd + " failed")
+
+def fst_attach_ap(dev, ifname, group):
+ cmd = "FST-ATTACH %s %s" % (ifname, group)
+ if "OK" not in dev.request(cmd):
+ raise Exception("FST-ATTACH (AP) failed")
+ ev = dev.wait_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE attached (AP)")
+ for t in [ "attached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (AP): " + ev)
+
+def fst_attach_sta(dev, ifname, group):
+ if "OK" not in dev.global_request("FST-ATTACH %s %s" % (ifname, group)):
+ raise Exception("FST-ATTACH (STA) failed")
+ ev = dev.wait_global_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE attached (STA)")
+ for t in [ "attached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (STA): " + ev)
+
+def fst_detach_ap(dev, ifname, group):
+ if "OK" not in dev.request("FST-DETACH " + ifname):
+ raise Exception("FST-DETACH (AP) failed for " + ifname)
+ ev = dev.wait_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE detached (AP) for " + ifname)
+ for t in [ "detached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (AP): " + ev)
+
+def fst_detach_sta(dev, ifname, group):
+ dev.dump_monitor()
+ if "OK" not in dev.global_request("FST-DETACH " + ifname):
+ raise Exception("FST-DETACH (STA) failed for " + ifname)
+ ev = dev.wait_global_event(['FST-EVENT-IFACE'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE detached (STA) for " + ifname)
+ for t in [ "detached", "ifname=" + ifname, "group=" + group ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-IFACE data (STA): " + ev)
+
+def fst_wait_event_peer_ap(dev, event, ifname, addr):
+ ev = dev.wait_event(['FST-EVENT-PEER'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-PEER connected (AP)")
+ for t in [ " " + event + " ", "ifname=" + ifname, "peer_addr=" + addr ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-PEER data (AP): " + ev)
+
+def fst_wait_event_peer_sta(dev, event, ifname, addr):
+ ev = dev.wait_global_event(['FST-EVENT-PEER'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-PEER connected (STA)")
+ for t in [ " " + event + " ", "ifname=" + ifname, "peer_addr=" + addr ]:
+ if t not in ev:
+ raise Exception("Unexpected FST-EVENT-PEER data (STA): " + ev)
+
+def fst_setup_req(dev, hglobal, freq, dst, req, stie, mbie=""):
+ act = req + stie + mbie
+ dev.request("MGMT_TX %s %s freq=%d action=%s" % (dst, dst, freq, act))
+ ev = dev.wait_event(['MGMT-TX-STATUS'], timeout=5)
+ if ev is None or "result=SUCCESS" not in ev:
+ raise Exception("FST Action frame not ACKed")
+
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ break
+
+def test_fst_test_setup(dev, apdev, test_params):
+ """FST setup using separate commands"""
+ try:
+ _test_fst_test_setup(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_test_setup(dev, apdev, test_params):
+ hglobal = hostapd.HostapdGlobal()
+ if "OK" not in hglobal.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED"):
+ raise HwsimSkip("No FST testing support")
+
+ params = { "ssid": "fst_11a", "hw_mode": "a", "channel": "36",
+ "country_code": "US" }
+ hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+
+ group = "fstg0"
+ fst_attach_ap(hglobal, apdev[0]['ifname'], group)
+
+ cmd = "FST-ATTACH %s %s" % (apdev[0]['ifname'], group)
+ if "FAIL" not in hglobal.request(cmd):
+ raise Exception("Duplicated FST-ATTACH (AP) accepted")
+
+ params = { "ssid": "fst_11g", "hw_mode": "g", "channel": "1",
+ "country_code": "US" }
+ hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
+ fst_attach_ap(hglobal, apdev[1]['ifname'], group)
+
+ sgroup = "fstg1"
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("wlan5")
+ fst_attach_sta(wpas, wpas.ifname, sgroup)
+
+ wpas.interface_add("wlan6", set_ifname=False)
+ wpas2 = WpaSupplicant(ifname="wlan6")
+ fst_attach_sta(wpas, wpas2.ifname, sgroup)
+
+ wpas.connect("fst_11a", key_mgmt="NONE", scan_freq="5180",
+ wait_connect=False)
+ wpas.wait_connected()
+
+ fst_wait_event_peer_sta(wpas, "connected", wpas.ifname, apdev[0]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "connected", apdev[0]['ifname'],
+ wpas.own_addr())
+
+ wpas2.connect("fst_11g", key_mgmt="NONE", scan_freq="2412",
+ wait_connect=False)
+ wpas2.wait_connected()
+
+ fst_wait_event_peer_sta(wpas, "connected", wpas2.ifname, apdev[1]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "connected", apdev[1]['ifname'],
+ wpas2.own_addr())
+
+ sid = wpas.global_request("FST-MANAGER SESSION_ADD " + sgroup).strip()
+ if "FAIL" in sid:
+ raise Exception("FST-MANAGER SESSION_ADD (STA) failed")
+
+ fst_session_set(wpas, sid, "old_ifname", wpas.ifname)
+ fst_session_set(wpas, sid, "old_peer_addr", apdev[0]['bssid'])
+ fst_session_set(wpas, sid, "new_ifname", wpas2.ifname)
+ fst_session_set(wpas, sid, "new_peer_addr", apdev[1]['bssid'])
+
+ if "OK" not in wpas.global_request("FST-MANAGER SESSION_INITIATE " + sid):
+ raise Exception("FST-MANAGER SESSION_INITIATE failed")
+
+ while True:
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=SETUP_COMPLETION" in ev:
+ f = re.search("session_id=(\d+)", ev)
+ if f is None:
+ raise Exception("No session_id in FST-EVENT-SESSION")
+ sid_ap = f.group(1)
+ cmd = "FST-MANAGER SESSION_RESPOND %s accept" % sid_ap
+ if "OK" not in hglobal.request(cmd):
+ raise Exception("FST-MANAGER SESSION_RESPOND failed on AP")
+ break
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION")
+ if "new_state=SETUP_COMPLETION" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data: " + ev)
+
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION")
+ if "event_type=EVENT_FST_ESTABLISHED" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data: " + ev)
+
+ cmd = "FST-MANAGER SESSION_REMOVE " + sid
+ if "OK" not in wpas.global_request(cmd):
+ raise Exception("FST-MANAGER SESSION_REMOVE failed")
+ ev = wpas.wait_global_event(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION")
+ if "new_state=INITIAL" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data (STA): " + ev)
+
+ ev = hglobal.wait_event(['FST-EVENT-SESSION'], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION (AP)")
+ if "new_state=INITIAL" not in ev:
+ raise Exception("Unexpected FST-EVENT-SESSION data (AP): " + ev)
+
+ if "FAIL" not in wpas.global_request(cmd):
+ raise Exception("Duplicated FST-MANAGER SESSION_REMOVE accepted")
+
+ hglobal.request("FST-MANAGER SESSION_REMOVE " + sid_ap)
+
+ wpas.request("DISCONNECT")
+ wpas.wait_disconnected()
+ fst_wait_event_peer_sta(wpas, "disconnected", wpas.ifname,
+ apdev[0]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "disconnected", apdev[0]['ifname'],
+ wpas.own_addr())
+
+ wpas2.request("DISCONNECT")
+ wpas2.wait_disconnected()
+ fst_wait_event_peer_sta(wpas, "disconnected", wpas2.ifname,
+ apdev[1]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "disconnected", apdev[1]['ifname'],
+ wpas2.own_addr())
+
+ fst_detach_ap(hglobal, apdev[0]['ifname'], group)
+ if "FAIL" not in hglobal.request("FST-DETACH " + apdev[0]['ifname']):
+ raise Exception("Duplicated FST-DETACH (AP) accepted")
+ hapd.disable()
+
+ fst_detach_ap(hglobal, apdev[1]['ifname'], group)
+ hapd2.disable()
+
+ fst_detach_sta(wpas, wpas.ifname, sgroup)
+ fst_detach_sta(wpas, wpas2.ifname, sgroup)
+
+def test_fst_setup_mbie_diff(dev, apdev, test_params):
+ """FST setup and different MBIE in FST Setup Request"""
+ try:
+ _test_fst_setup_mbie_diff(dev, apdev, test_params)
+ finally:
+ subprocess.call(['iw', 'reg', 'set', '00'])
+ dev[0].flush_scan_cache()
+ dev[1].flush_scan_cache()
+
+def _test_fst_setup_mbie_diff(dev, apdev, test_params):
+ hglobal = hostapd.HostapdGlobal()
+ if "OK" not in hglobal.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED"):
+ raise HwsimSkip("No FST testing support")
+
+ params = { "ssid": "fst_11a", "hw_mode": "a", "channel": "36",
+ "country_code": "US" }
+ hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+
+ group = "fstg0"
+ fst_attach_ap(hglobal, apdev[0]['ifname'], group)
+
+ cmd = "FST-ATTACH %s %s" % (apdev[0]['ifname'], group)
+ if "FAIL" not in hglobal.request(cmd):
+ raise Exception("Duplicated FST-ATTACH (AP) accepted")
+
+ params = { "ssid": "fst_11g", "hw_mode": "g", "channel": "1",
+ "country_code": "US" }
+ hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
+ fst_attach_ap(hglobal, apdev[1]['ifname'], group)
+
+ sgroup = "fstg1"
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("wlan5")
+ fst_attach_sta(wpas, wpas.ifname, sgroup)
+
+ wpas.interface_add("wlan6", set_ifname=False)
+ wpas2 = WpaSupplicant(ifname="wlan6")
+ fst_attach_sta(wpas, wpas2.ifname, sgroup)
+
+ wpas.connect("fst_11a", key_mgmt="NONE", scan_freq="5180",
+ wait_connect=False)
+ wpas.wait_connected()
+
+ fst_wait_event_peer_sta(wpas, "connected", wpas.ifname, apdev[0]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "connected", apdev[0]['ifname'],
+ wpas.own_addr())
+
+ wpas2.connect("fst_11g", key_mgmt="NONE", scan_freq="2412",
+ wait_connect=False)
+ wpas2.wait_connected()
+
+ fst_wait_event_peer_sta(wpas, "connected", wpas2.ifname, apdev[1]['bssid'])
+ fst_wait_event_peer_ap(hglobal, "connected", apdev[1]['ifname'],
+ wpas2.own_addr())
+
+ # FST Setup Request: Category, FST Action, Dialog Token (non-zero),
+ # LLT (32 bits, see 10.32), Session Transition (see 8.4.2.147),
+ # Multi-band element (optional, see 8.4.2.140)
+
+ # Session Transition: EID, Len, FSTS ID(4), Session Control,
+ # New Band (Band ID, Setup, Operation), Old Band (Band ID, Setup, Operation)
+
+ # Multi-band element: EID, Len, Multi-band Control, Band ID,
+ # Operating Class, Channel Number, BSSID (6), Beacon Interval (2),
+ # TSF Offset (8), Multi-band Connection Capability, FSTSessionTimeOut,
+ # STA MAC Address (6, optional), Pairwise Cipher Suite Count (2, optional),
+ # Pairwise Cipher Suite List (4xm, optional)
+
+ # MBIE with the non-matching STA MAC Address:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e1c0c0200010200000004000000000000000000000000ff0200000006ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # MBIE without the STA MAC Address:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e16040200010200000004000000000000000000000000ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # MBIE with unsupported STA Role:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e16070200010200000004000000000000000000000000ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # MBIE with unsupported Band ID:
+ req = "1200011a060000"
+ stie = "a40b0100000000020001040001"
+ mbie = "9e1604ff00010200000004000000000000000000000000ff"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie, mbie)
+
+ # FST Setup Request without MBIE (different FSTS ID):
+ req = "1200011a060000"
+ stie = "a40b0200000000020001040001"
+ fst_setup_req(wpas, hglobal, 5180, apdev[0]['bssid'], req, stie)