From 8efc83d4e7b88c6496989f8e58e44fc94134d57d Mon Sep 17 00:00:00 2001 From: Jonathan Afek Date: Thu, 19 May 2016 16:06:49 +0300 Subject: [PATCH] tests: Add support for wlantest for remote hwsim tests Use a monitor interface given in the command line that is not also a station or an AP as a monitor running wlantest on the channel used by the test. This makes all the tests that use wlantest available for execution on real hardware on remote hosts. Signed-off-by: Jonathan Afek --- tests/hwsim/run-tests.py | 3 + tests/hwsim/test_ap_ciphers.py | 10 ++-- tests/hwsim/test_ap_hs20.py | 3 +- tests/hwsim/test_ap_pmf.py | 70 +++++++++++++--------- tests/hwsim/test_ap_qosmap.py | 1 + tests/hwsim/test_ap_tdls.py | 49 +++++++-------- tests/hwsim/test_p2p_autogo.py | 3 +- tests/hwsim/test_peerkey.py | 10 ++-- tests/hwsim/test_wnm.py | 8 ++- tests/hwsim/wlantest.py | 133 +++++++++++++++++++++++++++++++++-------- tests/remote/config.py | 2 + tests/remote/hwsim_wrapper.py | 16 +++++ 12 files changed, 217 insertions(+), 91 deletions(-) diff --git a/tests/hwsim/run-tests.py b/tests/hwsim/run-tests.py index fd81b02..9361e3a 100755 --- a/tests/hwsim/run-tests.py +++ b/tests/hwsim/run-tests.py @@ -503,6 +503,9 @@ def main(): del hapd hapd = None + # Use None here since this instance of Wlantest() will never be + # used for remote host hwsim tests on real hardware. + Wlantest.setup(None) wt = Wlantest() rename_log(args.logdir, 'hwsim0.pcapng', name, wt) rename_log(args.logdir, 'hwsim0', name, wt) diff --git a/tests/hwsim/test_ap_ciphers.py b/tests/hwsim/test_ap_ciphers.py index 6cde14a..bff8906 100644 --- a/tests/hwsim/test_ap_ciphers.py +++ b/tests/hwsim/test_ap_ciphers.py @@ -28,10 +28,6 @@ def check_cipher(dev, ap, cipher): hwsim_utils.test_connectivity(dev, hapd) def check_group_mgmt_cipher(dev, ap, cipher): - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") - if cipher not in dev.get_capability("group_mgmt"): raise HwsimSkip("Cipher %s not supported" % cipher) params = { "ssid": "test-wpa2-psk-pmf", @@ -42,6 +38,12 @@ def check_group_mgmt_cipher(dev, ap, cipher): "rsn_pairwise": "CCMP", "group_mgmt_cipher": cipher } hapd = hostapd.add_ap(ap, params) + + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") + dev.connect("test-wpa2-psk-pmf", psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK-SHA256", pairwise="CCMP", group="CCMP", scan_freq="2412") diff --git a/tests/hwsim/test_ap_hs20.py b/tests/hwsim/test_ap_hs20.py index a3b61b2..3a93544 100644 --- a/tests/hwsim/test_ap_hs20.py +++ b/tests/hwsim/test_ap_hs20.py @@ -298,7 +298,7 @@ def _test_ap_interworking_scan_filtering(dev, apdev): ssid = "test-hs20-ap1" params['ssid'] = ssid params['hessid'] = bssid - hostapd.add_ap(apdev[0], params) + hapd0 = hostapd.add_ap(apdev[0], params) bssid2 = apdev[1]['bssid'] params = hs20_ap_params() @@ -312,6 +312,7 @@ def _test_ap_interworking_scan_filtering(dev, apdev): dev[0].hs20_enable() + Wlantest.setup(hapd0) wt = Wlantest() wt.flush() diff --git a/tests/hwsim/test_ap_pmf.py b/tests/hwsim/test_ap_pmf.py index f7c5f29..c9d7403 100644 --- a/tests/hwsim/test_ap_pmf.py +++ b/tests/hwsim/test_ap_pmf.py @@ -17,13 +17,14 @@ from wpasupplicant import WpaSupplicant def test_ap_pmf_required(dev, apdev): """WPA2-PSK AP with PMF required""" ssid = "test-pmf-required" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK-SHA256"; params["ieee80211w"] = "2"; hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") key_mgmt = hapd.get_config()['key_mgmt'] if key_mgmt.split(' ')[0] != "WPA-PSK-SHA256": raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt) @@ -53,13 +54,14 @@ def test_ap_pmf_required(dev, apdev): def test_ap_pmf_optional(dev, apdev): """WPA2-PSK AP with PMF optional""" ssid = "test-pmf-optional" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK"; params["ieee80211w"] = "1"; hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") @@ -75,13 +77,14 @@ def test_ap_pmf_optional(dev, apdev): def test_ap_pmf_optional_2akm(dev, apdev): """WPA2-PSK AP with PMF optional (2 AKMs)""" ssid = "test-pmf-optional-2akm" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256"; params["ieee80211w"] = "1"; hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") @@ -101,11 +104,12 @@ def test_ap_pmf_optional_2akm(dev, apdev): def test_ap_pmf_negative(dev, apdev): """WPA2-PSK AP without PMF (negative test)""" ssid = "test-pmf-negative" + params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") + hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) wt = Wlantest() wt.flush() wt.add_passphrase("12345678") - params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") - hapd = hostapd.add_ap(apdev[0], params) dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") @@ -123,13 +127,14 @@ def test_ap_pmf_negative(dev, apdev): def test_ap_pmf_assoc_comeback(dev, apdev): """WPA2-PSK AP with PMF association comeback""" ssid = "assoc-comeback" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK-SHA256"; params["ieee80211w"] = "2"; hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") @@ -146,13 +151,14 @@ def test_ap_pmf_assoc_comeback(dev, apdev): def test_ap_pmf_assoc_comeback2(dev, apdev): """WPA2-PSK AP with PMF association comeback (using DROP_SA)""" ssid = "assoc-comeback" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK"; params["ieee80211w"] = "1"; hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") dev[0].connect(ssid, psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412") if "OK" not in dev[0].request("DROP_SA"): @@ -167,9 +173,6 @@ def test_ap_pmf_sta_sa_query(dev, apdev): """WPA2-PSK AP with station using SA Query""" ssid = "assoc-comeback" addr = dev[0].own_addr() - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add("wlan5", drv_params="use_monitor=1") @@ -187,6 +190,11 @@ def test_ap_pmf_sta_sa_query(dev, apdev): bssid = wpas.own_addr() wpas.dump_monitor() + Wlantest.setup(wpas) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") + dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") @@ -260,9 +268,6 @@ def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev): """WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames""" ssid = "deauth-attack" addr = dev[0].own_addr() - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add("wlan5", drv_params="use_monitor=1") @@ -279,6 +284,11 @@ def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev): wpas.connect_network(id) bssid = wpas.own_addr() + Wlantest.setup(wpas) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") + dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") @@ -348,13 +358,14 @@ def test_ap_pmf_optional_eap(dev, apdev): def test_ap_pmf_required_sha1(dev, apdev): """WPA2-PSK AP with PMF required with SHA1 AKM""" ssid = "test-pmf-required-sha1" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK"; params["ieee80211w"] = "2"; hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") key_mgmt = hapd.get_config()['key_mgmt'] if key_mgmt.split(' ')[0] != "WPA-PSK": raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt) @@ -373,15 +384,16 @@ def test_ap_pmf_toggle(dev, apdev): def _test_ap_pmf_toggle(dev, apdev): ssid = "test-pmf-optional" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK"; params["ieee80211w"] = "1"; params["assoc_sa_query_max_timeout"] = "1" params["assoc_sa_query_retry_timeout"] = "1" hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") bssid = apdev[0]['bssid'] addr = dev[0].own_addr() dev[0].request("SET reassoc_same_bss_optim 1") diff --git a/tests/hwsim/test_ap_qosmap.py b/tests/hwsim/test_ap_qosmap.py index 0dd0cba..6084521 100644 --- a/tests/hwsim/test_ap_qosmap.py +++ b/tests/hwsim/test_ap_qosmap.py @@ -18,6 +18,7 @@ def check_qos_map(ap, hapd, dev, sta, dscp, tid, ap_tid=None): if not ap_tid: ap_tid = tid bssid = ap['bssid'] + Wlantest.setup(hapd) wt = Wlantest() wt.clear_sta_counters(bssid, sta) hwsim_utils.test_connectivity(dev, hapd, dscp=dscp, config=False) diff --git a/tests/hwsim/test_ap_tdls.py b/tests/hwsim/test_ap_tdls.py index 297a928..138f21c 100644 --- a/tests/hwsim/test_ap_tdls.py +++ b/tests/hwsim/test_ap_tdls.py @@ -56,7 +56,8 @@ def connect_2sta_open(dev, hapd, scan_freq="2412"): dev[1].connect("test-open", key_mgmt="NONE", scan_freq=scan_freq) connectivity(dev, hapd) -def wlantest_setup(): +def wlantest_setup(hapd): + Wlantest.setup(hapd) wt = Wlantest() wt.flush() wt.add_passphrase("12345678") @@ -165,7 +166,7 @@ def check_tdls_link(sta0, sta1, connected=True): def test_ap_tdls_discovery(dev, apdev): """WPA2-PSK AP and two stations using TDLS discovery""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[0].request("TDLS_DISCOVER " + dev[1].p2p_interface_addr()) time.sleep(0.2) @@ -173,7 +174,7 @@ def test_ap_tdls_discovery(dev, apdev): def test_ap_wpa2_tdls(dev, apdev): """WPA2-PSK AP and two stations using TDLS""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd) @@ -183,7 +184,7 @@ def test_ap_wpa2_tdls(dev, apdev): def test_ap_wpa2_tdls_concurrent_init(dev, apdev): """Concurrent TDLS setup initiation""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[0].request("SET tdls_testing 0x80") setup_tdls(dev[1], dev[0], hapd, reverse=True) @@ -191,7 +192,7 @@ def test_ap_wpa2_tdls_concurrent_init(dev, apdev): def test_ap_wpa2_tdls_concurrent_init2(dev, apdev): """Concurrent TDLS setup initiation (reverse)""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[1].request("SET tdls_testing 0x80") setup_tdls(dev[0], dev[1], hapd) @@ -199,7 +200,7 @@ def test_ap_wpa2_tdls_concurrent_init2(dev, apdev): def test_ap_wpa2_tdls_decline_resp(dev, apdev): """Decline TDLS Setup Response""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[1].request("SET tdls_testing 0x200") setup_tdls(dev[1], dev[0], hapd, expect_fail=True) @@ -207,7 +208,7 @@ def test_ap_wpa2_tdls_decline_resp(dev, apdev): def test_ap_wpa2_tdls_long_lifetime(dev, apdev): """TDLS with long TPK lifetime""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[1].request("SET tdls_testing 0x40") setup_tdls(dev[1], dev[0], hapd) @@ -215,7 +216,7 @@ def test_ap_wpa2_tdls_long_lifetime(dev, apdev): def test_ap_wpa2_tdls_long_frame(dev, apdev): """TDLS with long setup/teardown frames""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[0].request("SET tdls_testing 0x1") dev[1].request("SET tdls_testing 0x1") @@ -226,7 +227,7 @@ def test_ap_wpa2_tdls_long_frame(dev, apdev): def test_ap_wpa2_tdls_reneg(dev, apdev): """Renegotiate TDLS link""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) setup_tdls(dev[1], dev[0], hapd) setup_tdls(dev[0], dev[1], hapd) @@ -234,7 +235,7 @@ def test_ap_wpa2_tdls_reneg(dev, apdev): def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev): """Incorrect TPK lifetime in TDLS Setup Response""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[1].request("SET tdls_testing 0x10") setup_tdls(dev[0], dev[1], hapd, expect_fail=True) @@ -242,7 +243,7 @@ def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev): def test_ap_wpa2_tdls_diff_rsnie(dev, apdev): """TDLS with different RSN IEs""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[1].request("SET tdls_testing 0x2") setup_tdls(dev[1], dev[0], hapd) @@ -251,7 +252,7 @@ def test_ap_wpa2_tdls_diff_rsnie(dev, apdev): def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev): """Incorrect MIC in TDLS Setup Response""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[0].request("SET tdls_testing 0x800") addr0 = dev[0].p2p_interface_addr() @@ -261,7 +262,7 @@ def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev): def test_ap_wpa2_tdls_wrong_tpk_m3_mic(dev, apdev): """Incorrect MIC in TDLS Setup Confirm""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) dev[1].request("SET tdls_testing 0x800") addr0 = dev[0].p2p_interface_addr() @@ -274,7 +275,7 @@ def test_ap_wpa_tdls(dev, apdev): hapd = hostapd.add_ap(apdev[0], hostapd.wpa_params(ssid="test-wpa-psk", passphrase="12345678")) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa_psk(dev, hapd) setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd) @@ -286,7 +287,7 @@ def test_ap_wpa_mixed_tdls(dev, apdev): hapd = hostapd.add_ap(apdev[0], hostapd.wpa_mixed_params(ssid="test-wpa-mixed-psk", passphrase="12345678")) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa_psk_mixed(dev, hapd) setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd) @@ -296,7 +297,7 @@ def test_ap_wep_tdls(dev, apdev): """WEP AP and two stations using TDLS""" hapd = hostapd.add_ap(apdev[0], { "ssid": "test-wep", "wep_key0": '"hello"' }) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wep(dev, hapd) setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd) @@ -305,7 +306,7 @@ def test_ap_wep_tdls(dev, apdev): def test_ap_open_tdls(dev, apdev): """Open AP and two stations using TDLS""" hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" }) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_open(dev, hapd) setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd) @@ -321,7 +322,7 @@ def test_ap_wpa2_tdls_bssid_mismatch(dev, apdev): params['bridge'] = 'ap-br0' hapd = hostapd.add_ap(apdev[0], params) hostapd.add_ap(apdev[1], params) - wlantest_setup() + wlantest_setup(hapd) subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) dev[0].connect(ssid, psk=passphrase, scan_freq="2412", @@ -343,7 +344,7 @@ def test_ap_wpa2_tdls_bssid_mismatch(dev, apdev): def test_ap_wpa2_tdls_responder_teardown(dev, apdev): """TDLS teardown from responder with WPA2-PSK AP""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd, responder=True) @@ -362,7 +363,7 @@ def test_ap_open_tdls_vht(dev, apdev): "vht_oper_centr_freq_seg0_idx": "0" } try: hapd = hostapd.add_ap(apdev[0], params) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_open(dev, hapd, scan_freq="5180") setup_tdls(dev[0], dev[1], hapd) teardown_tdls(dev[0], dev[1], hapd) @@ -392,7 +393,7 @@ def test_ap_open_tdls_vht80(dev, apdev): try: hapd = None hapd = hostapd.add_ap(apdev[0], params) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_open(dev, hapd, scan_freq="5180") sig = dev[0].request("SIGNAL_POLL").splitlines() if "WIDTH=80 MHz" not in sig: @@ -436,7 +437,7 @@ def test_ap_open_tdls_vht80plus80(dev, apdev): try: hapd = None hapd = hostapd.add_ap(apdev[0], params) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_open(dev, hapd, scan_freq="5180") sig = dev[0].request("SIGNAL_POLL").splitlines() if "FREQUENCY=5180" not in sig: @@ -492,7 +493,7 @@ def test_ap_open_tdls_vht160(dev, apdev): if "5490" in r and "DFS" in r: raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed") raise Exception("AP setup timed out") - wlantest_setup() + wlantest_setup(hapd) connect_2sta_open(dev, hapd, scan_freq="5520") sig = dev[0].request("SIGNAL_POLL").splitlines() if "WIDTH=160 MHz" not in sig: @@ -539,7 +540,7 @@ def test_tdls_chan_switch(dev, apdev): def test_ap_tdls_link_status(dev, apdev): """Check TDLS link status between two stations""" hapd = start_ap_wpa2_psk(apdev[0]) - wlantest_setup() + wlantest_setup(hapd) connect_2sta_wpa2_psk(dev, hapd) check_tdls_link(dev[0], dev[1], connected=False) setup_tdls(dev[0], dev[1], hapd) diff --git a/tests/hwsim/test_p2p_autogo.py b/tests/hwsim/test_p2p_autogo.py index 3875343..55c4c0f 100644 --- a/tests/hwsim/test_p2p_autogo.py +++ b/tests/hwsim/test_p2p_autogo.py @@ -231,7 +231,6 @@ def test_autogo_pbc(dev): def test_autogo_tdls(dev): """P2P autonomous GO and two clients using TDLS""" - wt = Wlantest() go = dev[0] logger.info("Start autonomous GO with fixed parameters " + go.ifname) id = go.add_network() @@ -241,6 +240,8 @@ def test_autogo_tdls(dev): go.set_network(id, "disabled", "2") res = go.p2p_start_go(persistent=id, freq="2462") logger.debug("res: " + str(res)) + Wlantest.setup(go, True) + wt = Wlantest() wt.flush() wt.add_passphrase("12345678") connect_cli(go, dev[1], social=True, freq=2462) diff --git a/tests/hwsim/test_peerkey.py b/tests/hwsim/test_peerkey.py index b1e7b1f..30d9b60 100644 --- a/tests/hwsim/test_peerkey.py +++ b/tests/hwsim/test_peerkey.py @@ -50,15 +50,17 @@ def test_peerkey_unknown_peer(dev, apdev): def test_peerkey_pairwise_mismatch(dev, apdev): """RSN TKIP+CCMP AP and PeerKey between two STAs using different ciphers""" skip_with_fips(dev[0]) - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") ssid = "test-peerkey" passphrase = "12345678" params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) params['peerkey'] = "1" params['rsn_pairwise'] = "TKIP CCMP" - hostapd.add_ap(apdev[0], params) + hapd = hostapd.add_ap(apdev[0], params) + + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") dev[0].connect(ssid, psk=passphrase, scan_freq="2412", peerkey=True, pairwise="CCMP") diff --git a/tests/hwsim/test_wnm.py b/tests/hwsim/test_wnm.py index 8048113..3788ed2 100644 --- a/tests/hwsim/test_wnm.py +++ b/tests/hwsim/test_wnm.py @@ -185,9 +185,6 @@ def test_wnm_sleep_mode_ap_oom(dev, apdev): def test_wnm_sleep_mode_rsn_pmf(dev, apdev): """WNM Sleep Mode - RSN with PMF""" - wt = Wlantest() - wt.flush() - wt.add_passphrase("12345678") params = hostapd.wpa2_params("test-wnm-rsn", "12345678") params["wpa_key_mgmt"] = "WPA-PSK-SHA256"; params["ieee80211w"] = "2"; @@ -197,6 +194,11 @@ def test_wnm_sleep_mode_rsn_pmf(dev, apdev): params["bss_transition"] = "1" hapd = hostapd.add_ap(apdev[0], params) + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") + dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412") ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5) diff --git a/tests/hwsim/wlantest.py b/tests/hwsim/wlantest.py index 5f6b4ac..10bb45f 100644 --- a/tests/hwsim/wlantest.py +++ b/tests/hwsim/wlantest.py @@ -4,7 +4,9 @@ # This software may be distributed under the terms of the BSD license. # See README for more details. +import re import os +import posixpath import time import subprocess import logging @@ -13,44 +15,133 @@ import wpaspy logger = logging.getLogger() class Wlantest: + remote_host = None + setup_params = None + exe_thread = None + exe_res = [] + monitor_mod = None + setup_done = False + + @classmethod + def stop_remote_wlantest(cls): + if cls.exe_thread is None: + # Local flow - no need for remote operations + return + + cls.remote_host.execute(["killall", "-9", "wlantest"]) + cls.remote_host.wait_execute_complete(cls.exe_thread, 5) + cls.exe_thread = None + cls.exe_res = [] + + @classmethod + def reset_remote_wlantest(cls): + cls.stop_remote_wlantest() + cls.remote_host = None + cls.setup_params = None + cls.exe_thread = None + cls.exe_res = [] + cls.monitor_mod = None + cls.setup_done = False + + @classmethod + def start_remote_wlantest(cls): + if cls.remote_host is None: + # Local flow - no need for remote operations + return + if cls.exe_thread is not None: + raise Exception("Cannot start wlantest twice") + + log_dir = cls.setup_params['log_dir'] + ifaces = re.split('; | |, ', cls.remote_host.ifname) + ifname = ifaces[0] + exe = cls.setup_params["wlantest"] + tc_name = cls.setup_params["tc_name"] + base_log_name = tc_name + "_wlantest_" + \ + cls.remote_host.name + "_" + ifname + log_file = posixpath.join(log_dir, base_log_name + ".log") + pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng") + cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname, + pcap_file, log_file) + cls.remote_host.add_log(log_file) + cls.remote_host.add_log(pcap_file) + cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res) + # Give wlantest a chance to start working + time.sleep(1) + + @classmethod + def register_remote_wlantest(cls, host, setup_params, monitor_mod): + if cls.remote_host is not None: + raise Exception("Cannot register remote wlantest twice") + cls.remote_host = host + cls.setup_params = setup_params + cls.monitor_mod = monitor_mod + status, buf = host.execute(["which", setup_params['wlantest']]) + if status != 0: + raise Exception(host.name + " - wlantest: " + buf) + status, buf = host.execute(["which", setup_params['wlantest_cli']]) + if status != 0: + raise Exception(host.name + " - wlantest_cli: " + buf) + + @classmethod + def chan_from_wpa(cls, wpa, is_p2p=False): + if cls.monitor_mod is None: + return + m = cls.monitor_mod + return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)]) + + @classmethod + def setup(cls, wpa, is_p2p=False): + cls.chan_from_wpa(wpa, is_p2p) + cls.start_remote_wlantest() + cls.setup_done = True + def __init__(self): + if not self.setup_done: + raise Exception("Cannot create Wlantest instance before setup()") if os.path.isfile('../../wlantest/wlantest_cli'): self.wlantest_cli = '../../wlantest/wlantest_cli' else: self.wlantest_cli = 'wlantest_cli' + def cli_cmd(self, params): + if self.remote_host is not None: + exe = self.setup_params["wlantest_cli"] + ret = self.remote_host.execute([exe] + params) + if ret[0] != 0: + raise Exception("wlantest_cli failed") + return ret[1] + else: + return subprocess.check_output([self.wlantest_cli] + params) + def flush(self): - res = subprocess.check_output([self.wlantest_cli, "flush"]) + res = self.cli_cmd(["flush"]) if "FAIL" in res: raise Exception("wlantest_cli flush failed") def relog(self): - res = subprocess.check_output([self.wlantest_cli, "relog"]) + res = self.cli_cmd(["relog"]) if "FAIL" in res: raise Exception("wlantest_cli relog failed") def add_passphrase(self, passphrase): - res = subprocess.check_output([self.wlantest_cli, "add_passphrase", - passphrase]) + res = self.cli_cmd(["add_passphrase", passphrase]) if "FAIL" in res: raise Exception("wlantest_cli add_passphrase failed") def add_wepkey(self, key): - res = subprocess.check_output([self.wlantest_cli, "add_wepkey", key]) + res = self.cli_cmd(["add_wepkey", key]) if "FAIL" in res: raise Exception("wlantest_cli add_key failed") def info_bss(self, field, bssid): - res = subprocess.check_output([self.wlantest_cli, "info_bss", - field, bssid]) + res = self.cli_cmd(["info_bss", field, bssid]) if "FAIL" in res: raise Exception("Could not get BSS info from wlantest for " + bssid) return res def get_bss_counter(self, field, bssid): try: - res = subprocess.check_output([self.wlantest_cli, "get_bss_counter", - field, bssid]); + res = self.cli_cmd(["get_bss_counter", field, bssid]) except Exception, e: return 0 if "FAIL" in res: @@ -58,36 +149,30 @@ class Wlantest: return int(res) def clear_bss_counters(self, bssid): - subprocess.call([self.wlantest_cli, "clear_bss_counters", bssid], - stdout=open('/dev/null', 'w')); + self.cli_cmd(["clear_bss_counters", bssid]) def info_sta(self, field, bssid, addr): - res = subprocess.check_output([self.wlantest_cli, "info_sta", - field, bssid, addr]) + res = self.cli_cmd(["info_sta", field, bssid, addr]) if "FAIL" in res: raise Exception("Could not get STA info from wlantest for " + addr) return res def get_sta_counter(self, field, bssid, addr): - res = subprocess.check_output([self.wlantest_cli, "get_sta_counter", - field, bssid, addr]); + res = self.cli_cmd(["get_sta_counter", field, bssid, addr]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) def clear_sta_counters(self, bssid, addr): - res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters", - bssid, addr]); + res = self.cli_cmd(["clear_sta_counters", bssid, addr]) if "FAIL" in res: raise Exception("wlantest_cli command failed") def tdls_clear(self, bssid, addr1, addr2): - res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters", - bssid, addr1, addr2]); + self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2]) def get_tdls_counter(self, field, bssid, addr1, addr2): - res = subprocess.check_output([self.wlantest_cli, "get_tdls_counter", - field, bssid, addr1, addr2]); + res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) @@ -139,15 +224,13 @@ class Wlantest: raise Exception("Unexpected STA key_mgmt") def get_tx_tid(self, bssid, addr, tid): - res = subprocess.check_output([self.wlantest_cli, "get_tx_tid", - bssid, addr, str(tid)]); + res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) def get_rx_tid(self, bssid, addr, tid): - res = subprocess.check_output([self.wlantest_cli, "get_rx_tid", - bssid, addr, str(tid)]); + res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) diff --git a/tests/remote/config.py b/tests/remote/config.py index 0826ea0..cf3b77f 100644 --- a/tests/remote/config.py +++ b/tests/remote/config.py @@ -20,6 +20,8 @@ setup_params = { "setup_hw" : "./tests/setup_hw.sh", "hostapd" : "./tests/hostapd", "wpa_supplicant" : "./tests/wpa_supplicant", "iperf" : "iperf", + "wlantest" : "./tests/wlantest", + "wlantest_cli" : "./tests/wlantest_cli", "country" : "US", "log_dir" : "/tmp/", "ipv4_test_net" : "192.168.12.0", diff --git a/tests/remote/hwsim_wrapper.py b/tests/remote/hwsim_wrapper.py index d70fe85..d2598ab 100644 --- a/tests/remote/hwsim_wrapper.py +++ b/tests/remote/hwsim_wrapper.py @@ -11,6 +11,7 @@ import config import rutils import monitor import traceback +import wlantest import logging logger = logging.getLogger() @@ -45,6 +46,15 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test): monitor.add(dut_host, monitors) monitor.run(dut_host, setup_params) + monitor_hosts = monitor.create(devices, setup_params, refs, duts, + monitors) + mon = None + if len(monitor_hosts) > 0: + mon = monitor_hosts[0] + wlantest.Wlantest.reset_remote_wlantest() + wlantest.Wlantest.register_remote_wlantest(mon, setup_params, + monitor) + # run hostapd/wpa_supplicant for ref_host in ref_hosts: rutils.run_wpasupplicant(ref_host, setup_params) @@ -83,6 +93,9 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test): for dut_host in dut_hosts: dut_host.execute(["killall", "hostapd"]) dut_host.get_logs(local_log_dir) + if mon is not None: + wlantest.Wlantest.reset_remote_wlantest() + mon.get_logs(local_log_dir) return "" except: @@ -105,4 +118,7 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test): for dut_host in dut_hosts: dut_host.execute(["killall", "hostapd"]) dut_host.get_logs(local_log_dir) + if mon is not None: + wlantest.Wlantest.reset_remote_wlantest() + mon.get_logs(local_log_dir) raise -- 2.1.4