+
+def test_wpas_ctrl_event_burst(dev, apdev):
+ """wpa_supplicant control socket and event burst"""
+ if "OK" not in dev[0].request("EVENT_TEST 1000"):
+ raise Exception("Could not request event messages")
+
+ total_i = 0
+ total_g = 0
+ for i in range(100):
+ (i,g) = dev[0].dump_monitor()
+ total_i += i
+ total_g += g
+ logger.info("Received i=%d g=%d" % (i, g))
+ if total_i >= 1000 and total_g >= 1000:
+ break
+ time.sleep(0.05)
+
+ if total_i < 1000:
+ raise Exception("Some per-interface events not seen: %d" % total_i)
+ if total_g < 1000:
+ raise Exception("Some global events not seen: %d" % total_g)
+
+ if not dev[0].ping():
+ raise Exception("Could not ping wpa_supplicant at the end of the test")
+
+@remote_compatible
+def test_wpas_ctrl_sched_scan_plans(dev, apdev):
+ """wpa_supplicant sched_scan_plans parsing"""
+ dev[0].request("SET sched_scan_plans foo")
+ dev[0].request("SET sched_scan_plans 10:100 20:200 30")
+ with alloc_fail(dev[0], 1, "wpas_sched_scan_plans_set"):
+ dev[0].request("SET sched_scan_plans 10:100")
+ dev[0].request("SET sched_scan_plans 4294967295:0")
+ dev[0].request("SET sched_scan_plans 1 1")
+ dev[0].request("SET sched_scan_plans ")
+ dev[0].request("SET sched_scan_plans ")
+
+def test_wpas_ctrl_signal_monitor(dev, apdev):
+ """wpa_supplicant SIGNAL_MONITOR command"""
+ hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
+ dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
+ dev[1].connect("open", key_mgmt="NONE", scan_freq="2412",
+ bgscan="simple:1:-45:2")
+ dev[2].connect("open", key_mgmt="NONE", scan_freq="2412")
+
+ tests = [ " THRESHOLD=-45", " THRESHOLD=-44 HYSTERESIS=5", "" ]
+ try:
+ if "FAIL" in dev[2].request("SIGNAL_MONITOR THRESHOLD=-1 HYSTERESIS=5"):
+ raise Exception("SIGNAL_MONITOR command failed")
+ for t in tests:
+ if "OK" not in dev[0].request("SIGNAL_MONITOR" + t):
+ raise Exception("SIGNAL_MONITOR command failed: " + t)
+ if "FAIL" not in dev[1].request("SIGNAL_MONITOR THRESHOLD=-44 HYSTERESIS=5"):
+ raise Exception("SIGNAL_MONITOR command accepted while using bgscan")
+ ev = dev[2].wait_event(["CTRL-EVENT-SIGNAL-CHANGE"], timeout=10)
+ if ev is None:
+ raise Exception("No signal change event seen")
+ if "above=0" not in ev:
+ raise Exception("Unexpected signal change event contents: " + ev)
+ finally:
+ dev[0].request("SIGNAL_MONITOR")
+ dev[1].request("SIGNAL_MONITOR")
+ dev[2].request("SIGNAL_MONITOR")
+
+ dev[0].request("REMOVE_NETWORK all")
+ dev[1].request("REMOVE_NETWORK all")
+ dev[1].wait_disconnected()
+
+def test_wpas_ctrl_p2p_listen_offload(dev, apdev):
+ """wpa_supplicant P2P_LO_START and P2P_LO_STOP commands"""
+ dev[0].request("P2P_LO_STOP")
+ dev[0].request("P2P_LO_START ")
+ dev[0].request("P2P_LO_START 2412")
+ dev[0].request("P2P_LO_START 2412 100 200 3")
+ dev[0].request("P2P_LO_STOP")
+
+def test_wpas_ctrl_driver_flags(dev, apdev):
+ """DRIVER_FLAGS command"""
+ params = hostapd.wpa2_params(ssid="test", passphrase="12345678")
+ hapd = hostapd.add_ap(apdev[0], params)
+ hapd_flags = hapd.request("DRIVER_FLAGS")
+ wpas_flags = dev[0].request("DRIVER_FLAGS")
+ if "FAIL" in hapd_flags:
+ raise Exception("DRIVER_FLAGS failed")
+ if hapd_flags != wpas_flags:
+ raise Exception("Unexpected difference in hostapd vs. wpa_supplicant DRIVER_FLAGS output")
+ logger.info("DRIVER_FLAGS: " + hapd_flags)
+ flags = hapd_flags.split('\n')
+ if 'AP' not in flags:
+ raise Exception("AP flag missing from DRIVER_FLAGS")