tests: EAP-MSCHAPv2 error cases
authorJouni Malinen <j@w1.fi>
Sun, 11 Oct 2015 22:05:37 +0000 (01:05 +0300)
committerJouni Malinen <j@w1.fi>
Sun, 11 Oct 2015 22:55:00 +0000 (01:55 +0300)
Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/auth_serv/eap_user.conf
tests/hwsim/test_ap_eap.py
tests/hwsim/test_eap_proto.py
tests/hwsim/utils.py

index 3738e6c..da42f5b 100644 (file)
@@ -46,6 +46,8 @@ radius_accept_attr=25:x:00112233445566778899aa
 "gpsk-user-session-timeout"    GPSK    "abcdefghijklmnop0123456789abcdef"
 radius_accept_attr=27:d:3
 
+"phase1-user"  MSCHAPV2,MD5,GTC        "password"
+
 "020000000000" MACACL  "020000000000"
 
 "020000000100" MACACL  "020000000100"
index 3071b86..311832c 100644 (file)
@@ -15,7 +15,7 @@ import os
 
 import hwsim_utils
 import hostapd
-from utils import HwsimSkip, alloc_fail, fail_test, skip_with_fips
+from utils import HwsimSkip, alloc_fail, fail_test, skip_with_fips, wait_fail_trigger
 from wpasupplicant import WpaSupplicant
 from test_ap_psk import check_mib, find_wpas_process, read_process_memory, verify_not_present, get_key_locations
 
@@ -3955,3 +3955,85 @@ def test_eap_tls_no_session_resumption_radius(dev, apdev):
         raise Exception("Key handshake with the AP timed out")
     if dev[0].get_status_field("tls_session_reused") != '0':
         raise Exception("Unexpected session resumption on the second connection")
+
+def test_eap_mschapv2_errors(dev, apdev):
+    """EAP-MSCHAPv2 error cases"""
+    check_eap_capa(dev[0], "MSCHAPV2")
+    check_eap_capa(dev[0], "FAST")
+
+    params = hostapd.wpa2_eap_params(ssid="test-wpa-eap")
+    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    dev[0].connect("test-wpa-eap", key_mgmt="WPA-EAP", eap="MSCHAPV2",
+                   identity="phase1-user", password="password",
+                   scan_freq="2412")
+    dev[0].request("REMOVE_NETWORK all")
+    dev[0].wait_disconnected()
+
+    tests = [ (1, "hash_nt_password_hash;mschapv2_derive_response"),
+              (1, "nt_password_hash;mschapv2_derive_response"),
+              (1, "nt_password_hash;=mschapv2_derive_response"),
+              (1, "generate_nt_response;mschapv2_derive_response"),
+              (1, "generate_authenticator_response;mschapv2_derive_response"),
+              (1, "nt_password_hash;=mschapv2_derive_response"),
+              (1, "get_master_key;mschapv2_derive_response"),
+              (1, "os_get_random;eap_mschapv2_challenge_reply") ]
+    for count, func in tests:
+        with fail_test(dev[0], count, func):
+            dev[0].connect("test-wpa-eap", key_mgmt="WPA-EAP", eap="MSCHAPV2",
+                           identity="phase1-user", password="password",
+                           wait_connect=False, scan_freq="2412")
+            wait_fail_trigger(dev[0], "GET_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected()
+
+    tests = [ (1, "hash_nt_password_hash;mschapv2_derive_response"),
+              (1, "hash_nt_password_hash;=mschapv2_derive_response"),
+              (1, "generate_nt_response_pwhash;mschapv2_derive_response"),
+              (1, "generate_authenticator_response_pwhash;mschapv2_derive_response") ]
+    for count, func in tests:
+        with fail_test(dev[0], count, func):
+            dev[0].connect("test-wpa-eap", key_mgmt="WPA-EAP", eap="MSCHAPV2",
+                           identity="phase1-user",
+                           password_hex="hash:8846f7eaee8fb117ad06bdd830b7586c",
+                           wait_connect=False, scan_freq="2412")
+            wait_fail_trigger(dev[0], "GET_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected()
+
+    tests = [ (1, "eap_mschapv2_init"),
+              (1, "eap_msg_alloc;eap_mschapv2_challenge_reply"),
+              (1, "eap_msg_alloc;eap_mschapv2_success"),
+              (1, "eap_mschapv2_getKey") ]
+    for count, func in tests:
+        with alloc_fail(dev[0], count, func):
+            dev[0].connect("test-wpa-eap", key_mgmt="WPA-EAP", eap="MSCHAPV2",
+                           identity="phase1-user", password="password",
+                           wait_connect=False, scan_freq="2412")
+            wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected()
+
+    tests = [ (1, "eap_msg_alloc;eap_mschapv2_failure") ]
+    for count, func in tests:
+        with alloc_fail(dev[0], count, func):
+            dev[0].connect("test-wpa-eap", key_mgmt="WPA-EAP", eap="MSCHAPV2",
+                           identity="phase1-user", password="wrong password",
+                           wait_connect=False, scan_freq="2412")
+            wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected()
+
+    tests = [ (2, "eap_mschapv2_init"),
+              (3, "eap_mschapv2_init") ]
+    for count, func in tests:
+        with alloc_fail(dev[0], count, func):
+            dev[0].connect("test-wpa-eap", key_mgmt="WPA-EAP", eap="FAST",
+                           anonymous_identity="FAST", identity="user",
+                           password="password",
+                           ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2",
+                           phase1="fast_provisioning=1",
+                           pac_file="blob://fast_pac",
+                           wait_connect=False, scan_freq="2412")
+            wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected()
index bbddea8..9b0d552 100644 (file)
@@ -15,7 +15,7 @@ import threading
 import time
 
 import hostapd
-from utils import HwsimSkip
+from utils import HwsimSkip, fail_test, wait_fail_trigger
 from test_ap_eap import check_eap_capa
 
 EAP_CODE_REQUEST = 1
@@ -4390,3 +4390,86 @@ def test_eap_proto_mschapv2(dev, apdev):
             dev[0].wait_disconnected(timeout=1)
     finally:
         stop_radius_server(srv)
+
+def test_eap_proto_mschapv2_errors(dev, apdev):
+    """EAP-MSCHAPv2 protocol tests (error paths)"""
+    check_eap_capa(dev[0], "MSCHAPV2")
+
+    def mschapv2_handler(ctx, req):
+        logger.info("mschapv2_handler - RX " + req.encode("hex"))
+        if 'num' not in ctx:
+            ctx['num'] = 0
+        ctx['num'] = ctx['num'] + 1
+        if 'id' not in ctx:
+            ctx['id'] = 1
+        ctx['id'] = (ctx['id'] + 1) % 256
+        idx = 0
+
+        idx += 1
+        if ctx['num'] == idx:
+            logger.info("Test: Failure before challenge - password expired")
+            payload = 'E=648 R=1 C=00112233445566778899aabbccddeeff V=3 M=Password expired'
+            return struct.pack(">BBHBBBH", EAP_CODE_REQUEST, ctx['id'],
+                               4 + 1 + 4 + len(payload),
+                               EAP_TYPE_MSCHAPV2,
+                               4, 0, 4 + len(payload)) + payload
+        idx += 1
+        if ctx['num'] == idx:
+            logger.info("Test: Success after password change")
+            payload = "S=1122334455667788990011223344556677889900"
+            return struct.pack(">BBHBBBH", EAP_CODE_REQUEST, ctx['id'],
+                               4 + 1 + 4 + len(payload),
+                               EAP_TYPE_MSCHAPV2,
+                               3, 0, 4 + len(payload)) + payload
+
+        idx += 1
+        if ctx['num'] == idx:
+            logger.info("Test: Failure before challenge - password expired")
+            payload = 'E=648 R=1 C=00112233445566778899aabbccddeeff V=3 M=Password expired'
+            return struct.pack(">BBHBBBH", EAP_CODE_REQUEST, ctx['id'],
+                               4 + 1 + 4 + len(payload),
+                               EAP_TYPE_MSCHAPV2,
+                               4, 0, 4 + len(payload)) + payload
+        idx += 1
+        if ctx['num'] == idx:
+            logger.info("Test: Success after password change")
+            payload = "S=1122334455667788990011223344556677889900"
+            return struct.pack(">BBHBBBH", EAP_CODE_REQUEST, ctx['id'],
+                               4 + 1 + 4 + len(payload),
+                               EAP_TYPE_MSCHAPV2,
+                               3, 0, 4 + len(payload)) + payload
+
+        return None
+
+    srv = start_radius_server(mschapv2_handler)
+
+    try:
+        hapd = start_ap(apdev[0]['ifname'])
+
+        with fail_test(dev[0], 1, "eap_mschapv2_change_password"):
+            dev[0].connect("eap-test", key_mgmt="WPA-EAP", scan_freq="2412",
+                           eap="MSCHAPV2", identity="user",
+                           password="password", wait_connect=False)
+            ev = dev[0].wait_event(["CTRL-REQ-NEW_PASSWORD"], timeout=10)
+            if ev is None:
+                raise Exception("Timeout on new password request")
+            id = ev.split(':')[0].split('-')[-1]
+            dev[0].request("CTRL-RSP-NEW_PASSWORD-" + id + ":new-pw")
+            wait_fail_trigger(dev[0], "GET_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected(timeout=1)
+
+        with fail_test(dev[0], 1, "get_master_key;eap_mschapv2_change_password"):
+            dev[0].connect("eap-test", key_mgmt="WPA-EAP", scan_freq="2412",
+                           eap="MSCHAPV2", identity="user",
+                           password="password", wait_connect=False)
+            ev = dev[0].wait_event(["CTRL-REQ-NEW_PASSWORD"], timeout=10)
+            if ev is None:
+                raise Exception("Timeout on new password request")
+            id = ev.split(':')[0].split('-')[-1]
+            dev[0].request("CTRL-RSP-NEW_PASSWORD-" + id + ":new-pw")
+            wait_fail_trigger(dev[0], "GET_FAIL")
+            dev[0].request("REMOVE_NETWORK all")
+            dev[0].wait_disconnected(timeout=1)
+    finally:
+        stop_radius_server(srv)
index daed84f..9cdd4bc 100644 (file)
@@ -5,6 +5,7 @@
 # See README for more details.
 
 import os
+import time
 
 def get_ifnames():
     ifnames = []
@@ -50,6 +51,14 @@ class fail_test(object):
             if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
                 raise Exception("Test failure did not trigger")
 
+def wait_fail_trigger(dev, cmd):
+    for i in range(0, 40):
+        if dev.request(cmd).startswith("0:"):
+            break
+        if i == 39:
+            raise Exception("Failure not triggered")
+        time.sleep(0.05)
+
 def require_under_vm():
     with open('/proc/1/cmdline', 'r') as f:
         cmd = f.read()