Updated to hostap_2_6
[mech_eap.git] / libeap / tests / hwsim / wlantest.py
1 # Python class for controlling wlantest
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import re
8 import os
9 import posixpath
10 import time
11 import subprocess
12 import logging
13 import wpaspy
14
15 logger = logging.getLogger()
16
17 class Wlantest:
18     remote_host = None
19     setup_params = None
20     exe_thread = None
21     exe_res = []
22     monitor_mod = None
23     setup_done = False
24
25     @classmethod
26     def stop_remote_wlantest(cls):
27         if cls.exe_thread is None:
28             # Local flow - no need for remote operations
29             return
30
31         cls.remote_host.execute(["killall", "-9", "wlantest"])
32         cls.remote_host.wait_execute_complete(cls.exe_thread, 5)
33         cls.exe_thread = None
34         cls.exe_res = []
35
36     @classmethod
37     def reset_remote_wlantest(cls):
38         cls.stop_remote_wlantest()
39         cls.remote_host = None
40         cls.setup_params = None
41         cls.exe_thread = None
42         cls.exe_res = []
43         cls.monitor_mod = None
44         cls.setup_done = False
45
46     @classmethod
47     def start_remote_wlantest(cls):
48         if cls.remote_host is None:
49             # Local flow - no need for remote operations
50             return
51         if cls.exe_thread is not None:
52             raise Exception("Cannot start wlantest twice")
53
54         log_dir = cls.setup_params['log_dir']
55         ifaces = re.split('; | |, ', cls.remote_host.ifname)
56         ifname = ifaces[0]
57         exe = cls.setup_params["wlantest"]
58         tc_name = cls.setup_params["tc_name"]
59         base_log_name = tc_name + "_wlantest_" + \
60                         cls.remote_host.name + "_" + ifname
61         log_file = posixpath.join(log_dir, base_log_name + ".log")
62         pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng")
63         cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname,
64                                                     pcap_file, log_file)
65         cls.remote_host.add_log(log_file)
66         cls.remote_host.add_log(pcap_file)
67         cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res)
68         # Give wlantest a chance to start working
69         time.sleep(1)
70
71     @classmethod
72     def register_remote_wlantest(cls, host, setup_params, monitor_mod):
73         if cls.remote_host is not None:
74             raise Exception("Cannot register remote wlantest twice")
75         cls.remote_host = host
76         cls.setup_params = setup_params
77         cls.monitor_mod = monitor_mod
78         status, buf = host.execute(["which", setup_params['wlantest']])
79         if status != 0:
80             raise Exception(host.name + " - wlantest: " + buf)
81         status, buf = host.execute(["which", setup_params['wlantest_cli']])
82         if status != 0:
83             raise Exception(host.name + " - wlantest_cli: " + buf)
84
85     @classmethod
86     def chan_from_wpa(cls, wpa, is_p2p=False):
87         if cls.monitor_mod is None:
88             return
89         m = cls.monitor_mod
90         return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)])
91
92     @classmethod
93     def setup(cls, wpa, is_p2p=False):
94         cls.chan_from_wpa(wpa, is_p2p)
95         cls.start_remote_wlantest()
96         cls.setup_done = True
97
98     def __init__(self):
99         if not self.setup_done:
100             raise Exception("Cannot create Wlantest instance before setup()")
101         if os.path.isfile('../../wlantest/wlantest_cli'):
102             self.wlantest_cli = '../../wlantest/wlantest_cli'
103         else:
104             self.wlantest_cli = 'wlantest_cli'
105
106     def cli_cmd(self, params):
107         if self.remote_host is not None:
108             exe = self.setup_params["wlantest_cli"]
109             ret = self.remote_host.execute([exe] + params)
110             if ret[0] != 0:
111                 raise Exception("wlantest_cli failed")
112             return ret[1]
113         else:
114             return subprocess.check_output([self.wlantest_cli] + params)
115
116     def flush(self):
117         res = self.cli_cmd(["flush"])
118         if "FAIL" in res:
119             raise Exception("wlantest_cli flush failed")
120
121     def relog(self):
122         res = self.cli_cmd(["relog"])
123         if "FAIL" in res:
124             raise Exception("wlantest_cli relog failed")
125
126     def add_passphrase(self, passphrase):
127         res = self.cli_cmd(["add_passphrase", passphrase])
128         if "FAIL" in res:
129             raise Exception("wlantest_cli add_passphrase failed")
130
131     def add_wepkey(self, key):
132         res = self.cli_cmd(["add_wepkey", key])
133         if "FAIL" in res:
134             raise Exception("wlantest_cli add_key failed")
135
136     def info_bss(self, field, bssid):
137         res = self.cli_cmd(["info_bss", field, bssid])
138         if "FAIL" in res:
139             raise Exception("Could not get BSS info from wlantest for " + bssid)
140         return res
141
142     def get_bss_counter(self, field, bssid):
143         try:
144             res = self.cli_cmd(["get_bss_counter", field, bssid])
145         except Exception, e:
146             return 0
147         if "FAIL" in res:
148             return 0
149         return int(res)
150
151     def clear_bss_counters(self, bssid):
152         self.cli_cmd(["clear_bss_counters", bssid])
153
154     def info_sta(self, field, bssid, addr):
155         res = self.cli_cmd(["info_sta", field, bssid, addr])
156         if "FAIL" in res:
157             raise Exception("Could not get STA info from wlantest for " + addr)
158         return res
159
160     def get_sta_counter(self, field, bssid, addr):
161         res = self.cli_cmd(["get_sta_counter", field, bssid, addr])
162         if "FAIL" in res:
163             raise Exception("wlantest_cli command failed")
164         return int(res)
165
166     def clear_sta_counters(self, bssid, addr):
167         res = self.cli_cmd(["clear_sta_counters", bssid, addr])
168         if "FAIL" in res:
169             raise Exception("wlantest_cli command failed")
170
171     def tdls_clear(self, bssid, addr1, addr2):
172         self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2])
173
174     def get_tdls_counter(self, field, bssid, addr1, addr2):
175         res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2])
176         if "FAIL" in res:
177             raise Exception("wlantest_cli command failed")
178         return int(res)
179
180     def require_ap_pmf_mandatory(self, bssid):
181         res = self.info_bss("rsn_capab", bssid)
182         if "MFPR" not in res:
183             raise Exception("AP did not require PMF")
184         if "MFPC" not in res:
185             raise Exception("AP did not enable PMF")
186         res = self.info_bss("key_mgmt", bssid)
187         if "PSK-SHA256" not in res:
188             raise Exception("AP did not enable SHA256-based AKM for PMF")
189
190     def require_ap_pmf_optional(self, bssid):
191         res = self.info_bss("rsn_capab", bssid)
192         if "MFPR" in res:
193             raise Exception("AP required PMF")
194         if "MFPC" not in res:
195             raise Exception("AP did not enable PMF")
196
197     def require_ap_no_pmf(self, bssid):
198         res = self.info_bss("rsn_capab", bssid)
199         if "MFPR" in res:
200             raise Exception("AP required PMF")
201         if "MFPC" in res:
202             raise Exception("AP enabled PMF")
203
204     def require_sta_pmf_mandatory(self, bssid, addr):
205         res = self.info_sta("rsn_capab", bssid, addr)
206         if "MFPR" not in res:
207             raise Exception("STA did not require PMF")
208         if "MFPC" not in res:
209             raise Exception("STA did not enable PMF")
210
211     def require_sta_pmf(self, bssid, addr):
212         res = self.info_sta("rsn_capab", bssid, addr)
213         if "MFPC" not in res:
214             raise Exception("STA did not enable PMF")
215
216     def require_sta_no_pmf(self, bssid, addr):
217         res = self.info_sta("rsn_capab", bssid, addr)
218         if "MFPC" in res:
219             raise Exception("STA enabled PMF")
220
221     def require_sta_key_mgmt(self, bssid, addr, key_mgmt):
222         res = self.info_sta("key_mgmt", bssid, addr)
223         if key_mgmt not in res:
224             raise Exception("Unexpected STA key_mgmt")
225
226     def get_tx_tid(self, bssid, addr, tid):
227         res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)])
228         if "FAIL" in res:
229             raise Exception("wlantest_cli command failed")
230         return int(res)
231
232     def get_rx_tid(self, bssid, addr, tid):
233         res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)])
234         if "FAIL" in res:
235             raise Exception("wlantest_cli command failed")
236         return int(res)
237
238     def get_tid_counters(self, bssid, addr):
239         tx = {}
240         rx = {}
241         for tid in range(0, 17):
242             tx[tid] = self.get_tx_tid(bssid, addr, tid)
243             rx[tid] = self.get_rx_tid(bssid, addr, tid)
244         return [ tx, rx ]