tests: hostapd behavior with interface in different bridge
[mech_eap.git] / tests / hwsim / test_ap_psk.py
1 # WPA2-Personal tests
2 # Copyright (c) 2014, Qualcomm Atheros, Inc.
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import hashlib
9 import hmac
10 import logging
11 logger = logging.getLogger()
12 import os
13 import struct
14 import subprocess
15 import time
16
17 import hostapd
18 import hwsim_utils
19
20 def check_mib(dev, vals):
21     mib = dev.get_mib()
22     for v in vals:
23         if mib[v[0]] != v[1]:
24             raise Exception("Unexpected {} = {} (expected {})".format(v[0], mib[v[0]], v[1]))
25
26 def test_ap_wpa2_psk(dev, apdev):
27     """WPA2-PSK AP with PSK instead of passphrase"""
28     ssid = "test-wpa2-psk"
29     passphrase = 'qwertyuiop'
30     psk = '602e323e077bc63bd80307ef4745b754b0ae0a925c2638ecd13a794b9527b9e6'
31     params = hostapd.wpa2_params(ssid=ssid)
32     params['wpa_psk'] = psk
33     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
34     key_mgmt = hapd.get_config()['key_mgmt']
35     if key_mgmt.split(' ')[0] != "WPA-PSK":
36         raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
37     dev[0].connect(ssid, raw_psk=psk, scan_freq="2412")
38     dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
39
40     sig = dev[0].request("SIGNAL_POLL").splitlines()
41     pkt = dev[0].request("PKTCNT_POLL").splitlines()
42     if "FREQUENCY=2412" not in sig:
43         raise Exception("Unexpected SIGNAL_POLL value: " + str(sig))
44     if "TXBAD=0" not in pkt:
45         raise Exception("Unexpected TXBAD value: " + str(pkt))
46
47 def test_ap_wpa2_psk_file(dev, apdev):
48     """WPA2-PSK AP with PSK from a file"""
49     ssid = "test-wpa2-psk"
50     passphrase = 'qwertyuiop'
51     psk = '602e323e077bc63bd80307ef4745b754b0ae0a925c2638ecd13a794b9527b9e6'
52     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
53     params['wpa_psk_file'] = 'hostapd.wpa_psk'
54     hostapd.add_ap(apdev[0]['ifname'], params)
55     dev[1].connect(ssid, psk="very secret", scan_freq="2412", wait_connect=False)
56     dev[2].connect(ssid, raw_psk=psk, scan_freq="2412")
57     dev[2].request("REMOVE_NETWORK all")
58     dev[0].connect(ssid, psk="very secret", scan_freq="2412")
59     dev[0].request("REMOVE_NETWORK all")
60     dev[2].connect(ssid, psk="another passphrase for all STAs", scan_freq="2412")
61     dev[0].connect(ssid, psk="another passphrase for all STAs", scan_freq="2412")
62     ev = dev[1].wait_event(["WPA: 4-Way Handshake failed"], timeout=10)
63     if ev is None:
64         raise Exception("Timed out while waiting for failure report")
65     dev[1].request("REMOVE_NETWORK all")
66
67 def test_ap_wpa2_ptk_rekey(dev, apdev):
68     """WPA2-PSK AP and PTK rekey enforced by station"""
69     ssid = "test-wpa2-psk"
70     passphrase = 'qwertyuiop'
71     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
72     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
73     dev[0].connect(ssid, psk=passphrase, wpa_ptk_rekey="1", scan_freq="2412")
74     ev = dev[0].wait_event(["WPA: Key negotiation completed"])
75     if ev is None:
76         raise Exception("PTK rekey timed out")
77     hwsim_utils.test_connectivity(dev[0], hapd)
78
79 def test_ap_wpa2_ptk_rekey_ap(dev, apdev):
80     """WPA2-PSK AP and PTK rekey enforced by AP"""
81     ssid = "test-wpa2-psk"
82     passphrase = 'qwertyuiop'
83     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
84     params['wpa_ptk_rekey'] = '2'
85     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
86     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
87     ev = dev[0].wait_event(["WPA: Key negotiation completed"])
88     if ev is None:
89         raise Exception("PTK rekey timed out")
90     hwsim_utils.test_connectivity(dev[0], hapd)
91
92 def test_ap_wpa2_sha256_ptk_rekey(dev, apdev):
93     """WPA2-PSK/SHA256 AKM AP and PTK rekey enforced by station"""
94     ssid = "test-wpa2-psk"
95     passphrase = 'qwertyuiop'
96     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
97     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
98     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
99     dev[0].connect(ssid, psk=passphrase, key_mgmt="WPA-PSK-SHA256",
100                    wpa_ptk_rekey="1", scan_freq="2412")
101     ev = dev[0].wait_event(["WPA: Key negotiation completed"])
102     if ev is None:
103         raise Exception("PTK rekey timed out")
104     hwsim_utils.test_connectivity(dev[0], hapd)
105     check_mib(dev[0], [ ("dot11RSNAAuthenticationSuiteRequested", "00-0f-ac-6"),
106                         ("dot11RSNAAuthenticationSuiteSelected", "00-0f-ac-6") ])
107
108 def test_ap_wpa2_sha256_ptk_rekey_ap(dev, apdev):
109     """WPA2-PSK/SHA256 AKM AP and PTK rekey enforced by AP"""
110     ssid = "test-wpa2-psk"
111     passphrase = 'qwertyuiop'
112     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
113     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
114     params['wpa_ptk_rekey'] = '2'
115     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
116     dev[0].connect(ssid, psk=passphrase, key_mgmt="WPA-PSK-SHA256",
117                    scan_freq="2412")
118     ev = dev[0].wait_event(["WPA: Key negotiation completed"])
119     if ev is None:
120         raise Exception("PTK rekey timed out")
121     hwsim_utils.test_connectivity(dev[0], hapd)
122     check_mib(dev[0], [ ("dot11RSNAAuthenticationSuiteRequested", "00-0f-ac-6"),
123                         ("dot11RSNAAuthenticationSuiteSelected", "00-0f-ac-6") ])
124
125 def test_ap_wpa_ptk_rekey(dev, apdev):
126     """WPA-PSK/TKIP AP and PTK rekey enforced by station"""
127     ssid = "test-wpa-psk"
128     passphrase = 'qwertyuiop'
129     params = hostapd.wpa_params(ssid=ssid, passphrase=passphrase)
130     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
131     dev[0].connect(ssid, psk=passphrase, wpa_ptk_rekey="1", scan_freq="2412")
132     if "[WPA-PSK-TKIP]" not in dev[0].request("SCAN_RESULTS"):
133         raise Exception("Scan results missing WPA element info")
134     ev = dev[0].wait_event(["WPA: Key negotiation completed"])
135     if ev is None:
136         raise Exception("PTK rekey timed out")
137     hwsim_utils.test_connectivity(dev[0], hapd)
138
139 def test_ap_wpa_ptk_rekey_ap(dev, apdev):
140     """WPA-PSK/TKIP AP and PTK rekey enforced by AP"""
141     ssid = "test-wpa-psk"
142     passphrase = 'qwertyuiop'
143     params = hostapd.wpa_params(ssid=ssid, passphrase=passphrase)
144     params['wpa_ptk_rekey'] = '2'
145     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
146     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
147     ev = dev[0].wait_event(["WPA: Key negotiation completed"], timeout=10)
148     if ev is None:
149         raise Exception("PTK rekey timed out")
150     hwsim_utils.test_connectivity(dev[0], hapd)
151
152 def test_ap_wpa_ccmp(dev, apdev):
153     """WPA-PSK/CCMP"""
154     ssid = "test-wpa-psk"
155     passphrase = 'qwertyuiop'
156     params = hostapd.wpa_params(ssid=ssid, passphrase=passphrase)
157     params['wpa_pairwise'] = "CCMP"
158     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
159     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
160     hwsim_utils.test_connectivity(dev[0], hapd)
161     check_mib(dev[0], [ ("dot11RSNAConfigGroupCipherSize", "128"),
162                         ("dot11RSNAGroupCipherRequested", "00-50-f2-4"),
163                         ("dot11RSNAPairwiseCipherRequested", "00-50-f2-4"),
164                         ("dot11RSNAAuthenticationSuiteRequested", "00-50-f2-2"),
165                         ("dot11RSNAGroupCipherSelected", "00-50-f2-4"),
166                         ("dot11RSNAPairwiseCipherSelected", "00-50-f2-4"),
167                         ("dot11RSNAAuthenticationSuiteSelected", "00-50-f2-2"),
168                         ("dot1xSuppSuppControlledPortStatus", "Authorized") ])
169
170 def test_ap_wpa2_psk_file(dev, apdev):
171     """WPA2-PSK AP with various PSK file error and success cases"""
172     addr0 = dev[0].p2p_dev_addr()
173     addr1 = dev[1].p2p_dev_addr()
174     addr2 = dev[2].p2p_dev_addr()
175     ssid = "psk"
176     pskfile = "/tmp/ap_wpa2_psk_file_errors.psk_file"
177     try:
178         os.remove(pskfile)
179     except:
180         pass
181
182     params = { "ssid": ssid, "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
183                "rsn_pairwise": "CCMP", "wpa_psk_file": pskfile }
184
185     try:
186         # missing PSK file
187         hapd = hostapd.add_ap(apdev[0]['ifname'], params, no_enable=True)
188         if "FAIL" not in hapd.request("ENABLE"):
189             raise Exception("Unexpected ENABLE success")
190         hapd.request("DISABLE")
191
192         # invalid MAC address
193         with open(pskfile, "w") as f:
194             f.write("\n")
195             f.write("foo\n")
196         if "FAIL" not in hapd.request("ENABLE"):
197             raise Exception("Unexpected ENABLE success")
198         hapd.request("DISABLE")
199
200         # no PSK on line
201         with open(pskfile, "w") as f:
202             f.write("00:11:22:33:44:55\n")
203         if "FAIL" not in hapd.request("ENABLE"):
204             raise Exception("Unexpected ENABLE success")
205         hapd.request("DISABLE")
206
207         # invalid PSK
208         with open(pskfile, "w") as f:
209             f.write("00:11:22:33:44:55 1234567\n")
210         if "FAIL" not in hapd.request("ENABLE"):
211             raise Exception("Unexpected ENABLE success")
212         hapd.request("DISABLE")
213
214         # valid PSK file
215         with open(pskfile, "w") as f:
216             f.write("00:11:22:33:44:55 12345678\n")
217             f.write(addr0 + " 123456789\n")
218             f.write(addr1 + " 123456789a\n")
219             f.write(addr2 + " 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef\n")
220         if "FAIL" in hapd.request("ENABLE"):
221             raise Exception("Unexpected ENABLE failure")
222
223         dev[0].connect(ssid, psk="123456789", scan_freq="2412")
224         dev[1].connect(ssid, psk="123456789a", scan_freq="2412")
225         dev[2].connect(ssid, raw_psk="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", scan_freq="2412")
226
227     finally:
228         try:
229             os.remove(pskfile)
230         except:
231             pass
232
233 def test_ap_wpa2_psk_wildcard_ssid(dev, apdev):
234     """WPA2-PSK AP and wildcard SSID configuration"""
235     ssid = "test-wpa2-psk"
236     passphrase = 'qwertyuiop'
237     psk = '602e323e077bc63bd80307ef4745b754b0ae0a925c2638ecd13a794b9527b9e6'
238     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
239     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
240     dev[0].connect("", bssid=apdev[0]['bssid'], psk=passphrase,
241                    scan_freq="2412")
242     dev[1].connect("", bssid=apdev[0]['bssid'], raw_psk=psk, scan_freq="2412")
243
244 def test_ap_wpa2_gtk_rekey(dev, apdev):
245     """WPA2-PSK AP and GTK rekey enforced by AP"""
246     ssid = "test-wpa2-psk"
247     passphrase = 'qwertyuiop'
248     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
249     params['wpa_group_rekey'] = '1'
250     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
251     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
252     ev = dev[0].wait_event(["WPA: Group rekeying completed"], timeout=2)
253     if ev is None:
254         raise Exception("GTK rekey timed out")
255     hwsim_utils.test_connectivity(dev[0], hapd)
256
257 def test_ap_wpa_gtk_rekey(dev, apdev):
258     """WPA-PSK/TKIP AP and GTK rekey enforced by AP"""
259     ssid = "test-wpa-psk"
260     passphrase = 'qwertyuiop'
261     params = hostapd.wpa_params(ssid=ssid, passphrase=passphrase)
262     params['wpa_group_rekey'] = '1'
263     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
264     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
265     ev = dev[0].wait_event(["WPA: Group rekeying completed"], timeout=2)
266     if ev is None:
267         raise Exception("GTK rekey timed out")
268     hwsim_utils.test_connectivity(dev[0], hapd)
269
270 def test_ap_wpa2_gmk_rekey(dev, apdev):
271     """WPA2-PSK AP and GMK and GTK rekey enforced by AP"""
272     ssid = "test-wpa2-psk"
273     passphrase = 'qwertyuiop'
274     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
275     params['wpa_group_rekey'] = '1'
276     params['wpa_gmk_rekey'] = '2'
277     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
278     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
279     for i in range(0, 3):
280         ev = dev[0].wait_event(["WPA: Group rekeying completed"], timeout=2)
281         if ev is None:
282             raise Exception("GTK rekey timed out")
283     hwsim_utils.test_connectivity(dev[0], hapd)
284
285 def test_ap_wpa2_strict_rekey(dev, apdev):
286     """WPA2-PSK AP and strict GTK rekey enforced by AP"""
287     ssid = "test-wpa2-psk"
288     passphrase = 'qwertyuiop'
289     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
290     params['wpa_strict_rekey'] = '1'
291     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
292     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
293     dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
294     dev[1].request("DISCONNECT")
295     ev = dev[0].wait_event(["WPA: Group rekeying completed"], timeout=2)
296     if ev is None:
297         raise Exception("GTK rekey timed out")
298     hwsim_utils.test_connectivity(dev[0], hapd)
299
300 def test_ap_wpa2_bridge_fdb(dev, apdev):
301     """Bridge FDB entry removal"""
302     try:
303         ssid = "test-wpa2-psk"
304         passphrase = "12345678"
305         params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
306         params['bridge'] = 'ap-br0'
307         hostapd.add_ap(apdev[0]['ifname'], params)
308         subprocess.call(['sudo', 'brctl', 'setfd', 'ap-br0', '0'])
309         subprocess.call(['sudo', 'ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
310         dev[0].connect(ssid, psk=passphrase, scan_freq="2412",
311                        bssid=apdev[0]['bssid'])
312         dev[1].connect(ssid, psk=passphrase, scan_freq="2412",
313                        bssid=apdev[0]['bssid'])
314         addr0 = dev[0].p2p_interface_addr()
315         hwsim_utils.test_connectivity_sta(dev[0], dev[1])
316         cmd = subprocess.Popen(['brctl', 'showmacs', 'ap-br0'],
317                                stdout=subprocess.PIPE)
318         macs1 = cmd.stdout.read()
319         dev[0].request("DISCONNECT")
320         dev[1].request("DISCONNECT")
321         time.sleep(1)
322         cmd = subprocess.Popen(['brctl', 'showmacs', 'ap-br0'],
323                                stdout=subprocess.PIPE)
324         macs2 = cmd.stdout.read()
325
326         addr1 = dev[1].p2p_interface_addr()
327         if addr0 not in macs1 or addr1 not in macs1:
328             raise Exception("Bridge FDB entry missing")
329         if addr0 in macs2 or addr1 in macs2:
330             raise Exception("Bridge FDB entry was not removed")
331     finally:
332         subprocess.call(['sudo', 'ip', 'link', 'set', 'dev', 'ap-br0', 'down'])
333         subprocess.call(['sudo', 'brctl', 'delbr', 'ap-br0'])
334
335 def test_ap_wpa2_already_in_bridge(dev, apdev):
336     """hostapd behavior with interface already in bridge"""
337     ifname = apdev[0]['ifname']
338     br_ifname = 'ext-ap-br0'
339     try:
340         ssid = "test-wpa2-psk"
341         passphrase = "12345678"
342         subprocess.call(['brctl', 'addbr', br_ifname])
343         subprocess.call(['brctl', 'setfd', br_ifname, '0'])
344         subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'up'])
345         subprocess.call(['iw', ifname, 'set', 'type', '__ap'])
346         subprocess.call(['brctl', 'addif', br_ifname, ifname])
347         params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
348         hapd = hostapd.add_ap(ifname, params)
349         if hapd.get_driver_status_field('brname') != br_ifname:
350             raise Exception("Bridge name not identified correctly")
351         dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
352     finally:
353         subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'down'])
354         subprocess.call(['brctl', 'delif', br_ifname, ifname])
355         subprocess.call(['iw', ifname, 'set', 'type', 'station'])
356         subprocess.call(['brctl', 'delbr', br_ifname])
357
358 def test_ap_wpa2_in_different_bridge(dev, apdev):
359     """hostapd behavior with interface in different bridge"""
360     ifname = apdev[0]['ifname']
361     br_ifname = 'ext-ap-br0'
362     try:
363         ssid = "test-wpa2-psk"
364         passphrase = "12345678"
365         subprocess.call(['brctl', 'addbr', br_ifname])
366         subprocess.call(['brctl', 'setfd', br_ifname, '0'])
367         subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'up'])
368         subprocess.call(['iw', ifname, 'set', 'type', '__ap'])
369         subprocess.call(['brctl', 'addif', br_ifname, ifname])
370         time.sleep(0.5)
371         params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
372         params['bridge'] = 'ap-br0'
373         hapd = hostapd.add_ap(ifname, params)
374         subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
375         subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
376         brname = hapd.get_driver_status_field('brname')
377         if brname != 'ap-br0':
378             raise Exception("Incorrect bridge: " + brname)
379         dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
380         hwsim_utils.test_connectivity_iface(dev[0], hapd, "ap-br0")
381         if hapd.get_driver_status_field("added_bridge") != "1":
382             raise Exception("Unexpected added_bridge value")
383         if hapd.get_driver_status_field("added_if_into_bridge") != "1":
384             raise Exception("Unexpected added_if_into_bridge value")
385         dev[0].request("DISCONNECT")
386         hapd.disable()
387         subprocess.call(['ip', 'link', 'show', 'ap-br0'])
388         subprocess.call(['brctl', 'show'])
389     finally:
390         subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'down'])
391         subprocess.call(['brctl', 'delif', br_ifname, ifname],
392                         stderr=open('/dev/null', 'w'))
393         subprocess.call(['brctl', 'delbr', br_ifname])
394         subprocess.call(['brctl', 'show'])
395
396 def test_ap_wpa2_ext_add_to_bridge(dev, apdev):
397     """hostapd behavior with interface added to bridge externally"""
398     ifname = apdev[0]['ifname']
399     br_ifname = 'ext-ap-br0'
400     try:
401         ssid = "test-wpa2-psk"
402         passphrase = "12345678"
403         params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
404         hapd = hostapd.add_ap(ifname, params)
405
406         subprocess.call(['brctl', 'addbr', br_ifname])
407         subprocess.call(['brctl', 'setfd', br_ifname, '0'])
408         subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'up'])
409         subprocess.call(['brctl', 'addif', br_ifname, ifname])
410         dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
411         if hapd.get_driver_status_field('brname') != br_ifname:
412             raise Exception("Bridge name not identified correctly")
413     finally:
414         subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'down'])
415         subprocess.call(['brctl', 'delif', br_ifname, ifname])
416         subprocess.call(['brctl', 'delbr', br_ifname])
417
418 def test_ap_wpa2_psk_ext(dev, apdev):
419     """WPA2-PSK AP using external EAPOL I/O"""
420     bssid = apdev[0]['bssid']
421     ssid = "test-wpa2-psk"
422     passphrase = 'qwertyuiop'
423     psk = '602e323e077bc63bd80307ef4745b754b0ae0a925c2638ecd13a794b9527b9e6'
424     params = hostapd.wpa2_params(ssid=ssid)
425     params['wpa_psk'] = psk
426     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
427     hapd.request("SET ext_eapol_frame_io 1")
428     dev[0].request("SET ext_eapol_frame_io 1")
429     dev[0].connect(ssid, psk=passphrase, scan_freq="2412", wait_connect=False)
430     addr = dev[0].p2p_interface_addr()
431     while True:
432         ev = hapd.wait_event(["EAPOL-TX", "AP-STA-CONNECTED"], timeout=15)
433         if ev is None:
434             raise Exception("Timeout on EAPOL-TX from hostapd")
435         if "AP-STA-CONNECTED" in ev:
436             dev[0].wait_connected(timeout=15)
437             break
438         res = dev[0].request("EAPOL_RX " + bssid + " " + ev.split(' ')[2])
439         if "OK" not in res:
440             raise Exception("EAPOL_RX to wpa_supplicant failed")
441         ev = dev[0].wait_event(["EAPOL-TX", "CTRL-EVENT-CONNECTED"], timeout=15)
442         if ev is None:
443             raise Exception("Timeout on EAPOL-TX from wpa_supplicant")
444         if "CTRL-EVENT-CONNECTED" in ev:
445             break
446         res = hapd.request("EAPOL_RX " + addr + " " + ev.split(' ')[2])
447         if "OK" not in res:
448             raise Exception("EAPOL_RX to hostapd failed")
449
450 def parse_eapol(data):
451     (version, type, length) = struct.unpack('>BBH', data[0:4])
452     payload = data[4:]
453     if length > len(payload):
454         raise Exception("Invalid EAPOL length")
455     if length < len(payload):
456         payload = payload[0:length]
457     eapol = {}
458     eapol['version'] = version
459     eapol['type'] = type
460     eapol['length'] = length
461     eapol['payload'] = payload
462     if type == 3:
463         # EAPOL-Key
464         (eapol['descr_type'],) = struct.unpack('B', payload[0:1])
465         payload = payload[1:]
466         if eapol['descr_type'] == 2 or eapol['descr_type'] == 254:
467             # RSN EAPOL-Key
468             (key_info, key_len) = struct.unpack('>HH', payload[0:4])
469             eapol['rsn_key_info'] = key_info
470             eapol['rsn_key_len'] = key_len
471             eapol['rsn_replay_counter'] = payload[4:12]
472             eapol['rsn_key_nonce'] = payload[12:44]
473             eapol['rsn_key_iv'] = payload[44:60]
474             eapol['rsn_key_rsc'] = payload[60:68]
475             eapol['rsn_key_id'] = payload[68:76]
476             eapol['rsn_key_mic'] = payload[76:92]
477             payload = payload[92:]
478             (eapol['rsn_key_data_len'],) = struct.unpack('>H', payload[0:2])
479             payload = payload[2:]
480             eapol['rsn_key_data'] = payload
481     return eapol
482
483 def build_eapol(msg):
484     data = struct.pack(">BBH", msg['version'], msg['type'], msg['length'])
485     if msg['type'] == 3:
486         data += struct.pack('>BHH', msg['descr_type'], msg['rsn_key_info'],
487                             msg['rsn_key_len'])
488         data += msg['rsn_replay_counter']
489         data += msg['rsn_key_nonce']
490         data += msg['rsn_key_iv']
491         data += msg['rsn_key_rsc']
492         data += msg['rsn_key_id']
493         data += msg['rsn_key_mic']
494         data += struct.pack('>H', msg['rsn_key_data_len'])
495         data += msg['rsn_key_data']
496     else:
497         data += msg['payload']
498     return data
499
500 def sha1_prf(key, label, data, outlen):
501     res = ''
502     counter = 0
503     while outlen > 0:
504         m = hmac.new(key, label, hashlib.sha1)
505         m.update(struct.pack('B', 0))
506         m.update(data)
507         m.update(struct.pack('B', counter))
508         counter += 1
509         hash = m.digest()
510         if outlen > len(hash):
511             res += hash
512             outlen -= len(hash)
513         else:
514             res += hash[0:outlen]
515             outlen = 0
516     return res
517
518 def pmk_to_ptk(pmk, addr1, addr2, nonce1, nonce2):
519     if addr1 < addr2:
520         data = binascii.unhexlify(addr1.replace(':','')) + binascii.unhexlify(addr2.replace(':',''))
521     else:
522         data = binascii.unhexlify(addr2.replace(':','')) + binascii.unhexlify(addr1.replace(':',''))
523     if nonce1 < nonce2:
524         data += nonce1 + nonce2
525     else:
526         data += nonce2 + nonce1
527     label = "Pairwise key expansion"
528     ptk = sha1_prf(pmk, label, data, 48)
529     kck = ptk[0:16]
530     kek = ptk[16:32]
531     return (ptk, kck, kek)
532
533 def eapol_key_mic(kck, msg):
534     msg['rsn_key_mic'] = binascii.unhexlify('00000000000000000000000000000000')
535     data = build_eapol(msg)
536     m = hmac.new(kck, data, hashlib.sha1)
537     msg['rsn_key_mic'] = m.digest()[0:16]
538
539 def rsn_eapol_key_set(msg, key_info, key_len, nonce, data):
540     msg['rsn_key_info'] = key_info
541     msg['rsn_key_len'] = key_len
542     if nonce:
543         msg['rsn_key_nonce'] = nonce
544     else:
545         msg['rsn_key_nonce'] = binascii.unhexlify('0000000000000000000000000000000000000000000000000000000000000000')
546     if data:
547         msg['rsn_key_data_len'] = len(data)
548         msg['rsn_key_data'] = data
549         msg['length'] = 95 + len(data)
550     else:
551         msg['rsn_key_data_len'] = 0
552         msg['rsn_key_data'] = ''
553         msg['length'] = 95
554
555 def recv_eapol(hapd):
556     ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
557     if ev is None:
558         raise Exception("Timeout on EAPOL-TX from hostapd")
559     eapol = binascii.unhexlify(ev.split(' ')[2])
560     return parse_eapol(eapol)
561
562 def send_eapol(hapd, addr, data):
563     res = hapd.request("EAPOL_RX " + addr + " " + binascii.hexlify(data))
564     if "OK" not in res:
565         raise Exception("EAPOL_RX to hostapd failed")
566
567 def reply_eapol(info, hapd, addr, msg, key_info, nonce, data, kck):
568     logger.info("Send EAPOL-Key msg " + info)
569     rsn_eapol_key_set(msg, key_info, 0, nonce, data)
570     eapol_key_mic(kck, msg)
571     send_eapol(hapd, addr, build_eapol(msg))
572
573 def hapd_connected(hapd):
574     ev = hapd.wait_event(["AP-STA-CONNECTED"], timeout=15)
575     if ev is None:
576         raise Exception("Timeout on AP-STA-CONNECTED from hostapd")
577
578 def eapol_test(apdev, dev, wpa2=True):
579     bssid = apdev['bssid']
580     if wpa2:
581         ssid = "test-wpa2-psk"
582     else:
583         ssid = "test-wpa-psk"
584     psk = '602e323e077bc63bd80307ef4745b754b0ae0a925c2638ecd13a794b9527b9e6'
585     pmk = binascii.unhexlify(psk)
586     if wpa2:
587         params = hostapd.wpa2_params(ssid=ssid)
588     else:
589         params = hostapd.wpa_params(ssid=ssid)
590     params['wpa_psk'] = psk
591     hapd = hostapd.add_ap(apdev['ifname'], params)
592     hapd.request("SET ext_eapol_frame_io 1")
593     dev.request("SET ext_eapol_frame_io 1")
594     dev.connect(ssid, psk="not used", scan_freq="2412", wait_connect=False)
595     addr = dev.p2p_interface_addr()
596     if wpa2:
597         rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac020000')
598     else:
599         rsne = binascii.unhexlify('dd160050f20101000050f20201000050f20201000050f202')
600     snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
601     return (bssid,ssid,hapd,snonce,pmk,addr,rsne)
602
603 def test_ap_wpa2_psk_ext_eapol(dev, apdev):
604     """WPA2-PSK AP using external EAPOL supplicant"""
605     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
606
607     msg = recv_eapol(hapd)
608     anonce = msg['rsn_key_nonce']
609     logger.info("Replay same data back")
610     send_eapol(hapd, addr, build_eapol(msg))
611
612     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
613
614     logger.info("Truncated Key Data in EAPOL-Key msg 2/4")
615     rsn_eapol_key_set(msg, 0x0101, 0, snonce, rsne)
616     msg['length'] = 95 + 22 - 1
617     send_eapol(hapd, addr, build_eapol(msg))
618
619     reply_eapol("2/4", hapd, addr, msg, 0x010a, snonce, rsne, kck)
620
621     msg = recv_eapol(hapd)
622     if anonce != msg['rsn_key_nonce']:
623         raise Exception("ANonce changed")
624     logger.info("Replay same data back")
625     send_eapol(hapd, addr, build_eapol(msg))
626
627     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
628     hapd_connected(hapd)
629
630 def test_ap_wpa2_psk_ext_eapol_retry1(dev, apdev):
631     """WPA2 4-way handshake with EAPOL-Key 1/4 retransmitted"""
632     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
633
634     msg1 = recv_eapol(hapd)
635     anonce = msg1['rsn_key_nonce']
636
637     msg2 = recv_eapol(hapd)
638     if anonce != msg2['rsn_key_nonce']:
639         raise Exception("ANonce changed")
640
641     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
642
643     logger.info("Send EAPOL-Key msg 2/4")
644     msg = msg2
645     rsn_eapol_key_set(msg, 0x010a, 0, snonce, rsne)
646     eapol_key_mic(kck, msg)
647     send_eapol(hapd, addr, build_eapol(msg))
648
649     msg = recv_eapol(hapd)
650     if anonce != msg['rsn_key_nonce']:
651         raise Exception("ANonce changed")
652
653     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
654     hapd_connected(hapd)
655
656 def test_ap_wpa2_psk_ext_eapol_retry1b(dev, apdev):
657     """WPA2 4-way handshake with EAPOL-Key 1/4 and 2/4 retransmitted"""
658     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
659
660     msg1 = recv_eapol(hapd)
661     anonce = msg1['rsn_key_nonce']
662     msg2 = recv_eapol(hapd)
663     if anonce != msg2['rsn_key_nonce']:
664         raise Exception("ANonce changed")
665
666     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
667     reply_eapol("2/4 (a)", hapd, addr, msg1, 0x010a, snonce, rsne, kck)
668     reply_eapol("2/4 (b)", hapd, addr, msg2, 0x010a, snonce, rsne, kck)
669
670     msg = recv_eapol(hapd)
671     if anonce != msg['rsn_key_nonce']:
672         raise Exception("ANonce changed")
673
674     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
675     hapd_connected(hapd)
676
677 def test_ap_wpa2_psk_ext_eapol_retry1c(dev, apdev):
678     """WPA2 4-way handshake with EAPOL-Key 1/4 and 2/4 retransmitted and SNonce changing"""
679     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
680
681     msg1 = recv_eapol(hapd)
682     anonce = msg1['rsn_key_nonce']
683
684     msg2 = recv_eapol(hapd)
685     if anonce != msg2['rsn_key_nonce']:
686         raise Exception("ANonce changed")
687     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
688     reply_eapol("2/4 (a)", hapd, addr, msg1, 0x010a, snonce, rsne, kck)
689
690     snonce2 = binascii.unhexlify('2222222222222222222222222222222222222222222222222222222222222222')
691     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce2, anonce)
692     reply_eapol("2/4 (b)", hapd, addr, msg2, 0x010a, snonce2, rsne, kck)
693
694     msg = recv_eapol(hapd)
695     if anonce != msg['rsn_key_nonce']:
696         raise Exception("ANonce changed")
697     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
698     hapd_connected(hapd)
699
700 def test_ap_wpa2_psk_ext_eapol_retry1d(dev, apdev):
701     """WPA2 4-way handshake with EAPOL-Key 1/4 and 2/4 retransmitted and SNonce changing and older used"""
702     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
703
704     msg1 = recv_eapol(hapd)
705     anonce = msg1['rsn_key_nonce']
706     msg2 = recv_eapol(hapd)
707     if anonce != msg2['rsn_key_nonce']:
708         raise Exception("ANonce changed")
709
710     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
711     reply_eapol("2/4 (a)", hapd, addr, msg1, 0x010a, snonce, rsne, kck)
712
713     snonce2 = binascii.unhexlify('2222222222222222222222222222222222222222222222222222222222222222')
714     (ptk2, kck2, kek2) = pmk_to_ptk(pmk, addr, bssid, snonce2, anonce)
715
716     reply_eapol("2/4 (b)", hapd, addr, msg2, 0x010a, snonce2, rsne, kck2)
717     msg = recv_eapol(hapd)
718     if anonce != msg['rsn_key_nonce']:
719         raise Exception("ANonce changed")
720     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
721     hapd_connected(hapd)
722
723 def test_ap_wpa2_psk_ext_eapol_type_diff(dev, apdev):
724     """WPA2 4-way handshake using external EAPOL supplicant"""
725     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
726
727     msg = recv_eapol(hapd)
728     anonce = msg['rsn_key_nonce']
729
730     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
731
732     # Incorrect descriptor type (frame dropped)
733     msg['descr_type'] = 253
734     rsn_eapol_key_set(msg, 0x010a, 0, snonce, rsne)
735     eapol_key_mic(kck, msg)
736     send_eapol(hapd, addr, build_eapol(msg))
737
738     # Incorrect descriptor type, but with a workaround (frame processed)
739     msg['descr_type'] = 254
740     rsn_eapol_key_set(msg, 0x010a, 0, snonce, rsne)
741     eapol_key_mic(kck, msg)
742     send_eapol(hapd, addr, build_eapol(msg))
743
744     msg = recv_eapol(hapd)
745     if anonce != msg['rsn_key_nonce']:
746         raise Exception("ANonce changed")
747     logger.info("Replay same data back")
748     send_eapol(hapd, addr, build_eapol(msg))
749
750     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
751     hapd_connected(hapd)
752
753 def test_ap_wpa_psk_ext_eapol(dev, apdev):
754     """WPA2-PSK AP using external EAPOL supplicant"""
755     (bssid,ssid,hapd,snonce,pmk,addr,wpae) = eapol_test(apdev[0], dev[0],
756                                                         wpa2=False)
757
758     msg = recv_eapol(hapd)
759     anonce = msg['rsn_key_nonce']
760     logger.info("Replay same data back")
761     send_eapol(hapd, addr, build_eapol(msg))
762     logger.info("Too short data")
763     send_eapol(hapd, addr, build_eapol(msg)[0:98])
764
765     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
766     msg['descr_type'] = 2
767     reply_eapol("2/4(invalid type)", hapd, addr, msg, 0x010a, snonce, wpae, kck)
768     msg['descr_type'] = 254
769     reply_eapol("2/4", hapd, addr, msg, 0x010a, snonce, wpae, kck)
770
771     msg = recv_eapol(hapd)
772     if anonce != msg['rsn_key_nonce']:
773         raise Exception("ANonce changed")
774     logger.info("Replay same data back")
775     send_eapol(hapd, addr, build_eapol(msg))
776
777     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
778     hapd_connected(hapd)
779
780 def test_ap_wpa2_psk_ext_eapol_key_info(dev, apdev):
781     """WPA2-PSK 4-way handshake with strange key info values"""
782     (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
783
784     msg = recv_eapol(hapd)
785     anonce = msg['rsn_key_nonce']
786
787     (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
788     rsn_eapol_key_set(msg, 0x0000, 0, snonce, rsne)
789     send_eapol(hapd, addr, build_eapol(msg))
790     rsn_eapol_key_set(msg, 0xffff, 0, snonce, rsne)
791     send_eapol(hapd, addr, build_eapol(msg))
792     # SMK M1
793     rsn_eapol_key_set(msg, 0x2802, 0, snonce, rsne)
794     send_eapol(hapd, addr, build_eapol(msg))
795     # SMK M3
796     rsn_eapol_key_set(msg, 0x2002, 0, snonce, rsne)
797     send_eapol(hapd, addr, build_eapol(msg))
798     # Request
799     rsn_eapol_key_set(msg, 0x0902, 0, snonce, rsne)
800     send_eapol(hapd, addr, build_eapol(msg))
801     # Request
802     rsn_eapol_key_set(msg, 0x0902, 0, snonce, rsne)
803     tmp_kck = binascii.unhexlify('00000000000000000000000000000000')
804     eapol_key_mic(tmp_kck, msg)
805     send_eapol(hapd, addr, build_eapol(msg))
806
807     reply_eapol("2/4", hapd, addr, msg, 0x010a, snonce, rsne, kck)
808
809     msg = recv_eapol(hapd)
810     if anonce != msg['rsn_key_nonce']:
811         raise Exception("ANonce changed")
812
813     # Request (valic MIC)
814     rsn_eapol_key_set(msg, 0x0902, 0, snonce, rsne)
815     eapol_key_mic(kck, msg)
816     send_eapol(hapd, addr, build_eapol(msg))
817     # Request (valid MIC, replayed counter)
818     rsn_eapol_key_set(msg, 0x0902, 0, snonce, rsne)
819     eapol_key_mic(kck, msg)
820     send_eapol(hapd, addr, build_eapol(msg))
821
822     reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
823     hapd_connected(hapd)