8f3d358660f4716d2a12f649ca979ef3e1294483
[mech_eap.git] / tests / hwsim / test_mbo.py
1 # MBO tests
2 # Copyright (c) 2016, Intel Deutschland GmbH
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 from remotehost import remote_compatible
8 import logging
9 logger = logging.getLogger()
10
11 import hostapd
12 import os
13 import time
14
15 import hostapd
16 from tshark import run_tshark
17 from utils import alloc_fail, fail_test
18
19 def set_reg(country_code, apdev0=None, apdev1=None, dev0=None):
20     if apdev0:
21         hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', country_code])
22     if apdev1:
23         hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', country_code])
24     if dev0:
25         dev0.cmd_execute(['iw', 'reg', 'set', country_code])
26
27 def run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, country):
28     """MBO and supported operating classes"""
29     addr = dev[0].own_addr()
30
31     res2 = None
32     res5 = None
33
34     dev[0].flush_scan_cache()
35     dev[0].dump_monitor()
36
37     logger.info("Country: " + country)
38     set_reg(country, apdev[0], apdev[1], dev[0])
39     for j in range(5):
40         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=5)
41         if ev is None:
42             raise Exception("No regdom change event")
43         if "alpha2=" + country in ev:
44             break
45     dev[0].dump_monitor()
46     dev[1].dump_monitor()
47     dev[2].dump_monitor()
48     if hapd:
49         hapd.set("country_code", country)
50         hapd.enable()
51         dev[0].scan_for_bss(hapd.own_addr(), 5180, force_scan=True)
52         dev[0].connect("test-wnm-mbo", key_mgmt="NONE", scan_freq="5180")
53         sta = hapd.get_sta(addr)
54         res5 = sta['supp_op_classes'][2:]
55         dev[0].request("REMOVE_NETWORK all")
56         hapd.disable()
57         dev[0].wait_disconnected()
58         dev[0].dump_monitor()
59
60     hapd2.set("country_code", country)
61     hapd2.enable()
62     dev[0].scan_for_bss(hapd2.own_addr(), 2412, force_scan=True)
63     dev[0].connect("test-wnm-mbo-2", key_mgmt="NONE", scan_freq="2412")
64     sta = hapd2.get_sta(addr)
65     res2 = sta['supp_op_classes'][2:]
66     dev[0].request("REMOVE_NETWORK all")
67     hapd2.disable()
68     dev[0].wait_disconnected()
69     dev[0].dump_monitor()
70
71     return res2, res5
72
73 def test_mbo_supp_oper_classes(dev, apdev):
74     """MBO and supported operating classes"""
75     params = { 'ssid': "test-wnm-mbo",
76                'mbo': '1',
77                "country_code": "US",
78                'ieee80211d': '1',
79                "ieee80211n": "1",
80                "hw_mode": "a",
81                "channel": "36" }
82     hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
83
84     params = { 'ssid': "test-wnm-mbo-2",
85                'mbo': '1',
86                "country_code": "US",
87                'ieee80211d': '1',
88                "ieee80211n": "1",
89                "hw_mode": "g",
90                "channel": "1" }
91     hapd2 = hostapd.add_ap(apdev[1], params, no_enable=True)
92
93     try:
94         za2, za5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "ZA")
95         fi2, fi5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "FI")
96         us2, us5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "US")
97         jp2, jp5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, "JP")
98         bd2, bd5 = run_mbo_supp_oper_classes(dev, apdev, None, hapd2, "BD")
99         kz2, kz5 = run_mbo_supp_oper_classes(dev, apdev, None, hapd2, "KZ")
100     finally:
101         dev[0].dump_monitor()
102         set_reg("00", apdev[0], apdev[1], dev[0])
103         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
104
105     za = "515354737475767778797a7b808182"
106     fi = "515354737475767778797a7b808182"
107     us = "515354737475767778797a7b7c7d7e7f808182"
108     jp = "51525354737475767778797a7b808182"
109     bd = "5153547c7d7e7f80"
110     kz = "515354"
111
112     tests = [ ("ZA", za, za2, za5, True),
113               ("FI", fi, fi2, fi5, True),
114               ("US", us, us2, us5, True),
115               ("JP", jp, jp2, jp5, True),
116               ("BD", bd, bd2, bd5, False),
117               ("KZ", kz, kz2, kz5, False) ]
118     for country, expected, res2, res5, inc5 in tests:
119         # For now, allow operating class 129 to be missing since not all
120         # installed regdb files include the 160 MHz channels.
121         expected2 = expected.replace('808182', '8082')
122         # For now, allow operating classes 121-123 to be missing since not all
123         # installed regdb files include the related US DFS channels.
124         expected2 = expected2.replace('78797a7b7c', '787c')
125         if res2 != expected and res2 != expected2:
126             raise Exception("Unexpected supp_op_class string (country=%s, 2.4 GHz): %s (expected: %s)" % (country, res2, expected))
127         if inc5 and res5 != expected and res5 != expected2:
128             raise Exception("Unexpected supp_op_class string (country=%s, 5 GHz): %s (expected: %s)" % (country, res5, expected))
129
130 def test_mbo_assoc_disallow(dev, apdev, params):
131     hapd1 = hostapd.add_ap(apdev[0], { "ssid": "MBO", "mbo": "1" })
132     hapd2 = hostapd.add_ap(apdev[1], { "ssid": "MBO", "mbo": "1" })
133
134     logger.debug("Set mbo_assoc_disallow with invalid value")
135     if "FAIL" not in hapd1.request("SET mbo_assoc_disallow 2"):
136         raise Exception("Set mbo_assoc_disallow for AP1 succeeded unexpectedly with value 2")
137
138     logger.debug("Disallow associations to AP1 and allow association to AP2")
139     if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
140         raise Exception("Failed to set mbo_assoc_disallow for AP1")
141     if "OK" not in hapd2.request("SET mbo_assoc_disallow 0"):
142         raise Exception("Failed to set mbo_assoc_disallow for AP2")
143
144     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
145
146     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
147                      "wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00",
148                      wait=False)
149     if "Destination address: " + hapd1.own_addr() in out:
150         raise Exception("Association request sent to disallowed AP")
151
152     timestamp = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
153                            "wlan.fc.type_subtype == 0x00",
154                            display=['frame.time'], wait=False)
155
156     logger.debug("Allow associations to AP1 and disallow assications to AP2")
157     if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
158         raise Exception("Failed to set mbo_assoc_disallow for AP1")
159     if "OK" not in hapd2.request("SET mbo_assoc_disallow 1"):
160         raise Exception("Failed to set mbo_assoc_disallow for AP2")
161
162     dev[0].request("DISCONNECT")
163     dev[0].wait_disconnected()
164
165     # Force new scan, so the assoc_disallowed indication is updated */
166     dev[0].request("FLUSH")
167
168     dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
169
170     filter = 'wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00 && frame.time > "' + timestamp.rstrip() + '"'
171     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
172                      filter, wait=False)
173     if "Destination address: " + hapd2.own_addr() in out:
174         raise Exception("Association request sent to disallowed AP 2")
175
176 @remote_compatible
177 def test_mbo_cell_capa_update(dev, apdev):
178     """MBO cellular data capability update"""
179     ssid = "test-wnm-mbo"
180     params = { 'ssid': ssid, 'mbo': '1' }
181     hapd = hostapd.add_ap(apdev[0], params)
182     bssid = apdev[0]['bssid']
183     if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
184         raise Exception("Failed to set STA as cellular data capable")
185
186     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
187
188     addr = dev[0].own_addr()
189     sta = hapd.get_sta(addr)
190     if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
191         raise Exception("mbo_cell_capa missing after association")
192
193     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
194         raise Exception("Failed to set STA as cellular data not-capable")
195     # Duplicate update for additional code coverage
196     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
197         raise Exception("Failed to set STA as cellular data not-capable")
198
199     time.sleep(0.2)
200     sta = hapd.get_sta(addr)
201     if 'mbo_cell_capa' not in sta:
202         raise Exception("mbo_cell_capa missing after update")
203     if sta['mbo_cell_capa'] != '3':
204         raise Exception("mbo_cell_capa not updated properly")
205
206 @remote_compatible
207 def test_mbo_cell_capa_update_pmf(dev, apdev):
208     """MBO cellular data capability update with PMF required"""
209     ssid = "test-wnm-mbo"
210     passphrase = "12345678"
211     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
212     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
213     params["ieee80211w"] = "2"
214     params['mbo'] = '1'
215     hapd = hostapd.add_ap(apdev[0], params)
216     bssid = apdev[0]['bssid']
217     if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
218         raise Exception("Failed to set STA as cellular data capable")
219
220     dev[0].connect(ssid, psk=passphrase, key_mgmt="WPA-PSK-SHA256",
221                    proto="WPA2", ieee80211w="2", scan_freq="2412")
222
223     addr = dev[0].own_addr()
224     sta = hapd.get_sta(addr)
225     if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
226         raise Exception("mbo_cell_capa missing after association")
227
228     if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
229         raise Exception("Failed to set STA as cellular data not-capable")
230
231     time.sleep(0.2)
232     sta = hapd.get_sta(addr)
233     if 'mbo_cell_capa' not in sta:
234         raise Exception("mbo_cell_capa missing after update")
235     if sta['mbo_cell_capa'] != '3':
236         raise Exception("mbo_cell_capa not updated properly")
237
238 def test_mbo_wnm_token_wrap(dev, apdev):
239     """MBO WNM token wrap around"""
240     ssid = "test-wnm-mbo"
241     params = { 'ssid': ssid, 'mbo': '1' }
242     hapd = hostapd.add_ap(apdev[0], params)
243     bssid = apdev[0]['bssid']
244
245     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
246
247     # Trigger transmission of 256 WNM-Notification frames to wrap around the
248     # 8-bit mbo_wnm_token counter.
249     for i in range(128):
250         if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
251             raise Exception("Failed to set STA as cellular data capable")
252         if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
253             raise Exception("Failed to set STA as cellular data not-capable")
254
255 @remote_compatible
256 def test_mbo_non_pref_chan(dev, apdev):
257     """MBO non-preferred channel list"""
258     ssid = "test-wnm-mbo"
259     params = { 'ssid': ssid, 'mbo': '1' }
260     hapd = hostapd.add_ap(apdev[0], params)
261     bssid = apdev[0]['bssid']
262     if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:99"):
263         raise Exception("Invalid non_pref_chan value accepted")
264     if "FAIL" not in dev[0].request("SET non_pref_chan 81:15:200:3"):
265         raise Exception("Invalid non_pref_chan value accepted")
266     if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3 81:7:201:3"):
267         raise Exception("Invalid non_pref_chan value accepted")
268     if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
269         raise Exception("Failed to set non-preferred channel list")
270     if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:1:123 81:9:100:2"):
271         raise Exception("Failed to set non-preferred channel list")
272
273     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
274
275     addr = dev[0].own_addr()
276     sta = hapd.get_sta(addr)
277     logger.debug("STA: " + str(sta))
278     if 'non_pref_chan[0]' not in sta:
279         raise Exception("Missing non_pref_chan[0] value (assoc)")
280     if sta['non_pref_chan[0]'] != '81:200:1:123:7':
281         raise Exception("Unexpected non_pref_chan[0] value (assoc)")
282     if 'non_pref_chan[1]' not in sta:
283         raise Exception("Missing non_pref_chan[1] value (assoc)")
284     if sta['non_pref_chan[1]'] != '81:100:2:0:9':
285         raise Exception("Unexpected non_pref_chan[1] value (assoc)")
286     if 'non_pref_chan[2]' in sta:
287         raise Exception("Unexpected non_pref_chan[2] value (assoc)")
288
289     if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2"):
290         raise Exception("Failed to update non-preferred channel list")
291     time.sleep(0.1)
292     sta = hapd.get_sta(addr)
293     logger.debug("STA: " + str(sta))
294     if 'non_pref_chan[0]' not in sta:
295         raise Exception("Missing non_pref_chan[0] value (update 1)")
296     if sta['non_pref_chan[0]'] != '81:100:2:0:9':
297         raise Exception("Unexpected non_pref_chan[0] value (update 1)")
298     if 'non_pref_chan[1]' in sta:
299         raise Exception("Unexpected non_pref_chan[2] value (update 1)")
300
301     if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2 81:10:100:2 81:8:100:2 81:7:100:1:123 81:5:100:1:124"):
302         raise Exception("Failed to update non-preferred channel list")
303     time.sleep(0.1)
304     sta = hapd.get_sta(addr)
305     logger.debug("STA: " + str(sta))
306     if 'non_pref_chan[0]' not in sta:
307         raise Exception("Missing non_pref_chan[0] value (update 2)")
308     if sta['non_pref_chan[0]'] != '81:100:1:123:7':
309         raise Exception("Unexpected non_pref_chan[0] value (update 2)")
310     if 'non_pref_chan[1]' not in sta:
311         raise Exception("Missing non_pref_chan[1] value (update 2)")
312     if sta['non_pref_chan[1]'] != '81:100:1:124:5':
313         raise Exception("Unexpected non_pref_chan[1] value (update 2)")
314     if 'non_pref_chan[2]' not in sta:
315         raise Exception("Missing non_pref_chan[2] value (update 2)")
316     if sta['non_pref_chan[2]'] != '81:100:2:0:9,10,8':
317         raise Exception("Unexpected non_pref_chan[2] value (update 2)")
318     if 'non_pref_chan[3]' in sta:
319         raise Exception("Unexpected non_pref_chan[3] value (update 2)")
320
321     if "OK" not in dev[0].request("SET non_pref_chan 81:5:90:2 82:14:91:2"):
322         raise Exception("Failed to update non-preferred channel list")
323     time.sleep(0.1)
324     sta = hapd.get_sta(addr)
325     logger.debug("STA: " + str(sta))
326     if 'non_pref_chan[0]' not in sta:
327         raise Exception("Missing non_pref_chan[0] value (update 3)")
328     if sta['non_pref_chan[0]'] != '81:90:2:0:5':
329         raise Exception("Unexpected non_pref_chan[0] value (update 3)")
330     if 'non_pref_chan[1]' not in sta:
331         raise Exception("Missing non_pref_chan[1] value (update 3)")
332     if sta['non_pref_chan[1]'] != '82:91:2:0:14':
333         raise Exception("Unexpected non_pref_chan[1] value (update 3)")
334     if 'non_pref_chan[2]' in sta:
335         raise Exception("Unexpected non_pref_chan[2] value (update 3)")
336
337     if "OK" not in dev[0].request("SET non_pref_chan "):
338         raise Exception("Failed to update non-preferred channel list")
339     time.sleep(0.1)
340     sta = hapd.get_sta(addr)
341     logger.debug("STA: " + str(sta))
342     if 'non_pref_chan[0]' in sta:
343         raise Exception("Unexpected non_pref_chan[0] value (update 4)")
344
345 @remote_compatible
346 def test_mbo_sta_supp_op_classes(dev, apdev):
347     """MBO STA supported operating classes"""
348     ssid = "test-wnm-mbo"
349     params = { 'ssid': ssid, 'mbo': '1' }
350     hapd = hostapd.add_ap(apdev[0], params)
351
352     dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
353
354     addr = dev[0].own_addr()
355     sta = hapd.get_sta(addr)
356     logger.debug("STA: " + str(sta))
357     if 'supp_op_classes' not in sta:
358         raise Exception("No supp_op_classes")
359     supp = bytearray(sta['supp_op_classes'].decode("hex"))
360     if supp[0] != 81:
361         raise Exception("Unexpected current operating class %d" % supp[0])
362     if 115 not in supp:
363         raise Exception("Operating class 115 missing")
364
365 def test_mbo_failures(dev, apdev):
366     """MBO failure cases"""
367     ssid = "test-wnm-mbo"
368     params = { 'ssid': ssid, 'mbo': '1' }
369     hapd = hostapd.add_ap(apdev[0], params)
370
371     with alloc_fail(dev[0], 1, "wpas_mbo_ie"):
372         dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
373
374     with alloc_fail(dev[0], 1, "wpas_mbo_send_wnm_notification"):
375         if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
376             raise Exception("Failed to set STA as cellular data capable")
377     with fail_test(dev[0], 1, "wpas_mbo_send_wnm_notification"):
378         if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
379             raise Exception("Failed to set STA as cellular data not-capable")
380     with alloc_fail(dev[0], 1, "wpas_mbo_update_non_pref_chan"):
381         if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
382             raise Exception("non_pref_chan value accepted during OOM")
383     with alloc_fail(dev[0], 2, "wpas_mbo_update_non_pref_chan"):
384         if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
385             raise Exception("non_pref_chan value accepted during OOM")
386
387 def test_mbo_wnm_bss_tm_ie_parsing(dev, apdev):
388     """MBO BSS transition request MBO IE parsing"""
389     ssid = "test-wnm-mbo"
390     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
391     hapd = hostapd.add_ap(apdev[0], params)
392     bssid = apdev[0]['bssid']
393     addr = dev[0].own_addr()
394     dev[0].connect(ssid, psk="12345678", key_mgmt="WPA-PSK",
395                    proto="WPA2", ieee80211w="0", scan_freq="2412")
396
397     dev[0].request("SET ext_mgmt_frame_handling 1")
398     hdr = "d0003a01" + addr.replace(':', '') + bssid.replace(':', '') + bssid.replace(':', '') + "3000"
399     btm_hdr = "0a070100030001"
400
401     tests = [ ("Truncated attribute in MBO IE", "dd06506f9a160101"),
402               ("Unexpected cell data capa attribute length in MBO IE",
403                "dd09506f9a160501030500"),
404               ("Unexpected transition reason attribute length in MBO IE",
405                "dd06506f9a160600"),
406               ("Unexpected assoc retry delay attribute length in MBO IE",
407                "dd0c506f9a160100080200000800"),
408               ("Unknown attribute id 255 in MBO IE",
409                "dd06506f9a16ff00") ]
410
411     for test, mbo_ie in tests:
412         logger.info(test)
413         dev[0].request("NOTE " + test)
414         frame = hdr + btm_hdr + mbo_ie
415         if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
416             raise Exception("MGMT_RX_PROCESS failed")
417
418     logger.info("Unexpected association retry delay")
419     dev[0].request("NOTE Unexpected association retry delay")
420     btm_hdr = "0a070108030001112233445566778899aabbcc"
421     mbo_ie = "dd08506f9a1608020000"
422     frame = hdr + btm_hdr + mbo_ie
423     if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
424         raise Exception("MGMT_RX_PROCESS failed")
425
426     dev[0].request("SET ext_mgmt_frame_handling 0")