tests: MBO and ignoring disallowed association
[mech_eap.git] / tests / hwsim / test_mbo.py
1 # MBO tests
2 # Copyright (c) 2016, Intel Deutschland GmbH
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 from remotehost import remote_compatible
8 import logging
9 logger = logging.getLogger()
10
11 import hostapd
12 import os
13 import time
14
15 import hostapd
16 from tshark import run_tshark
17 from utils import alloc_fail, fail_test
18
19 def set_reg(country_code, apdev0=None, apdev1=None, dev0=None):
20     if apdev0:
21         hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', country_code])
22     if apdev1:
23         hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', country_code])
24     if dev0:
25         dev0.cmd_execute(['iw', 'reg', 'set', country_code])
26
27 def run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, country):
28     """MBO and supported operating classes"""
29     addr = dev[0].own_addr()
30
31     res2 = None
32     res5 = None
33
34     dev[0].flush_scan_cache()
35     dev[0].dump_monitor()
36
37     logger.info("Country: " + country)
38     set_reg(country, apdev[0], apdev[1], dev[0])
39     for j in range(5):
40         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=5)
41         if ev is None:
42             raise Exception("No regdom change event")
43         if "alpha2=" + country in ev:
44             break
45     dev[0].dump_monitor()
46     dev[1].dump_monitor()
47     dev[2].dump_monitor()
48     if hapd:
49         hapd.set("country_code", country)
50         hapd.enable()
51         dev[0].scan_for_bss(hapd.own_addr(), 5180, force_scan=True)
52         dev[0].connect("test-wnm-mbo", key_mgmt="NONE", scan_freq="5180")
53         sta = hapd.get_sta(addr)
54         res5 = sta['supp_op_classes'][2:]
55         dev[0].request("REMOVE_NETWORK all")
56         hapd.disable()
57         dev[0].wait_disconnected()
58         dev[0].dump_monitor()
59
60     hapd2.set("country_code", country)
61     hapd2.enable()
62     dev[0].scan_for_bss(hapd2.own_addr(), 2412, force_scan=True)
63     dev[0].connect("test-wnm-mbo-2", key_mgmt="NONE", scan_freq="2412")
64     sta = hapd2.get_sta(addr)
65     res2 = sta['supp_op_classes'][2:]
66     dev[0].request("REMOVE_NETWORK all")
67     hapd2.disable()
68     dev[0].wait_disconnected()
69     dev[0].dump_monitor()
70
71     return res2, res5
72
73 def test_mbo_supp_oper_classes(dev, apdev):
74     """MBO and supported operating classes"""
75     params = { 'ssid': "test-wnm-mbo",
76                'mbo': '1',
77                "country_code": "US",
78                'ieee80211d': '1',
79                "ieee80211n": "1",
80                "hw_mode": "a",
81                "channel": "36" }
82     hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
83
84     params = { 'ssid': "test-wnm-mbo-2",
85                'mbo': '1',
86                "country_code": "US",
87                'ieee80211d': '1',
88                "ieee80211n": "1",
89                "hw_mode": "g",
90                "channel": "1" }
91     hapd2 = hostapd.add_ap(apdev[1], params, no_enable=True)
92
93     try:
94         za2, za5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "ZA")
95         fi2, fi5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "FI")
96         us2, us5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "US")
97         jp2, jp5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "JP")
98         bd2, bd5 = run_mbo_supp_oper_classes(dev, apdev, None, hapd2, "BD")
99         kz2, kz5 = run_mbo_supp_oper_classes(dev, apdev, None, hapd2, "KZ")
100     finally:
101         dev[0].dump_monitor()
102         set_reg("00", apdev[0], apdev[1], dev[0])
103         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
104
105     za = "515354737475767778797a7b808182"
106     fi = "515354737475767778797a7b808182"
107     us = "515354737475767778797a7b7c7d7e7f808182"
108     jp = "51525354737475767778797a7b808182"
109     bd = "5153547c7d7e7f80"
110     kz = "515354"
111
112     tests = [ ("ZA", za, za2, za5, True),
113               ("FI", fi, fi2, fi5, True),
114               ("US", us, us2, us5, True),
115               ("JP", jp, jp2, jp5, True),
116               ("BD", bd, bd2, bd5, False),
117               ("KZ", kz, kz2, kz5, False) ]
118     for country, expected, res2, res5, inc5 in tests:
119         # For now, allow operating class 129 to be missing since not all
120         # installed regdb files include the 160 MHz channels.
121         expected2 = expected.replace('808182', '8082')
122         # For now, allow operating classes 121-123 to be missing since not all
123         # installed regdb files include the related US DFS channels.
124         expected2 = expected2.replace('78797a7b7c', '787c')
125         if res2 != expected and res2 != expected2:
126             raise Exception("Unexpected supp_op_class string (country=%s, 2.4 GHz): %s (expected: %s)" % (country, res2, expected))
127         if inc5 and res5 != expected and res5 != expected2:
128             raise Exception("Unexpected supp_op_class string (country=%s, 5 GHz): %s (expected: %s)" % (country, res5, expected))
129
130 def test_mbo_assoc_disallow(dev, apdev, params):
131     """MBO and association disallowed"""
132     hapd1 = hostapd.add_ap(apdev[0], { "ssid": "MBO", "mbo": "1" })
133     hapd2 = hostapd.add_ap(apdev[1], { "ssid": "MBO", "mbo": "1" })
134
135     logger.debug("Set mbo_assoc_disallow with invalid value")
136     if "FAIL" not in hapd1.request("SET mbo_assoc_disallow 2"):
137         raise Exception("Set mbo_assoc_disallow for AP1 succeeded unexpectedly with value 2")
138
139     logger.debug("Disallow associations to AP1 and allow association to AP2")
140     if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
141         raise Exception("Failed to set mbo_assoc_disallow for AP1")
142     if "OK" not in hapd2.request("SET mbo_assoc_disallow 0"):
143         raise Exception("Failed to set mbo_assoc_disallow for AP2")
144
145     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
146
147     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
148                      "wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00",
149                      wait=False)
150     if "Destination address: " + hapd1.own_addr() in out:
151         raise Exception("Association request sent to disallowed AP")
152
153     timestamp = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
154                            "wlan.fc.type_subtype == 0x00",
155                            display=['frame.time'], wait=False)
156
157     logger.debug("Allow associations to AP1 and disallow associations to AP2")
158     if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
159         raise Exception("Failed to set mbo_assoc_disallow for AP1")
160     if "OK" not in hapd2.request("SET mbo_assoc_disallow 1"):
161         raise Exception("Failed to set mbo_assoc_disallow for AP2")
162
163     dev[0].request("DISCONNECT")
164     dev[0].wait_disconnected()
165
166     # Force new scan, so the assoc_disallowed indication is updated */
167     dev[0].request("FLUSH")
168
169     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
170
171     filter = 'wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00 && frame.time > "' + timestamp.rstrip() + '"'
172     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
173                      filter, wait=False)
174     if "Destination address: " + hapd2.own_addr() in out:
175         raise Exception("Association request sent to disallowed AP 2")
176
177 def test_mbo_assoc_disallow_ignore(dev, apdev):
178     """MBO and ignoring disallowed association"""
179     try:
180         _test_mbo_assoc_disallow_ignore(dev, apdev)
181     finally:
182         dev[0].request("SCAN_INTERVAL 5")
183
184 def _test_mbo_assoc_disallow_ignore(dev, apdev):
185     hapd1 = hostapd.add_ap(apdev[0], { "ssid": "MBO", "mbo": "1" })
186     if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
187         raise Exception("Failed to set mbo_assoc_disallow for AP1")
188
189     if "OK" not in dev[0].request("SCAN_INTERVAL 1"):
190         raise Exception("Failed to set scan interval")
191     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412", wait_connect=False)
192     ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
193     if ev is None:
194         raise Exception("CTRL-EVENT-NETWORK-NOT-FOUND not seen")
195
196     if "OK" not in dev[0].request("SET ignore_assoc_disallow 1"):
197         raise Exception("Failed to set ignore_assoc_disallow")
198     ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
199     if ev is None:
200         raise Exception("CTRL-EVENT-ASSOC-REJECT not seen")
201     if "status_code=17" not in ev:
202         raise Exception("Unexpected association reject reason: " + ev)
203
204     if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
205         raise Exception("Failed to set mbo_assoc_disallow for AP1")
206     dev[0].wait_connected()
207
208 @remote_compatible
209 def test_mbo_cell_capa_update(dev, apdev):
210     """MBO cellular data capability update"""
211     ssid = "test-wnm-mbo"
212     params = { 'ssid': ssid, 'mbo': '1' }
213     hapd = hostapd.add_ap(apdev[0], params)
214     bssid = apdev[0]['bssid']
215     if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
216         raise Exception("Failed to set STA as cellular data capable")
217
218     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
219
220     addr = dev[0].own_addr()
221     sta = hapd.get_sta(addr)
222     if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
223         raise Exception("mbo_cell_capa missing after association")
224
225     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
226         raise Exception("Failed to set STA as cellular data not-capable")
227     # Duplicate update for additional code coverage
228     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
229         raise Exception("Failed to set STA as cellular data not-capable")
230
231     time.sleep(0.2)
232     sta = hapd.get_sta(addr)
233     if 'mbo_cell_capa' not in sta:
234         raise Exception("mbo_cell_capa missing after update")
235     if sta['mbo_cell_capa'] != '3':
236         raise Exception("mbo_cell_capa not updated properly")
237
238 @remote_compatible
239 def test_mbo_cell_capa_update_pmf(dev, apdev):
240     """MBO cellular data capability update with PMF required"""
241     ssid = "test-wnm-mbo"
242     passphrase = "12345678"
243     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
244     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
245     params["ieee80211w"] = "2"
246     params['mbo'] = '1'
247     hapd = hostapd.add_ap(apdev[0], params)
248     bssid = apdev[0]['bssid']
249     if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
250         raise Exception("Failed to set STA as cellular data capable")
251
252     dev[0].connect(ssid, psk=passphrase, key_mgmt="WPA-PSK-SHA256",
253                    proto="WPA2", ieee80211w="2", scan_freq="2412")
254
255     addr = dev[0].own_addr()
256     sta = hapd.get_sta(addr)
257     if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
258         raise Exception("mbo_cell_capa missing after association")
259
260     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
261         raise Exception("Failed to set STA as cellular data not-capable")
262
263     time.sleep(0.2)
264     sta = hapd.get_sta(addr)
265     if 'mbo_cell_capa' not in sta:
266         raise Exception("mbo_cell_capa missing after update")
267     if sta['mbo_cell_capa'] != '3':
268         raise Exception("mbo_cell_capa not updated properly")
269
270 def test_mbo_wnm_token_wrap(dev, apdev):
271     """MBO WNM token wrap around"""
272     ssid = "test-wnm-mbo"
273     params = { 'ssid': ssid, 'mbo': '1' }
274     hapd = hostapd.add_ap(apdev[0], params)
275     bssid = apdev[0]['bssid']
276
277     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
278
279     # Trigger transmission of 256 WNM-Notification frames to wrap around the
280     # 8-bit mbo_wnm_token counter.
281     for i in range(128):
282         if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
283             raise Exception("Failed to set STA as cellular data capable")
284         if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
285             raise Exception("Failed to set STA as cellular data not-capable")
286
287 @remote_compatible
288 def test_mbo_non_pref_chan(dev, apdev):
289     """MBO non-preferred channel list"""
290     ssid = "test-wnm-mbo"
291     params = { 'ssid': ssid, 'mbo': '1' }
292     hapd = hostapd.add_ap(apdev[0], params)
293     bssid = apdev[0]['bssid']
294     if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:99"):
295         raise Exception("Invalid non_pref_chan value accepted")
296     if "FAIL" not in dev[0].request("SET non_pref_chan 81:15:200:3"):
297         raise Exception("Invalid non_pref_chan value accepted")
298     if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3 81:7:201:3"):
299         raise Exception("Invalid non_pref_chan value accepted")
300     if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
301         raise Exception("Failed to set non-preferred channel list")
302     if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:1 81:9:100:2"):
303         raise Exception("Failed to set non-preferred channel list")
304
305     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
306
307     addr = dev[0].own_addr()
308     sta = hapd.get_sta(addr)
309     logger.debug("STA: " + str(sta))
310     if 'non_pref_chan[0]' not in sta:
311         raise Exception("Missing non_pref_chan[0] value (assoc)")
312     if sta['non_pref_chan[0]'] != '81:200:1:7':
313         raise Exception("Unexpected non_pref_chan[0] value (assoc)")
314     if 'non_pref_chan[1]' not in sta:
315         raise Exception("Missing non_pref_chan[1] value (assoc)")
316     if sta['non_pref_chan[1]'] != '81:100:2:9':
317         raise Exception("Unexpected non_pref_chan[1] value (assoc)")
318     if 'non_pref_chan[2]' in sta:
319         raise Exception("Unexpected non_pref_chan[2] value (assoc)")
320
321     if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2"):
322         raise Exception("Failed to update non-preferred channel list")
323     time.sleep(0.1)
324     sta = hapd.get_sta(addr)
325     logger.debug("STA: " + str(sta))
326     if 'non_pref_chan[0]' not in sta:
327         raise Exception("Missing non_pref_chan[0] value (update 1)")
328     if sta['non_pref_chan[0]'] != '81:100:2:9':
329         raise Exception("Unexpected non_pref_chan[0] value (update 1)")
330     if 'non_pref_chan[1]' in sta:
331         raise Exception("Unexpected non_pref_chan[1] value (update 1)")
332
333     if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2 81:10:100:2 81:8:100:2 81:7:100:1 81:5:100:1"):
334         raise Exception("Failed to update non-preferred channel list")
335     time.sleep(0.1)
336     sta = hapd.get_sta(addr)
337     logger.debug("STA: " + str(sta))
338     if 'non_pref_chan[0]' not in sta:
339         raise Exception("Missing non_pref_chan[0] value (update 2)")
340     if sta['non_pref_chan[0]'] != '81:100:1:7,5':
341         raise Exception("Unexpected non_pref_chan[0] value (update 2)")
342     if 'non_pref_chan[1]' not in sta:
343         raise Exception("Missing non_pref_chan[1] value (update 2)")
344     if sta['non_pref_chan[1]'] != '81:100:2:9,10,8':
345         raise Exception("Unexpected non_pref_chan[1] value (update 2)")
346     if 'non_pref_chan[2]' in sta:
347         raise Exception("Unexpected non_pref_chan[2] value (update 2)")
348
349     if "OK" not in dev[0].request("SET non_pref_chan 81:5:90:2 82:14:91:2"):
350         raise Exception("Failed to update non-preferred channel list")
351     time.sleep(0.1)
352     sta = hapd.get_sta(addr)
353     logger.debug("STA: " + str(sta))
354     if 'non_pref_chan[0]' not in sta:
355         raise Exception("Missing non_pref_chan[0] value (update 3)")
356     if sta['non_pref_chan[0]'] != '81:90:2:5':
357         raise Exception("Unexpected non_pref_chan[0] value (update 3)")
358     if 'non_pref_chan[1]' not in sta:
359         raise Exception("Missing non_pref_chan[1] value (update 3)")
360     if sta['non_pref_chan[1]'] != '82:91:2:14':
361         raise Exception("Unexpected non_pref_chan[1] value (update 3)")
362     if 'non_pref_chan[2]' in sta:
363         raise Exception("Unexpected non_pref_chan[2] value (update 3)")
364
365     if "OK" not in dev[0].request("SET non_pref_chan "):
366         raise Exception("Failed to update non-preferred channel list")
367     time.sleep(0.1)
368     sta = hapd.get_sta(addr)
369     logger.debug("STA: " + str(sta))
370     if 'non_pref_chan[0]' in sta:
371         raise Exception("Unexpected non_pref_chan[0] value (update 4)")
372
373 @remote_compatible
374 def test_mbo_sta_supp_op_classes(dev, apdev):
375     """MBO STA supported operating classes"""
376     ssid = "test-wnm-mbo"
377     params = { 'ssid': ssid, 'mbo': '1' }
378     hapd = hostapd.add_ap(apdev[0], params)
379
380     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
381
382     addr = dev[0].own_addr()
383     sta = hapd.get_sta(addr)
384     logger.debug("STA: " + str(sta))
385     if 'supp_op_classes' not in sta:
386         raise Exception("No supp_op_classes")
387     supp = bytearray(sta['supp_op_classes'].decode("hex"))
388     if supp[0] != 81:
389         raise Exception("Unexpected current operating class %d" % supp[0])
390     if 115 not in supp:
391         raise Exception("Operating class 115 missing")
392
393 def test_mbo_failures(dev, apdev):
394     """MBO failure cases"""
395     ssid = "test-wnm-mbo"
396     params = { 'ssid': ssid, 'mbo': '1' }
397     hapd = hostapd.add_ap(apdev[0], params)
398
399     with alloc_fail(dev[0], 1, "wpas_mbo_ie"):
400         dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
401
402     with alloc_fail(dev[0], 1, "wpas_mbo_send_wnm_notification"):
403         if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
404             raise Exception("Failed to set STA as cellular data capable")
405     with fail_test(dev[0], 1, "wpas_mbo_send_wnm_notification"):
406         if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
407             raise Exception("Failed to set STA as cellular data not-capable")
408     with alloc_fail(dev[0], 1, "wpas_mbo_update_non_pref_chan"):
409         if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
410             raise Exception("non_pref_chan value accepted during OOM")
411     with alloc_fail(dev[0], 2, "wpas_mbo_update_non_pref_chan"):
412         if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
413             raise Exception("non_pref_chan value accepted during OOM")
414
415 def test_mbo_wnm_bss_tm_ie_parsing(dev, apdev):
416     """MBO BSS transition request MBO IE parsing"""
417     ssid = "test-wnm-mbo"
418     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
419     hapd = hostapd.add_ap(apdev[0], params)
420     bssid = apdev[0]['bssid']
421     addr = dev[0].own_addr()
422     dev[0].connect(ssid, psk="12345678", key_mgmt="WPA-PSK",
423                    proto="WPA2", ieee80211w="0", scan_freq="2412")
424
425     dev[0].request("SET ext_mgmt_frame_handling 1")
426     hdr = "d0003a01" + addr.replace(':', '') + bssid.replace(':', '') + bssid.replace(':', '') + "3000"
427     btm_hdr = "0a070100030001"
428
429     tests = [ ("Truncated attribute in MBO IE", "dd06506f9a160101"),
430               ("Unexpected cell data capa attribute length in MBO IE",
431                "dd09506f9a160501030500"),
432               ("Unexpected transition reason attribute length in MBO IE",
433                "dd06506f9a160600"),
434               ("Unexpected assoc retry delay attribute length in MBO IE",
435                "dd0c506f9a160100080200000800"),
436               ("Unknown attribute id 255 in MBO IE",
437                "dd06506f9a16ff00") ]
438
439     for test, mbo_ie in tests:
440         logger.info(test)
441         dev[0].request("NOTE " + test)
442         frame = hdr + btm_hdr + mbo_ie
443         if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
444             raise Exception("MGMT_RX_PROCESS failed")
445
446     logger.info("Unexpected association retry delay")
447     dev[0].request("NOTE Unexpected association retry delay")
448     btm_hdr = "0a070108030001112233445566778899aabbcc"
449     mbo_ie = "dd08506f9a1608020000"
450     frame = hdr + btm_hdr + mbo_ie
451     if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
452         raise Exception("MGMT_RX_PROCESS failed")
453
454     dev[0].request("SET ext_mgmt_frame_handling 0")