tests: VHT/HT preference in BSS selection
[mech_eap.git] / tests / hwsim / test_ap_ht.py
1 # Test cases for HT operations with hostapd
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import time
8 import logging
9 logger = logging.getLogger()
10 import struct
11 import subprocess
12
13 import hostapd
14 from utils import HwsimSkip, alloc_fail
15 import hwsim_utils
16 from test_ap_csa import csa_supported
17
18 def clear_scan_cache(ifname):
19     subprocess.call(['ifconfig', ifname, 'up'])
20     subprocess.call(['iw', ifname, 'scan', 'freq', '2412', 'flush'])
21     time.sleep(0.1)
22     subprocess.call(['ifconfig', ifname, 'down'])
23
24 def test_ap_ht40_scan(dev, apdev):
25     """HT40 co-ex scan"""
26     clear_scan_cache(apdev[0]['ifname'])
27     params = { "ssid": "test-ht40",
28                "channel": "5",
29                "ht_capab": "[HT40-]"}
30     hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
31
32     state = hapd.get_status_field("state")
33     if state != "HT_SCAN":
34         time.sleep(0.1)
35         state = hapd.get_status_field("state")
36         if state != "HT_SCAN":
37             raise Exception("Unexpected interface state - expected HT_SCAN")
38
39     ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
40     if not ev:
41         raise Exception("AP setup timed out")
42
43     state = hapd.get_status_field("state")
44     if state != "ENABLED":
45         raise Exception("Unexpected interface state - expected ENABLED")
46
47     freq = hapd.get_status_field("freq")
48     if freq != "2432":
49         raise Exception("Unexpected frequency")
50     pri = hapd.get_status_field("channel")
51     if pri != "5":
52         raise Exception("Unexpected primary channel")
53     sec = hapd.get_status_field("secondary_channel")
54     if sec != "-1":
55         raise Exception("Unexpected secondary channel")
56
57     dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
58
59 def test_ap_ht40_scan_conflict(dev, apdev):
60     """HT40 co-ex scan conflict"""
61     clear_scan_cache(apdev[0]['ifname'])
62     params = { "ssid": "test-ht40",
63                "channel": "6",
64                "ht_capab": "[HT40+]"}
65     hostapd.add_ap(apdev[1]['ifname'], params)
66
67     params = { "ssid": "test-ht40",
68                "channel": "5",
69                "ht_capab": "[HT40-]"}
70     hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
71
72     state = hapd.get_status_field("state")
73     if state != "HT_SCAN":
74         time.sleep(0.1)
75         state = hapd.get_status_field("state")
76         if state != "HT_SCAN":
77             raise Exception("Unexpected interface state - expected HT_SCAN")
78
79     ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
80     if not ev:
81         raise Exception("AP setup timed out")
82
83     state = hapd.get_status_field("state")
84     if state != "ENABLED":
85         raise Exception("Unexpected interface state - expected ENABLED")
86
87     freq = hapd.get_status_field("freq")
88     if freq != "2432":
89         raise Exception("Unexpected frequency")
90     pri = hapd.get_status_field("channel")
91     if pri != "5":
92         raise Exception("Unexpected primary channel")
93     sec = hapd.get_status_field("secondary_channel")
94     if sec != "0":
95         raise Exception("Unexpected secondary channel: " + sec)
96
97     dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
98
99 def test_ap_ht40_scan_legacy_conflict(dev, apdev):
100     """HT40 co-ex scan conflict with legacy 20 MHz AP"""
101     clear_scan_cache(apdev[0]['ifname'])
102     params = { "ssid": "legacy-20",
103                "channel": "7", "ieee80211n": "0" }
104     hostapd.add_ap(apdev[1]['ifname'], params)
105
106     params = { "ssid": "test-ht40",
107                "channel": "5",
108                "ht_capab": "[HT40-]"}
109     hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
110
111     state = hapd.get_status_field("state")
112     if state != "HT_SCAN":
113         time.sleep(0.1)
114         state = hapd.get_status_field("state")
115         if state != "HT_SCAN":
116             raise Exception("Unexpected interface state - expected HT_SCAN")
117
118     ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
119     if not ev:
120         raise Exception("AP setup timed out")
121
122     state = hapd.get_status_field("state")
123     if state != "ENABLED":
124         raise Exception("Unexpected interface state - expected ENABLED")
125
126     freq = hapd.get_status_field("freq")
127     if freq != "2432":
128         raise Exception("Unexpected frequency: " + freq)
129     pri = hapd.get_status_field("channel")
130     if pri != "5":
131         raise Exception("Unexpected primary channel: " + pri)
132     sec = hapd.get_status_field("secondary_channel")
133     if sec != "0":
134         raise Exception("Unexpected secondary channel: " + sec)
135
136     dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
137
138 def test_ap_ht40_scan_match(dev, apdev):
139     """HT40 co-ex scan matching configuration"""
140     clear_scan_cache(apdev[0]['ifname'])
141     params = { "ssid": "test-ht40",
142                "channel": "5",
143                "ht_capab": "[HT40-]"}
144     hostapd.add_ap(apdev[1]['ifname'], params)
145
146     params = { "ssid": "test-ht40",
147                "channel": "5",
148                "ht_capab": "[HT40-]"}
149     hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
150
151     state = hapd.get_status_field("state")
152     if state != "HT_SCAN":
153         time.sleep(0.1)
154         state = hapd.get_status_field("state")
155         if state != "HT_SCAN":
156             raise Exception("Unexpected interface state - expected HT_SCAN")
157
158     ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
159     if not ev:
160         raise Exception("AP setup timed out")
161
162     state = hapd.get_status_field("state")
163     if state != "ENABLED":
164         raise Exception("Unexpected interface state - expected ENABLED")
165
166     freq = hapd.get_status_field("freq")
167     if freq != "2432":
168         raise Exception("Unexpected frequency")
169     pri = hapd.get_status_field("channel")
170     if pri != "5":
171         raise Exception("Unexpected primary channel")
172     sec = hapd.get_status_field("secondary_channel")
173     if sec != "-1":
174         raise Exception("Unexpected secondary channel: " + sec)
175
176     dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
177
178 def test_ap_ht40_5ghz_match(dev, apdev):
179     """HT40 co-ex scan on 5 GHz with matching pri/sec channel"""
180     clear_scan_cache(apdev[0]['ifname'])
181     try:
182         hapd = None
183         hapd2 = None
184         params = { "ssid": "test-ht40",
185                    "hw_mode": "a",
186                    "channel": "36",
187                    "country_code": "US",
188                    "ht_capab": "[HT40+]"}
189         hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
190
191         params = { "ssid": "test-ht40",
192                    "hw_mode": "a",
193                    "channel": "36",
194                    "ht_capab": "[HT40+]"}
195         hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
196
197         state = hapd.get_status_field("state")
198         if state != "HT_SCAN":
199             time.sleep(0.1)
200             state = hapd.get_status_field("state")
201             if state != "HT_SCAN":
202                 raise Exception("Unexpected interface state - expected HT_SCAN")
203
204         ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
205         if not ev:
206             raise Exception("AP setup timed out")
207
208         state = hapd.get_status_field("state")
209         if state != "ENABLED":
210             raise Exception("Unexpected interface state - expected ENABLED")
211
212         freq = hapd.get_status_field("freq")
213         if freq != "5180":
214             raise Exception("Unexpected frequency")
215         pri = hapd.get_status_field("channel")
216         if pri != "36":
217             raise Exception("Unexpected primary channel")
218         sec = hapd.get_status_field("secondary_channel")
219         if sec != "1":
220             raise Exception("Unexpected secondary channel: " + sec)
221
222         dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
223     finally:
224         dev[0].request("DISCONNECT")
225         if hapd:
226             hapd.request("DISABLE")
227         if hapd2:
228             hapd2.request("DISABLE")
229         subprocess.call(['iw', 'reg', 'set', '00'])
230         dev[0].flush_scan_cache()
231
232 def test_ap_ht40_5ghz_switch(dev, apdev):
233     """HT40 co-ex scan on 5 GHz switching pri/sec channel"""
234     clear_scan_cache(apdev[0]['ifname'])
235     try:
236         hapd = None
237         hapd2 = None
238         params = { "ssid": "test-ht40",
239                    "hw_mode": "a",
240                    "channel": "36",
241                    "country_code": "US",
242                    "ht_capab": "[HT40+]"}
243         hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
244
245         params = { "ssid": "test-ht40",
246                    "hw_mode": "a",
247                    "channel": "40",
248                    "ht_capab": "[HT40-]"}
249         hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
250
251         state = hapd.get_status_field("state")
252         if state != "HT_SCAN":
253             time.sleep(0.1)
254             state = hapd.get_status_field("state")
255             if state != "HT_SCAN":
256                 raise Exception("Unexpected interface state - expected HT_SCAN")
257
258         ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
259         if not ev:
260             raise Exception("AP setup timed out")
261
262         state = hapd.get_status_field("state")
263         if state != "ENABLED":
264             raise Exception("Unexpected interface state - expected ENABLED")
265
266         freq = hapd.get_status_field("freq")
267         if freq != "5180":
268             raise Exception("Unexpected frequency: " + freq)
269         pri = hapd.get_status_field("channel")
270         if pri != "36":
271             raise Exception("Unexpected primary channel: " + pri)
272         sec = hapd.get_status_field("secondary_channel")
273         if sec != "1":
274             raise Exception("Unexpected secondary channel: " + sec)
275
276         dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
277     finally:
278         dev[0].request("DISCONNECT")
279         if hapd:
280             hapd.request("DISABLE")
281         if hapd2:
282             hapd2.request("DISABLE")
283         subprocess.call(['iw', 'reg', 'set', '00'])
284
285 def test_ap_ht40_5ghz_switch2(dev, apdev):
286     """HT40 co-ex scan on 5 GHz switching pri/sec channel (2)"""
287     clear_scan_cache(apdev[0]['ifname'])
288     try:
289         hapd = None
290         hapd2 = None
291         params = { "ssid": "test-ht40",
292                    "hw_mode": "a",
293                    "channel": "36",
294                    "country_code": "US",
295                    "ht_capab": "[HT40+]"}
296         hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
297
298         id = dev[0].add_network()
299         dev[0].set_network(id, "mode", "2")
300         dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
301         dev[0].set_network(id, "key_mgmt", "NONE")
302         dev[0].set_network(id, "frequency", "5200")
303         dev[0].set_network(id, "scan_freq", "5200")
304         dev[0].select_network(id)
305         time.sleep(1)
306
307         params = { "ssid": "test-ht40",
308                    "hw_mode": "a",
309                    "channel": "40",
310                    "ht_capab": "[HT40-]"}
311         hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
312
313         state = hapd.get_status_field("state")
314         if state != "HT_SCAN":
315             time.sleep(0.1)
316             state = hapd.get_status_field("state")
317             if state != "HT_SCAN":
318                 raise Exception("Unexpected interface state - expected HT_SCAN")
319
320         ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
321         if not ev:
322             raise Exception("AP setup timed out")
323
324         state = hapd.get_status_field("state")
325         if state != "ENABLED":
326             raise Exception("Unexpected interface state - expected ENABLED")
327
328         freq = hapd.get_status_field("freq")
329         if freq != "5180":
330             raise Exception("Unexpected frequency: " + freq)
331         pri = hapd.get_status_field("channel")
332         if pri != "36":
333             raise Exception("Unexpected primary channel: " + pri)
334         sec = hapd.get_status_field("secondary_channel")
335         if sec != "1":
336             raise Exception("Unexpected secondary channel: " + sec)
337
338         dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
339     finally:
340         dev[0].request("DISCONNECT")
341         if hapd:
342             hapd.request("DISABLE")
343         if hapd2:
344             hapd2.request("DISABLE")
345         subprocess.call(['iw', 'reg', 'set', '00'])
346         dev[0].flush_scan_cache()
347
348 def test_obss_scan(dev, apdev):
349     """Overlapping BSS scan request"""
350     params = { "ssid": "obss-scan",
351                "channel": "6",
352                "ht_capab": "[HT40-]",
353                "obss_interval": "10" }
354     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
355
356     params = { "ssid": "another-bss",
357                "channel": "9",
358                "ieee80211n": "0" }
359     hostapd.add_ap(apdev[1]['ifname'], params)
360
361     dev[0].connect("obss-scan", key_mgmt="NONE", scan_freq="2437")
362     hapd.set("ext_mgmt_frame_handling", "1")
363     logger.info("Waiting for OBSS scan to occur")
364     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], timeout=15)
365     if ev is None:
366         raise Exception("Timed out while waiting for OBSS scan to start")
367     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], timeout=10)
368     if ev is None:
369         raise Exception("Timed out while waiting for OBSS scan results")
370     received = False
371     for i in range(0, 4):
372         frame = hapd.mgmt_rx(timeout=5)
373         if frame is None:
374             raise Exception("MGMT RX wait timed out")
375         if frame['subtype'] != 13:
376             continue
377         payload = frame['payload']
378         if len(payload) < 3:
379             continue
380         (category, action, ie) = struct.unpack('BBB', payload[0:3])
381         if category != 4:
382             continue
383         if action != 0:
384             continue
385         if ie == 72:
386             logger.info("20/40 BSS Coexistence report received")
387             received = True
388             break
389     if not received:
390         raise Exception("20/40 BSS Coexistence report not seen")
391
392 def test_obss_scan_40_intolerant(dev, apdev):
393     """Overlapping BSS scan request with 40 MHz intolerant AP"""
394     params = { "ssid": "obss-scan",
395                "channel": "6",
396                "ht_capab": "[HT40-]",
397                "obss_interval": "10" }
398     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
399
400     params = { "ssid": "another-bss",
401                "channel": "7",
402                "ht_capab": "[40-INTOLERANT]" }
403     hostapd.add_ap(apdev[1]['ifname'], params)
404
405     dev[0].connect("obss-scan", key_mgmt="NONE", scan_freq="2437")
406     hapd.set("ext_mgmt_frame_handling", "1")
407     logger.info("Waiting for OBSS scan to occur")
408     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], timeout=15)
409     if ev is None:
410         raise Exception("Timed out while waiting for OBSS scan to start")
411     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], timeout=10)
412     if ev is None:
413         raise Exception("Timed out while waiting for OBSS scan results")
414     received = False
415     for i in range(0, 4):
416         frame = hapd.mgmt_rx(timeout=5)
417         if frame is None:
418             raise Exception("MGMT RX wait timed out")
419         if frame['subtype'] != 13:
420             continue
421         payload = frame['payload']
422         if len(payload) < 3:
423             continue
424         (category, action, ie) = struct.unpack('BBB', payload[0:3])
425         if category != 4:
426             continue
427         if action != 0:
428             continue
429         if ie == 72:
430             logger.info("20/40 BSS Coexistence report received")
431             received = True
432             break
433     if not received:
434         raise Exception("20/40 BSS Coexistence report not seen")
435
436 def test_obss_coex_report_handling(dev, apdev):
437     """Overlapping BSS scan report handling with obss_interval=0"""
438     clear_scan_cache(apdev[0]['ifname'])
439     params = { "ssid": "obss-scan",
440                "channel": "6",
441                "ht_capab": "[HT40-]" }
442     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
443     bssid = apdev[0]['bssid']
444     dev[0].connect("obss-scan", key_mgmt="NONE", scan_freq="2437")
445
446     sec = hapd.get_status_field("secondary_channel")
447     if sec != "-1":
448         raise Exception("AP is not using 40 MHz channel")
449
450     # 20/40 MHz co-ex report tests: number of invalid reports and a valid report
451     # that forces 20 MHz channel.
452     tests = [ '0400', '040048', '04004801', '0400480000', '0400490100',
453               '040048ff0000', '04004801ff49ff00', '04004801004900',
454               '0400480100490101', '0400480100490201ff',
455               '040048010449020005' ]
456     for msg in tests:
457         req = "MGMT_TX {} {} freq=2437 action={}".format(bssid, bssid, msg)
458         if "OK" not in dev[0].request(req):
459             raise Exception("Could not send management frame")
460     time.sleep(0.5)
461     sec = hapd.get_status_field("secondary_channel")
462     if sec != "0":
463         raise Exception("AP did not move to 20 MHz channel")
464
465 def test_obss_coex_report_handling1(dev, apdev):
466     """Overlapping BSS scan report handling with obss_interval=1"""
467     clear_scan_cache(apdev[0]['ifname'])
468     params = { "ssid": "obss-scan",
469                "channel": "6",
470                "ht_capab": "[HT40+]",
471                "obss_interval": "1" }
472     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
473     bssid = apdev[0]['bssid']
474     dev[0].connect("obss-scan", key_mgmt="NONE", scan_freq="2437")
475
476     sec = hapd.get_status_field("secondary_channel")
477     if sec != "1":
478         raise Exception("AP is not using 40 MHz channel")
479
480     # 20/40 MHz co-ex report forcing 20 MHz channel
481     msg = '040048010449020005'
482     req = "MGMT_TX {} {} freq=2437 action={}".format(bssid, bssid, msg)
483     if "OK" not in dev[0].request(req):
484         raise Exception("Could not send management frame")
485     time.sleep(0.5)
486     sec = hapd.get_status_field("secondary_channel")
487     if sec != "0":
488         raise Exception("AP did not move to 20 MHz channel")
489
490     # No 20/40 MHz co-ex reports forcing 20 MHz channel during next interval
491     for i in range(20):
492         sec = hapd.get_status_field("secondary_channel")
493         if sec == "1":
494             break
495         time.sleep(0.5)
496     if sec != "1":
497         raise Exception("AP did not return to 40 MHz channel")
498
499 def test_olbc(dev, apdev):
500     """OLBC detection"""
501     params = { "ssid": "test-olbc",
502                "channel": "6",
503                "ht_capab": "[HT40-]",
504                "ap_table_expiration_time": "2" }
505     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
506     status = hapd.get_status()
507     if status['olbc'] != '0' or status['olbc_ht'] != '0':
508         raise Exception("Unexpected OLBC information")
509
510     params = { "ssid": "olbc-ap",
511                "hw_mode": "b",
512                "channel": "6",
513                "wmm_enabled": "0" }
514     hostapd.add_ap(apdev[1]['ifname'], params)
515     time.sleep(0.5)
516     status = hapd.get_status()
517     if status['olbc'] != '1' or status['olbc_ht'] != '1':
518         raise Exception("Missing OLBC information")
519
520     hapd_global = hostapd.HostapdGlobal()
521     hapd_global.remove(apdev[1]['ifname'])
522
523     logger.info("Waiting for OLBC state to time out")
524     cleared = False
525     for i in range(0, 15):
526         time.sleep(1)
527         status = hapd.get_status()
528         if status['olbc'] == '0' and status['olbc_ht'] == '0':
529             cleared = True
530             break
531     if not cleared:
532         raise Exception("OLBC state did nto time out")
533
534 def test_olbc_table_limit(dev, apdev):
535     """OLBC AP table size limit"""
536     ifname1 = apdev[0]['ifname']
537     ifname2 = apdev[0]['ifname'] + '-2'
538     ifname3 = apdev[0]['ifname'] + '-3'
539     hostapd.add_bss('phy3', ifname1, 'bss-1.conf')
540     hostapd.add_bss('phy3', ifname2, 'bss-2.conf')
541     hostapd.add_bss('phy3', ifname3, 'bss-3.conf')
542
543     params = { "ssid": "test-olbc",
544                "channel": "1",
545                "ap_table_max_size": "2" }
546     hapd = hostapd.add_ap(apdev[1]['ifname'], params)
547
548     time.sleep(0.3)
549     with alloc_fail(hapd, 1, "ap_list_process_beacon"):
550         time.sleep(0.3)
551     hapd.set("ap_table_max_size", "1")
552     time.sleep(0.3)
553     hapd.set("ap_table_max_size", "0")
554     time.sleep(0.3)
555
556 def test_olbc_5ghz(dev, apdev):
557     """OLBC detection on 5 GHz"""
558     try:
559         hapd = None
560         hapd2 = None
561         params = { "ssid": "test-olbc",
562                    "country_code": "FI",
563                    "hw_mode": "a",
564                    "channel": "36",
565                    "ht_capab": "[HT40+]" }
566         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
567         status = hapd.get_status()
568         if status['olbc'] != '0' or status['olbc_ht'] != '0':
569             raise Exception("Unexpected OLBC information")
570
571         params = { "ssid": "olbc-ap",
572                    "country_code": "FI",
573                    "hw_mode": "a",
574                    "channel": "36",
575                    "ieee80211n": "0",
576                    "wmm_enabled": "0" }
577         hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
578         found = False
579         for i in range(20):
580             time.sleep(0.1)
581             status = hapd.get_status()
582             logger.debug('olbc_ht: ' + status['olbc_ht'])
583             if status['olbc_ht'] == '1':
584                 found = True
585                 break
586         if not found:
587             raise Exception("Missing OLBC information")
588     finally:
589         if hapd:
590             hapd.request("DISABLE")
591         if hapd2:
592             hapd2.request("DISABLE")
593         subprocess.call(['iw', 'reg', 'set', '00'])
594
595 def test_ap_require_ht(dev, apdev):
596     """Require HT"""
597     params = { "ssid": "require-ht",
598                "require_ht": "1" }
599     hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
600
601     dev[1].connect("require-ht", key_mgmt="NONE", scan_freq="2412",
602                    disable_ht="1", wait_connect=False)
603     dev[0].connect("require-ht", key_mgmt="NONE", scan_freq="2412")
604     ev = dev[1].wait_event(["CTRL-EVENT-ASSOC-REJECT"])
605     if ev is None:
606         raise Exception("Association rejection timed out")
607     if "status_code=27" not in ev:
608         raise Exception("Unexpected rejection status code")
609     dev[2].connect("require-ht", key_mgmt="NONE", scan_freq="2412",
610                    ht_mcs="0x01 00 00 00 00 00 00 00 00 00",
611                    disable_max_amsdu="1", ampdu_factor="2",
612                    ampdu_density="1", disable_ht40="1", disable_sgi="1",
613                    disable_ldpc="1")
614
615 def test_ap_require_ht_limited_rates(dev, apdev):
616     """Require HT with limited supported rates"""
617     params = { "ssid": "require-ht",
618                "supported_rates": "60 120 240 360 480 540",
619                "require_ht": "1" }
620     hapd = hostapd.add_ap(apdev[0]['ifname'], params, wait_enabled=False)
621
622     dev[1].connect("require-ht", key_mgmt="NONE", scan_freq="2412",
623                    disable_ht="1", wait_connect=False)
624     dev[0].connect("require-ht", key_mgmt="NONE", scan_freq="2412")
625     ev = dev[1].wait_event(["CTRL-EVENT-ASSOC-REJECT"])
626     if ev is None:
627         raise Exception("Association rejection timed out")
628     if "status_code=27" not in ev:
629         raise Exception("Unexpected rejection status code")
630
631 def test_ap_ht_capab_not_supported(dev, apdev):
632     """HT configuration with driver not supporting all ht_capab entries"""
633     params = { "ssid": "test-ht40",
634                "channel": "5",
635                "ht_capab": "[HT40-][LDPC][SMPS-STATIC][SMPS-DYNAMIC][GF][SHORT-GI-20][SHORT-GI-40][TX-STBC][RX-STBC1][RX-STBC12][RX-STBC123][DELAYED-BA][MAX-AMSDU-7935][DSSS_CCK-40][LSIG-TXOP-PROT]"}
636     hapd = hostapd.add_ap(apdev[0]['ifname'], params, no_enable=True)
637     if "FAIL" not in hapd.request("ENABLE"):
638         raise Exception("Unexpected ENABLE success")
639
640 def test_ap_ht_40mhz_intolerant_sta(dev, apdev):
641     """Associated STA indicating 40 MHz intolerant"""
642     clear_scan_cache(apdev[0]['ifname'])
643     params = { "ssid": "intolerant",
644                "channel": "6",
645                "ht_capab": "[HT40-]" }
646     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
647     if hapd.get_status_field("num_sta_ht40_intolerant") != "0":
648         raise Exception("Unexpected num_sta_ht40_intolerant value")
649     if hapd.get_status_field("secondary_channel") != "-1":
650         raise Exception("Unexpected secondary_channel")
651
652     dev[0].connect("intolerant", key_mgmt="NONE", scan_freq="2437")
653     if hapd.get_status_field("num_sta_ht40_intolerant") != "0":
654         raise Exception("Unexpected num_sta_ht40_intolerant value")
655     if hapd.get_status_field("secondary_channel") != "-1":
656         raise Exception("Unexpected secondary_channel")
657
658     dev[2].connect("intolerant", key_mgmt="NONE", scan_freq="2437",
659                    ht40_intolerant="1")
660     time.sleep(1)
661     if hapd.get_status_field("num_sta_ht40_intolerant") != "1":
662         raise Exception("Unexpected num_sta_ht40_intolerant value (expected 1)")
663     if hapd.get_status_field("secondary_channel") != "0":
664         raise Exception("Unexpected secondary_channel (did not disable 40 MHz)")
665
666     dev[2].request("DISCONNECT")
667     time.sleep(1)
668     if hapd.get_status_field("num_sta_ht40_intolerant") != "0":
669         raise Exception("Unexpected num_sta_ht40_intolerant value (expected 0)")
670     if hapd.get_status_field("secondary_channel") != "-1":
671         raise Exception("Unexpected secondary_channel (did not re-enable 40 MHz)")
672
673 def test_ap_ht_40mhz_intolerant_ap(dev, apdev):
674     """Associated STA reports 40 MHz intolerant AP after association"""
675     clear_scan_cache(apdev[0]['ifname'])
676     params = { "ssid": "ht",
677                "channel": "6",
678                "ht_capab": "[HT40-]",
679                "obss_interval": "3" }
680     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
681
682     dev[0].connect("ht", key_mgmt="NONE", scan_freq="2437")
683
684     if hapd.get_status_field("secondary_channel") != "-1":
685         raise Exception("Unexpected secondary channel information")
686
687     logger.info("Start 40 MHz intolerant AP")
688     params = { "ssid": "intolerant",
689                "channel": "5",
690                "ht_capab": "[40-INTOLERANT]" }
691     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
692
693     logger.info("Waiting for co-ex report from STA")
694     ok = False
695     for i in range(0, 20):
696         time.sleep(1)
697         if hapd.get_status_field("secondary_channel") == "0":
698             logger.info("AP moved to 20 MHz channel")
699             ok = True
700             break
701     if not ok:
702         raise Exception("AP did not move to 20 MHz channel")
703
704     if "OK" not in hapd2.request("DISABLE"):
705         raise Exception("Failed to disable 40 MHz intolerant AP")
706
707     # make sure the intolerant AP disappears from scan results more quickly
708     dev[0].scan(type="ONLY", freq="2432", only_new=True)
709     dev[0].scan(type="ONLY", freq="2432", only_new=True)
710     dev[0].dump_monitor()
711
712     logger.info("Waiting for AP to move back to 40 MHz channel")
713     ok = False
714     for i in range(0, 30):
715         time.sleep(1)
716         if hapd.get_status_field("secondary_channel") == "-1":
717             logger.info("AP moved to 40 MHz channel")
718             ok = True
719             break
720     if not ok:
721         raise Exception("AP did not move to 40 MHz channel")
722
723 def test_ap_ht40_csa(dev, apdev):
724     """HT with 40 MHz channel width and CSA"""
725     csa_supported(dev[0])
726     try:
727         hapd = None
728         params = { "ssid": "ht",
729                    "country_code": "US",
730                    "hw_mode": "a",
731                    "channel": "36",
732                    "ht_capab": "[HT40+]",
733                    "ieee80211n": "1" }
734         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
735
736         dev[0].connect("ht", key_mgmt="NONE", scan_freq="5180")
737         hwsim_utils.test_connectivity(dev[0], hapd)
738
739         hapd.request("CHAN_SWITCH 5 5200 ht sec_channel_offset=-1 bandwidth=40")
740         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
741         if ev is None:
742             raise Exception("CSA finished event timed out")
743         if "freq=5200" not in ev:
744             raise Exception("Unexpected channel in CSA finished event")
745         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
746         if ev is not None:
747             raise Exception("Unexpected STA disconnection during CSA")
748         hwsim_utils.test_connectivity(dev[0], hapd)
749
750         hapd.request("CHAN_SWITCH 5 5180 ht sec_channel_offset=1 bandwidth=40")
751         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
752         if ev is None:
753             raise Exception("CSA finished event timed out")
754         if "freq=5180" not in ev:
755             raise Exception("Unexpected channel in CSA finished event")
756         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
757         if ev is not None:
758             raise Exception("Unexpected STA disconnection during CSA")
759         hwsim_utils.test_connectivity(dev[0], hapd)
760     finally:
761         dev[0].request("DISCONNECT")
762         if hapd:
763             hapd.request("DISABLE")
764         subprocess.call(['iw', 'reg', 'set', '00'])
765         dev[0].flush_scan_cache()
766
767 def test_ap_ht40_csa2(dev, apdev):
768     """HT with 40 MHz channel width and CSA"""
769     csa_supported(dev[0])
770     try:
771         hapd = None
772         params = { "ssid": "ht",
773                    "country_code": "US",
774                    "hw_mode": "a",
775                    "channel": "36",
776                    "ht_capab": "[HT40+]",
777                    "ieee80211n": "1" }
778         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
779
780         dev[0].connect("ht", key_mgmt="NONE", scan_freq="5180")
781         hwsim_utils.test_connectivity(dev[0], hapd)
782
783         hapd.request("CHAN_SWITCH 5 5220 ht sec_channel_offset=1 bandwidth=40")
784         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
785         if ev is None:
786             raise Exception("CSA finished event timed out")
787         if "freq=5220" not in ev:
788             raise Exception("Unexpected channel in CSA finished event")
789         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
790         if ev is not None:
791             raise Exception("Unexpected STA disconnection during CSA")
792         hwsim_utils.test_connectivity(dev[0], hapd)
793
794         hapd.request("CHAN_SWITCH 5 5180 ht sec_channel_offset=1 bandwidth=40")
795         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
796         if ev is None:
797             raise Exception("CSA finished event timed out")
798         if "freq=5180" not in ev:
799             raise Exception("Unexpected channel in CSA finished event")
800         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
801         if ev is not None:
802             raise Exception("Unexpected STA disconnection during CSA")
803         hwsim_utils.test_connectivity(dev[0], hapd)
804     finally:
805         dev[0].request("DISCONNECT")
806         if hapd:
807             hapd.request("DISABLE")
808         subprocess.call(['iw', 'reg', 'set', '00'])
809         dev[0].flush_scan_cache()
810
811 def test_ap_ht40_csa3(dev, apdev):
812     """HT with 40 MHz channel width and CSA"""
813     csa_supported(dev[0])
814     try:
815         hapd = None
816         params = { "ssid": "ht",
817                    "country_code": "US",
818                    "hw_mode": "a",
819                    "channel": "36",
820                    "ht_capab": "[HT40+]",
821                    "ieee80211n": "1" }
822         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
823
824         dev[0].connect("ht", key_mgmt="NONE", scan_freq="5180")
825         hwsim_utils.test_connectivity(dev[0], hapd)
826
827         hapd.request("CHAN_SWITCH 5 5240 ht sec_channel_offset=-1 bandwidth=40")
828         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
829         if ev is None:
830             raise Exception("CSA finished event timed out")
831         if "freq=5240" not in ev:
832             raise Exception("Unexpected channel in CSA finished event")
833         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
834         if ev is not None:
835             raise Exception("Unexpected STA disconnection during CSA")
836         hwsim_utils.test_connectivity(dev[0], hapd)
837
838         hapd.request("CHAN_SWITCH 5 5180 ht sec_channel_offset=1 bandwidth=40")
839         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
840         if ev is None:
841             raise Exception("CSA finished event timed out")
842         if "freq=5180" not in ev:
843             raise Exception("Unexpected channel in CSA finished event")
844         ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
845         if ev is not None:
846             raise Exception("Unexpected STA disconnection during CSA")
847         hwsim_utils.test_connectivity(dev[0], hapd)
848     finally:
849         dev[0].request("DISCONNECT")
850         if hapd:
851             hapd.request("DISABLE")
852         subprocess.call(['iw', 'reg', 'set', '00'])
853         dev[0].flush_scan_cache()
854
855 def test_ap_ht_smps(dev, apdev):
856     """SMPS AP configuration options"""
857     params = { "ssid": "ht1", "ht_capab": "[SMPS-STATIC]" }
858     try:
859         hapd = hostapd.add_ap(apdev[0]['ifname'], params)
860     except:
861         raise HwsimSkip("Assume mac80211_hwsim was not recent enough to support SMPS")
862     params = { "ssid": "ht2", "ht_capab": "[SMPS-DYNAMIC]" }
863     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
864
865     dev[0].connect("ht1", key_mgmt="NONE", scan_freq="2412")
866     dev[1].connect("ht2", key_mgmt="NONE", scan_freq="2412")
867     hwsim_utils.test_connectivity(dev[0], hapd)
868     hwsim_utils.test_connectivity(dev[1], hapd2)
869
870 def test_prefer_ht20(dev, apdev):
871     """Preference on HT20 over no-HT"""
872     params = { "ssid": "test",
873                "channel": "1",
874                "ieee80211n": "0" }
875     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
876     bssid = apdev[0]['bssid']
877     params = { "ssid": "test",
878                "channel": "1",
879                "ieee80211n": "1" }
880     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
881     bssid2 = apdev[1]['bssid']
882
883     dev[0].scan_for_bss(bssid, freq=2412)
884     dev[0].scan_for_bss(bssid2, freq=2412)
885     dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
886     if dev[0].get_status_field('bssid') != bssid2:
887         raise Exception("Unexpected BSS selected")
888
889     est = dev[0].get_bss(bssid)['est_throughput']
890     if est != "54000":
891         raise Exception("Unexpected BSS0 est_throughput: " + est)
892
893     est = dev[0].get_bss(bssid2)['est_throughput']
894     if est != "65000":
895         raise Exception("Unexpected BSS1 est_throughput: " + est)
896
897 def test_prefer_ht40(dev, apdev):
898     """Preference on HT40 over HT20"""
899     params = { "ssid": "test",
900                "channel": "1",
901                "ieee80211n": "1" }
902     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
903     bssid = apdev[0]['bssid']
904     params = { "ssid": "test",
905                "channel": "1",
906                "ieee80211n": "1",
907                "ht_capab": "[HT40+]" }
908     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
909     bssid2 = apdev[1]['bssid']
910
911     dev[0].scan_for_bss(bssid, freq=2412)
912     dev[0].scan_for_bss(bssid2, freq=2412)
913     dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
914     if dev[0].get_status_field('bssid') != bssid2:
915         raise Exception("Unexpected BSS selected")
916
917     est = dev[0].get_bss(bssid)['est_throughput']
918     if est != "65000":
919         raise Exception("Unexpected BSS0 est_throughput: " + est)
920
921     est = dev[0].get_bss(bssid2)['est_throughput']
922     if est != "135000":
923         raise Exception("Unexpected BSS1 est_throughput: " + est)
924
925 def test_prefer_ht20_during_roam(dev, apdev):
926     """Preference on HT20 over no-HT in roaming consideration"""
927     params = { "ssid": "test",
928                "channel": "1",
929                "ieee80211n": "0" }
930     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
931     bssid = apdev[0]['bssid']
932
933     dev[0].scan_for_bss(bssid, freq=2412)
934     dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
935
936     params = { "ssid": "test",
937                "channel": "1",
938                "ieee80211n": "1" }
939     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
940     bssid2 = apdev[1]['bssid']
941     dev[0].scan_for_bss(bssid2, freq=2412)
942     dev[0].scan(freq=2412)
943     dev[0].wait_connected()
944     
945     if dev[0].get_status_field('bssid') != bssid2:
946         raise Exception("Unexpected BSS selected")