Updated to hostap_2_6
[mech_eap.git] / libeap / tests / hwsim / test_fst_config.py
1 # FST configuration tests
2 # Copyright (c) 2015, Qualcomm Atheros, Inc.
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import logging
8 logger = logging.getLogger()
9 import subprocess
10 import time
11 import os
12 import signal
13 import hostapd
14 import wpasupplicant
15 import utils
16
17 import fst_test_common
18
19 class FstLauncherConfig:
20     """FstLauncherConfig class represents configuration to be used for
21     FST config tests related hostapd/wpa_supplicant instances"""
22     def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
23         self.iface = iface
24         self.fst_group = fst_group
25         self.fst_pri = fst_pri
26         self.fst_llt = fst_llt # None llt means no llt parameter will be set
27
28     def ifname(self):
29         return self.iface
30
31     def is_ap(self):
32         """Returns True if the configuration is for AP, otherwise - False"""
33         raise Exception("Virtual is_ap() called!")
34
35     def to_file(self, pathname):
36         """Creates configuration file to be used by FST config tests related
37         hostapd/wpa_supplicant instances"""
38         raise Exception("Virtual to_file() called!")
39
40 class FstLauncherConfigAP(FstLauncherConfig):
41     """FstLauncherConfigAP class represents configuration to be used for
42     FST config tests related hostapd instance"""
43     def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
44                  fst_llt=None):
45         self.ssid = ssid
46         self.mode = mode
47         self.chan = chan
48         FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
49
50     def is_ap(self):
51         return True
52
53     def get_channel(self):
54         return self.chan
55
56     def to_file(self, pathname):
57         """Creates configuration file to be used by FST config tests related
58         hostapd instance"""
59         with open(pathname, "w") as f:
60             f.write("country_code=US\n"
61                     "interface=%s\n"
62                     "ctrl_interface=/var/run/hostapd\n"
63                     "ssid=%s\n"
64                     "channel=%s\n"
65                     "hw_mode=%s\n"
66                     "ieee80211n=1\n" % (self.iface, self.ssid, self.chan,
67                                         self.mode))
68             if len(self.fst_group) != 0:
69                 f.write("fst_group_id=%s\n"
70                         "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
71                 if self.fst_llt is not None:
72                     f.write("fst_llt=%s\n" % self.fst_llt)
73         with open(pathname, "r") as f:
74             logger.debug("wrote hostapd config file %s:\n%s" % (pathname,
75                                                                 f.read()))
76
77 class FstLauncherConfigSTA(FstLauncherConfig):
78     """FstLauncherConfig class represents configuration to be used for
79     FST config tests related wpa_supplicant instance"""
80     def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
81         FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
82
83     def is_ap(self):
84         return False
85
86     def to_file(self, pathname):
87         """Creates configuration file to be used by FST config tests related
88         wpa_supplicant instance"""
89         with open(pathname, "w") as f:
90             f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n"
91                 "p2p_no_group_iface=1\n")
92             if len(self.fst_group) != 0:
93                 f.write("fst_group_id=%s\n"
94                     "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
95                 if self.fst_llt is not None:
96                     f.write("fst_llt=%s\n" % self.fst_llt)
97         with open(pathname, "r") as f:
98             logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read()))
99
100 class FstLauncher:
101     """FstLauncher class is responsible for launching and cleaning up of FST
102     config tests related hostapd/wpa_supplicant instances"""
103     def __init__(self, logpath):
104         self.logger = logging.getLogger()
105         self.fst_logpath = logpath
106         self.cfgs_to_run = []
107         self.hapd_fst_global = '/var/run/hostapd-fst-global'
108         self.wsup_fst_global = '/tmp/fststa'
109         self.nof_aps = 0
110         self.nof_stas = 0
111         self.reg_ctrl = fst_test_common.HapdRegCtrl()
112         self.test_is_supported()
113
114     def __del__(self):
115         self.cleanup()
116
117     @staticmethod
118     def test_is_supported():
119         h = hostapd.HostapdGlobal()
120         resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
121         if not resp.startswith("OK"):
122             raise utils.HwsimSkip("FST not supported")
123         w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5')
124         resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
125         if not resp.startswith("OK"):
126             raise utils.HwsimSkip("FST not supported")
127
128     def get_cfg_pathname(self, cfg):
129         """Returns pathname of ifname based configuration file"""
130         return self.fst_logpath +'/'+ cfg.ifname() + '.conf'
131
132     def add_cfg(self, cfg):
133         """Adds configuration to be used for launching hostapd/wpa_supplicant
134         instances"""
135         if cfg not in self.cfgs_to_run:
136             self.cfgs_to_run.append(cfg)
137         if cfg.is_ap() == True:
138             self.nof_aps += 1
139         else:
140             self.nof_stas += 1
141
142     def remove_cfg(self, cfg):
143         """Removes configuration previously added with add_cfg"""
144         if cfg in self.cfgs_to_run:
145             self.cfgs_to_run.remove(cfg)
146         if cfg.is_ap() == True:
147             self.nof_aps -= 1
148         else:
149             self.nof_stas -= 1
150         config_file = self.get_cfg_pathname(cfg)
151         if os.path.exists(config_file):
152             os.remove(config_file)
153
154     def run_hostapd(self):
155         """Lauches hostapd with interfaces configured according to
156         FstLauncherConfigAP configurations added"""
157         if self.nof_aps == 0:
158             raise Exception("No FST APs to start")
159         pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
160         mylogfile = self.fst_logpath + '/' + 'fst-hostapd'
161         prg = os.path.join(self.fst_logpath,
162                            'alt-hostapd/hostapd/hostapd')
163         if not os.path.exists(prg):
164             prg = '../../hostapd/hostapd'
165         cmd = [ prg, '-B', '-dddt',
166                 '-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global]
167         for i in range(0, len(self.cfgs_to_run)):
168             cfg = self.cfgs_to_run[i]
169             if cfg.is_ap() == True:
170                 cfgfile = self.get_cfg_pathname(cfg)
171                 cfg.to_file(cfgfile)
172                 cmd.append(cfgfile)
173                 self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel())
174         self.logger.debug("Starting fst hostapd: " + ' '.join(cmd))
175         res = subprocess.call(cmd)
176         self.logger.debug("fst hostapd start result: %d" % res)
177         if res == 0:
178             self.reg_ctrl.start()
179         return res
180
181     def run_wpa_supplicant(self):
182         """Lauches wpa_supplicant with interfaces configured according to
183         FstLauncherConfigSTA configurations added"""
184         if self.nof_stas == 0:
185             raise Exception("No FST STAs to start")
186         pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
187         mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant'
188         prg = os.path.join(self.fst_logpath,
189                            'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant')
190         if not os.path.exists(prg):
191             prg = '../../wpa_supplicant/wpa_supplicant'
192         cmd = [ prg, '-B', '-dddt',
193                 '-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global ]
194         sta_no = 0
195         for i in range(0, len(self.cfgs_to_run)):
196             cfg = self.cfgs_to_run[i]
197             if cfg.is_ap() == False:
198                 cfgfile = self.get_cfg_pathname(cfg)
199                 cfg.to_file(cfgfile)
200                 cmd.append('-c' + cfgfile)
201                 cmd.append('-i' + cfg.ifname())
202                 cmd.append('-Dnl80211')
203                 if sta_no != self.nof_stas -1:
204                     cmd.append('-N')    # Next station configuration
205                 sta_no += 1
206         self.logger.debug("Starting fst supplicant: " + ' '.join(cmd))
207         res = subprocess.call(cmd)
208         self.logger.debug("fst supplicant start result: %d" % res)
209         return res
210
211     def cleanup(self):
212         """Terminates hostapd/wpa_supplicant processes previously launched with
213         run_hostapd/run_wpa_supplicant"""
214         pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
215         self.kill_pid(pidfile, self.nof_aps > 0)
216         pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
217         self.kill_pid(pidfile, self.nof_stas > 0)
218         self.reg_ctrl.stop()
219         while len(self.cfgs_to_run) != 0:
220             cfg = self.cfgs_to_run[0]
221             self.remove_cfg(cfg)
222
223     def kill_pid(self, pidfile, try_again=False):
224         """Kills process by PID file"""
225         if not os.path.exists(pidfile):
226             if not try_again:
227                 return
228             # It might take some time for the process to write the PID file,
229             # so wait a bit longer before giving up.
230             self.logger.info("kill_pid: pidfile %s does not exist - try again after a second" % pidfile)
231             time.sleep(1)
232             if not os.path.exists(pidfile):
233                 self.logger.info("kill_pid: pidfile %s does not exist - could not kill the process" % pidfile)
234                 return
235         pid = -1
236         try:
237             for i in range(3):
238                 pf = file(pidfile, 'r')
239                 pidtxt = pf.read().strip()
240                 self.logger.debug("kill_pid: %s: '%s'" % (pidfile, pidtxt))
241                 pf.close()
242                 try:
243                     pid = int(pidtxt)
244                     break
245                 except Exception, e:
246                     self.logger.debug("kill_pid: No valid PID found: %s" % str(e))
247                     time.sleep(1)
248             self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid))
249             os.kill(pid, signal.SIGTERM)
250             for i in range(10):
251                 try:
252                     # Poll the pid (Is the process still existing?)
253                     os.kill(pid, 0)
254                 except OSError:
255                     # No, already done
256                     break
257                 # Wait and check again
258                 time.sleep(1)
259         except Exception, e:
260             self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e)))
261
262
263 def parse_ies(iehex, el=-1):
264     """Parses the information elements hex string 'iehex' in format
265     "0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity.
266     If 'el' is defined returns the list of hex values of the specific IE (or
267     empty list if the element is not in the string."""
268     iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)]
269     for i in range(0, len(iel)):
270          iel[i] = int(iel[i], 16)
271     # Sanity check
272     i = 0
273     res = []
274     while i < len(iel):
275         logger.debug("IE found: %x" % iel[i])
276         if el != -1 and el == iel[i]:
277             res = iel[i + 2:i + 2 + iel[i + 1]]
278         i += 2 + iel[i + 1]
279     if i != len(iel):
280         logger.error("Bad IE string: " + iehex)
281         res = []
282     return res
283
284 def scan_and_get_bss(dev, frq):
285     """Issues a scan on given device on given frequency, returns the bss info
286     dictionary ('ssid','ie','flags', etc.) or None. Note, the function
287     implies there is only one AP on the given channel. If not a case,
288     the function must be changed to call dev.get_bss() till the AP with the
289     [b]ssid that we need is found"""
290     dev.scan(freq=frq)
291     return dev.get_bss('0')
292
293
294 # AP configuration tests
295
296 def run_test_ap_configuration(apdev, test_params,
297                               fst_group = fst_test_common.fst_test_def_group,
298                               fst_pri = fst_test_common.fst_test_def_prio_high,
299                               fst_llt = fst_test_common.fst_test_def_llt):
300     """Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst
301     configuration is provided by the parameters. Returns the result of the run:
302     0 - no errors discovered, an error otherwise. The function is used for
303     simplek "bad configuration" tests."""
304     logdir = test_params['logdir']
305     fst_launcher = FstLauncher(logdir)
306     ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a',
307                               fst_test_common.fst_test_def_chan_a,
308                               fst_test_common.fst_test_def_group,
309                               fst_test_common.fst_test_def_prio_low,
310                               fst_test_common.fst_test_def_llt)
311     ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b',
312                               fst_test_common.fst_test_def_chan_g, fst_group,
313                               fst_pri, fst_llt)
314     fst_launcher.add_cfg(ap1)
315     fst_launcher.add_cfg(ap2)
316     res = fst_launcher.run_hostapd()
317     return res
318
319 def run_test_sta_configuration(test_params,
320                                fst_group = fst_test_common.fst_test_def_group,
321                                fst_pri = fst_test_common.fst_test_def_prio_high,
322                                fst_llt = fst_test_common.fst_test_def_llt):
323     """Runs FST wpa_supplicant where the 1st STA configuration is fixed, the
324     2nd fst configuration is provided by the parameters. Returns the result of
325     the run: 0 - no errors discovered, an error otherwise. The function is used
326     for simple "bad configuration" tests."""
327     logdir = test_params['logdir']
328     fst_launcher = FstLauncher(logdir)
329     sta1 = FstLauncherConfigSTA('wlan5',
330                                 fst_test_common.fst_test_def_group,
331                                 fst_test_common.fst_test_def_prio_low,
332                                 fst_test_common.fst_test_def_llt)
333     sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt)
334     fst_launcher.add_cfg(sta1)
335     fst_launcher.add_cfg(sta2)
336     res = fst_launcher.run_wpa_supplicant()
337     return res
338
339 def test_fst_ap_config_llt_neg(dev, apdev, test_params):
340     """FST AP configuration negative LLT"""
341     res = run_test_ap_configuration(apdev, test_params, fst_llt = '-1')
342     if res == 0:
343         raise Exception("hostapd started with a negative llt")
344
345 def test_fst_ap_config_llt_zero(dev, apdev, test_params):
346     """FST AP configuration zero LLT"""
347     res = run_test_ap_configuration(apdev, test_params, fst_llt = '0')
348     if res == 0:
349         raise Exception("hostapd started with a zero llt")
350
351 def test_fst_ap_config_llt_too_big(dev, apdev, test_params):
352     """FST AP configuration LLT is too big"""
353     res = run_test_ap_configuration(apdev, test_params,
354                                     fst_llt = '4294967296') #0x100000000
355     if res == 0:
356         raise Exception("hostapd started with llt that is too big")
357
358 def test_fst_ap_config_llt_nan(dev, apdev, test_params):
359     """FST AP configuration LLT is not a number"""
360     res = run_test_ap_configuration(apdev, test_params, fst_llt = 'nan')
361     if res == 0:
362         raise Exception("hostapd started with llt not a number")
363
364 def test_fst_ap_config_pri_neg(dev, apdev, test_params):
365     """FST AP configuration Priority negative"""
366     res = run_test_ap_configuration(apdev, test_params, fst_pri = '-1')
367     if res == 0:
368         raise Exception("hostapd started with a negative fst priority")
369
370 def test_fst_ap_config_pri_zero(dev, apdev, test_params):
371     """FST AP configuration Priority zero"""
372     res = run_test_ap_configuration(apdev, test_params, fst_pri = '0')
373     if res == 0:
374         raise Exception("hostapd started with a zero fst priority")
375
376 def test_fst_ap_config_pri_large(dev, apdev, test_params):
377     """FST AP configuration Priority too large"""
378     res = run_test_ap_configuration(apdev, test_params, fst_pri = '256')
379     if res == 0:
380         raise Exception("hostapd started with too large fst priority")
381
382 def test_fst_ap_config_pri_nan(dev, apdev, test_params):
383     """FST AP configuration Priority not a number"""
384     res = run_test_ap_configuration(apdev, test_params, fst_pri = 'nan')
385     if res == 0:
386         raise Exception("hostapd started with fst priority not a number")
387
388 def test_fst_ap_config_group_len(dev, apdev, test_params):
389     """FST AP configuration Group max length"""
390     res = run_test_ap_configuration(apdev, test_params,
391                                     fst_group = 'fstg5678abcd34567')
392     if res == 0:
393         raise Exception("hostapd started with fst_group length too big")
394
395 def test_fst_ap_config_good(dev, apdev, test_params):
396     """FST AP configuration good parameters"""
397     res = run_test_ap_configuration(apdev, test_params)
398     if res != 0:
399         raise Exception("hostapd didn't start with valid config parameters")
400
401 def test_fst_ap_config_default(dev, apdev, test_params):
402     """FST AP configuration default parameters"""
403     res = run_test_ap_configuration(apdev, test_params, fst_llt = None)
404     if res != 0:
405         raise Exception("hostapd didn't start with valid config parameters")
406
407
408 # STA configuration tests
409
410 def test_fst_sta_config_llt_neg(dev, apdev, test_params):
411     """FST STA configuration negative LLT"""
412     res = run_test_sta_configuration(test_params, fst_llt = '-1')
413     if res == 0:
414         raise Exception("wpa_supplicant started with a negative llt")
415
416 def test_fst_sta_config_llt_zero(dev, apdev, test_params):
417     """FST STA configuration zero LLT"""
418     res = run_test_sta_configuration(test_params, fst_llt = '0')
419     if res == 0:
420         raise Exception("wpa_supplicant started with a zero llt")
421
422 def test_fst_sta_config_llt_large(dev, apdev, test_params):
423     """FST STA configuration LLT is too large"""
424     res = run_test_sta_configuration(test_params,
425                                      fst_llt = '4294967296') #0x100000000
426     if res == 0:
427         raise Exception("wpa_supplicant started with llt that is too large")
428
429 def test_fst_sta_config_llt_nan(dev, apdev, test_params):
430     """FST STA configuration LLT is not a number"""
431     res = run_test_sta_configuration(test_params, fst_llt = 'nan')
432     if res == 0:
433         raise Exception("wpa_supplicant started with llt not a number")
434
435 def test_fst_sta_config_pri_neg(dev, apdev, test_params):
436     """FST STA configuration Priority negative"""
437     res = run_test_sta_configuration(test_params, fst_pri = '-1')
438     if res == 0:
439         raise Exception("wpa_supplicant started with a negative fst priority")
440
441 def test_fst_sta_config_pri_zero(dev, apdev, test_params):
442     """FST STA configuration Priority zero"""
443     res = run_test_sta_configuration(test_params, fst_pri = '0')
444     if res == 0:
445         raise Exception("wpa_supplicant started with a zero fst priority")
446
447 def test_fst_sta_config_pri_big(dev, apdev, test_params):
448     """FST STA configuration Priority too large"""
449     res = run_test_sta_configuration(test_params, fst_pri = '256')
450     if res == 0:
451         raise Exception("wpa_supplicant started with too large fst priority")
452
453 def test_fst_sta_config_pri_nan(dev, apdev, test_params):
454     """FST STA configuration Priority not a number"""
455     res = run_test_sta_configuration(test_params, fst_pri = 'nan')
456     if res == 0:
457         raise Exception("wpa_supplicant started with fst priority not a number")
458
459 def test_fst_sta_config_group_len(dev, apdev, test_params):
460     """FST STA configuration Group max length"""
461     res = run_test_sta_configuration(test_params,
462                                      fst_group = 'fstg5678abcd34567')
463     if res == 0:
464         raise Exception("wpa_supplicant started with fst_group length too big")
465
466 def test_fst_sta_config_good(dev, apdev, test_params):
467     """FST STA configuration good parameters"""
468     res = run_test_sta_configuration(test_params)
469     if res != 0:
470         raise Exception("wpa_supplicant didn't start with valid config parameters")
471
472 def test_fst_sta_config_default(dev, apdev, test_params):
473     """FST STA configuration default parameters"""
474     res = run_test_sta_configuration(test_params, fst_llt = None)
475     if res != 0:
476         raise Exception("wpa_supplicant didn't start with valid config parameters")
477
478 def test_fst_scan_mb(dev, apdev, test_params):
479     """FST scan valid MB IE presence with normal start"""
480     logdir = test_params['logdir']
481
482     # Test valid MB IE in scan results
483     fst_launcher = FstLauncher(logdir)
484     ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
485                               fst_test_common.fst_test_def_chan_a,
486                               fst_test_common.fst_test_def_group,
487                               fst_test_common.fst_test_def_prio_high)
488     ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b',
489                               fst_test_common.fst_test_def_chan_g,
490                               fst_test_common.fst_test_def_group,
491                               fst_test_common.fst_test_def_prio_low)
492     fst_launcher.add_cfg(ap1)
493     fst_launcher.add_cfg(ap2)
494     res = fst_launcher.run_hostapd()
495     if res != 0:
496         raise Exception("hostapd didn't start properly")
497     try:
498         mbie1=[]
499         flags1 = ''
500         mbie2=[]
501         flags2 = ''
502         # Scan 1st AP
503         vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
504         if vals1 != None:
505             if 'ie' in vals1:
506                 mbie1 = parse_ies(vals1['ie'], 0x9e)
507             if 'flags' in vals1:
508                 flags1 = vals1['flags']
509         # Scan 2nd AP
510         vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g)
511         if vals2 != None:
512             if 'ie' in vals2:
513                 mbie2 = parse_ies(vals2['ie'],0x9e)
514             if 'flags' in vals2:
515                 flags2 = vals2['flags']
516     finally:
517          fst_launcher.cleanup()
518
519     if len(mbie1) == 0:
520         raise Exception("No MB IE created by 1st AP")
521     if len(mbie2) == 0:
522         raise Exception("No MB IE created by 2nd AP")
523
524 def test_fst_scan_nomb(dev, apdev, test_params):
525     """FST scan no MB IE presence with 1 AP start"""
526     logdir = test_params['logdir']
527
528     # Test valid MB IE in scan results
529     fst_launcher = FstLauncher(logdir)
530     ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
531                               fst_test_common.fst_test_def_chan_a,
532                               fst_test_common.fst_test_def_group,
533                               fst_test_common.fst_test_def_prio_high)
534     fst_launcher.add_cfg(ap1)
535     res = fst_launcher.run_hostapd()
536     if res != 0:
537         raise Exception("Hostapd didn't start properly")
538     try:
539         time.sleep(2)
540         mbie1=[]
541         flags1 = ''
542         vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
543         if vals1 != None:
544             if 'ie' in vals1:
545                 mbie1 = parse_ies(vals1['ie'], 0x9e)
546             if 'flags' in vals1:
547                 flags1 = vals1['flags']
548     finally:
549         fst_launcher.cleanup()
550
551     if len(mbie1) != 0:
552         raise Exception("MB IE exists with 1 AP")