1 # hwsim testing utilities
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
10 logger = logging.getLogger()
12 from wpasupplicant import WpaSupplicant
14 def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
15 ifname1=None, ifname2=None, config=True, timeout=5):
16 addr1 = dev1.own_addr()
17 if not dev1group and isinstance(dev1, WpaSupplicant):
18 addr1 = dev1.get_driver_status_field('addr')
20 addr2 = dev2.own_addr()
21 if not dev2group and isinstance(dev2, WpaSupplicant):
22 addr2 = dev2.get_driver_status_field('addr')
27 if dev1.hostname is None and dev2.hostname is None:
30 broadcast_retry_c = 10
34 cmd = "DATA_TEST_CONFIG 1"
36 cmd = cmd + " ifname=" + ifname1
38 res = dev1.group_request(cmd)
40 res = dev1.request(cmd)
42 raise Exception("Failed to enable data test functionality")
44 cmd = "DATA_TEST_CONFIG 1"
46 cmd = cmd + " ifname=" + ifname2
48 res = dev2.group_request(cmd)
50 res = dev2.request(cmd)
52 raise Exception("Failed to enable data test functionality")
54 cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
56 dev1.group_request(cmd)
60 ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
62 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
64 raise Exception("dev1->dev2 unicast data delivery failed")
65 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
66 raise Exception("Unexpected dev1->dev2 unicast data result")
68 cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
69 for i in xrange(broadcast_retry_c):
72 dev1.group_request(cmd)
76 ev = dev2.wait_group_event(["DATA-TEST-RX"],
79 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
81 raise Exception("dev1->dev2 broadcast data delivery failed")
82 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
83 raise Exception("Unexpected dev1->dev2 broadcast data result")
85 except Exception as e:
86 if i == broadcast_retry_c - 1:
89 cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
91 dev2.group_request(cmd)
95 ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
97 ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
99 raise Exception("dev2->dev1 unicast data delivery failed")
100 if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
101 raise Exception("Unexpected dev2->dev1 unicast data result")
103 cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
104 for i in xrange(broadcast_retry_c):
107 dev2.group_request(cmd)
111 ev = dev1.wait_group_event(["DATA-TEST-RX"],
114 ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
116 raise Exception("dev2->dev1 broadcast data delivery failed")
117 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
118 raise Exception("Unexpected dev2->dev1 broadcast data result")
120 except Exception as e:
121 if i == broadcast_retry_c - 1:
126 dev1.group_request("DATA_TEST_CONFIG 0")
128 dev1.request("DATA_TEST_CONFIG 0")
130 dev2.group_request("DATA_TEST_CONFIG 0")
132 dev2.request("DATA_TEST_CONFIG 0")
134 def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
135 dev1group=False, dev2group=False,
136 ifname1=None, ifname2=None, config=True, timeout=5):
144 for i in range(0, max_tries):
146 run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
147 ifname1, ifname2, config=config,
153 if i + 1 < max_tries:
156 raise Exception(last_err)
158 def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
160 test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
163 def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
164 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
166 def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
167 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
169 def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
170 test_connectivity(dev1, dev2, dscp, tos)
172 (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
174 def set_powersave(dev, val):
175 phy = dev.get_driver_status_field("phyname")
176 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
178 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
180 raise Exception("Failed to set power save for device")