tests: Optimize ap_qosmap test cases
[mech_eap.git] / tests / hwsim / hwsim_utils.py
1 # hwsim testing utilities
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import subprocess
9 import time
10 import logging
11 logger = logging.getLogger()
12
13 from wpasupplicant import WpaSupplicant
14
15 def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
16                           ifname1=None, ifname2=None, config=True):
17     addr1 = dev1.own_addr()
18     if not dev1group and isinstance(dev1, WpaSupplicant):
19         addr1 = dev1.get_driver_status_field('addr')
20
21     addr2 = dev2.own_addr()
22     if not dev2group and isinstance(dev2, WpaSupplicant):
23         addr2 = dev2.get_driver_status_field('addr')
24
25     dev1.dump_monitor()
26     dev2.dump_monitor()
27
28     try:
29         if config:
30             cmd = "DATA_TEST_CONFIG 1"
31             if ifname1:
32                 cmd = cmd + " ifname=" + ifname1
33             if dev1group:
34                 res = dev1.group_request(cmd)
35             else:
36                 res = dev1.request(cmd)
37             if "OK" not in res:
38                 raise Exception("Failed to enable data test functionality")
39
40             cmd = "DATA_TEST_CONFIG 1"
41             if ifname2:
42                 cmd = cmd + " ifname=" + ifname2
43             if dev2group:
44                 res = dev2.group_request(cmd)
45             else:
46                 res = dev2.request(cmd)
47             if "OK" not in res:
48                 raise Exception("Failed to enable data test functionality")
49
50         cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
51         if dev1group:
52             dev1.group_request(cmd)
53         else:
54             dev1.request(cmd)
55         if dev2group:
56             ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
57         else:
58             ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
59         if ev is None:
60             raise Exception("dev1->dev2 unicast data delivery failed")
61         if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
62             raise Exception("Unexpected dev1->dev2 unicast data result")
63
64         cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
65         if dev1group:
66             dev1.group_request(cmd)
67         else:
68             dev1.request(cmd)
69         if dev2group:
70             ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
71         else:
72             ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
73         if ev is None:
74             raise Exception("dev1->dev2 broadcast data delivery failed")
75         if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
76             raise Exception("Unexpected dev1->dev2 broadcast data result")
77
78         cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
79         if dev2group:
80             dev2.group_request(cmd)
81         else:
82             dev2.request(cmd)
83         if dev1group:
84             ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
85         else:
86             ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
87         if ev is None:
88             raise Exception("dev2->dev1 unicast data delivery failed")
89         if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
90             raise Exception("Unexpected dev2->dev1 unicast data result")
91
92         cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
93         if dev2group:
94             dev2.group_request(cmd)
95         else:
96             dev2.request(cmd)
97         if dev1group:
98             ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
99         else:
100             ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
101         if ev is None:
102             raise Exception("dev2->dev1 broadcast data delivery failed")
103         if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
104             raise Exception("Unexpected dev2->dev1 broadcast data result")
105     finally:
106         if config:
107             if dev1group:
108                 dev1.group_request("DATA_TEST_CONFIG 0")
109             else:
110                 dev1.request("DATA_TEST_CONFIG 0")
111             if dev2group:
112                 dev2.group_request("DATA_TEST_CONFIG 0")
113             else:
114                 dev2.request("DATA_TEST_CONFIG 0")
115
116 def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
117                       dev1group=False, dev2group=False,
118                       ifname1=None, ifname2=None, config=True):
119     if dscp:
120         tos = dscp << 2
121     if not tos:
122         tos = 0
123
124     success = False
125     last_err = None
126     for i in range(0, max_tries):
127         try:
128             run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
129                                   ifname1, ifname2, config=config)
130             success = True
131             break
132         except Exception, e:
133             last_err = e
134             if i + 1 < max_tries:
135                 time.sleep(1)
136     if not success:
137         raise Exception(last_err)
138
139 def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
140                             max_tries=1):
141     test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
142                       max_tries=max_tries)
143
144 def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
145     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
146
147 def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
148     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
149
150 def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
151     test_connectivity(dev1, dev2, dscp, tos)