tests: Remove extra semicolons from python scripts
[mech_eap.git] / tests / hwsim / test_mbo.py
1 # MBO tests
2 # Copyright (c) 2016, Intel Deutschland GmbH
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 from remotehost import remote_compatible
8 import logging
9 logger = logging.getLogger()
10
11 import hostapd
12 import os
13 import time
14
15 import hostapd
16 from tshark import run_tshark
17 from utils import alloc_fail, fail_test
18
19 def set_reg(country_code, apdev0=None, apdev1=None, dev0=None):
20     if apdev0:
21         hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', country_code])
22     if apdev1:
23         hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', country_code])
24     if dev0:
25         dev0.cmd_execute(['iw', 'reg', 'set', country_code])
26
27 def run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, country):
28     """MBO and supported operating classes"""
29     addr = dev[0].own_addr()
30
31     res2 = None
32     res5 = None
33
34     dev[0].flush_scan_cache()
35     dev[0].dump_monitor()
36
37     logger.info("Country: " + country)
38     set_reg(country, apdev[0], apdev[1], dev[0])
39     for j in range(5):
40         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=5)
41         if ev is None:
42             raise Exception("No regdom change event")
43         if "alpha2=" + country in ev:
44             break
45     dev[0].dump_monitor()
46     dev[1].dump_monitor()
47     dev[2].dump_monitor()
48     if hapd:
49         hapd.set("country_code", country)
50         hapd.enable()
51         dev[0].scan_for_bss(hapd.own_addr(), 5180, force_scan=True)
52         dev[0].connect("test-wnm-mbo", key_mgmt="NONE", scan_freq="5180")
53         sta = hapd.get_sta(addr)
54         res5 = sta['supp_op_classes'][2:]
55         dev[0].request("REMOVE_NETWORK all")
56         hapd.disable()
57         dev[0].wait_disconnected()
58         dev[0].dump_monitor()
59
60     hapd2.set("country_code", country)
61     hapd2.enable()
62     dev[0].scan_for_bss(hapd2.own_addr(), 2412, force_scan=True)
63     dev[0].connect("test-wnm-mbo-2", key_mgmt="NONE", scan_freq="2412")
64     sta = hapd2.get_sta(addr)
65     res2 = sta['supp_op_classes'][2:]
66     dev[0].request("REMOVE_NETWORK all")
67     hapd2.disable()
68     dev[0].wait_disconnected()
69     dev[0].dump_monitor()
70
71     return res2, res5
72
73 def test_mbo_supp_oper_classes(dev, apdev):
74     """MBO and supported operating classes"""
75     params = { 'ssid': "test-wnm-mbo",
76                'mbo': '1',
77                "country_code": "US",
78                'ieee80211d': '1',
79                "ieee80211n": "1",
80                "hw_mode": "a",
81                "channel": "36" }
82     hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
83
84     params = { 'ssid': "test-wnm-mbo-2",
85                'mbo': '1',
86                "country_code": "US",
87                'ieee80211d': '1',
88                "ieee80211n": "1",
89                "hw_mode": "g",
90                "channel": "1" }
91     hapd2 = hostapd.add_ap(apdev[1], params, no_enable=True)
92
93     try:
94         za2, za5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "ZA")
95         fi2, fi5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "FI")
96         us2, us5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "US")
97         jp2, jp5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "JP")
98         bd2, bd5 = run_mbo_supp_oper_classes(dev, apdev, None, hapd2, "BD")
99         kz2, kz5 = run_mbo_supp_oper_classes(dev, apdev, None, hapd2, "KZ")
100     finally:
101         dev[0].dump_monitor()
102         set_reg("00", apdev[0], apdev[1], dev[0])
103         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
104
105     za = "515354737475767778797a7b808182"
106     fi = "515354737475767778797a7b808182"
107     us = "515354737475767778797a7b7c7d7e7f808182"
108     jp = "51525354737475767778797a7b808182"
109     bd = "5153547c7d7e7f80"
110     kz = "515354"
111
112     tests = [ ("ZA", za, za2, za5, True),
113               ("FI", fi, fi2, fi5, True),
114               ("US", us, us2, us5, True),
115               ("JP", jp, jp2, jp5, True),
116               ("BD", bd, bd2, bd5, False),
117               ("KZ", kz, kz2, kz5, False) ]
118     for country, expected, res2, res5, inc5 in tests:
119         if res2 != expected:
120             raise Exception("Unexpected supp_op_class string (country=%s, 2.4 GHz): %s (expected: %s)" % (country, res2, expected))
121         if inc5 and res5 != expected:
122             raise Exception("Unexpected supp_op_class string (country=%s, 5 GHz): %s (expected: %s)" % (country, res5, expected))
123
124 def test_mbo_assoc_disallow(dev, apdev, params):
125     hapd1 = hostapd.add_ap(apdev[0], { "ssid": "MBO", "mbo": "1" })
126     hapd2 = hostapd.add_ap(apdev[1], { "ssid": "MBO", "mbo": "1" })
127
128     logger.debug("Set mbo_assoc_disallow with invalid value")
129     if "FAIL" not in hapd1.request("SET mbo_assoc_disallow 2"):
130         raise Exception("Set mbo_assoc_disallow for AP1 succeeded unexpectedly with value 2")
131
132     logger.debug("Disallow associations to AP1 and allow association to AP2")
133     if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
134         raise Exception("Failed to set mbo_assoc_disallow for AP1")
135     if "OK" not in hapd2.request("SET mbo_assoc_disallow 0"):
136         raise Exception("Failed to set mbo_assoc_disallow for AP2")
137
138     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
139
140     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
141                      "wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00",
142                      wait=False)
143     if "Destination address: " + hapd1.own_addr() in out:
144         raise Exception("Association request sent to disallowed AP")
145
146     timestamp = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
147                            "wlan.fc.type_subtype == 0x00",
148                            display=['frame.time'], wait=False)
149
150     logger.debug("Allow associations to AP1 and disallow assications to AP2")
151     if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
152         raise Exception("Failed to set mbo_assoc_disallow for AP1")
153     if "OK" not in hapd2.request("SET mbo_assoc_disallow 1"):
154         raise Exception("Failed to set mbo_assoc_disallow for AP2")
155
156     dev[0].request("DISCONNECT")
157     dev[0].wait_disconnected()
158
159     # Force new scan, so the assoc_disallowed indication is updated */
160     dev[0].request("FLUSH")
161
162     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
163
164     filter = 'wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00 && frame.time > "' + timestamp.rstrip() + '"'
165     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
166                      filter, wait=False)
167     if "Destination address: " + hapd2.own_addr() in out:
168         raise Exception("Association request sent to disallowed AP 2")
169
170 @remote_compatible
171 def test_mbo_cell_capa_update(dev, apdev):
172     """MBO cellular data capability update"""
173     ssid = "test-wnm-mbo"
174     params = { 'ssid': ssid, 'mbo': '1' }
175     hapd = hostapd.add_ap(apdev[0], params)
176     bssid = apdev[0]['bssid']
177     if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
178         raise Exception("Failed to set STA as cellular data capable")
179
180     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
181
182     addr = dev[0].own_addr()
183     sta = hapd.get_sta(addr)
184     if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
185         raise Exception("mbo_cell_capa missing after association")
186
187     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
188         raise Exception("Failed to set STA as cellular data not-capable")
189     # Duplicate update for additional code coverage
190     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
191         raise Exception("Failed to set STA as cellular data not-capable")
192
193     time.sleep(0.2)
194     sta = hapd.get_sta(addr)
195     if 'mbo_cell_capa' not in sta:
196         raise Exception("mbo_cell_capa missing after update")
197     if sta['mbo_cell_capa'] != '3':
198         raise Exception("mbo_cell_capa not updated properly")
199
200 @remote_compatible
201 def test_mbo_cell_capa_update_pmf(dev, apdev):
202     """MBO cellular data capability update with PMF required"""
203     ssid = "test-wnm-mbo"
204     passphrase = "12345678"
205     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
206     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
207     params["ieee80211w"] = "2"
208     params['mbo'] = '1'
209     hapd = hostapd.add_ap(apdev[0], params)
210     bssid = apdev[0]['bssid']
211     if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
212         raise Exception("Failed to set STA as cellular data capable")
213
214     dev[0].connect(ssid, psk=passphrase, key_mgmt="WPA-PSK-SHA256",
215                    proto="WPA2", ieee80211w="2", scan_freq="2412")
216
217     addr = dev[0].own_addr()
218     sta = hapd.get_sta(addr)
219     if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
220         raise Exception("mbo_cell_capa missing after association")
221
222     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
223         raise Exception("Failed to set STA as cellular data not-capable")
224
225     time.sleep(0.2)
226     sta = hapd.get_sta(addr)
227     if 'mbo_cell_capa' not in sta:
228         raise Exception("mbo_cell_capa missing after update")
229     if sta['mbo_cell_capa'] != '3':
230         raise Exception("mbo_cell_capa not updated properly")
231
232 def test_mbo_wnm_token_wrap(dev, apdev):
233     """MBO WNM token wrap around"""
234     ssid = "test-wnm-mbo"
235     params = { 'ssid': ssid, 'mbo': '1' }
236     hapd = hostapd.add_ap(apdev[0], params)
237     bssid = apdev[0]['bssid']
238
239     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
240
241     # Trigger transmission of 256 WNM-Notification frames to wrap around the
242     # 8-bit mbo_wnm_token counter.
243     for i in range(128):
244         if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
245             raise Exception("Failed to set STA as cellular data capable")
246         if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
247             raise Exception("Failed to set STA as cellular data not-capable")
248
249 @remote_compatible
250 def test_mbo_non_pref_chan(dev, apdev):
251     """MBO non-preferred channel list"""
252     ssid = "test-wnm-mbo"
253     params = { 'ssid': ssid, 'mbo': '1' }
254     hapd = hostapd.add_ap(apdev[0], params)
255     bssid = apdev[0]['bssid']
256     if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:99"):
257         raise Exception("Invalid non_pref_chan value accepted")
258     if "FAIL" not in dev[0].request("SET non_pref_chan 81:15:200:3"):
259         raise Exception("Invalid non_pref_chan value accepted")
260     if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3 81:7:201:3"):
261         raise Exception("Invalid non_pref_chan value accepted")
262     if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
263         raise Exception("Failed to set non-preferred channel list")
264     if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:1:123 81:9:100:2"):
265         raise Exception("Failed to set non-preferred channel list")
266
267     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
268
269     addr = dev[0].own_addr()
270     sta = hapd.get_sta(addr)
271     logger.debug("STA: " + str(sta))
272     if 'non_pref_chan[0]' not in sta:
273         raise Exception("Missing non_pref_chan[0] value (assoc)")
274     if sta['non_pref_chan[0]'] != '81:200:1:123:7':
275         raise Exception("Unexpected non_pref_chan[0] value (assoc)")
276     if 'non_pref_chan[1]' not in sta:
277         raise Exception("Missing non_pref_chan[1] value (assoc)")
278     if sta['non_pref_chan[1]'] != '81:100:2:0:9':
279         raise Exception("Unexpected non_pref_chan[1] value (assoc)")
280     if 'non_pref_chan[2]' in sta:
281         raise Exception("Unexpected non_pref_chan[2] value (assoc)")
282
283     if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2"):
284         raise Exception("Failed to update non-preferred channel list")
285     time.sleep(0.1)
286     sta = hapd.get_sta(addr)
287     logger.debug("STA: " + str(sta))
288     if 'non_pref_chan[0]' not in sta:
289         raise Exception("Missing non_pref_chan[0] value (update 1)")
290     if sta['non_pref_chan[0]'] != '81:100:2:0:9':
291         raise Exception("Unexpected non_pref_chan[0] value (update 1)")
292     if 'non_pref_chan[1]' in sta:
293         raise Exception("Unexpected non_pref_chan[2] value (update 1)")
294
295     if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2 81:10:100:2 81:8:100:2 81:7:100:1:123 81:5:100:1:124"):
296         raise Exception("Failed to update non-preferred channel list")
297     time.sleep(0.1)
298     sta = hapd.get_sta(addr)
299     logger.debug("STA: " + str(sta))
300     if 'non_pref_chan[0]' not in sta:
301         raise Exception("Missing non_pref_chan[0] value (update 2)")
302     if sta['non_pref_chan[0]'] != '81:100:1:123:7':
303         raise Exception("Unexpected non_pref_chan[0] value (update 2)")
304     if 'non_pref_chan[1]' not in sta:
305         raise Exception("Missing non_pref_chan[1] value (update 2)")
306     if sta['non_pref_chan[1]'] != '81:100:1:124:5':
307         raise Exception("Unexpected non_pref_chan[1] value (update 2)")
308     if 'non_pref_chan[2]' not in sta:
309         raise Exception("Missing non_pref_chan[2] value (update 2)")
310     if sta['non_pref_chan[2]'] != '81:100:2:0:9,10,8':
311         raise Exception("Unexpected non_pref_chan[2] value (update 2)")
312     if 'non_pref_chan[3]' in sta:
313         raise Exception("Unexpected non_pref_chan[3] value (update 2)")
314
315     if "OK" not in dev[0].request("SET non_pref_chan 81:5:90:2 82:14:91:2"):
316         raise Exception("Failed to update non-preferred channel list")
317     time.sleep(0.1)
318     sta = hapd.get_sta(addr)
319     logger.debug("STA: " + str(sta))
320     if 'non_pref_chan[0]' not in sta:
321         raise Exception("Missing non_pref_chan[0] value (update 3)")
322     if sta['non_pref_chan[0]'] != '81:90:2:0:5':
323         raise Exception("Unexpected non_pref_chan[0] value (update 3)")
324     if 'non_pref_chan[1]' not in sta:
325         raise Exception("Missing non_pref_chan[1] value (update 3)")
326     if sta['non_pref_chan[1]'] != '82:91:2:0:14':
327         raise Exception("Unexpected non_pref_chan[1] value (update 3)")
328     if 'non_pref_chan[2]' in sta:
329         raise Exception("Unexpected non_pref_chan[2] value (update 3)")
330
331     if "OK" not in dev[0].request("SET non_pref_chan "):
332         raise Exception("Failed to update non-preferred channel list")
333     time.sleep(0.1)
334     sta = hapd.get_sta(addr)
335     logger.debug("STA: " + str(sta))
336     if 'non_pref_chan[0]' in sta:
337         raise Exception("Unexpected non_pref_chan[0] value (update 4)")
338
339 @remote_compatible
340 def test_mbo_sta_supp_op_classes(dev, apdev):
341     """MBO STA supported operating classes"""
342     ssid = "test-wnm-mbo"
343     params = { 'ssid': ssid, 'mbo': '1' }
344     hapd = hostapd.add_ap(apdev[0], params)
345
346     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
347
348     addr = dev[0].own_addr()
349     sta = hapd.get_sta(addr)
350     logger.debug("STA: " + str(sta))
351     if 'supp_op_classes' not in sta:
352         raise Exception("No supp_op_classes")
353     supp = bytearray(sta['supp_op_classes'].decode("hex"))
354     if supp[0] != 81:
355         raise Exception("Unexpected current operating class %d" % supp[0])
356     if 115 not in supp:
357         raise Exception("Operating class 115 missing")
358
359 def test_mbo_failures(dev, apdev):
360     """MBO failure cases"""
361     ssid = "test-wnm-mbo"
362     params = { 'ssid': ssid, 'mbo': '1' }
363     hapd = hostapd.add_ap(apdev[0], params)
364
365     with alloc_fail(dev[0], 1, "wpas_mbo_ie"):
366         dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
367
368     with alloc_fail(dev[0], 1, "wpas_mbo_send_wnm_notification"):
369         if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
370             raise Exception("Failed to set STA as cellular data capable")
371     with fail_test(dev[0], 1, "wpas_mbo_send_wnm_notification"):
372         if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
373             raise Exception("Failed to set STA as cellular data not-capable")
374     with alloc_fail(dev[0], 1, "wpas_mbo_update_non_pref_chan"):
375         if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
376             raise Exception("non_pref_chan value accepted during OOM")
377     with alloc_fail(dev[0], 2, "wpas_mbo_update_non_pref_chan"):
378         if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
379             raise Exception("non_pref_chan value accepted during OOM")
380
381 def test_mbo_wnm_bss_tm_ie_parsing(dev, apdev):
382     """MBO BSS transition request MBO IE parsing"""
383     ssid = "test-wnm-mbo"
384     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
385     hapd = hostapd.add_ap(apdev[0], params)
386     bssid = apdev[0]['bssid']
387     addr = dev[0].own_addr()
388     dev[0].connect(ssid, psk="12345678", key_mgmt="WPA-PSK",
389                    proto="WPA2", ieee80211w="0", scan_freq="2412")
390
391     dev[0].request("SET ext_mgmt_frame_handling 1")
392     hdr = "d0003a01" + addr.replace(':', '') + bssid.replace(':', '') + bssid.replace(':', '') + "3000"
393     btm_hdr = "0a070100030001"
394
395     tests = [ ("Truncated attribute in MBO IE", "dd06506f9a160101"),
396               ("Unexpected cell data capa attribute length in MBO IE",
397                "dd09506f9a160501030500"),
398               ("Unexpected transition reason attribute length in MBO IE",
399                "dd06506f9a160600"),
400               ("Unexpected assoc retry delay attribute length in MBO IE",
401                "dd0c506f9a160100080200000800"),
402               ("Unknown attribute id 255 in MBO IE",
403                "dd06506f9a16ff00") ]
404
405     for test, mbo_ie in tests:
406         logger.info(test)
407         dev[0].request("NOTE " + test)
408         frame = hdr + btm_hdr + mbo_ie
409         if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
410             raise Exception("MGMT_RX_PROCESS failed")
411
412     logger.info("Unexpected association retry delay")
413     dev[0].request("NOTE Unexpected association retry delay")
414     btm_hdr = "0a070108030001112233445566778899aabbcc"
415     mbo_ie = "dd08506f9a1608020000"
416     frame = hdr + btm_hdr + mbo_ie
417     if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
418         raise Exception("MGMT_RX_PROCESS failed")
419
420     dev[0].request("SET ext_mgmt_frame_handling 0")