tests: Authenticator 4-way handshake protocol testing
authorJouni Malinen <jouni@qca.qualcomm.com>
Fri, 21 Nov 2014 18:06:33 +0000 (20:06 +0200)
committerJouni Malinen <j@w1.fi>
Fri, 21 Nov 2014 18:50:41 +0000 (20:50 +0200)
This implements minimal RSN 4-way handshake Supplicant in Python and
uses that to test hostapd Authenticator implementation in various
possible protocol sequencies.

Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
tests/hwsim/test_ap_psk.py

index dfa3d3a..5408ada 100644 (file)
@@ -4,9 +4,13 @@
 # This software may be distributed under the terms of the BSD license.
 # See README for more details.
 
+import binascii
+import hashlib
+import hmac
 import logging
 logger = logging.getLogger()
 import os
+import struct
 import subprocess
 import time
 
@@ -406,3 +410,267 @@ def test_ap_wpa2_psk_ext(dev, apdev):
         res = hapd.request("EAPOL_RX " + addr + " " + ev.split(' ')[2])
         if "OK" not in res:
             raise Exception("EAPOL_RX to hostapd failed")
+
+def parse_eapol(data):
+    (version, type, length) = struct.unpack('>BBH', data[0:4])
+    payload = data[4:]
+    if length > len(payload):
+        raise Exception("Invalid EAPOL length")
+    if length < len(payload):
+        payload = payload[0:length]
+    eapol = {}
+    eapol['version'] = version
+    eapol['type'] = type
+    eapol['length'] = length
+    eapol['payload'] = payload
+    if type == 3:
+        # EAPOL-Key
+        (eapol['descr_type'],) = struct.unpack('B', payload[0:1])
+        payload = payload[1:]
+        if eapol['descr_type'] == 2:
+            # RSN EAPOL-Key
+            (key_info, key_len) = struct.unpack('>HH', payload[0:4])
+            eapol['rsn_key_info'] = key_info
+            eapol['rsn_key_len'] = key_len
+            eapol['rsn_replay_counter'] = payload[4:12]
+            eapol['rsn_key_nonce'] = payload[12:44]
+            eapol['rsn_key_iv'] = payload[44:60]
+            eapol['rsn_key_rsc'] = payload[60:68]
+            eapol['rsn_key_id'] = payload[68:76]
+            eapol['rsn_key_mic'] = payload[76:92]
+            payload = payload[92:]
+            (eapol['rsn_key_data_len'],) = struct.unpack('>H', payload[0:2])
+            payload = payload[2:]
+            eapol['rsn_key_data'] = payload
+    return eapol
+
+def build_eapol(msg):
+    data = struct.pack(">BBH", msg['version'], msg['type'], msg['length'])
+    if msg['type'] == 3:
+        data += struct.pack('>BHH', msg['descr_type'], msg['rsn_key_info'],
+                            msg['rsn_key_len'])
+        data += msg['rsn_replay_counter']
+        data += msg['rsn_key_nonce']
+        data += msg['rsn_key_iv']
+        data += msg['rsn_key_rsc']
+        data += msg['rsn_key_id']
+        data += msg['rsn_key_mic']
+        data += struct.pack('>H', msg['rsn_key_data_len'])
+        data += msg['rsn_key_data']
+    else:
+        data += msg['payload']
+    return data
+
+def sha1_prf(key, label, data, outlen):
+    res = ''
+    counter = 0
+    while outlen > 0:
+        m = hmac.new(key, label, hashlib.sha1)
+        m.update(struct.pack('B', 0))
+        m.update(data)
+        m.update(struct.pack('B', counter))
+        counter += 1
+        hash = m.digest()
+        if outlen > len(hash):
+            res += hash
+            outlen -= len(hash)
+        else:
+            res += hash[0:outlen]
+            outlen = 0
+    return res
+
+def pmk_to_ptk(pmk, addr1, addr2, nonce1, nonce2):
+    if addr1 < addr2:
+        data = binascii.unhexlify(addr1.replace(':','')) + binascii.unhexlify(addr2.replace(':',''))
+    else:
+        data = binascii.unhexlify(addr2.replace(':','')) + binascii.unhexlify(addr1.replace(':',''))
+    if nonce1 < nonce2:
+        data += nonce1 + nonce2
+    else:
+        data += nonce2 + nonce1
+    label = "Pairwise key expansion"
+    ptk = sha1_prf(pmk, label, data, 48)
+    kck = ptk[0:16]
+    kek = ptk[16:32]
+    return (ptk, kck, kek)
+
+def eapol_key_mic(kck, msg):
+    msg['rsn_key_mic'] = binascii.unhexlify('00000000000000000000000000000000')
+    data = build_eapol(msg)
+    m = hmac.new(kck, data, hashlib.sha1)
+    msg['rsn_key_mic'] = m.digest()[0:16]
+
+def rsn_eapol_key_set(msg, key_info, key_len, nonce, data):
+    msg['rsn_key_info'] = key_info
+    msg['rsn_key_len'] = key_len
+    if nonce:
+        msg['rsn_key_nonce'] = nonce
+    else:
+        msg['rsn_key_nonce'] = binascii.unhexlify('0000000000000000000000000000000000000000000000000000000000000000')
+    if data:
+        msg['rsn_key_data_len'] = len(data)
+        msg['rsn_key_data'] = data
+        msg['length'] = 95 + len(data)
+    else:
+        msg['rsn_key_data_len'] = 0
+        msg['rsn_key_data'] = ''
+        msg['length'] = 95
+
+def recv_eapol(hapd):
+    ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
+    if ev is None:
+        raise Exception("Timeout on EAPOL-TX from hostapd")
+    eapol = binascii.unhexlify(ev.split(' ')[2])
+    return parse_eapol(eapol)
+
+def send_eapol(hapd, addr, data):
+    res = hapd.request("EAPOL_RX " + addr + " " + binascii.hexlify(data))
+    if "OK" not in res:
+        raise Exception("EAPOL_RX to hostapd failed")
+
+def reply_eapol(info, hapd, addr, msg, key_info, nonce, data, kck):
+    logger.info("Send EAPOL-Key msg " + info)
+    rsn_eapol_key_set(msg, key_info, 0, nonce, data)
+    eapol_key_mic(kck, msg)
+    send_eapol(hapd, addr, build_eapol(msg))
+
+def hapd_connected(hapd):
+    ev = hapd.wait_event(["AP-STA-CONNECTED"], timeout=15)
+    if ev is None:
+        raise Exception("Timeout on AP-STA-CONNECTED from hostapd")
+
+def eapol_test(apdev, dev):
+    bssid = apdev['bssid']
+    ssid = "test-wpa2-psk"
+    psk = '602e323e077bc63bd80307ef4745b754b0ae0a925c2638ecd13a794b9527b9e6'
+    pmk = binascii.unhexlify(psk)
+    params = hostapd.wpa2_params(ssid=ssid)
+    params['wpa_psk'] = psk
+    hapd = hostapd.add_ap(apdev['ifname'], params)
+    hapd.request("SET ext_eapol_frame_io 1")
+    dev.request("SET ext_eapol_frame_io 1")
+    dev.connect(ssid, psk="not used", scan_freq="2412", wait_connect=False)
+    addr = dev.p2p_interface_addr()
+    rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac020000')
+    snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
+    return (bssid,ssid,hapd,snonce,pmk,addr,rsne)
+
+def test_ap_wpa2_psk_ext_eapol(dev, apdev):
+    """WPA2-PSK AP using external EAPOL supplicant"""
+    (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
+
+    msg = recv_eapol(hapd)
+    anonce = msg['rsn_key_nonce']
+    logger.info("Replay same data back")
+    send_eapol(hapd, addr, build_eapol(msg))
+
+    (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
+
+    logger.info("Truncated Key Data in EAPOL-Key msg 2/4")
+    rsn_eapol_key_set(msg, 0x0101, 0, snonce, rsne)
+    msg['length'] = 95 + 22 - 1
+    send_eapol(hapd, addr, build_eapol(msg))
+
+    reply_eapol("2/4", hapd, addr, msg, 0x010a, snonce, rsne, kck)
+
+    msg = recv_eapol(hapd)
+    if anonce != msg['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+    logger.info("Replay same data back")
+    send_eapol(hapd, addr, build_eapol(msg))
+
+    reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
+    hapd_connected(hapd)
+
+def test_ap_wpa2_psk_ext_eapol_retry1(dev, apdev):
+    """WPA2 4-way handshake with EAPOL-Key 1/4 retransmitted"""
+    (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
+
+    msg1 = recv_eapol(hapd)
+    anonce = msg1['rsn_key_nonce']
+
+    msg2 = recv_eapol(hapd)
+    if anonce != msg2['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+
+    (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
+
+    logger.info("Send EAPOL-Key msg 2/4")
+    msg = msg2
+    rsn_eapol_key_set(msg, 0x010a, 0, snonce, rsne)
+    eapol_key_mic(kck, msg)
+    send_eapol(hapd, addr, build_eapol(msg))
+
+    msg = recv_eapol(hapd)
+    if anonce != msg['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+
+    reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
+    hapd_connected(hapd)
+
+def test_ap_wpa2_psk_ext_eapol_retry1b(dev, apdev):
+    """WPA2 4-way handshake with EAPOL-Key 1/4 and 2/4 retransmitted"""
+    (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
+
+    msg1 = recv_eapol(hapd)
+    anonce = msg1['rsn_key_nonce']
+    msg2 = recv_eapol(hapd)
+    if anonce != msg2['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+
+    (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
+    reply_eapol("2/4 (a)", hapd, addr, msg1, 0x010a, snonce, rsne, kck)
+    reply_eapol("2/4 (b)", hapd, addr, msg2, 0x010a, snonce, rsne, kck)
+
+    msg = recv_eapol(hapd)
+    if anonce != msg['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+
+    reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
+    hapd_connected(hapd)
+
+def test_ap_wpa2_psk_ext_eapol_retry1c(dev, apdev):
+    """WPA2 4-way handshake with EAPOL-Key 1/4 and 2/4 retransmitted and SNonce changing"""
+    (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
+
+    msg1 = recv_eapol(hapd)
+    anonce = msg1['rsn_key_nonce']
+
+    msg2 = recv_eapol(hapd)
+    if anonce != msg2['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+    (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
+    reply_eapol("2/4 (a)", hapd, addr, msg1, 0x010a, snonce, rsne, kck)
+
+    snonce2 = binascii.unhexlify('2222222222222222222222222222222222222222222222222222222222222222')
+    (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce2, anonce)
+    reply_eapol("2/4 (b)", hapd, addr, msg2, 0x010a, snonce2, rsne, kck)
+
+    msg = recv_eapol(hapd)
+    if anonce != msg['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+    reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
+    hapd_connected(hapd)
+
+def test_ap_wpa2_psk_ext_eapol_retry1d(dev, apdev):
+    """WPA2 4-way handshake with EAPOL-Key 1/4 and 2/4 retransmitted and SNonce changing and older used"""
+    (bssid,ssid,hapd,snonce,pmk,addr,rsne) = eapol_test(apdev[0], dev[0])
+
+    msg1 = recv_eapol(hapd)
+    anonce = msg1['rsn_key_nonce']
+    msg2 = recv_eapol(hapd)
+    if anonce != msg2['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+
+    (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
+    reply_eapol("2/4 (a)", hapd, addr, msg1, 0x010a, snonce, rsne, kck)
+
+    snonce2 = binascii.unhexlify('2222222222222222222222222222222222222222222222222222222222222222')
+    (ptk2, kck2, kek2) = pmk_to_ptk(pmk, addr, bssid, snonce2, anonce)
+
+    reply_eapol("2/4 (b)", hapd, addr, msg2, 0x010a, snonce2, rsne, kck2)
+    msg = recv_eapol(hapd)
+    if anonce != msg['rsn_key_nonce']:
+        raise Exception("ANonce changed")
+    reply_eapol("4/4", hapd, addr, msg, 0x030a, None, None, kck)
+    hapd_connected(hapd)