tests: P2P_PRESENCE_REQ on group interface
[mech_eap.git] / tests / hwsim / test_p2p_autogo.py
1 # P2P autonomous GO test cases
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import time
8 import subprocess
9 import logging
10 logger = logging.getLogger()
11
12 import hwsim_utils
13 import utils
14 from wlantest import Wlantest
15 from wpasupplicant import WpaSupplicant
16
17 def autogo(go, freq=None, persistent=None):
18     logger.info("Start autonomous GO " + go.ifname)
19     res = go.p2p_start_go(freq=freq, persistent=persistent)
20     logger.debug("res: " + str(res))
21     return res
22
23 def connect_cli(go, client, social=False, freq=None):
24     logger.info("Try to connect the client to the GO")
25     pin = client.wps_read_pin()
26     go.p2p_go_authorize_client(pin)
27     res = client.p2p_connect_group(go.p2p_dev_addr(), pin, timeout=60,
28                                    social=social, freq=freq)
29     logger.info("Client connected")
30     hwsim_utils.test_connectivity_p2p(go, client)
31     return res
32
33 def test_autogo(dev):
34     """P2P autonomous GO and client joining group"""
35     addr0 = dev[0].p2p_dev_addr()
36     addr2 = dev[2].p2p_dev_addr()
37     res = autogo(dev[0])
38     if "p2p-wlan" in res['ifname']:
39         raise Exception("Unexpected group interface name on GO")
40     res = connect_cli(dev[0], dev[1])
41     if "p2p-wlan" in res['ifname']:
42         raise Exception("Unexpected group interface name on client")
43     bss = dev[1].get_bss("p2p_dev_addr=" + addr0)
44     if bss['bssid'] != dev[0].p2p_interface_addr():
45         raise Exception("Unexpected BSSID in the BSS entry for the GO")
46     id = bss['id']
47     bss = dev[1].get_bss("ID-" + id)
48     if bss['id'] != id:
49         raise Exception("Could not find BSS entry based on id")
50     res = dev[1].request("BSS RANGE=" + id + "- MASK=0x1")
51     if "id=" + id not in res:
52         raise Exception("Could not find BSS entry based on id range")
53
54     res = dev[1].request("SCAN_RESULTS")
55     if "[P2P]" not in res:
56         raise Exception("P2P flag missing from scan results: " + res)
57
58     # Presence request to increase testing coverage
59     if "FAIL" not in dev[1].group_request("P2P_PRESENCE_REQ 30000"):
60         raise Exception("Invald P2P_PRESENCE_REQ accepted")
61     if "FAIL" not in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400 30001"):
62         raise Exception("Invald P2P_PRESENCE_REQ accepted")
63     if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400"):
64         raise Exception("Could not send presence request")
65     ev = dev[1].wait_event(["P2P-PRESENCE-RESPONSE"])
66     if ev is None:
67         raise Exception("Timeout while waiting for Presence Response")
68     if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400 20000 102400"):
69         raise Exception("Could not send presence request")
70     ev = dev[1].wait_event(["P2P-PRESENCE-RESPONSE"])
71     if ev is None:
72         raise Exception("Timeout while waiting for Presence Response")
73     if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ"):
74         raise Exception("Could not send presence request")
75     ev = dev[1].wait_event(["P2P-PRESENCE-RESPONSE"])
76     if ev is None:
77         raise Exception("Timeout while waiting for Presence Response")
78
79     if not dev[2].discover_peer(addr0):
80         raise Exception("Could not discover GO")
81     dev[0].dump_monitor()
82     dev[2].global_request("P2P_PROV_DISC " + addr0 + " display join")
83     ev = dev[0].wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=10)
84     if ev is None:
85         raise Exception("GO did not report P2P-PROV-DISC-SHOW-PIN")
86     if "p2p_dev_addr=" + addr2 not in ev:
87         raise Exception("Unexpected P2P Device Address in event: " + ev)
88     if "group=" + dev[0].group_ifname not in ev:
89         raise Exception("Unexpected group interface in event: " + ev)
90     ev = dev[2].wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
91     if ev is None:
92         raise Exception("P2P-PROV-DISC-ENTER-PIN not reported")
93
94     dev[0].remove_group()
95     dev[1].wait_go_ending_session()
96
97 def test_autogo2(dev):
98     """P2P autonomous GO with a separate group interface and client joining group"""
99     dev[0].request("SET p2p_no_group_iface 0")
100     res = autogo(dev[0], freq=2437)
101     if "p2p-wlan" not in res['ifname']:
102         raise Exception("Unexpected group interface name on GO")
103     if res['ifname'] not in utils.get_ifnames():
104         raise Exception("Could not find group interface netdev")
105     connect_cli(dev[0], dev[1], social=True, freq=2437)
106     dev[0].remove_group()
107     dev[1].wait_go_ending_session()
108     if res['ifname'] in utils.get_ifnames():
109         raise Exception("Group interface netdev was not removed")
110
111 def test_autogo3(dev):
112     """P2P autonomous GO and client with a separate group interface joining group"""
113     dev[1].request("SET p2p_no_group_iface 0")
114     autogo(dev[0], freq=2462)
115     res = connect_cli(dev[0], dev[1], social=True, freq=2462)
116     if "p2p-wlan" not in res['ifname']:
117         raise Exception("Unexpected group interface name on client")
118     if res['ifname'] not in utils.get_ifnames():
119         raise Exception("Could not find group interface netdev")
120     dev[0].remove_group()
121     dev[1].wait_go_ending_session()
122     dev[1].ping()
123     if res['ifname'] in utils.get_ifnames():
124         raise Exception("Group interface netdev was not removed")
125
126 def test_autogo4(dev):
127     """P2P autonomous GO and client joining group (both with a separate group interface)"""
128     dev[0].request("SET p2p_no_group_iface 0")
129     dev[1].request("SET p2p_no_group_iface 0")
130     res1 = autogo(dev[0], freq=2412)
131     res2 = connect_cli(dev[0], dev[1], social=True, freq=2412)
132     if "p2p-wlan" not in res1['ifname']:
133         raise Exception("Unexpected group interface name on GO")
134     if "p2p-wlan" not in res2['ifname']:
135         raise Exception("Unexpected group interface name on client")
136     ifnames = utils.get_ifnames()
137     if res1['ifname'] not in ifnames:
138         raise Exception("Could not find GO group interface netdev")
139     if res2['ifname'] not in ifnames:
140         raise Exception("Could not find client group interface netdev")
141     dev[0].remove_group()
142     dev[1].wait_go_ending_session()
143     dev[1].ping()
144     ifnames = utils.get_ifnames()
145     if res1['ifname'] in ifnames:
146         raise Exception("GO group interface netdev was not removed")
147     if res2['ifname'] in ifnames:
148         raise Exception("Client group interface netdev was not removed")
149
150 def test_autogo_m2d(dev):
151     """P2P autonomous GO and clients not authorized"""
152     autogo(dev[0], freq=2412)
153     go_addr = dev[0].p2p_dev_addr()
154
155     dev[1].request("SET p2p_no_group_iface 0")
156     if not dev[1].discover_peer(go_addr, social=True):
157         raise Exception("GO " + go_addr + " not found")
158     dev[1].dump_monitor()
159
160     if not dev[2].discover_peer(go_addr, social=True):
161         raise Exception("GO " + go_addr + " not found")
162     dev[2].dump_monitor()
163
164     logger.info("Trying to join the group when GO has not authorized the client")
165     pin = dev[1].wps_read_pin()
166     cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
167     if "OK" not in dev[1].global_request(cmd):
168         raise Exception("P2P_CONNECT join failed")
169
170     pin = dev[2].wps_read_pin()
171     cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
172     if "OK" not in dev[2].global_request(cmd):
173         raise Exception("P2P_CONNECT join failed")
174
175     ev = dev[1].wait_global_event(["WPS-M2D"], timeout=10)
176     if ev is None:
177         raise Exception("No global M2D event")
178     ifaces = dev[1].request("INTERFACES").splitlines()
179     iface = ifaces[0] if "p2p-wlan" in ifaces[0] else ifaces[1]
180     wpas = WpaSupplicant(ifname=iface)
181     ev = wpas.wait_event(["WPS-M2D"], timeout=10)
182     if ev is None:
183         raise Exception("No M2D event on group interface")
184
185     ev = dev[2].wait_global_event(["WPS-M2D"], timeout=10)
186     if ev is None:
187         raise Exception("No global M2D event (2)")
188     ev = dev[2].wait_event(["WPS-M2D"], timeout=10)
189     if ev is None:
190         raise Exception("No M2D event on group interface (2)")
191
192 def test_autogo_fail(dev):
193     """P2P autonomous GO and incorrect PIN"""
194     autogo(dev[0], freq=2412)
195     go_addr = dev[0].p2p_dev_addr()
196     dev[0].p2p_go_authorize_client("00000000")
197
198     dev[1].request("SET p2p_no_group_iface 0")
199     if not dev[1].discover_peer(go_addr, social=True):
200         raise Exception("GO " + go_addr + " not found")
201     dev[1].dump_monitor()
202
203     logger.info("Trying to join the group when GO has not authorized the client")
204     pin = dev[1].wps_read_pin()
205     cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
206     if "OK" not in dev[1].global_request(cmd):
207         raise Exception("P2P_CONNECT join failed")
208
209     ev = dev[1].wait_global_event(["WPS-FAIL"], timeout=10)
210     if ev is None:
211         raise Exception("No global WPS-FAIL event")
212
213 def test_autogo_2cli(dev):
214     """P2P autonomous GO and two clients joining group"""
215     autogo(dev[0], freq=2412)
216     connect_cli(dev[0], dev[1], social=True, freq=2412)
217     connect_cli(dev[0], dev[2], social=True, freq=2412)
218     hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
219     dev[0].global_request("P2P_REMOVE_CLIENT " + dev[1].p2p_dev_addr())
220     dev[1].wait_go_ending_session()
221     dev[0].global_request("P2P_REMOVE_CLIENT iface=" + dev[2].p2p_interface_addr())
222     dev[2].wait_go_ending_session()
223     if "FAIL" not in dev[0].global_request("P2P_REMOVE_CLIENT foo"):
224         raise Exception("Invalid P2P_REMOVE_CLIENT command accepted")
225     dev[0].remove_group()
226
227 def test_autogo_pbc(dev):
228     """P2P autonomous GO and PBC"""
229     dev[1].request("SET p2p_no_group_iface 0")
230     autogo(dev[0], freq=2412)
231     if "FAIL" not in dev[0].group_request("WPS_PBC p2p_dev_addr=00:11:22:33:44"):
232         raise Exception("Invalid WPS_PBC succeeded")
233     if "OK" not in dev[0].group_request("WPS_PBC p2p_dev_addr=" + dev[1].p2p_dev_addr()):
234         raise Exception("WPS_PBC failed")
235     dev[2].p2p_connect_group(dev[0].p2p_dev_addr(), "pbc", timeout=0,
236                              social=True)
237     ev = dev[2].wait_event(["WPS-M2D"], timeout=15)
238     if ev is None:
239         raise Exception("WPS-M2D not reported")
240     if "config_error=12" not in ev:
241         raise Exception("Unexpected config_error: " + ev)
242     dev[1].p2p_connect_group(dev[0].p2p_dev_addr(), "pbc", timeout=15,
243                              social=True)
244
245 def test_autogo_tdls(dev):
246     """P2P autonomous GO and two clients using TDLS"""
247     wt = Wlantest()
248     go = dev[0]
249     logger.info("Start autonomous GO with fixed parameters " + go.ifname)
250     id = go.add_network()
251     go.set_network_quoted(id, "ssid", "DIRECT-tdls")
252     go.set_network_quoted(id, "psk", "12345678")
253     go.set_network(id, "mode", "3")
254     go.set_network(id, "disabled", "2")
255     res = go.p2p_start_go(persistent=id, freq="2462")
256     logger.debug("res: " + str(res))
257     wt.flush()
258     wt.add_passphrase("12345678")
259     connect_cli(go, dev[1], social=True, freq=2462)
260     connect_cli(go, dev[2], social=True, freq=2462)
261     hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
262     bssid = dev[0].p2p_interface_addr()
263     addr1 = dev[1].p2p_interface_addr()
264     addr2 = dev[2].p2p_interface_addr()
265     dev[1].tdls_setup(addr2)
266     time.sleep(1)
267     hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
268     conf = wt.get_tdls_counter("setup_conf_ok", bssid, addr1, addr2);
269     if conf == 0:
270         raise Exception("No TDLS Setup Confirm (success) seen")
271     dl = wt.get_tdls_counter("valid_direct_link", bssid, addr1, addr2);
272     if dl == 0:
273         raise Exception("No valid frames through direct link")
274     wt.tdls_clear(bssid, addr1, addr2);
275     dev[1].tdls_teardown(addr2)
276     time.sleep(1)
277     teardown = wt.get_tdls_counter("teardown", bssid, addr1, addr2);
278     if teardown == 0:
279         raise Exception("No TDLS Setup Teardown seen")
280     wt.tdls_clear(bssid, addr1, addr2);
281     hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
282     ap_path = wt.get_tdls_counter("valid_ap_path", bssid, addr1, addr2);
283     if ap_path == 0:
284         raise Exception("No valid frames via AP path")
285     direct_link = wt.get_tdls_counter("valid_direct_link", bssid, addr1, addr2);
286     if direct_link > 0:
287         raise Exception("Unexpected frames through direct link")
288     idirect_link = wt.get_tdls_counter("invalid_direct_link", bssid, addr1,
289                                        addr2);
290     if idirect_link > 0:
291         raise Exception("Unexpected frames through direct link (invalid)")
292     dev[2].remove_group()
293     dev[1].remove_group()
294     dev[0].remove_group()
295
296 def test_autogo_legacy(dev):
297     """P2P autonomous GO and legacy clients"""
298     res = autogo(dev[0], freq=2462)
299     if dev[0].get_group_status_field("passphrase", extra="WPS") != res['passphrase']:
300         raise Exception("passphrase mismatch")
301     if dev[0].request("P2P_GET_PASSPHRASE") != res['passphrase']:
302         raise Exception("passphrase mismatch(2)")
303
304     logger.info("Connect P2P client")
305     connect_cli(dev[0], dev[1], social=True, freq=2462)
306
307     if "FAIL" not in dev[1].request("P2P_GET_PASSPHRASE"):
308         raise Exception("P2P_GET_PASSPHRASE succeeded on P2P Client")
309
310     logger.info("Connect legacy WPS client")
311     pin = dev[2].wps_read_pin()
312     dev[0].p2p_go_authorize_client(pin)
313     dev[2].request("P2P_SET disabled 1")
314     dev[2].dump_monitor()
315     dev[2].request("WPS_PIN any " + pin)
316     ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
317     if ev is None:
318         raise Exception("Association with the GO timed out")
319     status = dev[2].get_status()
320     if status['wpa_state'] != 'COMPLETED':
321         raise Exception("Not fully connected")
322     hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
323     dev[2].request("DISCONNECT")
324
325     logger.info("Connect legacy non-WPS client")
326     dev[2].request("FLUSH")
327     dev[2].request("P2P_SET disabled 1")
328     dev[2].connect(ssid=res['ssid'], psk=res['passphrase'], proto='RSN',
329                    key_mgmt='WPA-PSK', pairwise='CCMP', group='CCMP',
330                    scan_freq=res['freq'])
331     hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
332     dev[2].request("DISCONNECT")
333
334     dev[0].remove_group()
335     dev[1].wait_go_ending_session()
336
337 def test_autogo_chan_switch(dev):
338     """P2P autonomous GO switching channels"""
339     autogo(dev[0], freq=2417)
340     connect_cli(dev[0], dev[1])
341     res = dev[0].request("CHAN_SWITCH 5 2422")
342     if "FAIL" in res:
343         # for now, skip test since mac80211_hwsim support is not yet widely
344         # deployed
345         return 'skip'
346     ev = dev[0].wait_event(["AP-CSA-FINISHED"], timeout=10)
347     if ev is None:
348         raise Exception("CSA finished event timed out")
349     if "freq=2422" not in ev:
350         raise Exception("Unexpected cahnnel in CSA finished event")
351     dev[0].dump_monitor()
352     dev[1].dump_monitor()
353     time.sleep(0.1)
354     hwsim_utils.test_connectivity_p2p(dev[0], dev[1])
355
356 def test_autogo_extra_cred(dev):
357     """P2P autonomous GO sending two WPS credentials"""
358     if "FAIL" in dev[0].request("SET wps_testing_dummy_cred 1"):
359         raise Exception("Failed to enable test mode")
360     autogo(dev[0], freq=2412)
361     connect_cli(dev[0], dev[1], social=True, freq=2412)
362     dev[0].remove_group()
363     dev[1].wait_go_ending_session()
364
365 def test_autogo_ifdown(dev):
366     """P2P autonomous GO and external ifdown"""
367     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
368     wpas.interface_add("wlan5")
369     res = autogo(wpas)
370     wpas.dump_monitor()
371     wpas.interface_remove("wlan5")
372     wpas.interface_add("wlan5")
373     res = autogo(wpas)
374     wpas.dump_monitor()
375     subprocess.call(['sudo', 'ifconfig', res['ifname'], 'down'])
376     ev = wpas.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
377     if ev is None:
378         raise Exception("Group removal not reported")
379     if res['ifname'] not in ev:
380         raise Exception("Unexpected group removal event: " + ev)
381
382 def test_autogo_start_during_scan(dev):
383     """P2P autonomous GO started during ongoing manual scan"""
384     try:
385         # use autoscan to set scan_req = MANUAL_SCAN_REQ
386         if "OK" not in dev[0].request("AUTOSCAN periodic:1"):
387             raise Exception("Failed to set autoscan")
388         autogo(dev[0], freq=2462)
389         connect_cli(dev[0], dev[1], social=True, freq=2462)
390         dev[0].remove_group()
391         dev[1].wait_go_ending_session()
392     finally:
393         dev[0].request("AUTOSCAN ")
394
395 def test_autogo_passphrase_len(dev):
396     """P2P autonomous GO and longer passphrase"""
397     try:
398         if "OK" not in dev[0].request("SET p2p_passphrase_len 13"):
399             raise Exception("Failed to set passphrase length")
400         res = autogo(dev[0], freq=2412)
401         if len(res['passphrase']) != 13:
402             raise Exception("Unexpected passphrase length")
403         if dev[0].get_group_status_field("passphrase", extra="WPS") != res['passphrase']:
404             raise Exception("passphrase mismatch")
405
406         logger.info("Connect P2P client")
407         connect_cli(dev[0], dev[1], social=True, freq=2412)
408
409         logger.info("Connect legacy WPS client")
410         pin = dev[2].wps_read_pin()
411         dev[0].p2p_go_authorize_client(pin)
412         dev[2].request("P2P_SET disabled 1")
413         dev[2].dump_monitor()
414         dev[2].request("WPS_PIN any " + pin)
415         ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
416         if ev is None:
417             raise Exception("Association with the GO timed out")
418         status = dev[2].get_status()
419         if status['wpa_state'] != 'COMPLETED':
420             raise Exception("Not fully connected")
421         dev[2].request("DISCONNECT")
422
423         logger.info("Connect legacy non-WPS client")
424         dev[2].request("FLUSH")
425         dev[2].request("P2P_SET disabled 1")
426         dev[2].connect(ssid=res['ssid'], psk=res['passphrase'], proto='RSN',
427                        key_mgmt='WPA-PSK', pairwise='CCMP', group='CCMP',
428                        scan_freq=res['freq'])
429         hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
430         dev[2].request("DISCONNECT")
431
432         dev[0].remove_group()
433         dev[1].wait_go_ending_session()
434     finally:
435         dev[0].request("SET p2p_passphrase_len 8")
436
437 def test_autogo_bridge(dev):
438     """P2P autonomous GO in a bridge"""
439     try:
440         # use autoscan to set scan_req = MANUAL_SCAN_REQ
441         if "OK" not in dev[0].request("AUTOSCAN periodic:1"):
442             raise Exception("Failed to set autoscan")
443         autogo(dev[0])
444         subprocess.call(['sudo', 'brctl', 'addbr', 'p2p-br0'])
445         subprocess.call(['sudo', 'brctl', 'setfd', 'p2p-br0', '0'])
446         subprocess.call(['sudo', 'brctl', 'addif', 'p2p-br0', dev[0].ifname])
447         subprocess.call(['sudo', 'ip', 'link', 'set', 'dev', 'p2p-br0', 'up'])
448         time.sleep(0.1)
449         subprocess.call(['sudo', 'brctl', 'delif', 'p2p-br0', dev[0].ifname])
450         time.sleep(0.1)
451         subprocess.call(['sudo', 'ip', 'link', 'set', 'dev', 'p2p-br0', 'down'])
452         time.sleep(0.1)
453         subprocess.call(['sudo', 'brctl', 'delbr', 'p2p-br0'])
454         ev = dev[0].wait_global_event(["P2P-GROUP-REMOVED"], timeout=1)
455         if ev is not None:
456             raise Exception("P2P group removed unexpectedly")
457         if dev[0].get_status_field('wpa_state') != "COMPLETED":
458             raise Exception("Unexpected wpa_state")
459         dev[0].remove_group()
460     finally:
461         dev[0].request("AUTOSCAN ")
462         subprocess.Popen(['sudo', 'brctl', 'delif', 'p2p-br0', dev[0].ifname],
463                          stderr=open('/dev/null', 'w'))
464         subprocess.Popen(['sudo', 'ip', 'link', 'set', 'dev', 'p2p-br0', 'down'],
465                          stderr=open('/dev/null', 'w'))
466         subprocess.Popen(['sudo', 'brctl', 'delbr', 'p2p-br0'],
467                          stderr=open('/dev/null', 'w'))
468
469 def test_presence_req_on_group_interface(dev):
470     """P2P_PRESENCE_REQ on group interface"""
471     dev[1].request("SET p2p_no_group_iface 0")
472     res = autogo(dev[0], freq=2437)
473     res = connect_cli(dev[0], dev[1], social=True, freq=2437)
474     if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400"):
475         raise Exception("Could not send presence request")
476     ev = dev[1].wait_group_event(["P2P-PRESENCE-RESPONSE"])
477     if ev is None:
478         raise Exception("Timeout while waiting for Presence Response")
479     dev[0].remove_group()
480     dev[1].wait_go_ending_session()