Updated to hostap_2_6
[mech_eap.git] / libeap / tests / hwsim / hwsim_utils.py
1 # hwsim testing utilities
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 logger = logging.getLogger()
11
12 from wpasupplicant import WpaSupplicant
13
14 def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
15                           ifname1=None, ifname2=None, config=True, timeout=5):
16     addr1 = dev1.own_addr()
17     if not dev1group and isinstance(dev1, WpaSupplicant):
18         addr1 = dev1.get_driver_status_field('addr')
19
20     addr2 = dev2.own_addr()
21     if not dev2group and isinstance(dev2, WpaSupplicant):
22         addr2 = dev2.get_driver_status_field('addr')
23
24     dev1.dump_monitor()
25     dev2.dump_monitor()
26
27     if dev1.hostname is None and dev2.hostname is None:
28         broadcast_retry_c = 1
29     else:
30         broadcast_retry_c = 10
31
32     try:
33         if config:
34             cmd = "DATA_TEST_CONFIG 1"
35             if ifname1:
36                 cmd = cmd + " ifname=" + ifname1
37             if dev1group:
38                 res = dev1.group_request(cmd)
39             else:
40                 res = dev1.request(cmd)
41             if "OK" not in res:
42                 raise Exception("Failed to enable data test functionality")
43
44             cmd = "DATA_TEST_CONFIG 1"
45             if ifname2:
46                 cmd = cmd + " ifname=" + ifname2
47             if dev2group:
48                 res = dev2.group_request(cmd)
49             else:
50                 res = dev2.request(cmd)
51             if "OK" not in res:
52                 raise Exception("Failed to enable data test functionality")
53
54         cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
55         if dev1group:
56             dev1.group_request(cmd)
57         else:
58             dev1.request(cmd)
59         if dev2group:
60             ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
61         else:
62             ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
63         if ev is None:
64             raise Exception("dev1->dev2 unicast data delivery failed")
65         if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
66             raise Exception("Unexpected dev1->dev2 unicast data result")
67
68         cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
69         for i in xrange(broadcast_retry_c):
70             try:
71                 if dev1group:
72                     dev1.group_request(cmd)
73                 else:
74                     dev1.request(cmd)
75                 if dev2group:
76                     ev = dev2.wait_group_event(["DATA-TEST-RX"],
77                                                timeout=timeout)
78                 else:
79                     ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
80                 if ev is None:
81                     raise Exception("dev1->dev2 broadcast data delivery failed")
82                 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
83                     raise Exception("Unexpected dev1->dev2 broadcast data result")
84                 break
85             except Exception as e:
86                 if i == broadcast_retry_c - 1:
87                     raise
88
89         cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
90         if dev2group:
91             dev2.group_request(cmd)
92         else:
93             dev2.request(cmd)
94         if dev1group:
95             ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
96         else:
97             ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
98         if ev is None:
99             raise Exception("dev2->dev1 unicast data delivery failed")
100         if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
101             raise Exception("Unexpected dev2->dev1 unicast data result")
102
103         cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
104         for i in xrange(broadcast_retry_c):
105             try:
106                 if dev2group:
107                     dev2.group_request(cmd)
108                 else:
109                     dev2.request(cmd)
110                 if dev1group:
111                     ev = dev1.wait_group_event(["DATA-TEST-RX"],
112                                                timeout=timeout)
113                 else:
114                     ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
115                 if ev is None:
116                     raise Exception("dev2->dev1 broadcast data delivery failed")
117                 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
118                     raise Exception("Unexpected dev2->dev1 broadcast data result")
119                 break
120             except Exception as e:
121                 if i == broadcast_retry_c - 1:
122                     raise
123     finally:
124         if config:
125             if dev1group:
126                 dev1.group_request("DATA_TEST_CONFIG 0")
127             else:
128                 dev1.request("DATA_TEST_CONFIG 0")
129             if dev2group:
130                 dev2.group_request("DATA_TEST_CONFIG 0")
131             else:
132                 dev2.request("DATA_TEST_CONFIG 0")
133
134 def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
135                       dev1group=False, dev2group=False,
136                       ifname1=None, ifname2=None, config=True, timeout=5):
137     if dscp:
138         tos = dscp << 2
139     if not tos:
140         tos = 0
141
142     success = False
143     last_err = None
144     for i in range(0, max_tries):
145         try:
146             run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
147                                   ifname1, ifname2, config=config,
148                                   timeout=timeout)
149             success = True
150             break
151         except Exception, e:
152             last_err = e
153             if i + 1 < max_tries:
154                 time.sleep(1)
155     if not success:
156         raise Exception(last_err)
157
158 def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
159                             max_tries=1):
160     test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
161                       max_tries=max_tries)
162
163 def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
164     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
165
166 def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
167     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
168
169 def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
170     test_connectivity(dev1, dev2, dscp, tos)
171
172 (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
173
174 def set_powersave(dev, val):
175     phy = dev.get_driver_status_field("phyname")
176     fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
177     data = '%d' % val
178     (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
179     if res != 0:
180         raise Exception("Failed to set power save for device")