from utils import HwsimSkip
from wlantest import Wlantest
from wpasupplicant import WpaSupplicant
-
-def autogo(go, freq=None, persistent=None):
- logger.info("Start autonomous GO " + go.ifname)
- res = go.p2p_start_go(freq=freq, persistent=persistent)
- logger.debug("res: " + str(res))
- return res
-
-def connect_cli(go, client, social=False, freq=None):
- logger.info("Try to connect the client to the GO")
- pin = client.wps_read_pin()
- go.p2p_go_authorize_client(pin)
- res = client.p2p_connect_group(go.p2p_dev_addr(), pin, timeout=60,
- social=social, freq=freq)
- logger.info("Client connected")
- hwsim_utils.test_connectivity_p2p(go, client)
- return res
+from p2p_utils import *
+from test_p2p_messages import mgmt_tx, parse_p2p_public_action
def test_autogo(dev):
"""P2P autonomous GO and client joining group"""
dev[1].flush_scan_cache()
dev[0].p2p_listen()
addr = dev[0].p2p_dev_addr()
+ if not dev[1].discover_peer(addr, social=True):
+ raise Exception("Peer not found")
+ dev[1].p2p_stop_find()
if "OK" not in dev[1].global_request("P2P_CONNECT " + addr + " pbc auto"):
raise Exception("P2P_CONNECT failed")
for i in [ name0, name2, name3 ]:
if i not in ev1 and i not in ev2 and i not in ev3:
raise Exception('name "%s" not found' % i)
+
+def rx_pd_req(dev):
+ msg = dev.mgmt_rx()
+ if msg is None:
+ raise Exception("MGMT-RX timeout")
+ p2p = parse_p2p_public_action(msg['payload'])
+ if p2p is None:
+ raise Exception("Not a P2P Public Action frame " + str(dialog_token))
+ if p2p['subtype'] != P2P_PROV_DISC_REQ:
+ raise Exception("Unexpected subtype %d" % p2p['subtype'])
+ p2p['freq'] = msg['freq']
+ return p2p
+
+def test_autogo_scan(dev):
+ """P2P autonomous GO and no P2P IE in Probe Response scan results"""
+ addr0 = dev[0].p2p_dev_addr()
+ addr1 = dev[1].p2p_dev_addr()
+ dev[0].p2p_start_go(freq=2412, persistent=True)
+ bssid = dev[0].p2p_interface_addr()
+
+ dev[1].discover_peer(addr0)
+ dev[1].p2p_stop_find()
+ ev = dev[1].wait_global_event(["P2P-FIND-STOPPED"], timeout=2)
+ time.sleep(0.1)
+ dev[1].flush_scan_cache()
+
+ pin = dev[1].wps_read_pin()
+ dev[0].group_request("WPS_PIN any " + pin)
+
+ try:
+ dev[1].request("SET p2p_disabled 1")
+ dev[1].request("SCAN freq=2412")
+ ev = dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
+ if ev is None:
+ raise Exception("Active scan did not complete")
+ finally:
+ dev[1].request("SET p2p_disabled 0")
+
+ for i in range(2):
+ dev[1].request("SCAN freq=2412 passive=1")
+ ev = dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
+ if ev is None:
+ raise Exception("Scan did not complete")
+
+ # Disable management frame processing for a moment to skip Probe Response
+ # frame with P2P IE.
+ dev[0].group_request("SET ext_mgmt_frame_handling 1")
+
+ dev[1].global_request("P2P_CONNECT " + bssid + " " + pin + " freq=2412 join")
+
+ # Skip the first Probe Request frame
+ ev = dev[0].wait_group_event(["MGMT-RX"], timeout=10)
+ if ev is None:
+ raise Exception("No Probe Request frame seen")
+ if not ev.split(' ')[4].startswith("40"):
+ raise Exception("Not a Probe Request frame")
+
+ # If a P2P Device is not used, the PD Request will be received on the group
+ # interface (which is actually wlan0, since a separate interface is not
+ # used), which was set to external management frame handling, so need to
+ # reply to it manually.
+ res = dev[0].get_driver_status()
+ if not (int(res['capa.flags'], 0) & 0x20000000):
+ # Reply to PD Request while still filtering Probe Request frames
+ msg = rx_pd_req(dev[0])
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(addr1, addr0, 2412, "0409506f9a0908%02xdd0a0050f204100800020008" % msg['dialog_token']))
+
+ # Skip Probe Request frames until something else is received
+ for i in range(10):
+ ev = dev[0].wait_group_event(["MGMT-RX"], timeout=10)
+ if ev is None:
+ raise Exception("No frame seen")
+ if not ev.split(' ')[4].startswith("40"):
+ break
+
+ # Allow wpa_supplicant to process authentication and association
+ dev[0].group_request("SET ext_mgmt_frame_handling 0")
+
+ # Joining the group should succeed and indicate persistent group based on
+ # Beacon frame P2P IE.
+ ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
+ if ev is None:
+ raise Exception("Failed to join group")
+ if "[PERSISTENT]" not in ev:
+ raise Exception("Did not recognize group as persistent")
+ dev[0].remove_group()
+ dev[1].wait_go_ending_session()
+
+def test_autogo_join_before_found(dev):
+ """P2P client joining a group before having found GO Device Address"""
+ dev[0].request("SET p2p_no_group_iface 0")
+ res = autogo(dev[0], freq=2412)
+ if "p2p-wlan" not in res['ifname']:
+ raise Exception("Unexpected group interface name on GO")
+ status = dev[0].get_group_status()
+ bssid = status['bssid']
+
+ pin = dev[1].wps_read_pin()
+ dev[0].p2p_go_authorize_client(pin)
+ cmd = "P2P_CONNECT " + bssid + " " + pin + " join freq=2412"
+ if "OK" not in dev[1].global_request(cmd):
+ raise Exception("P2P_CONNECT join failed")
+ ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
+ if ev is None:
+ raise Exception("Joining the group timed out")
+ dev[0].remove_group()
+ dev[1].wait_go_ending_session()