del hapd
hapd = None
+ # Use None here since this instance of Wlantest() will never be
+ # used for remote host hwsim tests on real hardware.
+ Wlantest.setup(None)
wt = Wlantest()
rename_log(args.logdir, 'hwsim0.pcapng', name, wt)
rename_log(args.logdir, 'hwsim0', name, wt)
hwsim_utils.test_connectivity(dev, hapd)
def check_group_mgmt_cipher(dev, ap, cipher):
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
-
if cipher not in dev.get_capability("group_mgmt"):
raise HwsimSkip("Cipher %s not supported" % cipher)
params = { "ssid": "test-wpa2-psk-pmf",
"rsn_pairwise": "CCMP",
"group_mgmt_cipher": cipher }
hapd = hostapd.add_ap(ap, params)
+
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev.connect("test-wpa2-psk-pmf", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256",
pairwise="CCMP", group="CCMP", scan_freq="2412")
ssid = "test-hs20-ap1"
params['ssid'] = ssid
params['hessid'] = bssid
- hostapd.add_ap(apdev[0], params)
+ hapd0 = hostapd.add_ap(apdev[0], params)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params()
dev[0].hs20_enable()
+ Wlantest.setup(hapd0)
wt = Wlantest()
wt.flush()
def test_ap_pmf_required(dev, apdev):
"""WPA2-PSK AP with PMF required"""
ssid = "test-pmf-required"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
key_mgmt = hapd.get_config()['key_mgmt']
if key_mgmt.split(' ')[0] != "WPA-PSK-SHA256":
raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
def test_ap_pmf_optional(dev, apdev):
"""WPA2-PSK AP with PMF optional"""
ssid = "test-pmf-optional"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
def test_ap_pmf_optional_2akm(dev, apdev):
"""WPA2-PSK AP with PMF optional (2 AKMs)"""
ssid = "test-pmf-optional-2akm"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
def test_ap_pmf_negative(dev, apdev):
"""WPA2-PSK AP without PMF (negative test)"""
ssid = "test-pmf-negative"
+ params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
+ hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
- params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
- hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
def test_ap_pmf_assoc_comeback(dev, apdev):
"""WPA2-PSK AP with PMF association comeback"""
ssid = "assoc-comeback"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
def test_ap_pmf_assoc_comeback2(dev, apdev):
"""WPA2-PSK AP with PMF association comeback (using DROP_SA)"""
ssid = "assoc-comeback"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
if "OK" not in dev[0].request("DROP_SA"):
"""WPA2-PSK AP with station using SA Query"""
ssid = "assoc-comeback"
addr = dev[0].own_addr()
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5", drv_params="use_monitor=1")
bssid = wpas.own_addr()
wpas.dump_monitor()
+ Wlantest.setup(wpas)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
"""WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames"""
ssid = "deauth-attack"
addr = dev[0].own_addr()
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5", drv_params="use_monitor=1")
wpas.connect_network(id)
bssid = wpas.own_addr()
+ Wlantest.setup(wpas)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
def test_ap_pmf_required_sha1(dev, apdev):
"""WPA2-PSK AP with PMF required with SHA1 AKM"""
ssid = "test-pmf-required-sha1"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
key_mgmt = hapd.get_config()['key_mgmt']
if key_mgmt.split(' ')[0] != "WPA-PSK":
raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
def _test_ap_pmf_toggle(dev, apdev):
ssid = "test-pmf-optional"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
params["assoc_sa_query_max_timeout"] = "1"
params["assoc_sa_query_retry_timeout"] = "1"
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
bssid = apdev[0]['bssid']
addr = dev[0].own_addr()
dev[0].request("SET reassoc_same_bss_optim 1")
if not ap_tid:
ap_tid = tid
bssid = ap['bssid']
+ Wlantest.setup(hapd)
wt = Wlantest()
wt.clear_sta_counters(bssid, sta)
hwsim_utils.test_connectivity(dev, hapd, dscp=dscp, config=False)
dev[1].connect("test-open", key_mgmt="NONE", scan_freq=scan_freq)
connectivity(dev, hapd)
-def wlantest_setup():
+def wlantest_setup(hapd):
+ Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
def test_ap_tdls_discovery(dev, apdev):
"""WPA2-PSK AP and two stations using TDLS discovery"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("TDLS_DISCOVER " + dev[1].p2p_interface_addr())
time.sleep(0.2)
def test_ap_wpa2_tdls(dev, apdev):
"""WPA2-PSK AP and two stations using TDLS"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
def test_ap_wpa2_tdls_concurrent_init(dev, apdev):
"""Concurrent TDLS setup initiation"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x80")
setup_tdls(dev[1], dev[0], hapd, reverse=True)
def test_ap_wpa2_tdls_concurrent_init2(dev, apdev):
"""Concurrent TDLS setup initiation (reverse)"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x80")
setup_tdls(dev[0], dev[1], hapd)
def test_ap_wpa2_tdls_decline_resp(dev, apdev):
"""Decline TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x200")
setup_tdls(dev[1], dev[0], hapd, expect_fail=True)
def test_ap_wpa2_tdls_long_lifetime(dev, apdev):
"""TDLS with long TPK lifetime"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x40")
setup_tdls(dev[1], dev[0], hapd)
def test_ap_wpa2_tdls_long_frame(dev, apdev):
"""TDLS with long setup/teardown frames"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x1")
dev[1].request("SET tdls_testing 0x1")
def test_ap_wpa2_tdls_reneg(dev, apdev):
"""Renegotiate TDLS link"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[1], dev[0], hapd)
setup_tdls(dev[0], dev[1], hapd)
def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev):
"""Incorrect TPK lifetime in TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x10")
setup_tdls(dev[0], dev[1], hapd, expect_fail=True)
def test_ap_wpa2_tdls_diff_rsnie(dev, apdev):
"""TDLS with different RSN IEs"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x2")
setup_tdls(dev[1], dev[0], hapd)
def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev):
"""Incorrect MIC in TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x800")
addr0 = dev[0].p2p_interface_addr()
def test_ap_wpa2_tdls_wrong_tpk_m3_mic(dev, apdev):
"""Incorrect MIC in TDLS Setup Confirm"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x800")
addr0 = dev[0].p2p_interface_addr()
hapd = hostapd.add_ap(apdev[0],
hostapd.wpa_params(ssid="test-wpa-psk",
passphrase="12345678"))
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
hapd = hostapd.add_ap(apdev[0],
hostapd.wpa_mixed_params(ssid="test-wpa-mixed-psk",
passphrase="12345678"))
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa_psk_mixed(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
"""WEP AP and two stations using TDLS"""
hapd = hostapd.add_ap(apdev[0],
{ "ssid": "test-wep", "wep_key0": '"hello"' })
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wep(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
def test_ap_open_tdls(dev, apdev):
"""Open AP and two stations using TDLS"""
hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
params['bridge'] = 'ap-br0'
hapd = hostapd.add_ap(apdev[0], params)
hostapd.add_ap(apdev[1], params)
- wlantest_setup()
+ wlantest_setup(hapd)
subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
dev[0].connect(ssid, psk=passphrase, scan_freq="2412",
def test_ap_wpa2_tdls_responder_teardown(dev, apdev):
"""TDLS teardown from responder with WPA2-PSK AP"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd, responder=True)
"vht_oper_centr_freq_seg0_idx": "0" }
try:
hapd = hostapd.add_ap(apdev[0], params)
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
try:
hapd = None
hapd = hostapd.add_ap(apdev[0], params)
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "WIDTH=80 MHz" not in sig:
try:
hapd = None
hapd = hostapd.add_ap(apdev[0], params)
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "FREQUENCY=5180" not in sig:
if "5490" in r and "DFS" in r:
raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
raise Exception("AP setup timed out")
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5520")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "WIDTH=160 MHz" not in sig:
def test_ap_tdls_link_status(dev, apdev):
"""Check TDLS link status between two stations"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
check_tdls_link(dev[0], dev[1], connected=False)
setup_tdls(dev[0], dev[1], hapd)
def test_autogo_tdls(dev):
"""P2P autonomous GO and two clients using TDLS"""
- wt = Wlantest()
go = dev[0]
logger.info("Start autonomous GO with fixed parameters " + go.ifname)
id = go.add_network()
go.set_network(id, "disabled", "2")
res = go.p2p_start_go(persistent=id, freq="2462")
logger.debug("res: " + str(res))
+ Wlantest.setup(go, True)
+ wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
connect_cli(go, dev[1], social=True, freq=2462)
def test_peerkey_pairwise_mismatch(dev, apdev):
"""RSN TKIP+CCMP AP and PeerKey between two STAs using different ciphers"""
skip_with_fips(dev[0])
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
ssid = "test-peerkey"
passphrase = "12345678"
params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
params['peerkey'] = "1"
params['rsn_pairwise'] = "TKIP CCMP"
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
+
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk=passphrase, scan_freq="2412", peerkey=True,
pairwise="CCMP")
def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
"""WNM Sleep Mode - RSN with PMF"""
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
params["bss_transition"] = "1"
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
# This software may be distributed under the terms of the BSD license.
# See README for more details.
+import re
import os
+import posixpath
import time
import subprocess
import logging
logger = logging.getLogger()
class Wlantest:
+ remote_host = None
+ setup_params = None
+ exe_thread = None
+ exe_res = []
+ monitor_mod = None
+ setup_done = False
+
+ @classmethod
+ def stop_remote_wlantest(cls):
+ if cls.exe_thread is None:
+ # Local flow - no need for remote operations
+ return
+
+ cls.remote_host.execute(["killall", "-9", "wlantest"])
+ cls.remote_host.wait_execute_complete(cls.exe_thread, 5)
+ cls.exe_thread = None
+ cls.exe_res = []
+
+ @classmethod
+ def reset_remote_wlantest(cls):
+ cls.stop_remote_wlantest()
+ cls.remote_host = None
+ cls.setup_params = None
+ cls.exe_thread = None
+ cls.exe_res = []
+ cls.monitor_mod = None
+ cls.setup_done = False
+
+ @classmethod
+ def start_remote_wlantest(cls):
+ if cls.remote_host is None:
+ # Local flow - no need for remote operations
+ return
+ if cls.exe_thread is not None:
+ raise Exception("Cannot start wlantest twice")
+
+ log_dir = cls.setup_params['log_dir']
+ ifaces = re.split('; | |, ', cls.remote_host.ifname)
+ ifname = ifaces[0]
+ exe = cls.setup_params["wlantest"]
+ tc_name = cls.setup_params["tc_name"]
+ base_log_name = tc_name + "_wlantest_" + \
+ cls.remote_host.name + "_" + ifname
+ log_file = posixpath.join(log_dir, base_log_name + ".log")
+ pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng")
+ cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname,
+ pcap_file, log_file)
+ cls.remote_host.add_log(log_file)
+ cls.remote_host.add_log(pcap_file)
+ cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res)
+ # Give wlantest a chance to start working
+ time.sleep(1)
+
+ @classmethod
+ def register_remote_wlantest(cls, host, setup_params, monitor_mod):
+ if cls.remote_host is not None:
+ raise Exception("Cannot register remote wlantest twice")
+ cls.remote_host = host
+ cls.setup_params = setup_params
+ cls.monitor_mod = monitor_mod
+ status, buf = host.execute(["which", setup_params['wlantest']])
+ if status != 0:
+ raise Exception(host.name + " - wlantest: " + buf)
+ status, buf = host.execute(["which", setup_params['wlantest_cli']])
+ if status != 0:
+ raise Exception(host.name + " - wlantest_cli: " + buf)
+
+ @classmethod
+ def chan_from_wpa(cls, wpa, is_p2p=False):
+ if cls.monitor_mod is None:
+ return
+ m = cls.monitor_mod
+ return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)])
+
+ @classmethod
+ def setup(cls, wpa, is_p2p=False):
+ cls.chan_from_wpa(wpa, is_p2p)
+ cls.start_remote_wlantest()
+ cls.setup_done = True
+
def __init__(self):
+ if not self.setup_done:
+ raise Exception("Cannot create Wlantest instance before setup()")
if os.path.isfile('../../wlantest/wlantest_cli'):
self.wlantest_cli = '../../wlantest/wlantest_cli'
else:
self.wlantest_cli = 'wlantest_cli'
+ def cli_cmd(self, params):
+ if self.remote_host is not None:
+ exe = self.setup_params["wlantest_cli"]
+ ret = self.remote_host.execute([exe] + params)
+ if ret[0] != 0:
+ raise Exception("wlantest_cli failed")
+ return ret[1]
+ else:
+ return subprocess.check_output([self.wlantest_cli] + params)
+
def flush(self):
- res = subprocess.check_output([self.wlantest_cli, "flush"])
+ res = self.cli_cmd(["flush"])
if "FAIL" in res:
raise Exception("wlantest_cli flush failed")
def relog(self):
- res = subprocess.check_output([self.wlantest_cli, "relog"])
+ res = self.cli_cmd(["relog"])
if "FAIL" in res:
raise Exception("wlantest_cli relog failed")
def add_passphrase(self, passphrase):
- res = subprocess.check_output([self.wlantest_cli, "add_passphrase",
- passphrase])
+ res = self.cli_cmd(["add_passphrase", passphrase])
if "FAIL" in res:
raise Exception("wlantest_cli add_passphrase failed")
def add_wepkey(self, key):
- res = subprocess.check_output([self.wlantest_cli, "add_wepkey", key])
+ res = self.cli_cmd(["add_wepkey", key])
if "FAIL" in res:
raise Exception("wlantest_cli add_key failed")
def info_bss(self, field, bssid):
- res = subprocess.check_output([self.wlantest_cli, "info_bss",
- field, bssid])
+ res = self.cli_cmd(["info_bss", field, bssid])
if "FAIL" in res:
raise Exception("Could not get BSS info from wlantest for " + bssid)
return res
def get_bss_counter(self, field, bssid):
try:
- res = subprocess.check_output([self.wlantest_cli, "get_bss_counter",
- field, bssid]);
+ res = self.cli_cmd(["get_bss_counter", field, bssid])
except Exception, e:
return 0
if "FAIL" in res:
return int(res)
def clear_bss_counters(self, bssid):
- subprocess.call([self.wlantest_cli, "clear_bss_counters", bssid],
- stdout=open('/dev/null', 'w'));
+ self.cli_cmd(["clear_bss_counters", bssid])
def info_sta(self, field, bssid, addr):
- res = subprocess.check_output([self.wlantest_cli, "info_sta",
- field, bssid, addr])
+ res = self.cli_cmd(["info_sta", field, bssid, addr])
if "FAIL" in res:
raise Exception("Could not get STA info from wlantest for " + addr)
return res
def get_sta_counter(self, field, bssid, addr):
- res = subprocess.check_output([self.wlantest_cli, "get_sta_counter",
- field, bssid, addr]);
+ res = self.cli_cmd(["get_sta_counter", field, bssid, addr])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def clear_sta_counters(self, bssid, addr):
- res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters",
- bssid, addr]);
+ res = self.cli_cmd(["clear_sta_counters", bssid, addr])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
def tdls_clear(self, bssid, addr1, addr2):
- res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters",
- bssid, addr1, addr2]);
+ self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2])
def get_tdls_counter(self, field, bssid, addr1, addr2):
- res = subprocess.check_output([self.wlantest_cli, "get_tdls_counter",
- field, bssid, addr1, addr2]);
+ res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
raise Exception("Unexpected STA key_mgmt")
def get_tx_tid(self, bssid, addr, tid):
- res = subprocess.check_output([self.wlantest_cli, "get_tx_tid",
- bssid, addr, str(tid)]);
+ res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def get_rx_tid(self, bssid, addr, tid):
- res = subprocess.check_output([self.wlantest_cli, "get_rx_tid",
- bssid, addr, str(tid)]);
+ res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
"hostapd" : "./tests/hostapd",
"wpa_supplicant" : "./tests/wpa_supplicant",
"iperf" : "iperf",
+ "wlantest" : "./tests/wlantest",
+ "wlantest_cli" : "./tests/wlantest_cli",
"country" : "US",
"log_dir" : "/tmp/",
"ipv4_test_net" : "192.168.12.0",
import rutils
import monitor
import traceback
+import wlantest
import logging
logger = logging.getLogger()
monitor.add(dut_host, monitors)
monitor.run(dut_host, setup_params)
+ monitor_hosts = monitor.create(devices, setup_params, refs, duts,
+ monitors)
+ mon = None
+ if len(monitor_hosts) > 0:
+ mon = monitor_hosts[0]
+ wlantest.Wlantest.reset_remote_wlantest()
+ wlantest.Wlantest.register_remote_wlantest(mon, setup_params,
+ monitor)
+
# run hostapd/wpa_supplicant
for ref_host in ref_hosts:
rutils.run_wpasupplicant(ref_host, setup_params)
for dut_host in dut_hosts:
dut_host.execute(["killall", "hostapd"])
dut_host.get_logs(local_log_dir)
+ if mon is not None:
+ wlantest.Wlantest.reset_remote_wlantest()
+ mon.get_logs(local_log_dir)
return ""
except:
for dut_host in dut_hosts:
dut_host.execute(["killall", "hostapd"])
dut_host.get_logs(local_log_dir)
+ if mon is not None:
+ wlantest.Wlantest.reset_remote_wlantest()
+ mon.get_logs(local_log_dir)
raise