import logging
logger = logging.getLogger()
import os
+import struct
import subprocess
import time
# Test connectivity 0->1 and 1->0
hwsim_utils.test_connectivity(dev[0], dev[1])
-def add_mesh_secure_net(dev, psk=True):
+def add_mesh_secure_net(dev, psk=True, pmf=False):
id = dev.add_network()
dev.set_network(id, "mode", "5")
dev.set_network_quoted(id, "ssid", "wpas-mesh-sec")
dev.set_network(id, "frequency", "2412")
if psk:
dev.set_network_quoted(id, "psk", "thisismypassphrase!")
+ if pmf:
+ dev.set_network(id, "ieee80211w", "2")
return id
def test_wpas_mesh_secure(dev, apdev):
# Test connectivity 0->1 and 1->0
hwsim_utils.test_connectivity(dev[0], dev[1])
+def test_mesh_secure_pmf(dev, apdev):
+ """Secure mesh network connectivity with PMF enabled"""
+ check_mesh_support(dev[0], secure=True)
+ dev[0].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[0], pmf=True)
+ dev[0].mesh_group_add(id)
+
+ dev[1].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[1], pmf=True)
+ dev[1].mesh_group_add(id)
+
+ # Check for mesh joined
+ check_mesh_group_added(dev[0])
+ check_mesh_group_added(dev[1])
+
+ # Check for peer connected
+ check_mesh_peer_connected(dev[0])
+ check_mesh_peer_connected(dev[1])
+
+ # Test connectivity 0->1 and 1->0
+ hwsim_utils.test_connectivity(dev[0], dev[1])
+
def test_wpas_mesh_secure_sae_group_mismatch(dev, apdev):
"""wpa_supplicant secure MESH and SAE group mismatch"""
check_mesh_support(dev[0], secure=True)
if mesh1:
dev[1].request("MESH_GROUP_REMOVE " + mesh1)
-def test_wpas_mesh_max_peering(dev, apdev):
+def test_wpas_mesh_max_peering(dev, apdev, params):
"""Mesh max peering limit"""
check_mesh_support(dev[0])
try:
finally:
dev[0].request("SET max_peer_links 99")
+ addr0 = dev[0].own_addr()
+ addr1 = dev[1].own_addr()
+ addr2 = dev[2].own_addr()
+
+ capfile = os.path.join(params['logdir'], "hwsim0.pcapng")
+ filt = "wlan.fc.type_subtype == 8"
+ out = run_tshark(capfile, filt, [ "wlan.sa", "wlan.mesh.config.cap" ])
+ pkts = out.splitlines()
+ one = [ 0, 0, 0 ]
+ zero = [ 0, 0, 0 ]
+ for pkt in pkts:
+ addr, cap = pkt.split('\t')
+ cap = int(cap, 16)
+ if addr == addr0:
+ idx = 0
+ elif addr == addr1:
+ idx = 1
+ elif addr == addr2:
+ idx = 2
+ else:
+ continue
+ if cap & 0x01:
+ one[idx] += 1
+ else:
+ zero[idx] += 1
+ logger.info("one: " + str(one))
+ logger.info("zero: " + str(zero))
+ if zero[0] == 0:
+ raise Exception("Accepting Additional Mesh Peerings not cleared")
+ if one[0] == 0:
+ raise Exception("Accepting Additional Mesh Peerings was not set in the first Beacon frame")
+ if zero[1] > 0 or zero[2] > 0 or one[1] == 0 or one[2] == 0:
+ raise Exception("Unexpected value in Accepting Additional Mesh Peerings from other STAs")
+
def test_wpas_mesh_open_5ghz(dev, apdev):
"""wpa_supplicant open MESH network on 5 GHz band"""
try:
dev[0].mesh_group_add(id)
wait_fail_trigger(dev[0], "GET_FAIL")
+ dev[0].dump_monitor()
with alloc_fail(dev[0], 1, "mesh_rsn_auth_init"):
id = add_mesh_secure_net(dev[0])
dev[0].mesh_group_add(id)
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+ dev[0].dump_monitor()
+ with fail_test(dev[0], 1, "os_get_random;mesh_rsn_init_ampe_sta"):
+ id = add_mesh_secure_net(dev[0])
+ dev[0].mesh_group_add(id)
+ dev[1].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[1])
+ dev[1].mesh_group_add(id)
+ wait_fail_trigger(dev[0], "GET_FAIL")
+
def test_wpas_mesh_reconnect(dev, apdev):
"""Secure mesh network plink counting during reconnection"""
check_mesh_support(dev[0])
if ev is not None:
raise Exception("Unexpected connection(1)")
+ # Additional coverage in mesh_rsn_sae_group() with non-zero
+ # wpa_s->mesh_rsn->sae_group_index.
+ dev[0].dump_monitor()
+ dev[1].dump_monitor()
+ id = add_mesh_secure_net(dev[2])
+ dev[2].mesh_group_add(id)
+ check_mesh_group_added(dev[2])
+ check_mesh_peer_connected(dev[0])
+ check_mesh_peer_connected(dev[2])
+ ev = dev[1].wait_event(["new peer notification"], timeout=10)
+ if ev is None:
+ raise Exception("dev[1] did not see peer(2)")
+ dev[0].dump_monitor()
+ dev[1].dump_monitor()
+ dev[2].dump_monitor()
+
dev[0].request("SET sae_groups ")
dev[1].request("SET sae_groups ")
# This will fail in IE parsing due to the truncated IE in the Probe
# Response frame.
bss = dev[0].request("BSS " + bssid)
+
+def test_mesh_missing_mic(dev, apdev):
+ """Secure mesh network and missing MIC"""
+ check_mesh_support(dev[0], secure=True)
+
+ dev[0].request("SET ext_mgmt_frame_handling 1")
+ dev[0].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[0])
+ dev[0].mesh_group_add(id)
+
+ dev[1].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[1])
+ dev[1].mesh_group_add(id)
+
+ # Check for mesh joined
+ check_mesh_group_added(dev[0])
+ check_mesh_group_added(dev[1])
+
+ count = 0
+ remove_mic = True
+ while True:
+ count += 1
+ if count > 15:
+ raise Exception("Did not see Action frames")
+ rx_msg = dev[0].mgmt_rx()
+ if rx_msg is None:
+ ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
+ if ev:
+ break
+ raise Exception("MGMT-RX timeout")
+ if rx_msg['subtype'] == 13:
+ payload = rx_msg['payload']
+ frame = rx_msg['frame']
+ (categ, action) = struct.unpack('BB', payload[0:2])
+ if categ == 15 and action == 1 and remove_mic:
+ # Mesh Peering Open
+ pos = frame.find('\x8c\x10')
+ if not pos:
+ raise Exception("Could not find MIC element")
+ logger.info("Found MIC at %d" % pos)
+ # Remove MIC
+ rx_msg['frame'] = frame[0:pos]
+ remove_mic = False
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
+ raise Exception("MGMT_RX_PROCESS failed")
+ ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
+ if ev:
+ break
+
+def test_mesh_pmkid_mismatch(dev, apdev):
+ """Secure mesh network and PMKID mismatch"""
+ check_mesh_support(dev[0], secure=True)
+ addr0 = dev[0].own_addr()
+ addr1 = dev[1].own_addr()
+ dev[0].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[0])
+ dev[0].set_network(id, "no_auto_peer", "1")
+ dev[0].mesh_group_add(id)
+
+ dev[1].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[1])
+ dev[1].set_network(id, "no_auto_peer", "1")
+ dev[1].mesh_group_add(id)
+
+ # Check for mesh joined
+ check_mesh_group_added(dev[0])
+ check_mesh_group_added(dev[1])
+
+ # Check for peer connected
+ ev = dev[0].wait_event(["will not initiate new peer link"], timeout=10)
+ if ev is None:
+ raise Exception("Missing no-initiate message")
+ if "OK" not in dev[0].request("MESH_PEER_ADD " + addr1):
+ raise Exception("MESH_PEER_ADD failed")
+ check_mesh_peer_connected(dev[0])
+ check_mesh_peer_connected(dev[1])
+
+ if "OK" not in dev[0].request("MESH_PEER_REMOVE " + addr1):
+ raise Exception("Failed to remove peer")
+
+ ev = dev[0].wait_event(["will not initiate new peer link"], timeout=10)
+ if ev is None:
+ raise Exception("Missing no-initiate message (2)")
+ dev[0].dump_monitor()
+ dev[1].dump_monitor()
+ dev[0].request("SET ext_mgmt_frame_handling 1")
+ if "OK" not in dev[0].request("MESH_PEER_ADD " + addr1):
+ raise Exception("MESH_PEER_ADD failed (2)")
+
+ count = 0
+ break_pmkid = True
+ while True:
+ count += 1
+ if count > 50:
+ raise Exception("Did not see Action frames")
+ rx_msg = dev[0].mgmt_rx()
+ if rx_msg is None:
+ ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.1)
+ if ev:
+ break
+ raise Exception("MGMT-RX timeout")
+ if rx_msg['subtype'] == 13:
+ payload = rx_msg['payload']
+ frame = rx_msg['frame']
+ (categ, action) = struct.unpack('BB', payload[0:2])
+ if categ == 15 and action == 1 and break_pmkid:
+ # Mesh Peering Open
+ pos = frame.find('\x75\x14')
+ if not pos:
+ raise Exception("Could not find Mesh Peering Management element")
+ logger.info("Found Mesh Peering Management element at %d" % pos)
+ # Break PMKID to hit "Mesh RSN: Invalid PMKID (Chosen PMK did
+ # not match calculated PMKID)"
+ rx_msg['frame'] = frame[0:pos + 6] + '\x00\x00\x00\x00' + frame[pos + 10:]
+ break_pmkid = False
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
+ raise Exception("MGMT_RX_PROCESS failed")
+ ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
+ if ev:
+ break
+
+def test_mesh_peering_proto(dev, apdev):
+ """Mesh peering management protocol testing"""
+ check_mesh_support(dev[0])
+
+ dev[0].request("SET ext_mgmt_frame_handling 1")
+ add_open_mesh_network(dev[0], beacon_int=160)
+ add_open_mesh_network(dev[1], beacon_int=160)
+
+ count = 0
+ test = 1
+ while True:
+ count += 1
+ if count > 50:
+ raise Exception("Did not see Action frames")
+ rx_msg = dev[0].mgmt_rx()
+ if rx_msg is None:
+ ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
+ if ev:
+ break
+ raise Exception("MGMT-RX timeout")
+ if rx_msg['subtype'] == 13:
+ payload = rx_msg['payload']
+ frame = rx_msg['frame']
+ (categ, action) = struct.unpack('BB', payload[0:2])
+ if categ == 15 and action == 1 and test == 1:
+ # Mesh Peering Open
+ pos = frame.find('\x75\x04')
+ if not pos:
+ raise Exception("Could not find Mesh Peering Management element")
+ logger.info("Found Mesh Peering Management element at %d" % pos)
+ # Remove the element to hit
+ # "MPM: No Mesh Peering Management element"
+ rx_msg['frame'] = frame[0:pos]
+ test += 1
+ elif categ == 15 and action == 1 and test == 2:
+ # Mesh Peering Open
+ pos = frame.find('\x72\x0e')
+ if not pos:
+ raise Exception("Could not find Mesh ID element")
+ logger.info("Found Mesh ID element at %d" % pos)
+ # Remove the element to hit
+ # "MPM: No Mesh ID or Mesh Configuration element"
+ rx_msg['frame'] = frame[0:pos] + frame[pos + 16:]
+ test += 1
+ elif categ == 15 and action == 1 and test == 3:
+ # Mesh Peering Open
+ pos = frame.find('\x72\x0e')
+ if not pos:
+ raise Exception("Could not find Mesh ID element")
+ logger.info("Found Mesh ID element at %d" % pos)
+ # Replace Mesh ID to hit "MPM: Mesh ID or Mesh Configuration
+ # element do not match local MBSS"
+ rx_msg['frame'] = frame[0:pos] + '\x72\x0etest-test-test' + frame[pos + 16:]
+ test += 1
+ elif categ == 15 and action == 1 and test == 4:
+ # Mesh Peering Open
+ # Remove IEs to hit
+ # "MPM: Ignore too short action frame 1 ie_len 0"
+ rx_msg['frame'] = frame[0:26]
+ test += 1
+ elif categ == 15 and action == 1 and test == 5:
+ # Mesh Peering Open
+ # Truncate IEs to hit
+ # "MPM: Failed to parse PLINK IEs"
+ rx_msg['frame'] = frame[0:30]
+ test += 1
+ elif categ == 15 and action == 1 and test == 6:
+ # Mesh Peering Open
+ pos = frame.find('\x75\x04')
+ if not pos:
+ raise Exception("Could not find Mesh Peering Management element")
+ logger.info("Found Mesh Peering Management element at %d" % pos)
+ # Truncate the element to hit
+ # "MPM: Invalid peer mgmt ie" and
+ # "MPM: Mesh parsing rejected frame"
+ rx_msg['frame'] = frame[0:pos] + '\x75\x00\x00\x00' + frame[pos + 6:]
+ test += 1
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
+ raise Exception("MGMT_RX_PROCESS failed")
+ ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
+ if ev:
+ break
+
+ if test != 7:
+ raise Exception("Not all test frames completed")