from wpasupplicant import WpaSupplicant
def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
- ifname1=None, ifname2=None):
+ ifname1=None, ifname2=None, config=True):
addr1 = dev1.own_addr()
if not dev1group and isinstance(dev1, WpaSupplicant):
addr1 = dev1.get_driver_status_field('addr')
dev2.dump_monitor()
try:
- cmd = "DATA_TEST_CONFIG 1"
- if ifname1:
- cmd = cmd + " ifname=" + ifname1
- if dev1group:
- res = dev1.group_request(cmd)
- else:
- res = dev1.request(cmd)
- if "OK" not in res:
- raise Exception("Failed to enable data test functionality")
-
- cmd = "DATA_TEST_CONFIG 1"
- if ifname2:
- cmd = cmd + " ifname=" + ifname2
- if dev2group:
- res = dev2.group_request(cmd)
- else:
- res = dev2.request(cmd)
- if "OK" not in res:
- raise Exception("Failed to enable data test functionality")
+ if config:
+ cmd = "DATA_TEST_CONFIG 1"
+ if ifname1:
+ cmd = cmd + " ifname=" + ifname1
+ if dev1group:
+ res = dev1.group_request(cmd)
+ else:
+ res = dev1.request(cmd)
+ if "OK" not in res:
+ raise Exception("Failed to enable data test functionality")
+
+ cmd = "DATA_TEST_CONFIG 1"
+ if ifname2:
+ cmd = cmd + " ifname=" + ifname2
+ if dev2group:
+ res = dev2.group_request(cmd)
+ else:
+ res = dev2.request(cmd)
+ if "OK" not in res:
+ raise Exception("Failed to enable data test functionality")
cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
if dev1group:
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
raise Exception("Unexpected dev2->dev1 broadcast data result")
finally:
- if dev1group:
- dev1.group_request("DATA_TEST_CONFIG 0")
- else:
- dev1.request("DATA_TEST_CONFIG 0")
- if dev2group:
- dev2.group_request("DATA_TEST_CONFIG 0")
- else:
- dev2.request("DATA_TEST_CONFIG 0")
+ if config:
+ if dev1group:
+ dev1.group_request("DATA_TEST_CONFIG 0")
+ else:
+ dev1.request("DATA_TEST_CONFIG 0")
+ if dev2group:
+ dev2.group_request("DATA_TEST_CONFIG 0")
+ else:
+ dev2.request("DATA_TEST_CONFIG 0")
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
dev1group=False, dev2group=False,
- ifname1=None, ifname2=None):
+ ifname1=None, ifname2=None, config=True):
if dscp:
tos = dscp << 2
if not tos:
for i in range(0, max_tries):
try:
run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
- ifname1, ifname2)
+ ifname1, ifname2, config=config)
success = True
break
except Exception, e:
import hostapd
from wlantest import Wlantest
-def check_qos_map(ap, hapd, dev, dscp, tid, ap_tid=None):
+def check_qos_map(ap, hapd, dev, sta, dscp, tid, ap_tid=None):
if not ap_tid:
ap_tid = tid
bssid = ap['bssid']
- sta = dev.p2p_interface_addr()
wt = Wlantest()
wt.clear_sta_counters(bssid, sta)
- hwsim_utils.test_connectivity(dev, hapd, dscp=dscp)
+ hwsim_utils.test_connectivity(dev, hapd, dscp=dscp, config=False)
time.sleep(0.02)
- [ tx, rx ] = wt.get_tid_counters(bssid, sta)
- if tx[tid] == 0:
+ tx = wt.get_tx_tid(bssid, sta, tid)
+ if tx == 0:
+ [ tx, rx ] = wt.get_tid_counters(bssid, sta)
logger.info("Expected TX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(tx))
raise Exception("No STA->AP data frame using the expected TID")
- if rx[ap_tid] == 0:
+ rx = wt.get_rx_tid(bssid, sta, ap_tid)
+ if rx == 0:
+ [ tx, rx ] = wt.get_tid_counters(bssid, sta)
logger.info("Expected RX DSCP " + str(dscp) + " with TID " + str(ap_tid) + " but counters: " + str(rx))
raise Exception("No AP->STA data frame using the expected TID")
hapd = hostapd.add_ap(apdev[0]['ifname'], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
time.sleep(0.1)
- check_qos_map(apdev[0], hapd, dev[0], 53, 2)
- check_qos_map(apdev[0], hapd, dev[0], 22, 6)
- check_qos_map(apdev[0], hapd, dev[0], 8, 0)
- check_qos_map(apdev[0], hapd, dev[0], 15, 0)
- check_qos_map(apdev[0], hapd, dev[0], 0, 1)
- check_qos_map(apdev[0], hapd, dev[0], 7, 1)
- check_qos_map(apdev[0], hapd, dev[0], 16, 3)
- check_qos_map(apdev[0], hapd, dev[0], 31, 3)
- check_qos_map(apdev[0], hapd, dev[0], 32, 4)
- check_qos_map(apdev[0], hapd, dev[0], 39, 4)
- check_qos_map(apdev[0], hapd, dev[0], 40, 6)
- check_qos_map(apdev[0], hapd, dev[0], 47, 6)
- check_qos_map(apdev[0], hapd, dev[0], 48, 7)
- check_qos_map(apdev[0], hapd, dev[0], 55, 7)
+ addr = dev[0].p2p_interface_addr()
+ dev[0].request("DATA_TEST_CONFIG 1")
+ hapd.request("DATA_TEST_CONFIG 1")
+ check_qos_map(apdev[0], hapd, dev[0], addr, 53, 2)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 22, 6)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 8, 0)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 15, 0)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 0, 1)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 7, 1)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 16, 3)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 31, 3)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 32, 4)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 39, 4)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 40, 6)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 47, 6)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 48, 7)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 55, 7)
hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55")
hapd.request("SEND_QOS_MAP_CONF " + dev[0].get_status_field("address"))
- check_qos_map(apdev[0], hapd, dev[0], 53, 7)
- check_qos_map(apdev[0], hapd, dev[0], 22, 6)
- check_qos_map(apdev[0], hapd, dev[0], 48, 7)
- check_qos_map(apdev[0], hapd, dev[0], 55, 7)
- check_qos_map(apdev[0], hapd, dev[0], 56, 56 >> 3)
- check_qos_map(apdev[0], hapd, dev[0], 63, 63 >> 3)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 53, 7)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 22, 6)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 48, 7)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 55, 7)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 56, 56 >> 3)
+ check_qos_map(apdev[0], hapd, dev[0], addr, 63, 63 >> 3)
+ dev[0].request("DATA_TEST_CONFIG 0")
+ hapd.request("DATA_TEST_CONFIG 0")
def test_ap_qosmap_default(dev, apdev):
"""QoS mapping with default values"""
params = { "ssid": ssid }
hapd = hostapd.add_ap(apdev[0]['ifname'], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
+ addr = dev[0].p2p_interface_addr()
+ dev[0].request("DATA_TEST_CONFIG 1")
+ hapd.request("DATA_TEST_CONFIG 1")
for dscp in [ 0, 7, 8, 15, 16, 23, 24, 31, 32, 39, 40, 47, 48, 55, 56, 63]:
- check_qos_map(apdev[0], hapd, dev[0], dscp, dscp >> 3)
+ check_qos_map(apdev[0], hapd, dev[0], addr, dscp, dscp >> 3)
+ dev[0].request("DATA_TEST_CONFIG 0")
+ hapd.request("DATA_TEST_CONFIG 0")
def test_ap_qosmap_default_acm(dev, apdev):
"""QoS mapping with default values and ACM=1 for VO/VI"""
"wmm_ac_vo_acm": "1" }
hapd = hostapd.add_ap(apdev[0]['ifname'], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
+ addr = dev[0].p2p_interface_addr()
+ dev[0].request("DATA_TEST_CONFIG 1")
+ hapd.request("DATA_TEST_CONFIG 1")
for dscp in [ 0, 7, 8, 15, 16, 23, 24, 31, 32, 39, 40, 47, 48, 55, 56, 63]:
ap_tid = dscp >> 3
tid = ap_tid
# downgrade VI/VO to BE
if tid in [ 4, 5, 6, 7 ]:
tid = 3
- check_qos_map(apdev[0], hapd, dev[0], dscp, tid, ap_tid)
+ check_qos_map(apdev[0], hapd, dev[0], addr, dscp, tid, ap_tid)
+ dev[0].request("DATA_TEST_CONFIG 0")
+ hapd.request("DATA_TEST_CONFIG 0")
def test_ap_qosmap_invalid(dev, apdev):
"""QoS mapping ctrl_iface error handling"""