--- /dev/null
+# FST tests related classes
+# Copyright (c) 2015, Qualcomm Atheros, Inc.
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import logging
+import subprocess
+import os
+import signal
+import time
+import re
+
+import hostapd
+import wpaspy
+import utils
+from wpasupplicant import WpaSupplicant
+
+import fst_test_common
+
+logger = logging.getLogger()
+
+def parse_fst_iface_event(ev):
+ """Parses FST iface event that comes as a string, e.g.
+ "<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0"
+ Returns a dictionary with parsed "event_type", "ifname", and "group"; or
+ None if not an FST event or can't be parsed."""
+ event = {}
+ if ev.find("FST-EVENT-IFACE") == -1:
+ return None
+ if ev.find("attached") != -1:
+ event['event_type'] = 'attached'
+ elif ev.find("detached") != -1:
+ event['event_type'] = 'detached'
+ else:
+ return None
+ f = re.search("ifname=(\S+)", ev)
+ if f is not None:
+ event['ifname'] = f.group(1)
+ f = re.search("group=(\S+)", ev)
+ if f is not None:
+ event['group'] = f.group(1)
+ return event
+
+def parse_fst_session_event(ev):
+ """Parses FST session event that comes as a string, e.g.
+ "<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT"
+ Returns a dictionary with parsed "type", "id", and "reason"; or None if not
+ a FST event or can't be parsed"""
+ event = {}
+ if ev.find("FST-EVENT-SESSION") == -1:
+ return None
+ event['new_state'] = '' # The field always exists in the dictionary
+ f = re.search("event_type=(\S+)", ev)
+ if f is None:
+ return None
+ event['type'] = f.group(1)
+ f = re.search("session_id=(\d+)", ev)
+ if f is not None:
+ event['id'] = f.group(1)
+ f = re.search("old_state=(\S+)", ev)
+ if f is not None:
+ event['old_state'] = f.group(1)
+ f = re.search("new_state=(\S+)", ev)
+ if f is not None:
+ event['new_state'] = f.group(1)
+ f = re.search("reason=(\S+)", ev)
+ if f is not None:
+ event['reason'] = f.group(1)
+ return event
+
+def start_two_ap_sta_pairs(apdev):
+ """auxiliary function that creates two pairs of APs and STAs"""
+ ap1 = FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
+ fst_test_common.fst_test_def_chan_a,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low,
+ fst_test_common.fst_test_def_llt)
+ ap1.start()
+ ap2 = FstAP(apdev[1]['ifname'], 'fst_11g', 'g',
+ fst_test_common.fst_test_def_chan_g,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_high,
+ fst_test_common.fst_test_def_llt)
+ ap2.start()
+
+ sta1 = FstSTA('wlan5',
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low,
+ fst_test_common.fst_test_def_llt)
+ sta1.start()
+ sta2 = FstSTA('wlan6',
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_high,
+ fst_test_common.fst_test_def_llt)
+ sta2.start()
+
+ return ap1, ap2, sta1, sta2
+
+def stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2):
+ sta1.stop()
+ sta2.stop()
+ ap1.stop()
+ ap2.stop()
+
+def connect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
+ """Connects a pair of stations, each one to a separate AP"""
+ dev1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ dev2.scan(freq=fst_test_common.fst_test_def_freq_g)
+
+ dev1.connect(ap1, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_a)
+ dev2.connect(ap2, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_g)
+
+def disconnect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
+ dev1.disconnect()
+ dev2.disconnect()
+
+def external_sta_connect(sta, ap, **kwargs):
+ """Connects the external station to the given AP"""
+ if not isinstance(sta, WpaSupplicant):
+ raise Exception("Bad STA object")
+ if not isinstance(ap, FstAP):
+ raise Exception("Bad AP object to connect to")
+ hap = ap.get_instance()
+ sta.connect(ap.get_ssid(), **kwargs)
+
+def disconnect_external_sta(sta, ap, check_disconnect=True):
+ """Disconnects the external station from the AP"""
+ if not isinstance(sta, WpaSupplicant):
+ raise Exception("Bad STA object")
+ if not isinstance(ap, FstAP):
+ raise Exception("Bad AP object to connect to")
+ sta.request("DISCONNECT")
+ if check_disconnect:
+ hap = ap.get_instance()
+ ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
+ if ev is None:
+ raise Exception("No disconnection event received from %s" % ap.get_ssid())
+
+#
+# FstDevice class
+# This is the parent class for the AP (FstAP) and STA (FstSTA) that implements
+# FST functionality.
+#
+class FstDevice:
+ def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
+ self.iface = iface
+ self.fst_group = fst_group
+ self.fst_pri = fst_pri
+ self.fst_llt = fst_llt # None llt means no llt parameter will be set
+ self.instance = None # Hostapd/WpaSupplicant instance
+ self.peer_obj = None # Peer object, must be a FstDevice child object
+ self.new_peer_addr = None # Peer MAC address for new session iface
+ self.old_peer_addr = None # Peer MAC address for old session iface
+ self.role = 'initiator' # Role: initiator/responder
+ s = self.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
+ if not s.startswith('OK'):
+ raise utils.HwsimSkip("FST not supported")
+
+ def ifname(self):
+ return self.iface
+
+ def get_instance(self):
+ """Gets the Hostapd/WpaSupplicant instance"""
+ raise Exception("Virtual get_instance() called!")
+
+ def get_own_mac_address(self):
+ """Gets the device's own MAC address"""
+ raise Exception("Virtual get_own_mac_address() called!")
+
+ def get_new_peer_addr(self):
+ return self.new_peer_addr
+
+ def get_old_peer_addr(self):
+ return self.old_peer_addr
+
+ def get_actual_peer_addr(self):
+ """Gets the peer address. A connected AP/station address is returned."""
+ raise Exception("Virtual get_actual_peer_addr() called!")
+
+ def grequest(self, req):
+ """Send request on the global control interface"""
+ raise Exception, "Virtual grequest() called!"
+
+ def wait_gevent(self, events, timeout=None):
+ """Wait for a list of events on the global interface"""
+ raise Exception("Virtual wait_gevent() called!")
+
+ def request(self, req):
+ """Issue a request to the control interface"""
+ h = self.get_instance()
+ return h.request(req)
+
+ def wait_event(self, events, timeout=None):
+ """Wait for an event from the control interface"""
+ h = self.get_instance()
+ if timeout is not None:
+ return h.wait_event(events, timeout=timeout)
+ else:
+ return h.wait_event(events)
+
+ def set_old_peer_addr(self, peer_addr=None):
+ """Sets the peer address"""
+ if peer_addr is not None:
+ self.old_peer_addr = peer_addr
+ else:
+ self.old_peer_addr = self.get_actual_peer_addr()
+
+ def set_new_peer_addr(self, peer_addr=None):
+ """Sets the peer address"""
+ if peer_addr is not None:
+ self.new_peer_addr = peer_addr
+ else:
+ self.new_peer_addr = self.get_actual_peer_addr()
+
+ def add_peer(self, obj, old_peer_addr=None, new_peer_addr=None):
+ """Add peer for FST session(s). 'obj' is a FstDevice subclass object.
+ The method must be called before add_session().
+ If peer_addr is not specified, the address of the currently connected
+ station is used."""
+ if not isinstance(obj, FstDevice):
+ raise Exception("Peer must be a FstDevice object")
+ self.peer_obj = obj
+ self.set_old_peer_addr(old_peer_addr)
+ self.set_new_peer_addr(new_peer_addr)
+
+ def get_peer(self):
+ """Returns peer object"""
+ return self.peer_obj
+
+ def set_fst_parameters(self, group_id=None, pri=None, llt=None):
+ """Change/set new FST parameters. Can be used to start FST sessions with
+ different FST parameters than defined in the configuration file."""
+ if group_id is not None:
+ self.fst_group = group_id
+ if pri is not None:
+ self.fst_pri = pri
+ if llt is not None:
+ self.fst_llt = llt
+
+ def get_local_mbies(self, ifname=None):
+ if_name = ifname if ifname is not None else self.iface
+ return self.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name)
+
+ def add_session(self):
+ """Adds an FST session. add_peer() must be called calling this
+ function"""
+ if self.peer_obj is None:
+ raise Exception("Peer wasn't added before starting session")
+ grp = ' ' + self.fst_group if self.fst_group != '' else ''
+ sid = self.grequest("FST-MANAGER SESSION_ADD" + grp)
+ sid = sid.strip()
+ if sid.startswith("FAIL"):
+ raise Exception("Cannot add FST session with groupid ==" + grp)
+ return sid
+
+ def set_session_param(self, params):
+ request = "FST-MANAGER SESSION_SET"
+ if params is not None and params != '':
+ request = request + ' ' + params
+ return self.grequest(request)
+
+ def configure_session(self, sid, new_iface, old_iface = None):
+ """Calls session_set for a number of parameters some of which are stored
+ in "self" while others are passed to this function explicitly. If
+ old_iface is None, current iface is used; if old_iface is an empty
+ string."""
+ oldiface = old_iface if old_iface is not None else self.iface
+ s = self.set_session_param(sid + ' old_ifname=' + oldiface)
+ if not s.startswith("OK"):
+ raise Exception("Cannot set FST session old_ifname: " + s)
+ if new_iface is not None:
+ s = self.set_session_param(sid + " new_ifname=" + new_iface)
+ if not s.startswith("OK"):
+ raise Exception("Cannot set FST session new_ifname:" + s)
+ if self.new_peer_addr is not None and self.new_peer_addr != '':
+ s = self.set_session_param(sid + " new_peer_addr=" + self.new_peer_addr)
+ if not s.startswith("OK"):
+ raise Exception("Cannot set FST session peer address:" + s + " (new)")
+ if self.old_peer_addr is not None and self.old_peer_addr != '':
+ s = self.set_session_param(sid + " old_peer_addr=" + self.old_peer_addr)
+ if not s.startswith("OK"):
+ raise Exception("Cannot set FST session peer address:" + s + " (old)")
+ if self.fst_llt is not None and self.fst_llt != '':
+ s = self.set_session_param(sid + " llt=" + self.fst_llt)
+ if not s.startswith("OK"):
+ raise Exception("Cannot set FST session llt:" + s)
+
+ def send_iface_attach_request(self, ifname, group, llt, priority):
+ request = "FST-ATTACH " + ifname + ' ' + group
+ if llt is not None:
+ request += " llt=" + llt
+ if priority is not None:
+ request += " priority=" + priority
+ res = self.grequest(request)
+ if not res.startswith("OK"):
+ raise Exception("Cannot attach FST iface: " + res)
+
+ def send_iface_detach_request(self, ifname):
+ res = self.grequest("FST-DETACH " + ifname)
+ if not res.startswith("OK"):
+ raise Exception("Cannot detach FST iface: " + res)
+
+ def send_session_setup_request(self, sid):
+ s = self.grequest("FST-MANAGER SESSION_INITIATE " + sid)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send setup request: %s" % s)
+ return s
+
+ def send_session_setup_response(self, sid, response):
+ request = "FST-MANAGER SESSION_RESPOND " + sid + " " + response
+ s = self.grequest(request)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send setup response: %s" % s)
+ return s
+
+ def send_test_session_setup_request(self, fsts_id,
+ additional_parameter = None):
+ request = "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id
+ if additional_parameter is not None:
+ request += " " + additional_parameter
+ s = self.grequest(request)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send FST setup request: %s" % s)
+ return s
+
+ def send_test_session_setup_response(self, fsts_id,
+ response, additional_parameter = None):
+ request = "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id + " " + response
+ if additional_parameter is not None:
+ request += " " + additional_parameter
+ s = self.grequest(request)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send FST setup response: %s" % s)
+ return s
+
+ def send_test_ack_request(self, fsts_id):
+ s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send FST ack request: %s" % s)
+ return s
+
+ def send_test_ack_response(self, fsts_id):
+ s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send FST ack response: %s" % s)
+ return s
+
+ def send_test_tear_down(self, fsts_id):
+ s = self.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id)
+ if not s.startswith('OK'):
+ raise Exception("Cannot send FST tear down: %s" % s)
+ return s
+
+ def get_fsts_id_by_sid(self, sid):
+ s = self.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid)
+ if s == ' ' or s.startswith('FAIL'):
+ raise Exception("Cannot get fsts_id for sid == " % sid)
+ return int(s)
+
+ def wait_for_iface_event(self, timeout):
+ while True:
+ ev = self.wait_gevent(["FST-EVENT-IFACE"], timeout)
+ if ev is None:
+ raise Exception("No FST-EVENT-IFACE received")
+ event = parse_fst_iface_event(ev)
+ if event is None:
+ # We can't parse so it's not our event, wait for next one
+ continue
+ return event
+
+ def wait_for_session_event(self, timeout, events_to_ignore=[],
+ events_to_count=[]):
+ while True:
+ ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION received")
+ event = parse_fst_session_event(ev)
+ if event is None:
+ # We can't parse so it's not our event, wait for next one
+ continue
+ if len(events_to_ignore) > 0:
+ if event['type'] in events_to_ignore:
+ continue
+ elif len(events_to_count) > 0:
+ if not event['type'] in events_to_count:
+ continue
+ return event
+
+ def initiate_session(self, sid, response="accept"):
+ """Initiates FST session with given session id 'sid'.
+ 'response' is the session respond answer: "accept", "reject", or a
+ special "timeout" value to skip the response in order to test session
+ timeouts.
+ Returns: "OK" - session has been initiated, otherwise the reason for the
+ reset: REASON_REJECT, REASON_STT."""
+ strsid = ' ' + sid if sid != '' else ''
+ s = self.grequest("FST-MANAGER SESSION_INITIATE"+ strsid)
+ if not s.startswith('OK'):
+ raise Exception("Cannot initiate fst session: %s" % s)
+ ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION received")
+ # We got FST event
+ event = parse_fst_session_event(ev)
+ if event == None:
+ raise Exception("Unrecognized FST event: " % ev)
+ if event['type'] != 'EVENT_FST_SETUP':
+ raise Exception("Expected FST_SETUP event, got: " + event['type'])
+ ev = self.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION received")
+ event = parse_fst_session_event(ev)
+ if event == None:
+ raise Exception("Unrecognized FST event: " % ev)
+ if event['type'] != 'EVENT_FST_SESSION_STATE':
+ raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
+ if event['new_state'] != "SETUP_COMPLETION":
+ raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state'])
+ if response == '':
+ return 'OK'
+ if response != "timeout":
+ s = self.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " " + response) # Or reject
+ if not s.startswith('OK'):
+ raise Exception("Error session_respond: %s" % s)
+ # Wait for EVENT_FST_SESSION_STATE events. We should get at least 2
+ # events. The 1st event will be EVENT_FST_SESSION_STATE
+ # old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be
+ # either EVENT_FST_ESTABLISHED with the session id or
+ # EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was
+ # reset, the reason field will tell why.
+ result = ''
+ while result == '':
+ ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
+ if ev is None:
+ break # No session event received
+ event = parse_fst_session_event(ev)
+ if event == None:
+ # We can't parse so it's not our event, wait for next one
+ continue
+ if event['type'] == 'EVENT_FST_ESTABLISHED':
+ result = "OK"
+ break
+ elif event['type'] == "EVENT_FST_SESSION_STATE":
+ if event['new_state'] == "INITIAL":
+ # Session was reset, the only reason to get back to initial
+ # state.
+ result = event['reason']
+ break
+ if result == '':
+ raise Exception("No event for session respond")
+ return result
+
+ def transfer_session(self, sid):
+ """Transfers the session. 'sid' is the session id. 'hsta' is the
+ station-responder object.
+ Returns: REASON_SWITCH - the session has been transferred successfully
+ or a REASON_... reported by the reset event."""
+ request = "FST-MANAGER SESSION_TRANSFER"
+ if sid != '':
+ request += ' ' + sid
+ s = self.grequest(request)
+ if not s.startswith('OK'):
+ raise Exception("Cannot transfer fst session: %s" % s)
+ result = ''
+ while result == '':
+ ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
+ if ev is None:
+ raise Exception("Missing session transfer event")
+ # We got FST event. We expect TRANSITION_CONFIRMED state and then
+ # INITIAL (reset) with the reason (e.g. "REASON_SWITCH").
+ # Right now we'll be waiting for the reset event and record the
+ # reason.
+ event = parse_fst_session_event(ev)
+ if event == None:
+ raise Exception("Unrecognized FST event: " % ev)
+ if event['new_state'] == 'INITIAL':
+ result = event['reason']
+ return result
+
+ def wait_for_tear_down(self):
+ ev = self.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
+ if ev is None:
+ raise Exception("No FST-EVENT-SESSION received")
+ # We got FST event
+ event = parse_fst_session_event(ev)
+ if event == None:
+ raise Exception("Unrecognized FST event: " % ev)
+ if event['type'] != 'EVENT_FST_SESSION_STATE':
+ raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
+ if event['new_state'] != "INITIAL":
+ raise Exception("Expected new state INITIAL, got: " + event['new_state'])
+ if event['reason'] != 'REASON_TEARDOWN':
+ raise Exception("Expected reason REASON_TEARDOWN, got: " + event['reason'])
+
+ def teardown_session(self, sid):
+ """Tears down FST session with a given session id ('sid')"""
+ strsid = ' ' + sid if sid != '' else ''
+ s = self.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid)
+ if not s.startswith('OK'):
+ raise Exception("Cannot tear down fst session: %s" % s)
+ self.peer_obj.wait_for_tear_down()
+
+
+ def remove_session(self, sid, wait_for_tear_down=True):
+ """Removes FST session with a given session id ('sid')"""
+ strsid = ' ' + sid if sid != '' else ''
+ s = self.grequest("FST-MANAGER SESSION_REMOVE" + strsid)
+ if not s.startswith('OK'):
+ raise Exception("Cannot remove fst session: %s" % s)
+ if wait_for_tear_down == True:
+ self.peer_obj.wait_for_tear_down()
+
+ def remove_all_sessions(self):
+ """Removes FST session with a given session id ('sid')"""
+ grp = ' ' + self.fst_group if self.fst_group != '' else ''
+ s = self.grequest("FST-MANAGER LIST_SESSIONS" + grp)
+ if not s.startswith('FAIL'):
+ for sid in s.splitlines():
+ sid = sid.strip()
+ if len(sid) != 0:
+ self.remove_session(sid, wait_for_tear_down=False)
+
+
+#
+# FstAP class
+#
+class FstAP (FstDevice):
+ def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
+ fst_llt=None):
+ """If fst_group is empty, then FST parameters will not be set
+ If fst_llt is empty, the parameter will not be set and the default value
+ is expected to be configured."""
+ self.ssid = ssid
+ self.mode = mode
+ self.chan = chan
+ self.reg_ctrl = fst_test_common.HapdRegCtrl()
+ self.reg_ctrl.add_ap(iface, self.chan)
+ self.global_instance = hostapd.HostapdGlobal()
+ FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt)
+
+ def start(self):
+ """Starts AP the "standard" way as it was intended by hostapd tests.
+ This will work only when FST supports fully dynamically loading
+ parameters in hostapd."""
+ params = {}
+ params['ssid'] = self.ssid
+ params['hw_mode'] = self.mode
+ params['channel'] = self.chan
+ params['country_code'] = 'US'
+ self.hapd=hostapd.add_ap(self.iface, params)
+ if not self.hapd.ping():
+ raise Exception("Could not ping FST hostapd")
+ self.reg_ctrl.start()
+ self.get_global_instance()
+ if len(self.fst_group) != 0:
+ self.send_iface_attach_request(self.iface, self.fst_group,
+ self.fst_llt, self.fst_pri)
+ return self.hapd
+
+ def stop(self):
+ """Removes the AP, To be used when dynamic fst APs are implemented in
+ hostapd."""
+ if len(self.fst_group) != 0:
+ self.remove_all_sessions()
+ self.send_iface_detach_request(self.iface)
+ self.reg_ctrl.stop()
+ del self.global_instance
+ self.global_instance = None
+
+ def get_instance(self):
+ """Return the Hostapd/WpaSupplicant instance"""
+ if self.instance is None:
+ self.instance = hostapd.Hostapd(self.iface)
+ return self.instance
+
+ def get_global_instance(self):
+ return self.global_instance
+
+ def get_own_mac_address(self):
+ """Gets the device's own MAC address"""
+ h = self.get_instance()
+ status = h.get_status()
+ return status['bssid[0]']
+
+ def get_actual_peer_addr(self):
+ """Gets the peer address. A connected station address is returned."""
+ # Use the device instance, the global control interface doesn't have
+ # station address
+ h = self.get_instance()
+ sta = h.get_sta(None)
+ if sta is None or 'addr' not in sta:
+ # Maybe station is not connected?
+ addr = None
+ else:
+ addr=sta['addr']
+ return addr
+
+ def grequest(self, req):
+ """Send request on the global control interface"""
+ logger.debug("FstAP::grequest: " + req)
+ h = self.get_global_instance()
+ return h.request(req)
+
+ def wait_gevent(self, events, timeout=None):
+ """Wait for a list of events on the global interface"""
+ h = self.get_global_instance()
+ if timeout is not None:
+ return h.wait_event(events, timeout=timeout)
+ else:
+ return h.wait_event(events)
+
+ def get_ssid(self):
+ return self.ssid
+
+#
+# FstSTA class
+#
+class FstSTA (FstDevice):
+ def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
+ """If fst_group is empty, then FST parameters will not be set
+ If fst_llt is empty, the parameter will not be set and the default value
+ is expected to be configured."""
+ FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt)
+ self.connected = None # FstAP object the station is connected to
+
+ def start(self):
+ """Current implementation involves running another instance of
+ wpa_supplicant with fixed FST STAs configurations. When any type of
+ dynamic STA loading is implemented, rewrite the function similarly to
+ FstAP."""
+ h = self.get_instance()
+ h.interface_add(self.iface, drv_params="force_connect_cmd=1")
+ if not h.global_ping():
+ raise Exception("Could not ping FST wpa_supplicant")
+ if len(self.fst_group) != 0:
+ self.send_iface_attach_request(self.iface, self.fst_group,
+ self.fst_llt, self.fst_pri)
+ return None
+
+ def stop(self):
+ """Removes the STA. In a static (temporary) implementation does nothing,
+ the STA will be removed when the fst wpa_supplicant process is killed by
+ fstap.cleanup()."""
+ h = self.get_instance()
+ if len(self.fst_group) != 0:
+ self.remove_all_sessions()
+ self.send_iface_detach_request(self.iface)
+ h.interface_remove(self.iface)
+ h.close_ctrl()
+ del h
+ self.instance = None
+
+ def get_instance(self):
+ """Return the Hostapd/WpaSupplicant instance"""
+ if self.instance is None:
+ self.instance = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ return self.instance
+
+ def get_own_mac_address(self):
+ """Gets the device's own MAC address"""
+ h = self.get_instance()
+ status = h.get_status()
+ return status['address']
+
+ def get_actual_peer_addr(self):
+ """Gets the peer address. A connected station address is returned"""
+ h = self.get_instance()
+ status = h.get_status()
+ return status['bssid']
+
+ def grequest(self, req):
+ """Send request on the global control interface"""
+ logger.debug("FstSTA::grequest: " + req)
+ h = self.get_instance()
+ return h.global_request(req)
+
+ def wait_gevent(self, events, timeout=None):
+ """Wait for a list of events on the global interface"""
+ h = self.get_instance()
+ if timeout is not None:
+ return h.wait_global_event(events, timeout=timeout)
+ else:
+ return h.wait_global_event(events)
+
+ def scan(self, freq=None, no_wait=False, only_new=False):
+ """Issue Scan with given parameters. Returns the BSS dictionary for the
+ AP found (the 1st BSS found. TODO: What if the AP required is not the
+ 1st in list?) or None if no BSS found. None call be also a result of
+ no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the
+ results at once."""
+ h = self.get_instance()
+ h.scan(None, freq, no_wait, only_new)
+ r = h.get_bss('0')
+ return r
+
+ def connect(self, ap, **kwargs):
+ """Connects to the given AP"""
+ if not isinstance(ap, FstAP):
+ raise Exception("Bad AP object to connect to")
+ h = self.get_instance()
+ hap = ap.get_instance()
+ h.connect(ap.get_ssid(), **kwargs)
+ self.connected = ap
+
+ def connect_to_external_ap(self, ap, ssid, check_connection=True, **kwargs):
+ """Connects to the given external AP"""
+ if not isinstance(ap, hostapd.Hostapd):
+ raise Exception("Bad AP object to connect to")
+ h = self.get_instance()
+ h.connect(ssid, **kwargs)
+ self.connected = ap
+ if check_connection:
+ ev = ap.wait_event([ "AP-STA-CONNECTED" ], timeout=10)
+ if ev is None:
+ self.connected = None
+ raise Exception("No connection event received from %s" % ssid)
+
+ def disconnect(self, check_disconnect=True):
+ """Disconnects from the AP the station is currently connected to"""
+ if self.connected is not None:
+ h = self.get_instance()
+ h.request("DISCONNECT")
+ if check_disconnect:
+ hap = self.connected.get_instance()
+ ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
+ if ev is None:
+ raise Exception("No disconnection event received from %s" % self.connected.get_ssid())
+ self.connected = None
+
+
+ def disconnect_from_external_ap(self, check_disconnect=True):
+ """Disconnects from the external AP the station is currently connected
+ to"""
+ if self.connected is not None:
+ h = self.get_instance()
+ h.request("DISCONNECT")
+ if check_disconnect:
+ hap = self.connected
+ ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
+ if ev is None:
+ raise Exception("No disconnection event received from AP")
+ self.connected = None
--- /dev/null
+# FST configuration tests
+# Copyright (c) 2015, Qualcomm Atheros, Inc.
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import logging
+logger = logging.getLogger()
+import subprocess
+import time
+import os
+import signal
+import hostapd
+import wpasupplicant
+import utils
+
+import fst_test_common
+
+class FstLauncherConfig:
+ """FstLauncherConfig class represents configuration to be used for
+ FST config tests related hostapd/wpa_supplicant instances"""
+ def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
+ self.iface = iface
+ self.fst_group = fst_group
+ self.fst_pri = fst_pri
+ self.fst_llt = fst_llt # None llt means no llt parameter will be set
+
+ def ifname(self):
+ return self.iface
+
+ def is_ap(self):
+ """Returns True if the configuration is for AP, otherwise - False"""
+ raise Exception("Virtual is_ap() called!")
+
+ def to_file(self, pathname):
+ """Creates configuration file to be used by FST config tests related
+ hostapd/wpa_supplicant instances"""
+ raise Exception("Virtual to_file() called!")
+
+class FstLauncherConfigAP(FstLauncherConfig):
+ """FstLauncherConfigAP class represents configuration to be used for
+ FST config tests related hostapd instance"""
+ def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
+ fst_llt=None):
+ self.ssid = ssid
+ self.mode = mode
+ self.chan = chan
+ FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
+
+ def is_ap(self):
+ return True
+
+ def get_channel(self):
+ return self.chan
+
+ def to_file(self, pathname):
+ """Creates configuration file to be used by FST config tests related
+ hostapd instance"""
+ with open(pathname, "w") as f:
+ f.write("country_code=US\n"
+ "interface=%s\n"
+ "ctrl_interface=/var/run/hostapd\n"
+ "ssid=%s\n"
+ "channel=%s\n"
+ "hw_mode=%s\n"
+ "ieee80211n=1\n" % (self.iface, self.ssid, self.chan,
+ self.mode))
+ if len(self.fst_group) != 0:
+ f.write("fst_group_id=%s\n"
+ "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
+ if self.fst_llt is not None:
+ f.write("fst_llt=%s\n" % self.fst_llt)
+ with open(pathname, "r") as f:
+ logger.debug("wrote hostapd config file %s:\n%s" % (pathname,
+ f.read()))
+
+class FstLauncherConfigSTA(FstLauncherConfig):
+ """FstLauncherConfig class represents configuration to be used for
+ FST config tests related wpa_supplicant instance"""
+ def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
+ FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
+
+ def is_ap(self):
+ return False
+
+ def to_file(self, pathname):
+ """Creates configuration file to be used by FST config tests related
+ wpa_supplicant instance"""
+ with open(pathname, "w") as f:
+ f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n"
+ "p2p_no_group_iface=1\n")
+ if len(self.fst_group) != 0:
+ f.write("fst_group_id=%s\n"
+ "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
+ if self.fst_llt is not None:
+ f.write("fst_llt=%s\n" % self.fst_llt)
+ with open(pathname, "r") as f:
+ logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read()))
+
+class FstLauncher:
+ """FstLauncher class is responsible for launching and cleaning up of FST
+ config tests related hostapd/wpa_supplicant instances"""
+ def __init__(self, logpath):
+ self.logger = logging.getLogger()
+ self.fst_logpath = logpath
+ self.cfgs_to_run = []
+ self.hapd_fst_global = '/var/run/hostapd-fst-global'
+ self.wsup_fst_global = '/tmp/fststa'
+ self.nof_aps = 0
+ self.nof_stas = 0
+ self.reg_ctrl = fst_test_common.HapdRegCtrl()
+ self.test_is_supported()
+
+ def __del__(self):
+ self.cleanup()
+
+ @staticmethod
+ def test_is_supported():
+ h = hostapd.HostapdGlobal()
+ resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
+ if not resp.startswith("OK"):
+ raise utils.HwsimSkip("FST not supported")
+ w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
+ if not resp.startswith("OK"):
+ raise utils.HwsimSkip("FST not supported")
+
+ def get_cfg_pathname(self, cfg):
+ """Returns pathname of ifname based configuration file"""
+ return self.fst_logpath +'/'+ cfg.ifname() + '.conf'
+
+ def add_cfg(self, cfg):
+ """Adds configuration to be used for launching hostapd/wpa_supplicant
+ instances"""
+ if cfg not in self.cfgs_to_run:
+ self.cfgs_to_run.append(cfg)
+ if cfg.is_ap() == True:
+ self.nof_aps += 1
+ else:
+ self.nof_stas += 1
+
+ def remove_cfg(self, cfg):
+ """Removes configuration previously added with add_cfg"""
+ if cfg in self.cfgs_to_run:
+ self.cfgs_to_run.remove(cfg)
+ if cfg.is_ap() == True:
+ self.nof_aps -= 1
+ else:
+ self.nof_stas -= 1
+ config_file = self.get_cfg_pathname(cfg);
+ if os.path.exists(config_file):
+ os.remove(config_file)
+
+ def run_hostapd(self):
+ """Lauches hostapd with interfaces configured according to
+ FstLauncherConfigAP configurations added"""
+ if self.nof_aps == 0:
+ raise Exception("No FST APs to start")
+ pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
+ mylogfile = self.fst_logpath + '/' + 'fst-hostapd'
+ cmd = [ '../../hostapd/hostapd', '-B', '-ddd',
+ '-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global]
+ for i in range(0, len(self.cfgs_to_run)):
+ cfg = self.cfgs_to_run[i]
+ if cfg.is_ap() == True:
+ cfgfile = self.get_cfg_pathname(cfg)
+ cfg.to_file(cfgfile)
+ cmd.append(cfgfile)
+ self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel())
+ self.logger.debug("Starting fst hostapd: " + ' '.join(cmd))
+ res = subprocess.call(cmd)
+ self.logger.debug("fst hostapd start result: %d" % res)
+ if res == 0:
+ self.reg_ctrl.start()
+ return res
+
+ def run_wpa_supplicant(self):
+ """Lauches wpa_supplicant with interfaces configured according to
+ FstLauncherConfigSTA configurations added"""
+ if self.nof_stas == 0:
+ raise Exception("No FST STAs to start")
+ pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
+ mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant'
+ cmd = [ '../../wpa_supplicant/wpa_supplicant', '-B', '-ddd',
+ '-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global ]
+ sta_no = 0
+ for i in range(0, len(self.cfgs_to_run)):
+ cfg = self.cfgs_to_run[i]
+ if cfg.is_ap() == False:
+ cfgfile = self.get_cfg_pathname(cfg)
+ cfg.to_file(cfgfile)
+ cmd.append('-c' + cfgfile)
+ cmd.append('-i' + cfg.ifname())
+ cmd.append('-Dnl80211')
+ if sta_no != self.nof_stas -1:
+ cmd.append('-N') # Next station configuration
+ sta_no += 1
+ self.logger.debug("Starting fst supplicant: " + ' '.join(cmd))
+ res = subprocess.call(cmd)
+ self.logger.debug("fst supplicant start result: %d" % res)
+ return res
+
+ def cleanup(self):
+ """Terminates hostapd/wpa_supplicant processes previously launched with
+ run_hostapd/run_wpa_supplicant"""
+ pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
+ self.kill_pid(pidfile)
+ pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
+ self.kill_pid(pidfile)
+ self.reg_ctrl.stop()
+ while len(self.cfgs_to_run) != 0:
+ cfg = self.cfgs_to_run[0]
+ self.remove_cfg(cfg)
+
+ def kill_pid(self, pidfile):
+ """Kills process by PID file"""
+ if not os.path.exists(pidfile):
+ return
+ pid = -1
+ try:
+ pf = file(pidfile, 'r')
+ pid = int(pf.read().strip())
+ pf.close()
+ self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid))
+ os.kill(pid, signal.SIGTERM)
+ for i in range(10):
+ try:
+ # Poll the pid (Is the process still existing?)
+ os.kill(pid, 0)
+ except OSError:
+ # No, already done
+ break
+ # Wait and check again
+ time.sleep(1)
+ except Exception, e:
+ self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e)))
+
+
+def parse_ies(iehex, el=-1):
+ """Parses the information elements hex string 'iehex' in format
+ "0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity.
+ If 'el' is defined returns the list of hex values of the specific IE (or
+ empty list if the element is not in the string."""
+ iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)]
+ for i in range(0, len(iel)):
+ iel[i] = int(iel[i], 16)
+ # Sanity check
+ i = 0
+ res = []
+ while i < len(iel):
+ logger.debug("IE found: %x" % iel[i])
+ if el != -1 and el == iel[i]:
+ res = iel[i + 2:i + 2 + iel[i + 1]]
+ i += 2 + iel[i + 1]
+ if i != len(iel):
+ logger.error("Bad IE string: " + iehex)
+ res = []
+ return res
+
+def scan_and_get_bss(dev, frq):
+ """Issues a scan on given device on given frequency, returns the bss info
+ dictionary ('ssid','ie','flags', etc.) or None. Note, the function
+ implies there is only one AP on the given channel. If not a case,
+ the function must be changed to call dev.get_bss() till the AP with the
+ [b]ssid that we need is found"""
+ dev.scan(freq=frq)
+ return dev.get_bss('0')
+
+
+# AP configuration tests
+
+def run_test_ap_configuration(apdev, test_params,
+ fst_group = fst_test_common.fst_test_def_group,
+ fst_pri = fst_test_common.fst_test_def_prio_high,
+ fst_llt = fst_test_common.fst_test_def_llt):
+ """Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst
+ configuration is provided by the parameters. Returns the result of the run:
+ 0 - no errors discovered, an error otherwise. The function is used for
+ simplek "bad configuration" tests."""
+ logdir = test_params['logdir']
+ fst_launcher = FstLauncher(logdir)
+ ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a',
+ fst_test_common.fst_test_def_chan_a,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low,
+ fst_test_common.fst_test_def_llt)
+ ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b',
+ fst_test_common.fst_test_def_chan_g, fst_group,
+ fst_pri, fst_llt)
+ fst_launcher.add_cfg(ap1)
+ fst_launcher.add_cfg(ap2)
+ res = fst_launcher.run_hostapd()
+ return res
+
+def run_test_sta_configuration(test_params,
+ fst_group = fst_test_common.fst_test_def_group,
+ fst_pri = fst_test_common.fst_test_def_prio_high,
+ fst_llt = fst_test_common.fst_test_def_llt):
+ """Runs FST wpa_supplicant where the 1st STA configuration is fixed, the
+ 2nd fst configuration is provided by the parameters. Returns the result of
+ the run: 0 - no errors discovered, an error otherwise. The function is used
+ for simple "bad configuration" tests."""
+ logdir = test_params['logdir']
+ fst_launcher = FstLauncher(logdir)
+ sta1 = FstLauncherConfigSTA('wlan5',
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low,
+ fst_test_common.fst_test_def_llt)
+ sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt)
+ fst_launcher.add_cfg(sta1)
+ fst_launcher.add_cfg(sta2)
+ res = fst_launcher.run_wpa_supplicant()
+ return res
+
+def test_fst_ap_config_llt_neg(dev, apdev, test_params):
+ """FST AP configuration negative LLT"""
+ res = run_test_ap_configuration(apdev, test_params, fst_llt = '-1')
+ if res == 0:
+ raise Exception("hostapd started with a negative llt")
+
+def test_fst_ap_config_llt_zero(dev, apdev, test_params):
+ """FST AP configuration zero LLT"""
+ res = run_test_ap_configuration(apdev, test_params, fst_llt = '0')
+ if res == 0:
+ raise Exception("hostapd started with a zero llt")
+
+def test_fst_ap_config_llt_too_big(dev, apdev, test_params):
+ """FST AP configuration LLT is too big"""
+ res = run_test_ap_configuration(apdev, test_params,
+ fst_llt = '4294967296') #0x100000000
+ if res == 0:
+ raise Exception("hostapd started with llt that is too big")
+
+def test_fst_ap_config_llt_nan(dev, apdev, test_params):
+ """FST AP configuration LLT is not a number"""
+ res = run_test_ap_configuration(apdev, test_params, fst_llt = 'nan')
+ if res == 0:
+ raise Exception("hostapd started with llt not a number")
+
+def test_fst_ap_config_pri_neg(dev, apdev, test_params):
+ """FST AP configuration Priority negative"""
+ res = run_test_ap_configuration(apdev, test_params, fst_pri = '-1')
+ if res == 0:
+ raise Exception("hostapd started with a negative fst priority")
+
+def test_fst_ap_config_pri_zero(dev, apdev, test_params):
+ """FST AP configuration Priority zero"""
+ res = run_test_ap_configuration(apdev, test_params, fst_pri = '0')
+ if res == 0:
+ raise Exception("hostapd started with a zero fst priority")
+
+def test_fst_ap_config_pri_large(dev, apdev, test_params):
+ """FST AP configuration Priority too large"""
+ res = run_test_ap_configuration(apdev, test_params, fst_pri = '256')
+ if res == 0:
+ raise Exception("hostapd started with too large fst priority")
+
+def test_fst_ap_config_pri_nan(dev, apdev, test_params):
+ """FST AP configuration Priority not a number"""
+ res = run_test_ap_configuration(apdev, test_params, fst_pri = 'nan')
+ if res == 0:
+ raise Exception("hostapd started with fst priority not a number")
+
+def test_fst_ap_config_group_len(dev, apdev, test_params):
+ """FST AP configuration Group max length"""
+ res = run_test_ap_configuration(apdev, test_params,
+ fst_group = 'fstg5678abcd34567')
+ if res == 0:
+ raise Exception("hostapd started with fst_group length too big")
+
+def test_fst_ap_config_good(dev, apdev, test_params):
+ """FST AP configuration good parameters"""
+ res = run_test_ap_configuration(apdev, test_params)
+ if res != 0:
+ raise Exception("hostapd didn't start with valid config parameters")
+
+def test_fst_ap_config_default(dev, apdev, test_params):
+ """FST AP configuration default parameters"""
+ res = run_test_ap_configuration(apdev, test_params, fst_llt = None)
+ if res != 0:
+ raise Exception("hostapd didn't start with valid config parameters")
+
+
+# STA configuration tests
+
+def test_fst_sta_config_llt_neg(dev, apdev, test_params):
+ """FST STA configuration negative LLT"""
+ res = run_test_sta_configuration(test_params, fst_llt = '-1')
+ if res == 0:
+ raise Exception("wpa_supplicant started with a negative llt")
+
+def test_fst_sta_config_llt_zero(dev, apdev, test_params):
+ """FST STA configuration zero LLT"""
+ res = run_test_sta_configuration(test_params, fst_llt = '0')
+ if res == 0:
+ raise Exception("wpa_supplicant started with a zero llt")
+
+def test_fst_sta_config_llt_large(dev, apdev, test_params):
+ """FST STA configuration LLT is too large"""
+ res = run_test_sta_configuration(test_params,
+ fst_llt = '4294967296') #0x100000000
+ if res == 0:
+ raise Exception("wpa_supplicant started with llt that is too large")
+
+def test_fst_sta_config_llt_nan(dev, apdev, test_params):
+ """FST STA configuration LLT is not a number"""
+ res = run_test_sta_configuration(test_params, fst_llt = 'nan')
+ if res == 0:
+ raise Exception("wpa_supplicant started with llt not a number")
+
+def test_fst_sta_config_pri_neg(dev, apdev, test_params):
+ """FST STA configuration Priority negative"""
+ res = run_test_sta_configuration(test_params, fst_pri = '-1')
+ if res == 0:
+ raise Exception("wpa_supplicant started with a negative fst priority")
+
+def test_fst_sta_config_pri_zero(dev, apdev, test_params):
+ """FST STA configuration Priority zero"""
+ res = run_test_sta_configuration(test_params, fst_pri = '0')
+ if res == 0:
+ raise Exception("wpa_supplicant started with a zero fst priority")
+
+def test_fst_sta_config_pri_big(dev, apdev, test_params):
+ """FST STA configuration Priority too large"""
+ res = run_test_sta_configuration(test_params, fst_pri = '256')
+ if res == 0:
+ raise Exception("wpa_supplicant started with too large fst priority")
+
+def test_fst_sta_config_pri_nan(dev, apdev, test_params):
+ """FST STA configuration Priority not a number"""
+ res = run_test_sta_configuration(test_params, fst_pri = 'nan')
+ if res == 0:
+ raise Exception("wpa_supplicant started with fst priority not a number")
+
+def test_fst_sta_config_group_len(dev, apdev, test_params):
+ """FST STA configuration Group max length"""
+ res = run_test_sta_configuration(test_params,
+ fst_group = 'fstg5678abcd34567')
+ if res == 0:
+ raise Exception("wpa_supplicant started with fst_group length too big")
+
+def test_fst_sta_config_good(dev, apdev, test_params):
+ """FST STA configuration good parameters"""
+ res = run_test_sta_configuration(test_params)
+ if res != 0:
+ raise Exception("wpa_supplicant didn't start with valid config parameters")
+
+def test_fst_sta_config_default(dev, apdev, test_params):
+ """FST STA configuration default parameters"""
+ res = run_test_sta_configuration(test_params, fst_llt = None)
+ if res != 0:
+ raise Exception("wpa_supplicant didn't start with valid config parameters")
+
+def test_fst_scan_mb(dev, apdev, test_params):
+ """FST scan valid MB IE presence with normal start"""
+ logdir = test_params['logdir']
+
+ # Test valid MB IE in scan results
+ fst_launcher = FstLauncher(logdir)
+ ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
+ fst_test_common.fst_test_def_chan_a,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_high)
+ ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b',
+ fst_test_common.fst_test_def_chan_g,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low)
+ fst_launcher.add_cfg(ap1)
+ fst_launcher.add_cfg(ap2)
+ res = fst_launcher.run_hostapd()
+ if res != 0:
+ raise Exception("hostapd didn't start properly")
+ try:
+ mbie1=[]
+ flags1 = ''
+ mbie2=[]
+ flags2 = ''
+ # Scan 1st AP
+ vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
+ if vals1 != None:
+ if 'ie' in vals1:
+ mbie1 = parse_ies(vals1['ie'], 0x9e)
+ if 'flags' in vals1:
+ flags1 = vals1['flags']
+ # Scan 2nd AP
+ vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g)
+ if vals2 != None:
+ if 'ie' in vals2:
+ mbie2 = parse_ies(vals2['ie'],0x9e)
+ if 'flags' in vals2:
+ flags2 = vals2['flags']
+ finally:
+ fst_launcher.cleanup()
+
+ if len(mbie1) == 0:
+ raise Exception("No MB IE created by 1st AP")
+ if len(mbie2) == 0:
+ raise Exception("No MB IE created by 2nd AP")
+
+def test_fst_scan_nomb(dev, apdev, test_params):
+ """FST scan no MB IE presence with 1 AP start"""
+ logdir = test_params['logdir']
+
+ # Test valid MB IE in scan results
+ fst_launcher = FstLauncher(logdir)
+ ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
+ fst_test_common.fst_test_def_chan_a,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_high)
+ fst_launcher.add_cfg(ap1)
+ res = fst_launcher.run_hostapd()
+ if res != 0:
+ raise Exception("Hostapd didn't start properly")
+ try:
+ time.sleep(2)
+ mbie1=[]
+ flags1 = ''
+ vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
+ if vals1 != None:
+ if 'ie' in vals1:
+ mbie1 = parse_ies(vals1['ie'], 0x9e)
+ if 'flags' in vals1:
+ flags1 = vals1['flags']
+ finally:
+ fst_launcher.cleanup()
+
+ if len(mbie1) != 0:
+ raise Exception("MB IE exists with 1 AP")
--- /dev/null
+# FST functionality tests
+# Copyright (c) 2015, Qualcomm Atheros, Inc.
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import logging
+logger = logging.getLogger()
+import subprocess
+import time
+import os
+
+import hwsim_utils
+from hwsim import HWSimRadio
+import hostapd
+import fst_test_common
+import fst_module_aux
+
+#enum - bad parameter types
+bad_param_none = 0
+bad_param_session_add_no_params = 1
+bad_param_group_id = 2
+bad_param_session_set_no_params = 3
+bad_param_session_set_unknown_param = 4
+bad_param_session_id = 5
+bad_param_old_iface = 6
+bad_param_new_iface = 7
+bad_param_negative_llt = 8
+bad_param_zero_llt = 9
+bad_param_llt_too_big = 10
+bad_param_llt_nan = 11
+bad_param_peer_addr = 12
+bad_param_session_initiate_no_params = 13
+bad_param_session_initiate_bad_session_id = 14
+bad_param_session_initiate_with_no_new_iface_set = 15
+bad_param_session_initiate_with_bad_peer_addr_set = 16
+bad_param_session_initiate_request_with_bad_stie = 17
+bad_param_session_initiate_response_with_reject = 18
+bad_param_session_initiate_response_with_bad_stie = 19
+bad_param_session_initiate_response_with_zero_llt = 20
+bad_param_session_initiate_stt_no_response = 21
+bad_param_session_initiate_concurrent_setup_request = 22
+bad_param_session_transfer_no_params = 23
+bad_param_session_transfer_bad_session_id = 24
+bad_param_session_transfer_setup_skipped = 25
+bad_param_session_teardown_no_params = 26
+bad_param_session_teardown_bad_session_id = 27
+bad_param_session_teardown_setup_skipped = 28
+bad_param_session_teardown_bad_fsts_id = 29
+
+bad_param_names = ("None",
+ "No params passed to session add",
+ "Group ID",
+ "No params passed to session set",
+ "Unknown param passed to session set",
+ "Session ID",
+ "Old interface name",
+ "New interface name",
+ "Negative LLT",
+ "Zero LLT",
+ "LLT too big",
+ "LLT is not a number",
+ "Peer address",
+ "No params passed to session initiate",
+ "Session ID",
+ "No new_iface was set",
+ "Peer address",
+ "Request with bad st ie",
+ "Response with reject",
+ "Response with bad st ie",
+ "Response with zero llt",
+ "No response, STT",
+ "Concurrent setup request",
+ "No params passed to session transfer",
+ "Session ID",
+ "Session setup skipped",
+ "No params passed to session teardown",
+ "Bad session",
+ "Session setup skipped",
+ "Bad fsts_id")
+
+def fst_start_session(apdev, test_params, bad_param_type, start_on_ap,
+ peer_addr = None):
+ """This function makes the necessary preparations and the adds and sets a
+ session using either correct or incorrect parameters depending on the value
+ of bad_param_type. If the call ends as expected (with session being
+ successfully added and set in case of correct parameters or with the
+ expected exception in case of incorrect parameters), the function silently
+ exits. Otherwise, it throws an exception thus failing the test."""
+
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ bad_parameter_detected = False
+ exception_already_raised = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if start_on_ap:
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ else:
+ initiator = sta1
+ responder = ap1
+ new_iface = sta2.ifname()
+ new_peer_addr = sta2.get_actual_peer_addr()
+ initiator.add_peer(responder, peer_addr, new_peer_addr)
+ group_id = None
+ if bad_param_type == bad_param_group_id:
+ group_id = '-1'
+ elif bad_param_type == bad_param_session_add_no_params:
+ group_id = ''
+ initiator.set_fst_parameters(group_id=group_id)
+ sid = initiator.add_session()
+ if bad_param_type == bad_param_session_set_no_params:
+ res = initiator.set_session_param(None)
+ if not res.startswith("OK"):
+ raise Exception("Session set operation failed")
+ elif bad_param_type == bad_param_session_set_unknown_param:
+ res = initiator.set_session_param("bad_param=1")
+ if not res.startswith("OK"):
+ raise Exception("Session set operation failed")
+ else:
+ if bad_param_type == bad_param_session_initiate_with_no_new_iface_set:
+ new_iface = None
+ elif bad_param_type == bad_param_new_iface:
+ new_iface = 'wlan12'
+ old_iface = None if bad_param_type != bad_param_old_iface else 'wlan12'
+ llt = None
+ if bad_param_type == bad_param_negative_llt:
+ llt = '-1'
+ elif bad_param_type == bad_param_zero_llt:
+ llt = '0'
+ elif bad_param_type == bad_param_llt_too_big:
+ llt = '4294967296' #0x100000000
+ elif bad_param_type == bad_param_llt_nan:
+ llt = 'nan'
+ elif bad_param_type == bad_param_session_id:
+ sid = '-1'
+ initiator.set_fst_parameters(llt=llt)
+ initiator.configure_session(sid, new_iface, old_iface)
+ except Exception, e:
+ if e.args[0].startswith("Cannot add FST session with groupid"):
+ if bad_param_type == bad_param_group_id or bad_param_type == bad_param_session_add_no_params:
+ bad_parameter_detected = True
+ elif e.args[0].startswith("Cannot set FST session new_ifname:"):
+ if bad_param_type == bad_param_new_iface:
+ bad_parameter_detected = True
+ elif e.args[0].startswith("Session set operation failed"):
+ if (bad_param_type == bad_param_session_set_no_params or
+ bad_param_type == bad_param_session_set_unknown_param):
+ bad_parameter_detected = True
+ elif e.args[0].startswith("Cannot set FST session old_ifname:"):
+ if (bad_param_type == bad_param_old_iface or
+ bad_param_type == bad_param_session_id or
+ bad_param_type == bad_param_session_set_no_params):
+ bad_parameter_detected = True
+ elif e.args[0].startswith("Cannot set FST session llt:"):
+ if (bad_param_type == bad_param_negative_llt or
+ bad_param_type == bad_param_llt_too_big or
+ bad_param_type == bad_param_llt_nan):
+ bad_parameter_detected = True
+ elif e.args[0].startswith("Cannot set FST session peer address:"):
+ if bad_param_type == bad_param_peer_addr:
+ bad_parameter_detected = True
+ if not bad_parameter_detected:
+ # The exception was unexpected
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if bad_parameter_detected:
+ logger.info("Success. Bad parameter was detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ if bad_param_type == bad_param_none or bad_param_type == bad_param_zero_llt:
+ logger.info("Success. Session added and set")
+ else:
+ exception_text = ""
+ if bad_param_type == bad_param_peer_addr:
+ exception_text = "Failure. Bad parameter was not detected (Peer address == %s)" % ap1.get_new_peer_addr()
+ else:
+ exception_text = "Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type]
+ raise Exception(exception_text)
+ else:
+ print "Failure. Unexpected exception"
+
+def fst_initiate_session(apdev, test_params, bad_param_type, init_on_ap):
+ """This function makes the necessary preparations and then adds, sets and
+ initiates a session using either correct or incorrect parameters at each
+ stage depending on the value of bad_param_type. If the call ends as expected
+ (with session being successfully added, set and initiated in case of correct
+ parameters or with the expected exception in case of incorrect parameters),
+ the function silently exits. Otherwise it throws an exception thus failing
+ the test."""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ bad_parameter_detected = False
+ exception_already_raised = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ # This call makes sure FstHostapd singleton object is created and, as a
+ # result, the global control interface is registered (this is done from
+ # the constructor).
+ ap1.get_global_instance()
+ if init_on_ap:
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname() if bad_param_type != bad_param_session_initiate_with_no_new_iface_set else None
+ new_peer_addr = ap2.get_actual_peer_addr()
+ resp_newif = sta2.ifname()
+ else:
+ initiator = sta1
+ responder = ap1
+ new_iface = sta2.ifname() if bad_param_type != bad_param_session_initiate_with_no_new_iface_set else None
+ new_peer_addr = sta2.get_actual_peer_addr()
+ resp_newif = ap2.ifname()
+ peeraddr = None if bad_param_type != bad_param_session_initiate_with_bad_peer_addr_set else '10:DE:AD:DE:AD:11'
+ initiator.add_peer(responder, peeraddr, new_peer_addr)
+ if bad_param_type == bad_param_session_initiate_response_with_zero_llt:
+ initiator.set_fst_parameters(llt='0')
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ if bad_param_type == bad_param_session_initiate_no_params:
+ sid = ''
+ elif bad_param_type == bad_param_session_initiate_bad_session_id:
+ sid = '-1'
+ if bad_param_type == bad_param_session_initiate_request_with_bad_stie:
+ actual_fsts_id = initiator.get_fsts_id_by_sid(sid)
+ initiator.send_test_session_setup_request(str(actual_fsts_id), "bad_new_band")
+ responder.wait_for_session_event(5)
+ elif bad_param_type == bad_param_session_initiate_response_with_reject:
+ initiator.send_session_setup_request(sid)
+ initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ setup_event = responder.wait_for_session_event(5, [],
+ ['EVENT_FST_SETUP'])
+ if not 'id' in setup_event:
+ raise Exception("No session id in FST setup event")
+ responder.send_session_setup_response(str(setup_event['id']),
+ "reject")
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ if event['new_state'] != "INITIAL" or event['reason'] != "REASON_REJECT":
+ raise Exception("Response with reject not handled as expected")
+ bad_parameter_detected = True
+ elif bad_param_type == bad_param_session_initiate_response_with_bad_stie:
+ initiator.send_session_setup_request(sid)
+ initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ responder.wait_for_session_event(5, [], ['EVENT_FST_SETUP'])
+ actual_fsts_id = initiator.get_fsts_id_by_sid(sid)
+ responder.send_test_session_setup_response(str(actual_fsts_id),
+ "accept", "bad_new_band")
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ if event['new_state'] != "INITIAL" or event['reason'] != "REASON_ERROR_PARAMS":
+ raise Exception("Response with bad STIE not handled as expected")
+ bad_parameter_detected = True
+ elif bad_param_type == bad_param_session_initiate_response_with_zero_llt:
+ initiator.initiate_session(sid, "accept")
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ if event['new_state'] != "TRANSITION_DONE":
+ raise Exception("Response reception for a session with llt=0 not handled as expected")
+ bad_parameter_detected = True
+ elif bad_param_type == bad_param_session_initiate_stt_no_response:
+ initiator.send_session_setup_request(sid)
+ initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ responder.wait_for_session_event(5, [], ['EVENT_FST_SETUP'])
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ if event['new_state'] != "INITIAL" or event['reason'] != "REASON_STT":
+ raise Exception("No response scenario not handled as expected")
+ bad_parameter_detected = True
+ elif bad_param_type == bad_param_session_initiate_concurrent_setup_request:
+ responder.add_peer(initiator)
+ resp_sid = responder.add_session()
+ responder.configure_session(resp_sid, resp_newif)
+ initiator.send_session_setup_request(sid)
+ actual_fsts_id = initiator.get_fsts_id_by_sid(sid)
+ responder.send_test_session_setup_request(str(actual_fsts_id))
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ initiator_addr = initiator.get_own_mac_address()
+ responder_addr = responder.get_own_mac_address()
+ if initiator_addr < responder_addr:
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ if event['new_state'] != "INITIAL" or event['reason'] != "REASON_SETUP":
+ raise Exception("Concurrent setup scenario not handled as expected")
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SETUP"])
+ # The incoming setup request received by the initiator has
+ # priority over the one sent previously by the initiator itself
+ # because the initiator's MAC address is numerically lower than
+ # the one of the responder. Thus, the initiator should generate
+ # an FST_SETUP event.
+ else:
+ event = initiator.wait_for_session_event(5, [], ["EVENT_FST_SESSION_STATE"])
+ if event['new_state'] != "INITIAL" or event['reason'] != "REASON_STT":
+ raise Exception("Concurrent setup scenario not handled as expected")
+ # The incoming setup request was dropped at the initiator
+ # because its MAC address is numerically bigger than the one of
+ # the responder. Thus, the initiator continue to wait for a
+ # setup response until the STT event fires.
+ bad_parameter_detected = True
+ else:
+ initiator.initiate_session(sid, "accept")
+ except Exception, e:
+ if e.args[0].startswith("Cannot initiate fst session"):
+ if bad_param_type != bad_param_none:
+ bad_parameter_detected = True
+ elif e.args[0].startswith("No FST-EVENT-SESSION received"):
+ if bad_param_type == bad_param_session_initiate_request_with_bad_stie:
+ bad_parameter_detected = True
+ if not bad_parameter_detected:
+ #The exception was unexpected
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if bad_parameter_detected:
+ logger.info("Success. Bad parameter was detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ if bad_param_type == bad_param_none:
+ logger.info("Success. Session initiated")
+ else:
+ raise Exception("Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ print "Failure. Unexpected exception"
+
+def fst_transfer_session(apdev, test_params, bad_param_type, init_on_ap):
+ """This function makes the necessary preparations and then adds, sets,
+ initiates and attempts to transfer a session using either correct or
+ incorrect parameters at each stage depending on the value of bad_param_type.
+ If the call ends as expected the function silently exits. Otherwise, it
+ throws an exception thus failing the test."""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ bad_parameter_detected = False
+ exception_already_raised = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ # This call makes sure FstHostapd singleton object is created and, as a
+ # result, the global control interface is registered (this is done from
+ # the constructor).
+ ap1.get_global_instance()
+ if init_on_ap:
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ else:
+ initiator = sta1
+ responder = ap1
+ new_iface = sta2.ifname()
+ new_peer_addr = sta2.get_actual_peer_addr()
+ initiator.add_peer(responder, new_peer_addr = new_peer_addr)
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ if bad_param_type != bad_param_session_transfer_setup_skipped:
+ initiator.initiate_session(sid, "accept")
+ if bad_param_type == bad_param_session_transfer_no_params:
+ sid = ''
+ elif bad_param_type == bad_param_session_transfer_bad_session_id:
+ sid = '-1'
+ initiator.transfer_session(sid)
+ except Exception, e:
+ if e.args[0].startswith("Cannot transfer fst session"):
+ if bad_param_type != bad_param_none:
+ bad_parameter_detected = True
+ if not bad_parameter_detected:
+ # The exception was unexpected
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if bad_parameter_detected:
+ logger.info("Success. Bad parameter was detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ if bad_param_type == bad_param_none:
+ logger.info("Success. Session transferred")
+ else:
+ raise Exception("Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ print "Failure. Unexpected exception"
+
+
+def fst_tear_down_session(apdev, test_params, bad_param_type, init_on_ap):
+ """This function makes the necessary preparations and then adds, sets, and
+ initiates a session. It then issues a tear down command using either
+ correct or incorrect parameters at each stage. If the call ends as expected,
+ the function silently exits. Otherwise, it throws an exception thus failing
+ the test."""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ bad_parameter_detected = False
+ exception_already_raised = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ # This call makes sure FstHostapd singleton object is created and, as a
+ # result, the global control interface is registered (this is done from
+ # the constructor).
+ ap1.get_global_instance()
+ if init_on_ap:
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ else:
+ initiator = sta1
+ responder = ap1
+ new_iface = sta2.ifname()
+ new_peer_addr = sta2.get_actual_peer_addr()
+ initiator.add_peer(responder, new_peer_addr = new_peer_addr)
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ if bad_param_type != bad_param_session_teardown_setup_skipped:
+ initiator.initiate_session(sid, "accept")
+ if bad_param_type == bad_param_session_teardown_bad_fsts_id:
+ initiator.send_test_tear_down('-1')
+ responder.wait_for_session_event(5)
+ else:
+ if bad_param_type == bad_param_session_teardown_no_params:
+ sid = ''
+ elif bad_param_type == bad_param_session_teardown_bad_session_id:
+ sid = '-1'
+ initiator.teardown_session(sid)
+ except Exception, e:
+ if e.args[0].startswith("Cannot tear down fst session"):
+ if (bad_param_type == bad_param_session_teardown_no_params or
+ bad_param_type == bad_param_session_teardown_bad_session_id or
+ bad_param_type == bad_param_session_teardown_setup_skipped):
+ bad_parameter_detected = True
+ elif e.args[0].startswith("No FST-EVENT-SESSION received"):
+ if bad_param_type == bad_param_session_teardown_bad_fsts_id:
+ bad_parameter_detected = True
+ if not bad_parameter_detected:
+ # The exception was unexpected
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if bad_parameter_detected:
+ logger.info("Success. Bad parameter was detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ if bad_param_type == bad_param_none:
+ logger.info("Success. Session torn down")
+ else:
+ raise Exception("Failure. Bad parameter was not detected (%s)" % bad_param_names[bad_param_type])
+ else:
+ print "Failure. Unexpected exception"
+
+
+#enum - remove session scenarios
+remove_scenario_no_params = 0
+remove_scenario_bad_session_id = 1
+remove_scenario_non_established_session = 2
+remove_scenario_established_session = 3
+
+remove_scenario_names = ("No params",
+ "Bad session id",
+ "Remove non-established session",
+ "Remove established session")
+
+
+def fst_remove_session(apdev, test_params, remove_session_scenario, init_on_ap):
+ """This function attempts to remove a session at various stages of its
+ formation, depending on the value of remove_session_scenario. If the call
+ ends as expected, the function silently exits. Otherwise, it throws an
+ exception thus failing the test."""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ bad_parameter_detected = False
+ exception_already_raised = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ # This call makes sure FstHostapd singleton object is created and, as a
+ # result, the global control interface is registered (this is done from
+ # the constructor).
+ ap1.get_global_instance()
+ if init_on_ap:
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ else:
+ initiator = sta1
+ responder = ap1
+ new_iface = sta2.ifname()
+ new_peer_addr = sta2.get_actual_peer_addr()
+ initiator.add_peer(responder, new_peer_addr = new_peer_addr)
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ if remove_session_scenario != remove_scenario_no_params:
+ if remove_session_scenario != remove_scenario_non_established_session:
+ initiator.initiate_session(sid, "accept")
+ if remove_session_scenario == remove_scenario_no_params:
+ sid = ''
+ elif remove_session_scenario == remove_scenario_bad_session_id:
+ sid = '-1'
+ initiator.remove_session(sid)
+ except Exception, e:
+ if e.args[0].startswith("Cannot remove fst session"):
+ if (remove_session_scenario == remove_scenario_no_params or
+ remove_session_scenario == remove_scenario_bad_session_id):
+ bad_parameter_detected = True
+ elif e.args[0].startswith("No FST-EVENT-SESSION received"):
+ if remove_session_scenario == remove_scenario_non_established_session:
+ bad_parameter_detected = True
+ if not bad_parameter_detected:
+ #The exception was unexpected
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if bad_parameter_detected:
+ logger.info("Success. Remove scenario ended as expected (%s)" % remove_scenario_names[remove_session_scenario])
+ else:
+ if remove_session_scenario == remove_scenario_established_session:
+ logger.info("Success. Session removed")
+ else:
+ raise Exception("Failure. Remove scenario ended in an unexpected way (%s)" % remove_scenario_names[remove_session_scenario])
+ else:
+ print "Failure. Unexpected exception"
+
+
+#enum - frame types
+frame_type_session_request = 0
+frame_type_session_response = 1
+frame_type_ack_request = 2
+frame_type_ack_response = 3
+frame_type_tear_down = 4
+
+frame_type_names = ("Session request",
+ "Session Response",
+ "Ack request",
+ "Ack response",
+ "Tear down")
+
+def fst_send_unexpected_frame(apdev, test_params, frame_type, send_from_ap, additional_param = ''):
+ """This function creates two pairs of APs and stations, makes them connect
+ and then causes one side to send an unexpected FST frame of the specified
+ type to the other. The other side should then identify and ignore the
+ frame."""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ exception_already_raised = False
+ frame_receive_timeout = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ # This call makes sure FstHostapd singleton object is created and, as a
+ # result, the global control interface is registered (this is done from
+ # the constructor).
+ ap1.get_global_instance()
+ if send_from_ap:
+ sender = ap1
+ receiver = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ else:
+ sender = sta1
+ receiver = ap1
+ new_iface = sta2.ifname()
+ new_peer_addr = sta2.get_actual_peer_addr()
+ sender.add_peer(receiver, new_peer_addr = new_peer_addr)
+ sid=sender.add_session()
+ sender.configure_session(sid, new_iface)
+ if frame_type == frame_type_session_request:
+ sender.send_session_setup_request(sid)
+ event = receiver.wait_for_session_event(5)
+ if event['type'] != 'EVENT_FST_SETUP':
+ raise Exception("Unexpected indication: " + event['type'])
+ elif frame_type == frame_type_session_response:
+ #fsts_id doesn't matter, no actual session exists
+ sender.send_test_session_setup_response('0', additional_param)
+ receiver.wait_for_session_event(5)
+ elif frame_type == frame_type_ack_request:
+ #fsts_id doesn't matter, no actual session exists
+ sender.send_test_ack_request('0')
+ receiver.wait_for_session_event(5)
+ elif frame_type == frame_type_ack_response:
+ #fsts_id doesn't matter, no actual session exists
+ sender.send_test_ack_response('0')
+ receiver.wait_for_session_event(5)
+ elif frame_type == frame_type_tear_down:
+ #fsts_id doesn't matter, no actual session exists
+ sender.send_test_tear_down('0')
+ receiver.wait_for_session_event(5)
+ except Exception, e:
+ if e.args[0].startswith("No FST-EVENT-SESSION received"):
+ if frame_type != frame_type_session_request:
+ frame_receive_timeout = True
+ else:
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if frame_receive_timeout:
+ logger.info("Success. Frame was ignored (%s)" % frame_type_names[frame_type])
+ else:
+ if frame_type == frame_type_session_request:
+ logger.info("Success. Frame received, session created")
+ else:
+ raise Exception("Failure. Frame was not ignored (%s)" % frame_type_names[frame_type])
+ else:
+ print "Failure. Unexpected exception"
+
+
+#enum - bad session transfer scenarios
+bad_scenario_none = 0
+bad_scenario_ack_req_session_not_set_up = 1
+bad_scenario_ack_req_session_not_established_init_side = 2
+bad_scenario_ack_req_session_not_established_resp_side = 3
+bad_scenario_ack_req_bad_fsts_id = 4
+bad_scenario_ack_resp_session_not_set_up = 5
+bad_scenario_ack_resp_session_not_established_init_side = 6
+bad_scenario_ack_resp_session_not_established_resp_side = 7
+bad_scenario_ack_resp_no_ack_req = 8
+bad_scenario_ack_resp_bad_fsts_id = 9
+
+bad_scenario_names = ("None",
+ "Ack request received before the session was set up",
+ "Ack request received on the initiator side before session was established",
+ "Ack request received on the responder side before session was established",
+ "Ack request received with bad fsts_id",
+ "Ack response received before the session was set up",
+ "Ack response received on the initiator side before session was established",
+ "Ack response received on the responder side before session was established",
+ "Ack response received before ack request was sent",
+ "Ack response received with bad fsts_id")
+
+def fst_bad_transfer(apdev, test_params, bad_scenario_type, init_on_ap):
+ """This function makes the necessary preparations and then adds and sets a
+ session. It then initiates and it unless instructed otherwise) and attempts
+ to send one of the frames involved in the session transfer protocol,
+ skipping or distorting one of the stages according to the value of
+ bad_scenario_type parameter."""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ bad_parameter_detected = False
+ exception_already_raised = False
+ try:
+ fst_module_aux.connect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ # This call makes sure FstHostapd singleton object is created and, as a
+ # result, the global control interface is registered (this is done from
+ # the constructor).
+ ap1.get_global_instance()
+ if init_on_ap:
+ initiator = ap1
+ responder = sta1
+ new_iface = ap2.ifname()
+ new_peer_addr = ap2.get_actual_peer_addr()
+ else:
+ initiator = sta1
+ responder = ap1
+ new_iface = sta2.ifname()
+ new_peer_addr = sta2.get_actual_peer_addr()
+ initiator.add_peer(responder, new_peer_addr = new_peer_addr)
+ sid = initiator.add_session()
+ initiator.configure_session(sid, new_iface)
+ if (bad_scenario_type != bad_scenario_ack_req_session_not_set_up and
+ bad_scenario_type != bad_scenario_ack_resp_session_not_set_up):
+ if (bad_scenario_type != bad_scenario_ack_req_session_not_established_init_side and
+ bad_scenario_type != bad_scenario_ack_resp_session_not_established_init_side and
+ bad_scenario_type != bad_scenario_ack_req_session_not_established_resp_side and
+ bad_scenario_type != bad_scenario_ack_resp_session_not_established_resp_side):
+ response = "accept"
+ else:
+ response = ''
+ initiator.initiate_session(sid, response)
+ if bad_scenario_type == bad_scenario_ack_req_session_not_set_up:
+ #fsts_id doesn't matter, no actual session exists
+ responder.send_test_ack_request('0')
+ initiator.wait_for_session_event(5)
+ # We want to send the unexpected frame to the side that already has
+ # a session created
+ elif bad_scenario_type == bad_scenario_ack_resp_session_not_set_up:
+ #fsts_id doesn't matter, no actual session exists
+ responder.send_test_ack_response('0')
+ initiator.wait_for_session_event(5)
+ # We want to send the unexpected frame to the side that already has
+ # a session created
+ elif bad_scenario_type == bad_scenario_ack_req_session_not_established_init_side:
+ #fsts_id doesn't matter, no actual session exists
+ initiator.send_test_ack_request('0')
+ responder.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ elif bad_scenario_type == bad_scenario_ack_req_session_not_established_resp_side:
+ #fsts_id doesn't matter, no actual session exists
+ responder.send_test_ack_request('0')
+ initiator.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ elif bad_scenario_type == bad_scenario_ack_resp_session_not_established_init_side:
+ #fsts_id doesn't matter, no actual session exists
+ initiator.send_test_ack_response('0')
+ responder.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ elif bad_scenario_type == bad_scenario_ack_resp_session_not_established_resp_side:
+ #fsts_id doesn't matter, no actual session exists
+ responder.send_test_ack_response('0')
+ initiator.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ elif bad_scenario_type == bad_scenario_ack_req_bad_fsts_id:
+ initiator.send_test_ack_request('-1')
+ responder.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ elif bad_scenario_type == bad_scenario_ack_resp_bad_fsts_id:
+ initiator.send_test_ack_response('-1')
+ responder.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ elif bad_scenario_type == bad_scenario_ack_resp_no_ack_req:
+ actual_fsts_id = initiator.get_fsts_id_by_sid(sid)
+ initiator.send_test_ack_response(str(actual_fsts_id))
+ responder.wait_for_session_event(5, ["EVENT_FST_SESSION_STATE"])
+ else:
+ raise Exception("Unknown bad scenario identifier")
+ except Exception, e:
+ if e.args[0].startswith("No FST-EVENT-SESSION received"):
+ bad_parameter_detected = True
+ if not bad_parameter_detected:
+ # The exception was unexpected
+ logger.info(e)
+ exception_already_raised = True
+ raise
+ finally:
+ fst_module_aux.disconnect_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+ if not exception_already_raised:
+ if bad_parameter_detected:
+ logger.info("Success. Bad scenario was handled correctly (%s)" % bad_scenario_names[bad_scenario_type])
+ else:
+ raise Exception("Failure. Bad scenario was handled incorrectly (%s)" % bad_scenario_names[bad_scenario_type])
+ else:
+ print "Failure. Unexpected exception"
+
+def test_fst_sta_connect_to_non_fst_ap(dev, apdev, test_params):
+ """FST STA connecting to non-FST AP"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g" })
+ try:
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g",
+ key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
+ not res_sta1_mbies.startswith("FAIL") or
+ not res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs have not been removed on the stations")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_sta_connect_to_fst_ap(dev, apdev, test_params):
+ """FST STA connecting to FST AP"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ orig_sta2_mbies = sta2.get_local_mbies()
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(ap1, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_a)
+ time.sleep(2)
+ res_sta2_mbies = sta2.get_local_mbies()
+ if res_sta2_mbies == orig_sta2_mbies:
+ raise Exception("Failure. MB IEs have not been updated")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_ap_connect_to_fst_sta(dev, apdev, test_params):
+ """FST AP connecting to FST STA"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ orig_ap_mbies = ap1.get_local_mbies()
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(ap1, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_a)
+ time.sleep(2)
+ res_ap_mbies = ap1.get_local_mbies()
+ if res_ap_mbies != orig_ap_mbies:
+ raise Exception("Failure. MB IEs have been unexpectedly updated on the AP")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_ap_connect_to_non_fst_sta(dev, apdev, test_params):
+ """FST AP connecting to non-FST STA"""
+ ap1, ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ try:
+ orig_ap_mbies = ap2.get_local_mbies()
+ vals = dev[0].scan(None, fst_test_common.fst_test_def_freq_g)
+ fst_module_aux.external_sta_connect(dev[0], ap2, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_g)
+ time.sleep(2)
+ res_ap_mbies = ap2.get_local_mbies()
+ if res_ap_mbies != orig_ap_mbies:
+ raise Exception("Failure. MB IEs have been unexpectedly updated on the AP")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ fst_module_aux.disconnect_external_sta(dev[0], ap2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2)
+
+def test_fst_second_sta_connect_to_non_fst_ap(dev, apdev, test_params):
+ """FST STA 2nd connecting to non-FST AP"""
+ fst_ap1, fst_ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g" })
+ try:
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(fst_ap1, key_mgmt="NONE", scan_freq=fst_test_common.fst_test_def_freq_a)
+ time.sleep(2)
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g", key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
+ not res_sta1_mbies.startswith("FAIL") or
+ not res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs have not been removed on the stations")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+
+
+def test_fst_second_sta_connect_to_fst_ap(dev, apdev, test_params):
+ """FST STA 2nd connecting to FST AP"""
+ fst_ap1, fst_ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g" })
+ try:
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g", key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(fst_ap1, key_mgmt="NONE", scan_freq=fst_test_common.fst_test_def_freq_a)
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (not orig_sta1_mbies.startswith("FAIL") or
+ not orig_sta2_mbies.startswith("FAIL") or
+ not res_sta1_mbies.startswith("FAIL") or
+ not res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs should have stayed non-present on the stations")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+
+def test_fst_disconnect_1_of_2_stas_from_non_fst_ap(dev, apdev, test_params):
+ """FST disconnect 1 of 2 STAs from non-FST AP"""
+ fst_ap1, fst_ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g" })
+ try:
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(fst_ap1, key_mgmt="NONE", scan_freq=fst_test_common.fst_test_def_freq_a)
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g", key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ sta2.disconnect_from_external_ap()
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (not orig_sta1_mbies.startswith("FAIL") or
+ not orig_sta2_mbies.startswith("FAIL") or
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs haven't reappeared on the stations")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+
+
+def test_fst_disconnect_1_of_2_stas_from_fst_ap(dev, apdev, test_params):
+ """FST disconnect 1 of 2 STAs from FST AP"""
+ fst_ap1, fst_ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g" })
+ try:
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(fst_ap1, key_mgmt="NONE", scan_freq=fst_test_common.fst_test_def_freq_a)
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g", key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ sta1.disconnect()
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (not orig_sta1_mbies.startswith("FAIL") or
+ not orig_sta2_mbies.startswith("FAIL") or
+ not res_sta1_mbies.startswith("FAIL") or
+ not res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs should have stayed non-present on the stations")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+
+def test_fst_disconnect_2_of_2_stas_from_non_fst_ap(dev, apdev, test_params):
+ """FST disconnect 2 of 2 STAs from non-FST AP"""
+ fst_ap1, fst_ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g" })
+ try:
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(fst_ap1, key_mgmt="NONE", scan_freq=fst_test_common.fst_test_def_freq_a)
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g", key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ sta1.disconnect()
+ time.sleep(2)
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ sta2.disconnect_from_external_ap()
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (not orig_sta1_mbies.startswith("FAIL") or
+ not orig_sta2_mbies.startswith("FAIL") or
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs haven't reappeared on the stations")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+
+def test_fst_disconnect_2_of_2_stas_from_fst_ap(dev, apdev, test_params):
+ """FST disconnect 2 of 2 STAs from FST AP"""
+ fst_ap1, fst_ap2, sta1, sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ with HWSimRadio() as (radio, iface):
+ non_fst_ap = hostapd.add_ap(iface, { "ssid": "non_fst_11g"})
+ try:
+ vals = sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ sta1.connect(fst_ap1, key_mgmt="NONE", scan_freq=fst_test_common.fst_test_def_freq_a)
+ vals = sta2.scan()
+ freq = vals['freq']
+ sta2.connect_to_external_ap(non_fst_ap, ssid="non_fst_11g", key_mgmt="NONE", scan_freq=freq)
+ time.sleep(2)
+ sta2.disconnect_from_external_ap()
+ time.sleep(2)
+ orig_sta1_mbies = sta1.get_local_mbies()
+ orig_sta2_mbies = sta2.get_local_mbies()
+ sta1.disconnect()
+ time.sleep(2)
+ res_sta1_mbies = sta1.get_local_mbies()
+ res_sta2_mbies = sta2.get_local_mbies()
+ if (orig_sta1_mbies.startswith("FAIL") or
+ orig_sta2_mbies.startswith("FAIL") or
+ res_sta1_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs should have stayed present on both stations")
+ # Mandatory part of 8.4.2.140 Multi-band element is 24 bytes = 48 hex chars
+ basic_sta1_mbies = res_sta1_mbies[0:48] + res_sta1_mbies[60:108]
+ basic_sta2_mbies = res_sta2_mbies[0:48] + res_sta2_mbies[60:108]
+ if (basic_sta1_mbies != basic_sta2_mbies):
+ raise Exception("Failure. Basic MB IEs should have become identical on both stations")
+ addr_sta1_str = sta1.get_own_mac_address().replace(":", "")
+ addr_sta2_str = sta2.get_own_mac_address().replace(":", "")
+ # Mandatory part of 8.4.2.140 Multi-band element is followed by STA MAC Address field (6 bytes = 12 hex chars)
+ addr_sta1_mbie1 = res_sta1_mbies[48:60]
+ addr_sta1_mbie2 = res_sta1_mbies[108:120]
+ addr_sta2_mbie1 = res_sta2_mbies[48:60]
+ addr_sta2_mbie2 = res_sta2_mbies[108:120]
+ if (addr_sta1_mbie1 != addr_sta1_mbie2 or
+ addr_sta1_mbie1 != addr_sta2_str or
+ addr_sta2_mbie1 != addr_sta2_mbie2 or
+ addr_sta2_mbie1 != addr_sta1_str):
+ raise Exception("Failure. STA Address in MB IEs should have been same as the other STA's")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ sta1.disconnect()
+ sta2.disconnect_from_external_ap()
+ fst_module_aux.stop_two_ap_sta_pairs(fst_ap1, fst_ap2, sta1, sta2)
+
+def test_fst_disconnect_non_fst_sta(dev, apdev, test_params):
+ """FST disconnect non-FST STA"""
+ ap1, ap2, fst_sta1, fst_sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ external_sta_connected = False
+ try:
+ vals = fst_sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ fst_sta1.connect(ap1, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_a)
+ vals = dev[0].scan(None, fst_test_common.fst_test_def_freq_g)
+ fst_module_aux.external_sta_connect(dev[0], ap2, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_g)
+ external_sta_connected = True
+ time.sleep(2)
+ fst_sta1.disconnect()
+ time.sleep(2)
+ orig_ap_mbies = ap2.get_local_mbies()
+ fst_module_aux.disconnect_external_sta(dev[0], ap2)
+ external_sta_connected = False
+ time.sleep(2)
+ res_ap_mbies = ap2.get_local_mbies()
+ if res_ap_mbies != orig_ap_mbies:
+ raise Exception("Failure. MB IEs have been unexpectedly updated on the AP")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ fst_sta1.disconnect()
+ if external_sta_connected:
+ fst_module_aux.disconnect_external_sta(dev[0], ap2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, fst_sta1, fst_sta2)
+
+def test_fst_disconnect_fst_sta(dev, apdev, test_params):
+ """FST disconnect FST STA"""
+ ap1, ap2, fst_sta1, fst_sta2 = fst_module_aux.start_two_ap_sta_pairs(apdev)
+ external_sta_connected = False;
+ try:
+ vals = fst_sta1.scan(freq=fst_test_common.fst_test_def_freq_a)
+ fst_sta1.connect(ap1, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_a)
+ vals = dev[0].scan(None, fst_test_common.fst_test_def_freq_g)
+ fst_module_aux.external_sta_connect(dev[0], ap2, key_mgmt="NONE",
+ scan_freq=fst_test_common.fst_test_def_freq_g)
+ external_sta_connected = True
+ time.sleep(2)
+ fst_module_aux.disconnect_external_sta(dev[0], ap2)
+ external_sta_connected = False
+ time.sleep(2)
+ orig_ap_mbies = ap2.get_local_mbies()
+ fst_sta1.disconnect()
+ time.sleep(2)
+ res_ap_mbies = ap2.get_local_mbies()
+ if res_ap_mbies != orig_ap_mbies:
+ raise Exception("Failure. MB IEs have been unexpectedly updated on the AP")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ fst_sta1.disconnect()
+ if external_sta_connected:
+ fst_module_aux.disconnect_external_sta(dev[0], ap2)
+ fst_module_aux.stop_two_ap_sta_pairs(ap1, ap2, fst_sta1, fst_sta2)
+
+def test_fst_dynamic_iface_attach(dev, apdev, test_params):
+ """FST dynamic interface attach"""
+ ap1 = fst_module_aux.FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
+ fst_test_common.fst_test_def_chan_a,
+ fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low,
+ fst_test_common.fst_test_def_llt)
+ ap1.start()
+ ap2 = fst_module_aux.FstAP(apdev[1]['ifname'], 'fst_11g', 'b',
+ fst_test_common.fst_test_def_chan_g,
+ '', '', '')
+ ap2.start()
+
+ sta1 = fst_module_aux.FstSTA('wlan5',
+ fst_module_aux.fst_test_common.fst_test_def_group,
+ fst_test_common.fst_test_def_prio_low,
+ fst_test_common.fst_test_def_llt)
+ sta1.start()
+ sta2 = fst_module_aux.FstSTA('wlan6', '', '', '')
+ sta2.start()
+
+ try:
+ orig_sta2_mbies = sta2.get_local_mbies()
+ orig_ap2_mbies = ap2.get_local_mbies()
+ sta2.send_iface_attach_request(sta2.ifname(),
+ fst_module_aux.fst_test_common.fst_test_def_group,
+ '52', '27')
+ event = sta2.wait_for_iface_event(5)
+ if event['event_type'] != 'attached':
+ raise Exception("Failure. Iface was not properly attached")
+ ap2.send_iface_attach_request(ap2.ifname(),
+ fst_module_aux.fst_test_common.fst_test_def_group,
+ '102', '77')
+ event = ap2.wait_for_iface_event(5)
+ if event['event_type'] != 'attached':
+ raise Exception("Failure. Iface was not properly attached")
+ time.sleep(2)
+ res_sta2_mbies = sta2.get_local_mbies()
+ res_ap2_mbies = ap2.get_local_mbies()
+ sta2.send_iface_detach_request(sta2.ifname())
+ event = sta2.wait_for_iface_event(5)
+ if event['event_type'] != 'detached':
+ raise Exception("Failure. Iface was not properly detached")
+ ap2.send_iface_detach_request(ap2.ifname())
+ event = ap2.wait_for_iface_event(5)
+ if event['event_type'] != 'detached':
+ raise Exception("Failure. Iface was not properly detached")
+ if (not orig_sta2_mbies.startswith("FAIL") or
+ not orig_ap2_mbies.startswith("FAIL") or
+ res_sta2_mbies.startswith("FAIL") or
+ res_ap2_mbies.startswith("FAIL")):
+ raise Exception("Failure. MB IEs should have appeared on the station and on the AP")
+ except Exception, e:
+ logger.info(e)
+ raise
+ finally:
+ ap1.stop()
+ ap2.stop()
+ sta1.stop()
+ sta2.stop()
+
+# AP side FST module tests
+
+def test_fst_ap_start_session(dev, apdev, test_params):
+ """FST AP start session"""
+ fst_start_session(apdev, test_params, bad_param_none, True)
+
+def test_fst_ap_start_session_no_add_params(dev, apdev, test_params):
+ """FST AP start session - no add params"""
+ fst_start_session(apdev, test_params, bad_param_session_add_no_params, True)
+
+def test_fst_ap_start_session_bad_group_id(dev, apdev, test_params):
+ """FST AP start session - bad group id"""
+ fst_start_session(apdev, test_params, bad_param_group_id, True)
+
+def test_fst_ap_start_session_no_set_params(dev, apdev, test_params):
+ """FST AP start session - no set params"""
+ fst_start_session(apdev, test_params, bad_param_session_set_no_params, True)
+
+def test_fst_ap_start_session_set_unknown_param(dev, apdev, test_params):
+ """FST AP start session - set unknown param"""
+ fst_start_session(apdev, test_params, bad_param_session_set_unknown_param,
+ True)
+
+def test_fst_ap_start_session_bad_session_id(dev, apdev, test_params):
+ """FST AP start session - bad session id"""
+ fst_start_session(apdev, test_params, bad_param_session_id, True)
+
+def test_fst_ap_start_session_bad_new_iface(dev, apdev, test_params):
+ """FST AP start session - bad new iface"""
+ fst_start_session(apdev, test_params, bad_param_new_iface, True)
+
+def test_fst_ap_start_session_bad_old_iface(dev, apdev, test_params):
+ """FST AP start session - bad old iface"""
+ fst_start_session(apdev, test_params, bad_param_old_iface, True)
+
+def test_fst_ap_start_session_negative_llt(dev, apdev, test_params):
+ """FST AP start session - negative llt"""
+ fst_start_session(apdev, test_params, bad_param_negative_llt, True)
+
+def test_fst_ap_start_session_zero_llt(dev, apdev, test_params):
+ """FST AP start session - zero llt"""
+ fst_start_session(apdev, test_params, bad_param_zero_llt, True)
+
+def test_fst_ap_start_session_llt_too_big(dev, apdev, test_params):
+ """FST AP start session - llt too large"""
+ fst_start_session(apdev, test_params, bad_param_llt_too_big, True)
+
+def test_fst_ap_start_session_invalid_peer_addr(dev, apdev, test_params):
+ """FST AP start session - invalid peer address"""
+ fst_start_session(apdev, test_params, bad_param_peer_addr, True,
+ 'GG:GG:GG:GG:GG:GG')
+
+def test_fst_ap_start_session_multicast_peer_addr(dev, apdev, test_params):
+ """FST AP start session - multicast peer address"""
+ fst_start_session(apdev, test_params, bad_param_peer_addr, True,
+ '01:00:11:22:33:44')
+
+def test_fst_ap_start_session_broadcast_peer_addr(dev, apdev, test_params):
+ """FST AP start session - broadcast peer address"""
+ fst_start_session(apdev, test_params, bad_param_peer_addr, True,
+ 'FF:FF:FF:FF:FF:FF')
+
+def test_fst_ap_initiate_session(dev, apdev, test_params):
+ """FST AP initiate session"""
+ fst_initiate_session(apdev, test_params, bad_param_none, True)
+
+def test_fst_ap_initiate_session_no_params(dev, apdev, test_params):
+ """FST AP initiate session - no params"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_no_params, True)
+
+def test_fst_ap_initiate_session_invalid_session_id(dev, apdev, test_params):
+ """FST AP initiate session - invalid session id"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_bad_session_id, True)
+
+def test_fst_ap_initiate_session_no_new_iface(dev, apdev, test_params):
+ """FST AP initiate session - no new iface"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_with_no_new_iface_set, True)
+
+def test_fst_ap_initiate_session_bad_peer_addr(dev, apdev, test_params):
+ """FST AP initiate session - bad peer address"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_with_bad_peer_addr_set,
+ True)
+
+def test_fst_ap_initiate_session_request_with_bad_stie(dev, apdev, test_params):
+ """FST AP initiate session - request with bad stie"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_request_with_bad_stie, True)
+
+def test_fst_ap_initiate_session_response_with_reject(dev, apdev, test_params):
+ """FST AP initiate session - response with reject"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_response_with_reject, True)
+
+def test_fst_ap_initiate_session_response_with_bad_stie(dev, apdev,
+ test_params):
+ """FST AP initiate session - response with bad stie"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_response_with_bad_stie,
+ True)
+
+def test_fst_ap_initiate_session_response_with_zero_llt(dev, apdev,
+ test_params):
+ """FST AP initiate session - zero llt"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_response_with_zero_llt,
+ True)
+
+def test_fst_ap_initiate_session_stt_no_response(dev, apdev, test_params):
+ """FST AP initiate session - stt no response"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_stt_no_response, True)
+
+def test_fst_ap_initiate_session_concurrent_setup_request(dev, apdev,
+ test_params):
+ """FST AP initiate session - concurrent setup request"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_concurrent_setup_request,
+ True)
+
+def test_fst_ap_session_request_with_no_session(dev, apdev, test_params):
+ """FST AP session request with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_session_request,
+ True)
+
+def test_fst_ap_session_response_accept_with_no_session(dev, apdev,
+ test_params):
+ """FST AP session response accept with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_session_response,
+ True, "accept")
+
+def test_fst_ap_session_response_reject_with_no_session(dev, apdev,
+ test_params):
+ """FST AP session response reject with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_session_response,
+ True, "reject")
+
+def test_fst_ap_ack_request_with_no_session(dev, apdev, test_params):
+ """FST AP ack request with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_ack_request, True)
+
+def test_fst_ap_ack_response_with_no_session(dev, apdev, test_params):
+ """FST AP ack response with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_ack_response, True)
+
+def test_fst_ap_tear_down_response_with_no_session(dev, apdev, test_params):
+ """FST AP tear down response with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_tear_down, True)
+
+def test_fst_ap_transfer_session(dev, apdev, test_params):
+ """FST AP transfer session"""
+ fst_transfer_session(apdev, test_params, bad_param_none, True)
+
+def test_fst_ap_transfer_session_no_params(dev, apdev, test_params):
+ """FST AP transfer session - no params"""
+ fst_transfer_session(apdev, test_params,
+ bad_param_session_transfer_no_params, True)
+
+def test_fst_ap_transfer_session_bad_session_id(dev, apdev, test_params):
+ """FST AP transfer session - bad session id"""
+ fst_transfer_session(apdev, test_params,
+ bad_param_session_transfer_bad_session_id, True)
+
+def test_fst_ap_transfer_session_setup_skipped(dev, apdev, test_params):
+ """FST AP transfer session - setup skipped"""
+ fst_transfer_session(apdev, test_params,
+ bad_param_session_transfer_setup_skipped, True)
+
+def test_fst_ap_ack_request_with_session_not_set_up(dev, apdev, test_params):
+ """FST AP ack request with session not set up"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_req_session_not_set_up, True)
+
+def test_fst_ap_ack_request_with_session_not_established_init_side(dev, apdev,
+ test_params):
+ """FST AP ack request with session not established init side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_req_session_not_established_init_side,
+ True)
+
+def test_fst_ap_ack_request_with_session_not_established_resp_side(dev, apdev,
+ test_params):
+ """FST AP ack request with session not established resp side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_req_session_not_established_resp_side,
+ True)
+
+def test_fst_ap_ack_request_with_bad_fsts_id(dev, apdev, test_params):
+ """FST AP ack request with bad fsts id"""
+ fst_bad_transfer(apdev, test_params, bad_scenario_ack_req_bad_fsts_id, True)
+
+def test_fst_ap_ack_response_with_session_not_set_up(dev, apdev, test_params):
+ """FST AP ack response with session not set up"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_resp_session_not_set_up, True)
+
+def test_fst_ap_ack_response_with_session_not_established_init_side(dev, apdev, test_params):
+ """FST AP ack response with session not established init side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_resp_session_not_established_init_side,
+ True)
+
+def test_fst_ap_ack_response_with_session_not_established_resp_side(dev, apdev, test_params):
+ """FST AP ack response with session not established resp side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_resp_session_not_established_resp_side,
+ True)
+
+def test_fst_ap_ack_response_with_no_ack_request(dev, apdev, test_params):
+ """FST AP ack response with no ack request"""
+ fst_bad_transfer(apdev, test_params, bad_scenario_ack_resp_no_ack_req, True)
+
+def test_fst_ap_tear_down_session(dev, apdev, test_params):
+ """FST AP tear down session"""
+ fst_tear_down_session(apdev, test_params, bad_param_none, True)
+
+def test_fst_ap_tear_down_session_no_params(dev, apdev, test_params):
+ """FST AP tear down session - no params"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_no_params, True)
+
+def test_fst_ap_tear_down_session_bad_session_id(dev, apdev, test_params):
+ """FST AP tear down session - bad session id"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_bad_session_id, True)
+
+def test_fst_ap_tear_down_session_setup_skipped(dev, apdev, test_params):
+ """FST AP tear down session - setup skipped"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_setup_skipped, True)
+
+def test_fst_ap_tear_down_session_bad_fsts_id(dev, apdev, test_params):
+ """FST AP tear down session - bad fsts id"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_bad_fsts_id, True)
+
+def test_fst_ap_remove_session_not_established(dev, apdev, test_params):
+ """FST AP remove session - not established"""
+ fst_remove_session(apdev, test_params,
+ remove_scenario_non_established_session, True)
+
+def test_fst_ap_remove_session_established(dev, apdev, test_params):
+ """FST AP remove session - established"""
+ fst_remove_session(apdev, test_params,
+ remove_scenario_established_session, True)
+
+def test_fst_ap_remove_session_no_params(dev, apdev, test_params):
+ """FST AP remove session - no params"""
+ fst_remove_session(apdev, test_params, remove_scenario_no_params, True)
+
+def test_fst_ap_remove_session_bad_session_id(dev, apdev, test_params):
+ """FST AP remove session - bad session id"""
+ fst_remove_session(apdev, test_params, remove_scenario_bad_session_id, True)
+
+# STA side FST module tests
+
+def test_fst_sta_start_session(dev, apdev, test_params):
+ """FST STA start session"""
+ fst_start_session(apdev, test_params, bad_param_none, False)
+
+def test_fst_sta_start_session_no_add_params(dev, apdev, test_params):
+ """FST STA start session - no add params"""
+ fst_start_session(apdev, test_params, bad_param_session_add_no_params,
+ False)
+
+def test_fst_sta_start_session_bad_group_id(dev, apdev, test_params):
+ """FST STA start session - bad group id"""
+ fst_start_session(apdev, test_params, bad_param_group_id, False)
+
+def test_fst_sta_start_session_no_set_params(dev, apdev, test_params):
+ """FST STA start session - no set params"""
+ fst_start_session(apdev, test_params, bad_param_session_set_no_params,
+ False)
+
+def test_fst_sta_start_session_set_unknown_param(dev, apdev, test_params):
+ """FST STA start session - set unknown param"""
+ fst_start_session(apdev, test_params, bad_param_session_set_unknown_param,
+ False)
+
+def test_fst_sta_start_session_bad_session_id(dev, apdev, test_params):
+ """FST STA start session - bad session id"""
+ fst_start_session(apdev, test_params, bad_param_session_id, False)
+
+def test_fst_sta_start_session_bad_new_iface(dev, apdev, test_params):
+ """FST STA start session - bad new iface"""
+ fst_start_session(apdev, test_params, bad_param_new_iface, False)
+
+def test_fst_sta_start_session_bad_old_iface(dev, apdev, test_params):
+ """FST STA start session - bad old iface"""
+ fst_start_session(apdev, test_params, bad_param_old_iface, False)
+
+def test_fst_sta_start_session_negative_llt(dev, apdev, test_params):
+ """FST STA start session - negative llt"""
+ fst_start_session(apdev, test_params, bad_param_negative_llt, False)
+
+def test_fst_sta_start_session_zero_llt(dev, apdev, test_params):
+ """FST STA start session - zero llt"""
+ fst_start_session(apdev, test_params, bad_param_zero_llt, False)
+
+def test_fst_sta_start_session_llt_too_big(dev, apdev, test_params):
+ """FST STA start session - llt too large"""
+ fst_start_session(apdev, test_params, bad_param_llt_too_big, False)
+
+def test_fst_sta_start_session_invalid_peer_addr(dev, apdev, test_params):
+ """FST STA start session - invalid peer address"""
+ fst_start_session(apdev, test_params, bad_param_peer_addr, False,
+ 'GG:GG:GG:GG:GG:GG')
+
+def test_fst_sta_start_session_multicast_peer_addr(dev, apdev, test_params):
+ """FST STA start session - multicast peer address"""
+ fst_start_session(apdev, test_params, bad_param_peer_addr, False,
+ '11:00:11:22:33:44')
+
+def test_fst_sta_start_session_broadcast_peer_addr(dev, apdev, test_params):
+ """FST STA start session - broadcast peer addr"""
+ fst_start_session(apdev, test_params, bad_param_peer_addr, False,
+ 'FF:FF:FF:FF:FF:FF')
+
+def test_fst_sta_initiate_session(dev, apdev, test_params):
+ """FST STA initiate session"""
+ fst_initiate_session(apdev, test_params, bad_param_none, False)
+
+def test_fst_sta_initiate_session_no_params(dev, apdev, test_params):
+ """FST STA initiate session - no params"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_no_params, False)
+
+def test_fst_sta_initiate_session_invalid_session_id(dev, apdev, test_params):
+ """FST STA initiate session - invalid session id"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_bad_session_id, False)
+
+def test_fst_sta_initiate_session_no_new_iface(dev, apdev, test_params):
+ """FST STA initiate session - no new iface"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_with_no_new_iface_set,
+ False)
+
+def test_fst_sta_initiate_session_bad_peer_addr(dev, apdev, test_params):
+ """FST STA initiate session - bad peer address"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_with_bad_peer_addr_set,
+ False)
+
+def test_fst_sta_initiate_session_request_with_bad_stie(dev, apdev,
+ test_params):
+ """FST STA initiate session - request with bad stie"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_request_with_bad_stie,
+ False)
+
+def test_fst_sta_initiate_session_response_with_reject(dev, apdev, test_params):
+ """FST STA initiate session - response with reject"""
+ fst_initiate_session(apdev, test_params, bad_param_session_initiate_response_with_reject, False)
+
+def test_fst_sta_initiate_session_response_with_bad_stie(dev, apdev, test_params):
+ """FST STA initiate session - response with bad stie"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_response_with_bad_stie,
+ False)
+
+def test_fst_sta_initiate_session_response_with_zero_llt(dev, apdev,
+ test_params):
+ """FST STA initiate session - response with zero llt"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_response_with_zero_llt,
+ False)
+
+def test_fst_sta_initiate_session_stt_no_response(dev, apdev, test_params):
+ """FST STA initiate session - stt no response"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_stt_no_response, False)
+
+def test_fst_sta_initiate_session_concurrent_setup_request(dev, apdev,
+ test_params):
+ """FST STA initiate session - concurrent setup request"""
+ fst_initiate_session(apdev, test_params,
+ bad_param_session_initiate_concurrent_setup_request,
+ False)
+
+def test_fst_sta_session_request_with_no_session(dev, apdev, test_params):
+ """FST STA session request with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_session_request,
+ False)
+
+def test_fst_sta_session_response_accept_with_no_session(dev, apdev,
+ test_params):
+ """FST STA session response accept with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_session_response,
+ False, "accept")
+
+def test_fst_sta_session_response_reject_with_no_session(dev, apdev,
+ test_params):
+ """FST STA session response reject with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_session_response,
+ False, "reject")
+
+def test_fst_sta_ack_request_with_no_session(dev, apdev, test_params):
+ """FST STA ack request with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_ack_request, False)
+
+def test_fst_sta_ack_response_with_no_session(dev, apdev, test_params):
+ """FST STA ack response with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_ack_response,
+ False)
+
+def test_fst_sta_tear_down_response_with_no_session(dev, apdev, test_params):
+ """FST STA tear down response with no session"""
+ fst_send_unexpected_frame(apdev, test_params, frame_type_tear_down, False)
+
+def test_fst_sta_transfer_session(dev, apdev, test_params):
+ """FST STA transfer session"""
+ fst_transfer_session(apdev, test_params, bad_param_none, False)
+
+def test_fst_sta_transfer_session_no_params(dev, apdev, test_params):
+ """FST STA transfer session - no params"""
+ fst_transfer_session(apdev, test_params,
+ bad_param_session_transfer_no_params, False)
+
+def test_fst_sta_transfer_session_bad_session_id(dev, apdev, test_params):
+ """FST STA transfer session - bad session id"""
+ fst_transfer_session(apdev, test_params,
+ bad_param_session_transfer_bad_session_id, False)
+
+def test_fst_sta_transfer_session_setup_skipped(dev, apdev, test_params):
+ """FST STA transfer session - setup skipped"""
+ fst_transfer_session(apdev, test_params,
+ bad_param_session_transfer_setup_skipped, False)
+
+def test_fst_sta_ack_request_with_session_not_set_up(dev, apdev, test_params):
+ """FST STA ack request with session not set up"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_req_session_not_set_up, False)
+
+def test_fst_sta_ack_request_with_session_not_established_init_side(dev, apdev, test_params):
+ """FST STA ack request with session not established init side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_req_session_not_established_init_side,
+ False)
+
+def test_fst_sta_ack_request_with_session_not_established_resp_side(dev, apdev, test_params):
+ """FST STA ack request with session not established resp side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_req_session_not_established_resp_side,
+ False)
+
+def test_fst_sta_ack_request_with_bad_fsts_id(dev, apdev, test_params):
+ """FST STA ack request with bad fsts id"""
+ fst_bad_transfer(apdev, test_params, bad_scenario_ack_req_bad_fsts_id,
+ False)
+
+def test_fst_sta_ack_response_with_session_not_set_up(dev, apdev, test_params):
+ """FST STA ack response with session not set up"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_resp_session_not_set_up, False)
+
+def test_fst_sta_ack_response_with_session_not_established_init_side(dev, apdev, test_params):
+ """FST STA ack response with session not established init side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_resp_session_not_established_init_side,
+ False)
+
+def test_fst_sta_ack_response_with_session_not_established_resp_side(dev, apdev, test_params):
+ """FST STA ack response with session not established resp side"""
+ fst_bad_transfer(apdev, test_params,
+ bad_scenario_ack_resp_session_not_established_resp_side,
+ False)
+
+def test_fst_sta_ack_response_with_no_ack_request(dev, apdev, test_params):
+ """FST STA ack response with no ack request"""
+ fst_bad_transfer(apdev, test_params, bad_scenario_ack_resp_no_ack_req,
+ False)
+
+def test_fst_sta_tear_down_session(dev, apdev, test_params):
+ """FST STA tear down session"""
+ fst_tear_down_session(apdev, test_params, bad_param_none, False)
+
+def test_fst_sta_tear_down_session_no_params(dev, apdev, test_params):
+ """FST STA tear down session - no params"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_no_params, False)
+
+def test_fst_sta_tear_down_session_bad_session_id(dev, apdev, test_params):
+ """FST STA tear down session - bad session id"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_bad_session_id, False)
+
+def test_fst_sta_tear_down_session_setup_skipped(dev, apdev, test_params):
+ """FST STA tear down session - setup skipped"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_setup_skipped, False)
+
+def test_fst_sta_tear_down_session_bad_fsts_id(dev, apdev, test_params):
+ """FST STA tear down session - bad fsts id"""
+ fst_tear_down_session(apdev, test_params,
+ bad_param_session_teardown_bad_fsts_id, False)
+
+def test_fst_sta_remove_session_not_established(dev, apdev, test_params):
+ """FST STA tear down session - not established"""
+ fst_remove_session(apdev, test_params,
+ remove_scenario_non_established_session, False)
+
+def test_fst_sta_remove_session_established(dev, apdev, test_params):
+ """FST STA remove session - established"""
+ fst_remove_session(apdev, test_params,
+ remove_scenario_established_session, False)
+
+def test_fst_sta_remove_session_no_params(dev, apdev, test_params):
+ """FST STA remove session - no params"""
+ fst_remove_session(apdev, test_params, remove_scenario_no_params, False)
+
+def test_fst_sta_remove_session_bad_session_id(dev, apdev, test_params):
+ """FST STA remove session - bad session id"""
+ fst_remove_session(apdev, test_params, remove_scenario_bad_session_id,
+ False)