1 # Python class for controlling wlantest
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
15 logger = logging.getLogger()
26 def stop_remote_wlantest(cls):
27 if cls.exe_thread is None:
28 # Local flow - no need for remote operations
31 cls.remote_host.execute(["killall", "-9", "wlantest"])
32 cls.remote_host.wait_execute_complete(cls.exe_thread, 5)
37 def reset_remote_wlantest(cls):
38 cls.stop_remote_wlantest()
39 cls.remote_host = None
40 cls.setup_params = None
43 cls.monitor_mod = None
44 cls.setup_done = False
47 def start_remote_wlantest(cls):
48 if cls.remote_host is None:
49 # Local flow - no need for remote operations
51 if cls.exe_thread is not None:
52 raise Exception("Cannot start wlantest twice")
54 log_dir = cls.setup_params['log_dir']
55 ifaces = re.split('; | |, ', cls.remote_host.ifname)
57 exe = cls.setup_params["wlantest"]
58 tc_name = cls.setup_params["tc_name"]
59 base_log_name = tc_name + "_wlantest_" + \
60 cls.remote_host.name + "_" + ifname
61 log_file = posixpath.join(log_dir, base_log_name + ".log")
62 pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng")
63 cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname,
65 cls.remote_host.add_log(log_file)
66 cls.remote_host.add_log(pcap_file)
67 cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res)
68 # Give wlantest a chance to start working
72 def register_remote_wlantest(cls, host, setup_params, monitor_mod):
73 if cls.remote_host is not None:
74 raise Exception("Cannot register remote wlantest twice")
75 cls.remote_host = host
76 cls.setup_params = setup_params
77 cls.monitor_mod = monitor_mod
78 status, buf = host.execute(["which", setup_params['wlantest']])
80 raise Exception(host.name + " - wlantest: " + buf)
81 status, buf = host.execute(["which", setup_params['wlantest_cli']])
83 raise Exception(host.name + " - wlantest_cli: " + buf)
86 def chan_from_wpa(cls, wpa, is_p2p=False):
87 if cls.monitor_mod is None:
90 return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)])
93 def setup(cls, wpa, is_p2p=False):
94 cls.chan_from_wpa(wpa, is_p2p)
95 cls.start_remote_wlantest()
99 if not self.setup_done:
100 raise Exception("Cannot create Wlantest instance before setup()")
101 if os.path.isfile('../../wlantest/wlantest_cli'):
102 self.wlantest_cli = '../../wlantest/wlantest_cli'
104 self.wlantest_cli = 'wlantest_cli'
106 def cli_cmd(self, params):
107 if self.remote_host is not None:
108 exe = self.setup_params["wlantest_cli"]
109 ret = self.remote_host.execute([exe] + params)
111 raise Exception("wlantest_cli failed")
114 return subprocess.check_output([self.wlantest_cli] + params)
117 res = self.cli_cmd(["flush"])
119 raise Exception("wlantest_cli flush failed")
122 res = self.cli_cmd(["relog"])
124 raise Exception("wlantest_cli relog failed")
126 def add_passphrase(self, passphrase):
127 res = self.cli_cmd(["add_passphrase", passphrase])
129 raise Exception("wlantest_cli add_passphrase failed")
131 def add_wepkey(self, key):
132 res = self.cli_cmd(["add_wepkey", key])
134 raise Exception("wlantest_cli add_key failed")
136 def info_bss(self, field, bssid):
137 res = self.cli_cmd(["info_bss", field, bssid])
139 raise Exception("Could not get BSS info from wlantest for " + bssid)
142 def get_bss_counter(self, field, bssid):
144 res = self.cli_cmd(["get_bss_counter", field, bssid])
151 def clear_bss_counters(self, bssid):
152 self.cli_cmd(["clear_bss_counters", bssid])
154 def info_sta(self, field, bssid, addr):
155 res = self.cli_cmd(["info_sta", field, bssid, addr])
157 raise Exception("Could not get STA info from wlantest for " + addr)
160 def get_sta_counter(self, field, bssid, addr):
161 res = self.cli_cmd(["get_sta_counter", field, bssid, addr])
163 raise Exception("wlantest_cli command failed")
166 def clear_sta_counters(self, bssid, addr):
167 res = self.cli_cmd(["clear_sta_counters", bssid, addr])
169 raise Exception("wlantest_cli command failed")
171 def tdls_clear(self, bssid, addr1, addr2):
172 self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2])
174 def get_tdls_counter(self, field, bssid, addr1, addr2):
175 res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2])
177 raise Exception("wlantest_cli command failed")
180 def require_ap_pmf_mandatory(self, bssid):
181 res = self.info_bss("rsn_capab", bssid)
182 if "MFPR" not in res:
183 raise Exception("AP did not require PMF")
184 if "MFPC" not in res:
185 raise Exception("AP did not enable PMF")
186 res = self.info_bss("key_mgmt", bssid)
187 if "PSK-SHA256" not in res:
188 raise Exception("AP did not enable SHA256-based AKM for PMF")
190 def require_ap_pmf_optional(self, bssid):
191 res = self.info_bss("rsn_capab", bssid)
193 raise Exception("AP required PMF")
194 if "MFPC" not in res:
195 raise Exception("AP did not enable PMF")
197 def require_ap_no_pmf(self, bssid):
198 res = self.info_bss("rsn_capab", bssid)
200 raise Exception("AP required PMF")
202 raise Exception("AP enabled PMF")
204 def require_sta_pmf_mandatory(self, bssid, addr):
205 res = self.info_sta("rsn_capab", bssid, addr)
206 if "MFPR" not in res:
207 raise Exception("STA did not require PMF")
208 if "MFPC" not in res:
209 raise Exception("STA did not enable PMF")
211 def require_sta_pmf(self, bssid, addr):
212 res = self.info_sta("rsn_capab", bssid, addr)
213 if "MFPC" not in res:
214 raise Exception("STA did not enable PMF")
216 def require_sta_no_pmf(self, bssid, addr):
217 res = self.info_sta("rsn_capab", bssid, addr)
219 raise Exception("STA enabled PMF")
221 def require_sta_key_mgmt(self, bssid, addr, key_mgmt):
222 res = self.info_sta("key_mgmt", bssid, addr)
223 if key_mgmt not in res:
224 raise Exception("Unexpected STA key_mgmt")
226 def get_tx_tid(self, bssid, addr, tid):
227 res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)])
229 raise Exception("wlantest_cli command failed")
232 def get_rx_tid(self, bssid, addr, tid):
233 res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)])
235 raise Exception("wlantest_cli command failed")
238 def get_tid_counters(self, bssid, addr):
241 for tid in range(0, 17):
242 tx[tid] = self.get_tx_tid(bssid, addr, tid)
243 rx[tid] = self.get_rx_tid(bssid, addr, tid)