Updated through tag hostap_2_5 from git://w1.fi/hostap.git
[mech_eap.git] / libeap / tests / hwsim / hwsim_utils.py
1 # hwsim testing utilities
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import subprocess
9 import time
10 import logging
11 logger = logging.getLogger()
12
13 from wpasupplicant import WpaSupplicant
14
15 def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
16                           ifname1=None, ifname2=None, config=True, timeout=5):
17     addr1 = dev1.own_addr()
18     if not dev1group and isinstance(dev1, WpaSupplicant):
19         addr1 = dev1.get_driver_status_field('addr')
20
21     addr2 = dev2.own_addr()
22     if not dev2group and isinstance(dev2, WpaSupplicant):
23         addr2 = dev2.get_driver_status_field('addr')
24
25     dev1.dump_monitor()
26     dev2.dump_monitor()
27
28     try:
29         if config:
30             cmd = "DATA_TEST_CONFIG 1"
31             if ifname1:
32                 cmd = cmd + " ifname=" + ifname1
33             if dev1group:
34                 res = dev1.group_request(cmd)
35             else:
36                 res = dev1.request(cmd)
37             if "OK" not in res:
38                 raise Exception("Failed to enable data test functionality")
39
40             cmd = "DATA_TEST_CONFIG 1"
41             if ifname2:
42                 cmd = cmd + " ifname=" + ifname2
43             if dev2group:
44                 res = dev2.group_request(cmd)
45             else:
46                 res = dev2.request(cmd)
47             if "OK" not in res:
48                 raise Exception("Failed to enable data test functionality")
49
50         cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
51         if dev1group:
52             dev1.group_request(cmd)
53         else:
54             dev1.request(cmd)
55         if dev2group:
56             ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
57         else:
58             ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
59         if ev is None:
60             raise Exception("dev1->dev2 unicast data delivery failed")
61         if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
62             raise Exception("Unexpected dev1->dev2 unicast data result")
63
64         cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
65         if dev1group:
66             dev1.group_request(cmd)
67         else:
68             dev1.request(cmd)
69         if dev2group:
70             ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
71         else:
72             ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
73         if ev is None:
74             raise Exception("dev1->dev2 broadcast data delivery failed")
75         if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
76             raise Exception("Unexpected dev1->dev2 broadcast data result")
77
78         cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
79         if dev2group:
80             dev2.group_request(cmd)
81         else:
82             dev2.request(cmd)
83         if dev1group:
84             ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
85         else:
86             ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
87         if ev is None:
88             raise Exception("dev2->dev1 unicast data delivery failed")
89         if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
90             raise Exception("Unexpected dev2->dev1 unicast data result")
91
92         cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
93         if dev2group:
94             dev2.group_request(cmd)
95         else:
96             dev2.request(cmd)
97         if dev1group:
98             ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
99         else:
100             ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
101         if ev is None:
102             raise Exception("dev2->dev1 broadcast data delivery failed")
103         if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
104             raise Exception("Unexpected dev2->dev1 broadcast data result")
105     finally:
106         if config:
107             if dev1group:
108                 dev1.group_request("DATA_TEST_CONFIG 0")
109             else:
110                 dev1.request("DATA_TEST_CONFIG 0")
111             if dev2group:
112                 dev2.group_request("DATA_TEST_CONFIG 0")
113             else:
114                 dev2.request("DATA_TEST_CONFIG 0")
115
116 def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
117                       dev1group=False, dev2group=False,
118                       ifname1=None, ifname2=None, config=True, timeout=5):
119     if dscp:
120         tos = dscp << 2
121     if not tos:
122         tos = 0
123
124     success = False
125     last_err = None
126     for i in range(0, max_tries):
127         try:
128             run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
129                                   ifname1, ifname2, config=config,
130                                   timeout=timeout)
131             success = True
132             break
133         except Exception, e:
134             last_err = e
135             if i + 1 < max_tries:
136                 time.sleep(1)
137     if not success:
138         raise Exception(last_err)
139
140 def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
141                             max_tries=1):
142     test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
143                       max_tries=max_tries)
144
145 def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
146     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
147
148 def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
149     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
150
151 def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
152     test_connectivity(dev1, dev2, dscp, tos)
153
154 (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
155
156 def set_powersave(dev, val):
157     phy = dev.get_driver_status_field("phyname")
158     psf = open('/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy, 'w')
159     psf.write('%d\n' % val)
160     psf.close()