1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
9 logger = logging.getLogger()
21 from wpasupplicant import WpaSupplicant
22 from utils import HwsimSkip, alloc_fail, fail_test
23 from p2p_utils import *
24 from test_ap_tdls import connect_2sta_open
25 from test_ap_eap import check_altsubject_match_support
27 WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
28 WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
29 WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
30 WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
31 WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
32 WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
33 WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
34 WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
35 WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
36 WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
38 def prepare_dbus(dev):
40 logger.info("No dbus module available")
41 raise HwsimSkip("No dbus module available")
43 from dbus.mainloop.glib import DBusGMainLoop
44 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
45 bus = dbus.SystemBus()
46 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
47 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
48 path = wpas.GetInterface(dev.ifname)
49 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
50 return (bus,wpas_obj,path,if_obj)
52 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
54 class TestDbus(object):
55 def __init__(self, bus):
56 self.loop = gobject.MainLoop()
60 def __exit__(self, type, value, traceback):
61 for s in self.signals:
64 def add_signal(self, handler, interface, name, byte_arrays=False):
65 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
67 byte_arrays=byte_arrays)
68 self.signals.append(s)
70 def timeout(self, *args):
71 logger.debug("timeout")
75 class alloc_fail_dbus(object):
76 def __init__(self, dev, count, funcs, operation="Operation",
81 self._operation = operation
82 self._expected = expected
84 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
85 if "OK" not in self._dev.request(cmd):
86 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87 def __exit__(self, type, value, traceback):
89 raise Exception("%s succeeded during out-of-memory" % self._operation)
90 if type == dbus.exceptions.DBusException and self._expected in str(value):
92 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
93 raise Exception("%s did not trigger allocation failure" % self._operation)
96 def start_ap(ap, ssid="test-wps",
97 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
99 "wpa_passphrase": "12345678", "wpa": "2",
100 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
101 "ap_pin": "12345670", "uuid": ap_uuid}
102 return hostapd.add_ap(ap['ifname'], params)
104 def test_dbus_getall(dev, apdev):
106 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
108 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
109 dbus_interface=dbus.PROPERTIES_IFACE)
110 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
112 props = if_obj.GetAll(WPAS_DBUS_IFACE,
113 dbus_interface=dbus.PROPERTIES_IFACE)
114 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
116 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
120 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
121 dbus_interface=dbus.PROPERTIES_IFACE)
123 raise Exception("Unexpected BSSs entry: " + str(res))
125 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
126 dbus_interface=dbus.PROPERTIES_IFACE)
128 raise Exception("Unexpected Networks entry: " + str(res))
130 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
131 bssid = apdev[0]['bssid']
132 dev[0].scan_for_bss(bssid, freq=2412)
133 id = dev[0].add_network()
134 dev[0].set_network(id, "disabled", "0")
135 dev[0].set_network_quoted(id, "ssid", "test")
137 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
138 dbus_interface=dbus.PROPERTIES_IFACE)
140 raise Exception("Missing BSSs entry: " + str(res))
141 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
142 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
143 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
145 for item in props['BSSID']:
146 if len(bssid_str) > 0:
148 bssid_str += '%02x' % item
149 if bssid_str != bssid:
150 raise Exception("Unexpected BSSID in BSSs entry")
152 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
153 dbus_interface=dbus.PROPERTIES_IFACE)
155 raise Exception("Missing Networks entry: " + str(res))
156 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
157 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
158 dbus_interface=dbus.PROPERTIES_IFACE)
159 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
160 ssid = props['Properties']['ssid']
162 raise Exception("Unexpected SSID in network entry")
164 def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
165 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
166 dbus_interface=dbus.PROPERTIES_IFACE,
167 byte_arrays=byte_arrays)
168 if expect is not None and val != expect:
169 raise Exception("Unexpected %s: %s (expected: %s)" %
170 (prop, str(val), str(expect)))
173 def dbus_set(dbus, wpas_obj, prop, val):
174 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
175 dbus_interface=dbus.PROPERTIES_IFACE)
177 def test_dbus_properties(dev, apdev):
178 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
179 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
181 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
182 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
183 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
184 for (val,err) in [ (3, "Error.Failed: wrong property type"),
185 ("foo", "Error.Failed: wrong debug level value") ]:
187 dbus_set(dbus, wpas_obj, "DebugLevel", val)
188 raise Exception("Invalid DebugLevel value accepted: " + str(val))
189 except dbus.exceptions.DBusException, e:
190 if err not in str(e):
191 raise Exception("Unexpected error message: " + str(e))
192 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
193 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
195 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
196 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
197 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
199 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
200 raise Exception("Invalid DebugTimestamp value accepted")
201 except dbus.exceptions.DBusException, e:
202 if "Error.Failed: wrong property type" not in str(e):
203 raise Exception("Unexpected error message: " + str(e))
204 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
205 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
207 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
208 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
209 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
211 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
212 raise Exception("Invalid DebugShowKeys value accepted")
213 except dbus.exceptions.DBusException, e:
214 if "Error.Failed: wrong property type" not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
217 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
219 res = dbus_get(dbus, wpas_obj, "Interfaces")
221 raise Exception("Unexpected Interfaces value: " + str(res))
223 res = dbus_get(dbus, wpas_obj, "EapMethods")
224 if len(res) < 5 or "TTLS" not in res:
225 raise Exception("Unexpected EapMethods value: " + str(res))
227 res = dbus_get(dbus, wpas_obj, "Capabilities")
228 if len(res) < 2 or "p2p" not in res:
229 raise Exception("Unexpected Capabilities value: " + str(res))
231 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
232 val = binascii.unhexlify("010006020304050608")
233 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
234 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
236 raise Exception("WFDIEs value changed")
238 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
239 raise Exception("Invalid WFDIEs value accepted")
240 except dbus.exceptions.DBusException, e:
241 if "InvalidArgs" not in str(e):
242 raise Exception("Unexpected error message: " + str(e))
243 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
244 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
245 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
246 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
248 raise Exception("WFDIEs not cleared properly")
250 res = dbus_get(dbus, wpas_obj, "EapMethods")
252 dbus_set(dbus, wpas_obj, "EapMethods", res)
253 raise Exception("Invalid Set accepted")
254 except dbus.exceptions.DBusException, e:
255 if "InvalidArgs: Property is read-only" not in str(e):
256 raise Exception("Unexpected error message: " + str(e))
259 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
260 dbus_interface=dbus.PROPERTIES_IFACE)
261 raise Exception("Unknown method accepted")
262 except dbus.exceptions.DBusException, e:
263 if "UnknownMethod" not in str(e):
264 raise Exception("Unexpected error message: " + str(e))
267 wpas_obj.Get("foo", "DebugShowKeys",
268 dbus_interface=dbus.PROPERTIES_IFACE)
269 raise Exception("Invalid Get accepted")
270 except dbus.exceptions.DBusException, e:
271 if "InvalidArgs: No such property" not in str(e):
272 raise Exception("Unexpected error message: " + str(e))
274 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
277 test_obj.Get(123, "DebugShowKeys",
278 dbus_interface=dbus.PROPERTIES_IFACE)
279 raise Exception("Invalid Get accepted")
280 except dbus.exceptions.DBusException, e:
281 if "InvalidArgs: Invalid arguments" not in str(e):
282 raise Exception("Unexpected error message: " + str(e))
284 test_obj.Get(WPAS_DBUS_SERVICE, 123,
285 dbus_interface=dbus.PROPERTIES_IFACE)
286 raise Exception("Invalid Get accepted")
287 except dbus.exceptions.DBusException, e:
288 if "InvalidArgs: Invalid arguments" not in str(e):
289 raise Exception("Unexpected error message: " + str(e))
292 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
293 dbus.ByteArray('', variant_level=2),
294 dbus_interface=dbus.PROPERTIES_IFACE)
295 raise Exception("Invalid Set accepted")
296 except dbus.exceptions.DBusException, e:
297 if "InvalidArgs: invalid message format" not in str(e):
298 raise Exception("Unexpected error message: " + str(e))
300 def test_dbus_set_global_properties(dev, apdev):
301 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
302 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
304 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
307 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
308 dbus_interface=dbus.PROPERTIES_IFACE)
310 raise Exception("Unexpected " + p[0] + " value: " + str(res))
312 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
313 dbus_interface=dbus.PROPERTIES_IFACE)
315 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
316 dbus_interface=dbus.PROPERTIES_IFACE)
318 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
320 def test_dbus_invalid_method(dev, apdev):
321 """D-Bus invalid method"""
322 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
323 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
327 raise Exception("Unknown method accepted")
328 except dbus.exceptions.DBusException, e:
329 if "UnknownMethod" not in str(e):
330 raise Exception("Unexpected error message: " + str(e))
332 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
333 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
336 raise Exception("WPS.Start with incorrect signature accepted")
337 except dbus.exceptions.DBusException, e:
338 if "InvalidArgs: Invalid arg" not in str(e):
339 raise Exception("Unexpected error message: " + str(e))
341 def test_dbus_get_set_wps(dev, apdev):
342 """D-Bus Get/Set for WPS properties"""
344 _test_dbus_get_set_wps(dev, apdev)
346 dev[0].request("SET wps_cred_processing 0")
347 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
349 def _test_dbus_get_set_wps(dev, apdev):
350 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
352 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
353 dbus_interface=dbus.PROPERTIES_IFACE)
355 val = "display keypad virtual_display nfc_interface"
356 dev[0].request("SET config_methods " + val)
358 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
359 dbus_interface=dbus.PROPERTIES_IFACE)
361 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
363 val2 = "push_button display"
364 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
365 dbus_interface=dbus.PROPERTIES_IFACE)
366 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
367 dbus_interface=dbus.PROPERTIES_IFACE)
369 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
371 dev[0].request("SET config_methods " + val)
374 dev[0].request("SET wps_cred_processing " + str(i))
375 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
376 dbus_interface=dbus.PROPERTIES_IFACE)
377 expected_val = False if i == 1 else True
378 if val != expected_val:
379 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
381 class TestDbusGetSet(TestDbus):
382 def __init__(self, bus):
383 TestDbus.__init__(self, bus)
384 self.signal_received = False
385 self.signal_received_deprecated = False
386 self.sets_done = False
389 gobject.timeout_add(1, self.run_sets)
390 gobject.timeout_add(1000, self.timeout)
391 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
393 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
398 def propertiesChanged(self, properties):
399 logger.debug("PropertiesChanged: " + str(properties))
400 if properties.has_key("ProcessCredentials"):
401 self.signal_received_deprecated = True
402 if self.sets_done and self.signal_received:
405 def propertiesChanged2(self, interface_name, changed_properties,
406 invalidated_properties):
407 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
408 if interface_name != WPAS_DBUS_IFACE_WPS:
410 if changed_properties.has_key("ProcessCredentials"):
411 self.signal_received = True
412 if self.sets_done and self.signal_received_deprecated:
415 def run_sets(self, *args):
416 logger.debug("run_sets")
417 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
419 dbus_interface=dbus.PROPERTIES_IFACE)
420 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
421 dbus_interface=dbus.PROPERTIES_IFACE) != True:
422 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
423 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
425 dbus_interface=dbus.PROPERTIES_IFACE)
426 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
427 dbus_interface=dbus.PROPERTIES_IFACE) != False:
428 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
430 self.dbus_sets_done = True
434 return self.signal_received and self.signal_received_deprecated
436 with TestDbusGetSet(bus) as t:
438 raise Exception("No signal received for ProcessCredentials change")
440 def test_dbus_wps_invalid(dev, apdev):
441 """D-Bus invaldi WPS operation"""
442 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
443 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
445 failures = [ {'Role': 'foo', 'Type': 'pbc'},
446 {'Role': 123, 'Type': 'pbc'},
448 {'Role': 'enrollee'},
449 {'Role': 'registrar'},
450 {'Role': 'enrollee', 'Type': 123},
451 {'Role': 'enrollee', 'Type': 'foo'},
452 {'Role': 'enrollee', 'Type': 'pbc',
453 'Bssid': '02:33:44:55:66:77'},
454 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
455 {'Role': 'enrollee', 'Type': 'pbc',
456 'Bssid': dbus.ByteArray('12345')},
457 {'Role': 'enrollee', 'Type': 'pbc',
458 'P2PDeviceAddress': 12345},
459 {'Role': 'enrollee', 'Type': 'pbc',
460 'P2PDeviceAddress': dbus.ByteArray('12345')},
461 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
462 for args in failures:
465 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
466 except dbus.exceptions.DBusException, e:
467 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
468 raise Exception("Unexpected error message: " + str(e))
470 def test_dbus_wps_oom(dev, apdev):
471 """D-Bus WPS operation (OOM)"""
472 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
473 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
475 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
476 if_obj.Get(WPAS_DBUS_IFACE, "State",
477 dbus_interface=dbus.PROPERTIES_IFACE)
479 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
480 bssid = apdev[0]['bssid']
481 dev[0].scan_for_bss(bssid, freq=2412)
484 for i in range(1, 3):
485 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
486 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
487 dbus_interface=dbus.PROPERTIES_IFACE)
489 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
490 dbus_interface=dbus.PROPERTIES_IFACE)
491 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
492 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
493 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
494 dbus_interface=dbus.PROPERTIES_IFACE)
496 id = dev[0].add_network()
497 dev[0].set_network(id, "disabled", "0")
498 dev[0].set_network_quoted(id, "ssid", "test")
500 for i in range(1, 3):
501 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
502 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
503 dbus_interface=dbus.PROPERTIES_IFACE)
505 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
506 dbus_get(dbus, wpas_obj, "Interfaces")
508 for i in range(1, 6):
509 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
510 dbus_get(dbus, wpas_obj, "EapMethods")
512 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
513 expected="Error.Failed: Failed to set property"):
514 val2 = "push_button display"
515 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
516 dbus_interface=dbus.PROPERTIES_IFACE)
518 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
520 expected="UnknownError: WPS start failed"):
521 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
523 def test_dbus_wps_pbc(dev, apdev):
524 """D-Bus WPS/PBC operation and signals"""
526 _test_dbus_wps_pbc(dev, apdev)
528 dev[0].request("SET wps_cred_processing 0")
530 def _test_dbus_wps_pbc(dev, apdev):
531 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
532 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
534 hapd = start_ap(apdev[0])
535 hapd.request("WPS_PBC")
536 bssid = apdev[0]['bssid']
537 dev[0].scan_for_bss(bssid, freq="2412")
538 dev[0].request("SET wps_cred_processing 2")
540 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
541 dbus_interface=dbus.PROPERTIES_IFACE)
543 raise Exception("Missing BSSs entry: " + str(res))
544 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
545 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
546 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
547 if 'WPS' not in props:
548 raise Exception("No WPS information in the BSS entry")
549 if 'Type' not in props['WPS']:
550 raise Exception("No Type field in the WPS dictionary")
551 if props['WPS']['Type'] != 'pbc':
552 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
554 class TestDbusWps(TestDbus):
555 def __init__(self, bus, wps):
556 TestDbus.__init__(self, bus)
557 self.success_seen = False
558 self.credentials_received = False
562 gobject.timeout_add(1, self.start_pbc)
563 gobject.timeout_add(15000, self.timeout)
564 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
565 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
570 def wpsEvent(self, name, args):
571 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
572 if name == "success":
573 self.success_seen = True
574 if self.credentials_received:
577 def credentials(self, args):
578 logger.debug("credentials: " + str(args))
579 self.credentials_received = True
580 if self.success_seen:
583 def start_pbc(self, *args):
584 logger.debug("start_pbc")
585 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
589 return self.success_seen and self.credentials_received
591 with TestDbusWps(bus, wps) as t:
593 raise Exception("Failure in D-Bus operations")
595 dev[0].wait_connected(timeout=10)
596 dev[0].request("DISCONNECT")
598 dev[0].flush_scan_cache()
600 def test_dbus_wps_pbc_overlap(dev, apdev):
601 """D-Bus WPS/PBC operation and signal for PBC overlap"""
602 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
603 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
605 hapd = start_ap(apdev[0])
606 hapd2 = start_ap(apdev[1], ssid="test-wps2",
607 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
608 hapd.request("WPS_PBC")
609 hapd2.request("WPS_PBC")
610 bssid = apdev[0]['bssid']
611 dev[0].scan_for_bss(bssid, freq="2412")
612 bssid2 = apdev[1]['bssid']
613 dev[0].scan_for_bss(bssid2, freq="2412")
615 class TestDbusWps(TestDbus):
616 def __init__(self, bus, wps):
617 TestDbus.__init__(self, bus)
618 self.overlap_seen = False
622 gobject.timeout_add(1, self.start_pbc)
623 gobject.timeout_add(15000, self.timeout)
624 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
628 def wpsEvent(self, name, args):
629 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
630 if name == "pbc-overlap":
631 self.overlap_seen = True
634 def start_pbc(self, *args):
635 logger.debug("start_pbc")
636 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
640 return self.overlap_seen
642 with TestDbusWps(bus, wps) as t:
644 raise Exception("Failure in D-Bus operations")
646 dev[0].request("WPS_CANCEL")
647 dev[0].request("DISCONNECT")
649 dev[0].flush_scan_cache()
651 def test_dbus_wps_pin(dev, apdev):
652 """D-Bus WPS/PIN operation and signals"""
654 _test_dbus_wps_pin(dev, apdev)
656 dev[0].request("SET wps_cred_processing 0")
658 def _test_dbus_wps_pin(dev, apdev):
659 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
660 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
662 hapd = start_ap(apdev[0])
663 hapd.request("WPS_PIN any 12345670")
664 bssid = apdev[0]['bssid']
665 dev[0].scan_for_bss(bssid, freq="2412")
666 dev[0].request("SET wps_cred_processing 2")
668 class TestDbusWps(TestDbus):
669 def __init__(self, bus):
670 TestDbus.__init__(self, bus)
671 self.success_seen = False
672 self.credentials_received = False
675 gobject.timeout_add(1, self.start_pin)
676 gobject.timeout_add(15000, self.timeout)
677 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
678 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
683 def wpsEvent(self, name, args):
684 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
685 if name == "success":
686 self.success_seen = True
687 if self.credentials_received:
690 def credentials(self, args):
691 logger.debug("credentials: " + str(args))
692 self.credentials_received = True
693 if self.success_seen:
696 def start_pin(self, *args):
697 logger.debug("start_pin")
698 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
699 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
704 return self.success_seen and self.credentials_received
706 with TestDbusWps(bus) as t:
708 raise Exception("Failure in D-Bus operations")
710 dev[0].wait_connected(timeout=10)
712 def test_dbus_wps_pin2(dev, apdev):
713 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
715 _test_dbus_wps_pin2(dev, apdev)
717 dev[0].request("SET wps_cred_processing 0")
719 def _test_dbus_wps_pin2(dev, apdev):
720 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
721 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
723 hapd = start_ap(apdev[0])
724 bssid = apdev[0]['bssid']
725 dev[0].scan_for_bss(bssid, freq="2412")
726 dev[0].request("SET wps_cred_processing 2")
728 class TestDbusWps(TestDbus):
729 def __init__(self, bus):
730 TestDbus.__init__(self, bus)
731 self.success_seen = False
735 gobject.timeout_add(1, self.start_pin)
736 gobject.timeout_add(15000, self.timeout)
737 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
738 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
743 def wpsEvent(self, name, args):
744 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
745 if name == "success":
746 self.success_seen = True
747 if self.credentials_received:
750 def credentials(self, args):
751 logger.debug("credentials: " + str(args))
752 self.credentials_received = True
753 if self.success_seen:
756 def start_pin(self, *args):
757 logger.debug("start_pin")
758 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
759 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
762 h = hostapd.Hostapd(apdev[0]['ifname'])
763 h.request("WPS_PIN any " + pin)
767 return self.success_seen and self.credentials_received
769 with TestDbusWps(bus) as t:
771 raise Exception("Failure in D-Bus operations")
773 dev[0].wait_connected(timeout=10)
775 def test_dbus_wps_pin_m2d(dev, apdev):
776 """D-Bus WPS/PIN operation and signals with M2D"""
778 _test_dbus_wps_pin_m2d(dev, apdev)
780 dev[0].request("SET wps_cred_processing 0")
782 def _test_dbus_wps_pin_m2d(dev, apdev):
783 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
784 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
786 hapd = start_ap(apdev[0])
787 bssid = apdev[0]['bssid']
788 dev[0].scan_for_bss(bssid, freq="2412")
789 dev[0].request("SET wps_cred_processing 2")
791 class TestDbusWps(TestDbus):
792 def __init__(self, bus):
793 TestDbus.__init__(self, bus)
794 self.success_seen = False
795 self.credentials_received = False
798 gobject.timeout_add(1, self.start_pin)
799 gobject.timeout_add(15000, self.timeout)
800 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
801 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
806 def wpsEvent(self, name, args):
807 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
808 if name == "success":
809 self.success_seen = True
810 if self.credentials_received:
813 h = hostapd.Hostapd(apdev[0]['ifname'])
814 h.request("WPS_PIN any 12345670")
816 def credentials(self, args):
817 logger.debug("credentials: " + str(args))
818 self.credentials_received = True
819 if self.success_seen:
822 def start_pin(self, *args):
823 logger.debug("start_pin")
824 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
825 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
830 return self.success_seen and self.credentials_received
832 with TestDbusWps(bus) as t:
834 raise Exception("Failure in D-Bus operations")
836 dev[0].wait_connected(timeout=10)
838 def test_dbus_wps_reg(dev, apdev):
839 """D-Bus WPS/Registrar operation and signals"""
841 _test_dbus_wps_reg(dev, apdev)
843 dev[0].request("SET wps_cred_processing 0")
845 def _test_dbus_wps_reg(dev, apdev):
846 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
847 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
849 hapd = start_ap(apdev[0])
850 hapd.request("WPS_PIN any 12345670")
851 bssid = apdev[0]['bssid']
852 dev[0].scan_for_bss(bssid, freq="2412")
853 dev[0].request("SET wps_cred_processing 2")
855 class TestDbusWps(TestDbus):
856 def __init__(self, bus):
857 TestDbus.__init__(self, bus)
858 self.credentials_received = False
861 gobject.timeout_add(100, self.start_reg)
862 gobject.timeout_add(15000, self.timeout)
863 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
864 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
869 def wpsEvent(self, name, args):
870 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
872 def credentials(self, args):
873 logger.debug("credentials: " + str(args))
874 self.credentials_received = True
877 def start_reg(self, *args):
878 logger.debug("start_reg")
879 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
880 wps.Start({'Role': 'registrar', 'Type': 'pin',
881 'Pin': '12345670', 'Bssid': bssid_ay})
885 return self.credentials_received
887 with TestDbusWps(bus) as t:
889 raise Exception("Failure in D-Bus operations")
891 dev[0].wait_connected(timeout=10)
893 def test_dbus_wps_cancel(dev, apdev):
894 """D-Bus WPS Cancel operation"""
895 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
896 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
898 hapd = start_ap(apdev[0])
899 bssid = apdev[0]['bssid']
902 dev[0].scan_for_bss(bssid, freq="2412")
903 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
904 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
907 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
909 def test_dbus_scan_invalid(dev, apdev):
910 """D-Bus invalid scan method"""
911 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
912 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
914 tests = [ ({}, "InvalidArgs"),
915 ({'Type': 123}, "InvalidArgs"),
916 ({'Type': 'foo'}, "InvalidArgs"),
917 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
918 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
919 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
921 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
922 dbus.ByteArray("3"), dbus.ByteArray("4"),
923 dbus.ByteArray("5"), dbus.ByteArray("6"),
924 dbus.ByteArray("7"), dbus.ByteArray("8"),
925 dbus.ByteArray("9"), dbus.ByteArray("10"),
926 dbus.ByteArray("11"), dbus.ByteArray("12"),
927 dbus.ByteArray("13"), dbus.ByteArray("14"),
928 dbus.ByteArray("15"), dbus.ByteArray("16"),
929 dbus.ByteArray("17") ]},
932 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
934 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
935 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
936 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
937 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
939 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
942 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
944 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
945 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
947 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
949 for (t,err) in tests:
952 raise Exception("Invalid Scan() arguments accepted: " + str(t))
953 except dbus.exceptions.DBusException, e:
954 if err not in str(e):
955 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
957 def test_dbus_scan_oom(dev, apdev):
958 """D-Bus scan method and OOM"""
959 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
960 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
962 with alloc_fail_dbus(dev[0], 1,
963 "wpa_scan_clone_params;wpas_dbus_handler_scan",
964 "Scan", expected="ScanError: Scan request rejected"):
965 iface.Scan({ 'Type': 'passive',
966 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
968 with alloc_fail_dbus(dev[0], 1,
969 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
971 iface.Scan({ 'Type': 'passive',
972 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
974 with alloc_fail_dbus(dev[0], 1,
975 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
977 iface.Scan({ 'Type': 'active',
978 'IEs': [ dbus.ByteArray("\xdd\x00") ],
979 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
981 with alloc_fail_dbus(dev[0], 1,
982 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
984 iface.Scan({ 'Type': 'active',
985 'SSIDs': [ dbus.ByteArray("open"),
987 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
989 def test_dbus_scan(dev, apdev):
990 """D-Bus scan and related signals"""
991 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
992 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
994 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
996 class TestDbusScan(TestDbus):
997 def __init__(self, bus):
998 TestDbus.__init__(self, bus)
999 self.scan_completed = 0
1000 self.bss_added = False
1001 self.fail_reason = None
1003 def __enter__(self):
1004 gobject.timeout_add(1, self.run_scan)
1005 gobject.timeout_add(15000, self.timeout)
1006 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1007 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1008 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1012 def scanDone(self, success):
1013 logger.debug("scanDone: success=%s" % success)
1014 self.scan_completed += 1
1015 if self.scan_completed == 1:
1016 iface.Scan({'Type': 'passive',
1018 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1019 elif self.scan_completed == 2:
1020 iface.Scan({'Type': 'passive',
1021 'AllowRoam': False})
1022 elif self.bss_added and self.scan_completed == 3:
1025 def bssAdded(self, bss, properties):
1026 logger.debug("bssAdded: %s" % bss)
1027 logger.debug(str(properties))
1028 if 'WPS' in properties:
1029 if 'Type' in properties['WPS']:
1030 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1032 self.bss_added = True
1033 if self.scan_completed == 3:
1036 def bssRemoved(self, bss):
1037 logger.debug("bssRemoved: %s" % bss)
1039 def run_scan(self, *args):
1040 logger.debug("run_scan")
1041 iface.Scan({'Type': 'active',
1042 'SSIDs': [ dbus.ByteArray("open"),
1044 'IEs': [ dbus.ByteArray("\xdd\x00"),
1047 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1051 return self.scan_completed == 3 and self.bss_added
1053 with TestDbusScan(bus) as t:
1055 raise Exception(t.fail_reason)
1057 raise Exception("Expected signals not seen")
1059 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1060 dbus_interface=dbus.PROPERTIES_IFACE)
1062 raise Exception("Scan result not in BSSs property")
1064 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1065 dbus_interface=dbus.PROPERTIES_IFACE)
1067 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1070 def test_dbus_scan_busy(dev, apdev):
1071 """D-Bus scan trigger rejection when busy with previous scan"""
1072 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1073 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1075 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1076 raise Exception("Failed to start scan")
1077 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1079 raise Exception("Scan start timed out")
1082 iface.Scan({'Type': 'active', 'AllowRoam': False})
1083 raise Exception("Scan() accepted when busy")
1084 except dbus.exceptions.DBusException, e:
1085 if "ScanError: Scan request reject" not in str(e):
1086 raise Exception("Unexpected error message: " + str(e))
1088 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1090 raise Exception("Scan timed out")
1092 def test_dbus_connect(dev, apdev):
1093 """D-Bus AddNetwork and connect"""
1094 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1095 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1097 ssid = "test-wpa2-psk"
1098 passphrase = 'qwertyuiop'
1099 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1100 hapd = hostapd.add_ap(apdev[0], params)
1102 class TestDbusConnect(TestDbus):
1103 def __init__(self, bus):
1104 TestDbus.__init__(self, bus)
1105 self.network_added = False
1106 self.network_selected = False
1107 self.network_removed = False
1110 def __enter__(self):
1111 gobject.timeout_add(1, self.run_connect)
1112 gobject.timeout_add(15000, self.timeout)
1113 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1114 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1116 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1118 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1119 "PropertiesChanged")
1123 def networkAdded(self, network, properties):
1124 logger.debug("networkAdded: %s" % str(network))
1125 logger.debug(str(properties))
1126 self.network_added = True
1128 def networkRemoved(self, network):
1129 logger.debug("networkRemoved: %s" % str(network))
1130 self.network_removed = True
1132 def networkSelected(self, network):
1133 logger.debug("networkSelected: %s" % str(network))
1134 self.network_selected = True
1136 def propertiesChanged(self, properties):
1137 logger.debug("propertiesChanged: %s" % str(properties))
1138 if 'State' in properties and properties['State'] == "completed":
1142 elif self.state == 2:
1145 elif self.state == 4:
1148 elif self.state == 5:
1151 elif self.state == 7:
1153 res = iface.SignalPoll()
1154 logger.debug("SignalPoll: " + str(res))
1155 if 'frequency' not in res or res['frequency'] != 2412:
1157 logger.info("Unexpected SignalPoll result")
1158 iface.RemoveNetwork(self.netw)
1159 if 'State' in properties and properties['State'] == "disconnected":
1162 iface.SelectNetwork(self.netw)
1163 elif self.state == 3:
1166 elif self.state == 6:
1169 elif self.state == 8:
1173 def run_connect(self, *args):
1174 logger.debug("run_connect")
1175 args = dbus.Dictionary({ 'ssid': ssid,
1176 'key_mgmt': 'WPA-PSK',
1178 'scan_freq': 2412 },
1180 self.netw = iface.AddNetwork(args)
1181 iface.SelectNetwork(self.netw)
1185 if not self.network_added or \
1186 not self.network_removed or \
1187 not self.network_selected:
1189 return self.state == 9
1191 with TestDbusConnect(bus) as t:
1193 raise Exception("Expected signals not seen")
1195 def test_dbus_connect_psk_mem(dev, apdev):
1196 """D-Bus AddNetwork and connect with memory-only PSK"""
1197 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1198 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1200 ssid = "test-wpa2-psk"
1201 passphrase = 'qwertyuiop'
1202 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1203 hapd = hostapd.add_ap(apdev[0], params)
1205 class TestDbusConnect(TestDbus):
1206 def __init__(self, bus):
1207 TestDbus.__init__(self, bus)
1208 self.connected = False
1210 def __enter__(self):
1211 gobject.timeout_add(1, self.run_connect)
1212 gobject.timeout_add(15000, self.timeout)
1213 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1214 "PropertiesChanged")
1215 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1220 def propertiesChanged(self, properties):
1221 logger.debug("propertiesChanged: %s" % str(properties))
1222 if 'State' in properties and properties['State'] == "completed":
1223 self.connected = True
1226 def networkRequest(self, path, field, txt):
1227 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1228 if field == "PSK_PASSPHRASE":
1229 iface.NetworkReply(path, field, '"' + passphrase + '"')
1231 def run_connect(self, *args):
1232 logger.debug("run_connect")
1233 args = dbus.Dictionary({ 'ssid': ssid,
1234 'key_mgmt': 'WPA-PSK',
1236 'scan_freq': 2412 },
1238 self.netw = iface.AddNetwork(args)
1239 iface.SelectNetwork(self.netw)
1243 return self.connected
1245 with TestDbusConnect(bus) as t:
1247 raise Exception("Expected signals not seen")
1249 def test_dbus_connect_oom(dev, apdev):
1250 """D-Bus AddNetwork and connect when out-of-memory"""
1251 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1252 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1254 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1255 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1257 ssid = "test-wpa2-psk"
1258 passphrase = 'qwertyuiop'
1259 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1260 hapd = hostapd.add_ap(apdev[0], params)
1262 class TestDbusConnect(TestDbus):
1263 def __init__(self, bus):
1264 TestDbus.__init__(self, bus)
1265 self.network_added = False
1266 self.network_selected = False
1267 self.network_removed = False
1270 def __enter__(self):
1271 gobject.timeout_add(1, self.run_connect)
1272 gobject.timeout_add(1500, self.timeout)
1273 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1274 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1276 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1278 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1279 "PropertiesChanged")
1283 def networkAdded(self, network, properties):
1284 logger.debug("networkAdded: %s" % str(network))
1285 logger.debug(str(properties))
1286 self.network_added = True
1288 def networkRemoved(self, network):
1289 logger.debug("networkRemoved: %s" % str(network))
1290 self.network_removed = True
1292 def networkSelected(self, network):
1293 logger.debug("networkSelected: %s" % str(network))
1294 self.network_selected = True
1296 def propertiesChanged(self, properties):
1297 logger.debug("propertiesChanged: %s" % str(properties))
1298 if 'State' in properties and properties['State'] == "completed":
1302 elif self.state == 2:
1305 elif self.state == 4:
1308 elif self.state == 5:
1310 res = iface.SignalPoll()
1311 logger.debug("SignalPoll: " + str(res))
1312 if 'frequency' not in res or res['frequency'] != 2412:
1314 logger.info("Unexpected SignalPoll result")
1315 iface.RemoveNetwork(self.netw)
1316 if 'State' in properties and properties['State'] == "disconnected":
1319 iface.SelectNetwork(self.netw)
1320 elif self.state == 3:
1323 elif self.state == 6:
1327 def run_connect(self, *args):
1328 logger.debug("run_connect")
1329 args = dbus.Dictionary({ 'ssid': ssid,
1330 'key_mgmt': 'WPA-PSK',
1332 'scan_freq': 2412 },
1335 self.netw = iface.AddNetwork(args)
1336 except Exception, e:
1337 logger.info("Exception on AddNetwork: " + str(e))
1341 iface.SelectNetwork(self.netw)
1342 except Exception, e:
1343 logger.info("Exception on SelectNetwork: " + str(e))
1349 if not self.network_added or \
1350 not self.network_removed or \
1351 not self.network_selected:
1353 return self.state == 7
1356 for i in range(1, 1000):
1358 dev[j].dump_monitor()
1359 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1361 with TestDbusConnect(bus) as t:
1363 logger.info("Iteration %d - Expected signals not seen" % i)
1365 logger.info("Iteration %d - success" % i)
1367 state = dev[0].request('GET_ALLOC_FAIL')
1368 logger.info("GET_ALLOC_FAIL: " + state)
1369 dev[0].dump_monitor()
1370 dev[0].request("TEST_ALLOC_FAIL 0:")
1372 raise Exception("Connection succeeded during out-of-memory")
1373 if not state.startswith('0:'):
1380 # Force regulatory update to re-fetch hw capabilities for the following
1383 dev[0].dump_monitor()
1384 subprocess.call(['iw', 'reg', 'set', 'US'])
1385 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1387 dev[0].dump_monitor()
1388 subprocess.call(['iw', 'reg', 'set', '00'])
1389 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1391 def test_dbus_while_not_connected(dev, apdev):
1392 """D-Bus invalid operations while not connected"""
1393 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1394 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1398 raise Exception("Disconnect() accepted when not connected")
1399 except dbus.exceptions.DBusException, e:
1400 if "NotConnected" not in str(e):
1401 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1405 raise Exception("Reattach() accepted when not connected")
1406 except dbus.exceptions.DBusException, e:
1407 if "NotConnected" not in str(e):
1408 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1410 def test_dbus_connect_eap(dev, apdev):
1411 """D-Bus AddNetwork and connect to EAP network"""
1412 check_altsubject_match_support(dev[0])
1413 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1414 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1416 ssid = "ieee8021x-open"
1417 params = hostapd.radius_params()
1418 params["ssid"] = ssid
1419 params["ieee8021x"] = "1"
1420 hapd = hostapd.add_ap(apdev[0], params)
1422 class TestDbusConnect(TestDbus):
1423 def __init__(self, bus):
1424 TestDbus.__init__(self, bus)
1425 self.certification_received = False
1426 self.eap_status = False
1429 def __enter__(self):
1430 gobject.timeout_add(1, self.run_connect)
1431 gobject.timeout_add(15000, self.timeout)
1432 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1433 "PropertiesChanged")
1434 self.add_signal(self.certification, WPAS_DBUS_IFACE,
1435 "Certification", byte_arrays=True)
1436 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1438 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1442 def propertiesChanged(self, properties):
1443 logger.debug("propertiesChanged: %s" % str(properties))
1444 if 'State' in properties and properties['State'] == "completed":
1448 logger.info("Set dNSName constraint")
1449 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1450 args = dbus.Dictionary({ 'altsubject_match':
1451 self.server_dnsname },
1453 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1454 dbus_interface=dbus.PROPERTIES_IFACE)
1455 elif self.state == 2:
1458 logger.info("Set non-matching dNSName constraint")
1459 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1460 args = dbus.Dictionary({ 'altsubject_match':
1461 self.server_dnsname + "FOO" },
1463 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1464 dbus_interface=dbus.PROPERTIES_IFACE)
1465 if 'State' in properties and properties['State'] == "disconnected":
1469 iface.SelectNetwork(self.netw)
1472 iface.SelectNetwork(self.netw)
1474 def certification(self, args):
1475 logger.debug("certification: %s" % str(args))
1476 self.certification_received = True
1477 if args['depth'] == 0:
1478 # The test server certificate is supposed to have dNSName
1479 if len(args['altsubject']) < 1:
1480 raise Exception("Missing dNSName")
1481 dnsname = args['altsubject'][0]
1482 if not dnsname.startswith("DNS:"):
1483 raise Exception("Expected dNSName not found: " + dnsname)
1484 logger.info("altsubject: " + dnsname)
1485 self.server_dnsname = dnsname
1487 def eap(self, status, parameter):
1488 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1489 if status == 'completion' and parameter == 'success':
1490 self.eap_status = True
1491 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1495 def networkRequest(self, path, field, txt):
1496 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1497 if field == "PASSWORD":
1498 iface.NetworkReply(path, field, "password")
1500 def run_connect(self, *args):
1501 logger.debug("run_connect")
1502 args = dbus.Dictionary({ 'ssid': ssid,
1503 'key_mgmt': 'IEEE8021X',
1506 'anonymous_identity': 'ttls',
1507 'identity': 'pap user',
1508 'ca_cert': 'auth_serv/ca.pem',
1509 'phase2': 'auth=PAP',
1510 'scan_freq': 2412 },
1512 self.netw = iface.AddNetwork(args)
1513 iface.SelectNetwork(self.netw)
1517 if not self.eap_status or not self.certification_received:
1519 return self.state == 5
1521 with TestDbusConnect(bus) as t:
1523 raise Exception("Expected signals not seen")
1525 def test_dbus_network(dev, apdev):
1526 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1527 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1528 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1530 args = dbus.Dictionary({ 'ssid': "foo",
1531 'key_mgmt': 'WPA-PSK',
1533 'identity': dbus.ByteArray([ 1, 2 ]),
1534 'priority': dbus.Int32(0),
1535 'scan_freq': dbus.UInt32(2412) },
1537 netw = iface.AddNetwork(args)
1538 id = int(dev[0].list_networks()[0]['id'])
1539 val = dev[0].get_network(id, "scan_freq")
1541 raise Exception("Invalid scan_freq value: " + str(val))
1542 iface.RemoveNetwork(netw)
1544 args = dbus.Dictionary({ 'ssid': "foo",
1546 'scan_freq': "2412 2432",
1547 'freq_list': "2412 2417 2432" },
1549 netw = iface.AddNetwork(args)
1550 id = int(dev[0].list_networks()[0]['id'])
1551 val = dev[0].get_network(id, "scan_freq")
1552 if val != "2412 2432":
1553 raise Exception("Invalid scan_freq value (2): " + str(val))
1554 val = dev[0].get_network(id, "freq_list")
1555 if val != "2412 2417 2432":
1556 raise Exception("Invalid freq_list value: " + str(val))
1557 iface.RemoveNetwork(netw)
1559 iface.RemoveNetwork(netw)
1560 raise Exception("Invalid RemoveNetwork() accepted")
1561 except dbus.exceptions.DBusException, e:
1562 if "NetworkUnknown" not in str(e):
1563 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1565 iface.SelectNetwork(netw)
1566 raise Exception("Invalid SelectNetwork() accepted")
1567 except dbus.exceptions.DBusException, e:
1568 if "NetworkUnknown" not in str(e):
1569 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1571 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1572 'identity': "testuser", 'scan_freq': '2412' },
1574 netw1 = iface.AddNetwork(args)
1575 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1577 netw2 = iface.AddNetwork(args)
1578 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1579 dbus_interface=dbus.PROPERTIES_IFACE)
1581 raise Exception("Unexpected number of networks")
1583 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1584 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1585 dbus_interface=dbus.PROPERTIES_IFACE)
1587 raise Exception("Added network was unexpectedly enabled by default")
1588 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1589 dbus_interface=dbus.PROPERTIES_IFACE)
1590 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1591 dbus_interface=dbus.PROPERTIES_IFACE)
1593 raise Exception("Set(Enabled,True) did not seem to change property value")
1594 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1595 dbus_interface=dbus.PROPERTIES_IFACE)
1596 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1597 dbus_interface=dbus.PROPERTIES_IFACE)
1599 raise Exception("Set(Enabled,False) did not seem to change property value")
1601 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1602 dbus_interface=dbus.PROPERTIES_IFACE)
1603 raise Exception("Invalid Set(Enabled,1) accepted")
1604 except dbus.exceptions.DBusException, e:
1605 if "Error.Failed: wrong property type" not in str(e):
1606 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1608 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1609 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1610 dbus_interface=dbus.PROPERTIES_IFACE)
1611 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1612 dbus_interface=dbus.PROPERTIES_IFACE)
1613 if res['ssid'] != '"foo1new"':
1614 raise Exception("Set(Properties) failed to update ssid")
1615 if res['identity'] != '"testuser"':
1616 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1618 iface.RemoveAllNetworks()
1619 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1620 dbus_interface=dbus.PROPERTIES_IFACE)
1622 raise Exception("Unexpected number of networks")
1623 iface.RemoveAllNetworks()
1625 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1626 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1628 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1629 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1632 iface.AddNetwork(args)
1633 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1634 except dbus.exceptions.DBusException, e:
1635 if "InvalidArgs" not in str(e):
1636 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1638 def test_dbus_network_oom(dev, apdev):
1639 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1640 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1641 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1643 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1644 'identity': "testuser", 'scan_freq': '2412' },
1646 netw1 = iface.AddNetwork(args)
1647 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1649 with alloc_fail_dbus(dev[0], 1,
1650 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1652 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1653 dbus_interface=dbus.PROPERTIES_IFACE)
1655 iface.RemoveAllNetworks()
1657 with alloc_fail_dbus(dev[0], 1,
1658 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1659 "RemoveNetwork", "InvalidArgs"):
1660 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1662 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1663 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1666 netw = iface.AddNetwork(args)
1667 # Currently, AddNetwork() succeeds even if os_strdup() for path
1668 # fails, so remove the network if that occurs.
1669 iface.RemoveNetwork(netw)
1670 except dbus.exceptions.DBusException, e:
1673 for i in range(1, 3):
1674 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1676 netw = iface.AddNetwork(args)
1677 # Currently, AddNetwork() succeeds even if network registration
1678 # fails, so remove the network if that occurs.
1679 iface.RemoveNetwork(netw)
1680 except dbus.exceptions.DBusException, e:
1683 with alloc_fail_dbus(dev[0], 1,
1684 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1686 "UnknownError: wpa_supplicant could not add a network"):
1687 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1689 netw = iface.AddNetwork(args)
1692 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1693 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1695 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1696 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1697 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1698 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1699 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1700 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1702 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1703 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1705 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1706 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1708 for (count,funcs,args) in tests:
1709 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1710 netw = iface.AddNetwork(args)
1712 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1713 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1714 raise Exception("Unexpected network block added")
1715 if len(dev[0].list_networks()) > 0:
1716 raise Exception("Unexpected network block visible")
1718 def test_dbus_interface(dev, apdev):
1719 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1721 _test_dbus_interface(dev, apdev)
1723 # Need to force P2P channel list update since the 'lo' interface
1724 # with driver=none ends up configuring default dualband channels.
1725 dev[0].request("SET country US")
1726 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1728 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1730 dev[0].request("SET country 00")
1731 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1733 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1735 subprocess.call(['iw', 'reg', 'set', '00'])
1737 def _test_dbus_interface(dev, apdev):
1738 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1739 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1741 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1743 path = wpas.CreateInterface(params)
1744 logger.debug("New interface path: " + str(path))
1745 path2 = wpas.GetInterface("lo")
1747 raise Exception("Interface object mismatch")
1749 params = dbus.Dictionary({ 'Ifname': 'lo',
1751 'ConfigFile': 'foo',
1752 'BridgeIfname': 'foo', },
1755 wpas.CreateInterface(params)
1756 raise Exception("Invalid CreateInterface() accepted")
1757 except dbus.exceptions.DBusException, e:
1758 if "InterfaceExists" not in str(e):
1759 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1761 wpas.RemoveInterface(path)
1763 wpas.RemoveInterface(path)
1764 raise Exception("Invalid RemoveInterface() accepted")
1765 except dbus.exceptions.DBusException, e:
1766 if "InterfaceUnknown" not in str(e):
1767 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1769 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1773 wpas.CreateInterface(params)
1774 raise Exception("Invalid CreateInterface() accepted")
1775 except dbus.exceptions.DBusException, e:
1776 if "InvalidArgs" not in str(e):
1777 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1779 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1781 wpas.CreateInterface(params)
1782 raise Exception("Invalid CreateInterface() accepted")
1783 except dbus.exceptions.DBusException, e:
1784 if "InvalidArgs" not in str(e):
1785 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1788 wpas.GetInterface("lo")
1789 raise Exception("Invalid GetInterface() accepted")
1790 except dbus.exceptions.DBusException, e:
1791 if "InterfaceUnknown" not in str(e):
1792 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1794 def test_dbus_interface_oom(dev, apdev):
1795 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1796 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1797 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1799 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1800 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1802 wpas.CreateInterface(params)
1804 for i in range(1, 1000):
1805 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1806 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1809 npath = wpas.CreateInterface(params)
1810 wpas.RemoveInterface(npath)
1811 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1812 state = dev[0].request('GET_ALLOC_FAIL')
1813 logger.info("GET_ALLOC_FAIL: " + state)
1814 dev[0].dump_monitor()
1815 dev[0].request("TEST_ALLOC_FAIL 0:")
1817 raise Exception("CreateInterface succeeded during out-of-memory")
1818 if not state.startswith('0:'):
1820 except dbus.exceptions.DBusException, e:
1823 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1824 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1826 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1827 wpas.CreateInterface(params)
1829 def test_dbus_blob(dev, apdev):
1830 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1831 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1832 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1834 blob = dbus.ByteArray("\x01\x02\x03")
1835 iface.AddBlob('blob1', blob)
1837 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1838 raise Exception("Invalid AddBlob() accepted")
1839 except dbus.exceptions.DBusException, e:
1840 if "BlobExists" not in str(e):
1841 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1842 res = iface.GetBlob('blob1')
1843 if len(res) != len(blob):
1844 raise Exception("Unexpected blob data length")
1845 for i in range(len(res)):
1846 if res[i] != dbus.Byte(blob[i]):
1847 raise Exception("Unexpected blob data")
1848 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1849 dbus_interface=dbus.PROPERTIES_IFACE)
1850 if 'blob1' not in res:
1851 raise Exception("Added blob missing from Blobs property")
1852 iface.RemoveBlob('blob1')
1854 iface.RemoveBlob('blob1')
1855 raise Exception("Invalid RemoveBlob() accepted")
1856 except dbus.exceptions.DBusException, e:
1857 if "BlobUnknown" not in str(e):
1858 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1860 iface.GetBlob('blob1')
1861 raise Exception("Invalid GetBlob() accepted")
1862 except dbus.exceptions.DBusException, e:
1863 if "BlobUnknown" not in str(e):
1864 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1866 class TestDbusBlob(TestDbus):
1867 def __init__(self, bus):
1868 TestDbus.__init__(self, bus)
1869 self.blob_added = False
1870 self.blob_removed = False
1872 def __enter__(self):
1873 gobject.timeout_add(1, self.run_blob)
1874 gobject.timeout_add(15000, self.timeout)
1875 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1876 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1880 def blobAdded(self, blobName):
1881 logger.debug("blobAdded: %s" % blobName)
1882 if blobName == 'blob2':
1883 self.blob_added = True
1885 def blobRemoved(self, blobName):
1886 logger.debug("blobRemoved: %s" % blobName)
1887 if blobName == 'blob2':
1888 self.blob_removed = True
1891 def run_blob(self, *args):
1892 logger.debug("run_blob")
1893 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1894 iface.RemoveBlob('blob2')
1898 return self.blob_added and self.blob_removed
1900 with TestDbusBlob(bus) as t:
1902 raise Exception("Expected signals not seen")
1904 def test_dbus_blob_oom(dev, apdev):
1905 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1906 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1907 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1909 for i in range(1, 4):
1910 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1912 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1914 def test_dbus_autoscan(dev, apdev):
1915 """D-Bus Autoscan()"""
1916 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1917 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1919 iface.AutoScan("foo")
1920 iface.AutoScan("periodic:1")
1922 dev[0].request("AUTOSCAN ")
1924 def test_dbus_autoscan_oom(dev, apdev):
1925 """D-Bus Autoscan() OOM"""
1926 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1927 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1929 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1930 iface.AutoScan("foo")
1931 dev[0].request("AUTOSCAN ")
1933 def test_dbus_tdls_invalid(dev, apdev):
1934 """D-Bus invalid TDLS operations"""
1935 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1936 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1938 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
1939 connect_2sta_open(dev, hapd)
1940 addr1 = dev[1].p2p_interface_addr()
1943 iface.TDLSDiscover("foo")
1944 raise Exception("Invalid TDLSDiscover() accepted")
1945 except dbus.exceptions.DBusException, e:
1946 if "InvalidArgs" not in str(e):
1947 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1950 iface.TDLSStatus("foo")
1951 raise Exception("Invalid TDLSStatus() accepted")
1952 except dbus.exceptions.DBusException, e:
1953 if "InvalidArgs" not in str(e):
1954 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1956 res = iface.TDLSStatus(addr1)
1957 if res != "peer does not exist":
1958 raise Exception("Unexpected TDLSStatus response")
1961 iface.TDLSSetup("foo")
1962 raise Exception("Invalid TDLSSetup() accepted")
1963 except dbus.exceptions.DBusException, e:
1964 if "InvalidArgs" not in str(e):
1965 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1968 iface.TDLSTeardown("foo")
1969 raise Exception("Invalid TDLSTeardown() accepted")
1970 except dbus.exceptions.DBusException, e:
1971 if "InvalidArgs" not in str(e):
1972 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1975 iface.TDLSTeardown("00:11:22:33:44:55")
1976 raise Exception("TDLSTeardown accepted for unknown peer")
1977 except dbus.exceptions.DBusException, e:
1978 if "UnknownError: error performing TDLS teardown" not in str(e):
1979 raise Exception("Unexpected error message: " + str(e))
1981 def test_dbus_tdls_oom(dev, apdev):
1982 """D-Bus TDLS operations during OOM"""
1983 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1984 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1986 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1987 "UnknownError: error performing TDLS setup"):
1988 iface.TDLSSetup("00:11:22:33:44:55")
1990 def test_dbus_tdls(dev, apdev):
1992 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1993 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1995 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
1996 connect_2sta_open(dev, hapd)
1998 addr1 = dev[1].p2p_interface_addr()
2000 class TestDbusTdls(TestDbus):
2001 def __init__(self, bus):
2002 TestDbus.__init__(self, bus)
2003 self.tdls_setup = False
2004 self.tdls_teardown = False
2006 def __enter__(self):
2007 gobject.timeout_add(1, self.run_tdls)
2008 gobject.timeout_add(15000, self.timeout)
2009 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2010 "PropertiesChanged")
2014 def propertiesChanged(self, properties):
2015 logger.debug("propertiesChanged: %s" % str(properties))
2017 def run_tdls(self, *args):
2018 logger.debug("run_tdls")
2019 iface.TDLSDiscover(addr1)
2020 gobject.timeout_add(100, self.run_tdls2)
2023 def run_tdls2(self, *args):
2024 logger.debug("run_tdls2")
2025 iface.TDLSSetup(addr1)
2026 gobject.timeout_add(500, self.run_tdls3)
2029 def run_tdls3(self, *args):
2030 logger.debug("run_tdls3")
2031 res = iface.TDLSStatus(addr1)
2032 if res == "connected":
2033 self.tdls_setup = True
2035 logger.info("Unexpected TDLSStatus: " + res)
2036 iface.TDLSTeardown(addr1)
2037 gobject.timeout_add(200, self.run_tdls4)
2040 def run_tdls4(self, *args):
2041 logger.debug("run_tdls4")
2042 res = iface.TDLSStatus(addr1)
2043 if res == "peer does not exist":
2044 self.tdls_teardown = True
2046 logger.info("Unexpected TDLSStatus: " + res)
2051 return self.tdls_setup and self.tdls_teardown
2053 with TestDbusTdls(bus) as t:
2055 raise Exception("Expected signals not seen")
2057 def test_dbus_pkcs11(dev, apdev):
2058 """D-Bus SetPKCS11EngineAndModulePath()"""
2059 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2060 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2063 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2064 except dbus.exceptions.DBusException, e:
2065 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2066 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2069 iface.SetPKCS11EngineAndModulePath("foo", "")
2070 except dbus.exceptions.DBusException, e:
2071 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2072 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2074 iface.SetPKCS11EngineAndModulePath("", "bar")
2075 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2076 dbus_interface=dbus.PROPERTIES_IFACE)
2078 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2079 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2080 dbus_interface=dbus.PROPERTIES_IFACE)
2082 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2084 iface.SetPKCS11EngineAndModulePath("", "")
2085 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2086 dbus_interface=dbus.PROPERTIES_IFACE)
2088 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2089 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2090 dbus_interface=dbus.PROPERTIES_IFACE)
2092 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2094 def test_dbus_apscan(dev, apdev):
2095 """D-Bus Get/Set ApScan"""
2097 _test_dbus_apscan(dev, apdev)
2099 dev[0].request("AP_SCAN 1")
2101 def _test_dbus_apscan(dev, apdev):
2102 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2104 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2105 dbus_interface=dbus.PROPERTIES_IFACE)
2107 raise Exception("Unexpected initial ApScan value: %d" % res)
2110 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2111 dbus_interface=dbus.PROPERTIES_IFACE)
2112 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2113 dbus_interface=dbus.PROPERTIES_IFACE)
2115 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2118 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2119 dbus_interface=dbus.PROPERTIES_IFACE)
2120 raise Exception("Invalid Set(ApScan,-1) accepted")
2121 except dbus.exceptions.DBusException, e:
2122 if "Error.Failed: wrong property type" not in str(e):
2123 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2126 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2127 dbus_interface=dbus.PROPERTIES_IFACE)
2128 raise Exception("Invalid Set(ApScan,123) accepted")
2129 except dbus.exceptions.DBusException, e:
2130 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2131 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2133 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2134 dbus_interface=dbus.PROPERTIES_IFACE)
2136 def test_dbus_fastreauth(dev, apdev):
2137 """D-Bus Get/Set FastReauth"""
2138 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2140 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2141 dbus_interface=dbus.PROPERTIES_IFACE)
2143 raise Exception("Unexpected initial FastReauth value: " + str(res))
2145 for i in [ False, True ]:
2146 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2147 dbus_interface=dbus.PROPERTIES_IFACE)
2148 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2149 dbus_interface=dbus.PROPERTIES_IFACE)
2151 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2154 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2155 dbus_interface=dbus.PROPERTIES_IFACE)
2156 raise Exception("Invalid Set(FastReauth,-1) accepted")
2157 except dbus.exceptions.DBusException, e:
2158 if "Error.Failed: wrong property type" not in str(e):
2159 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2161 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2162 dbus_interface=dbus.PROPERTIES_IFACE)
2164 def test_dbus_bss_expire(dev, apdev):
2165 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2166 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2168 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2169 dbus_interface=dbus.PROPERTIES_IFACE)
2170 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2171 dbus_interface=dbus.PROPERTIES_IFACE)
2173 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2175 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2176 dbus_interface=dbus.PROPERTIES_IFACE)
2177 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2178 dbus_interface=dbus.PROPERTIES_IFACE)
2180 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2183 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2184 dbus_interface=dbus.PROPERTIES_IFACE)
2185 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2186 except dbus.exceptions.DBusException, e:
2187 if "Error.Failed: wrong property type" not in str(e):
2188 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2191 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2192 dbus_interface=dbus.PROPERTIES_IFACE)
2193 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2194 except dbus.exceptions.DBusException, e:
2195 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2196 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2199 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2200 dbus_interface=dbus.PROPERTIES_IFACE)
2201 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2202 except dbus.exceptions.DBusException, e:
2203 if "Error.Failed: wrong property type" not in str(e):
2204 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2207 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2208 dbus_interface=dbus.PROPERTIES_IFACE)
2209 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2210 except dbus.exceptions.DBusException, e:
2211 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2212 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2214 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2215 dbus_interface=dbus.PROPERTIES_IFACE)
2216 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2217 dbus_interface=dbus.PROPERTIES_IFACE)
2219 def test_dbus_country(dev, apdev):
2220 """D-Bus Get/Set Country"""
2222 _test_dbus_country(dev, apdev)
2224 dev[0].request("SET country 00")
2225 subprocess.call(['iw', 'reg', 'set', '00'])
2227 def _test_dbus_country(dev, apdev):
2228 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2230 # work around issues with possible pending regdom event from the end of
2231 # the previous test case
2233 dev[0].dump_monitor()
2235 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2236 dbus_interface=dbus.PROPERTIES_IFACE)
2237 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2238 dbus_interface=dbus.PROPERTIES_IFACE)
2240 raise Exception("Unexpected Country value %s (expected FI)" % res)
2242 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2244 # For now, work around separate P2P Device interface event delivery
2245 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2247 raise Exception("regdom change event not seen")
2248 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2249 raise Exception("Unexpected event contents: " + ev)
2252 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2253 dbus_interface=dbus.PROPERTIES_IFACE)
2254 raise Exception("Invalid Set(Country,-1) accepted")
2255 except dbus.exceptions.DBusException, e:
2256 if "Error.Failed: wrong property type" not in str(e):
2257 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2260 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2261 dbus_interface=dbus.PROPERTIES_IFACE)
2262 raise Exception("Invalid Set(Country,F) accepted")
2263 except dbus.exceptions.DBusException, e:
2264 if "Error.Failed: invalid country code" not in str(e):
2265 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2267 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2268 dbus_interface=dbus.PROPERTIES_IFACE)
2270 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2272 # For now, work around separate P2P Device interface event delivery
2273 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2275 raise Exception("regdom change event not seen")
2276 # init=CORE was previously used due to invalid db.txt data for 00. For
2277 # now, allow both it and the new init=USER after fixed db.txt.
2278 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2279 raise Exception("Unexpected event contents: " + ev)
2281 def test_dbus_scan_interval(dev, apdev):
2282 """D-Bus Get/Set ScanInterval"""
2284 _test_dbus_scan_interval(dev, apdev)
2286 dev[0].request("SCAN_INTERVAL 5")
2288 def _test_dbus_scan_interval(dev, apdev):
2289 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2291 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2292 dbus_interface=dbus.PROPERTIES_IFACE)
2293 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2294 dbus_interface=dbus.PROPERTIES_IFACE)
2296 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2299 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2300 dbus_interface=dbus.PROPERTIES_IFACE)
2301 raise Exception("Invalid Set(ScanInterval,100) accepted")
2302 except dbus.exceptions.DBusException, e:
2303 if "Error.Failed: wrong property type" not in str(e):
2304 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2307 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2308 dbus_interface=dbus.PROPERTIES_IFACE)
2309 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2310 except dbus.exceptions.DBusException, e:
2311 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2312 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2314 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2315 dbus_interface=dbus.PROPERTIES_IFACE)
2317 def test_dbus_probe_req_reporting(dev, apdev):
2318 """D-Bus Probe Request reporting"""
2319 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2321 dev[1].p2p_find(social=True)
2323 class TestDbusProbe(TestDbus):
2324 def __init__(self, bus):
2325 TestDbus.__init__(self, bus)
2326 self.reported = False
2328 def __enter__(self):
2329 gobject.timeout_add(1, self.run_test)
2330 gobject.timeout_add(15000, self.timeout)
2331 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2333 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2338 def groupStarted(self, properties):
2339 logger.debug("groupStarted: " + str(properties))
2340 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2341 properties['interface_object'])
2342 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2343 self.iface.SubscribeProbeReq()
2344 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2346 def probeRequest(self, args):
2347 logger.debug("probeRequest: args=%s" % str(args))
2348 self.reported = True
2351 def run_test(self, *args):
2352 logger.debug("run_test")
2353 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2354 params = dbus.Dictionary({ 'frequency': 2412 })
2355 p2p.GroupAdd(params)
2359 return self.reported
2361 with TestDbusProbe(bus) as t:
2363 raise Exception("Expected signals not seen")
2364 t.iface.UnsubscribeProbeReq()
2366 t.iface.UnsubscribeProbeReq()
2367 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2368 except dbus.exceptions.DBusException, e:
2369 if "NoSubscription" not in str(e):
2370 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2371 t.group_p2p.Disconnect()
2373 with TestDbusProbe(bus) as t:
2375 raise Exception("Expected signals not seen")
2376 # On purpose, leave ProbeReq subscription in place to test automatic
2379 dev[1].p2p_stop_find()
2381 def test_dbus_probe_req_reporting_oom(dev, apdev):
2382 """D-Bus Probe Request reporting (OOM)"""
2383 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2384 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2386 # Need to make sure this process has not already subscribed to avoid false
2387 # failures due to the operation succeeding due to os_strdup() not even
2390 iface.UnsubscribeProbeReq()
2391 was_subscribed = True
2392 except dbus.exceptions.DBusException, e:
2393 was_subscribed = False
2396 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2397 "SubscribeProbeReq"):
2398 iface.SubscribeProbeReq()
2401 # On purpose, leave ProbeReq subscription in place to test automatic
2403 iface.SubscribeProbeReq()
2405 def test_dbus_p2p_invalid(dev, apdev):
2406 """D-Bus invalid P2P operations"""
2407 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2408 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2411 p2p.RejectPeer(path + "/Peers/00112233445566")
2412 raise Exception("Invalid RejectPeer accepted")
2413 except dbus.exceptions.DBusException, e:
2414 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2415 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2418 p2p.RejectPeer("/foo")
2419 raise Exception("Invalid RejectPeer accepted")
2420 except dbus.exceptions.DBusException, e:
2421 if "InvalidArgs" not in str(e):
2422 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2432 raise Exception("Invalid RemoveClient accepted")
2433 except dbus.exceptions.DBusException, e:
2434 if "InvalidArgs" not in str(e):
2435 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2437 tests = [ {'DiscoveryType': 'foo'},
2438 {'RequestedDeviceTypes': 'foo'},
2439 {'RequestedDeviceTypes': ['foo']},
2440 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2441 '10','11','12','13','14','15','16',
2443 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2444 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2445 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2446 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2447 dbus.ByteArray('1234567')]},
2448 {'Foo': dbus.Int16(1)},
2449 {'Foo': dbus.UInt16(1)},
2450 {'Foo': dbus.Int64(1)},
2451 {'Foo': dbus.UInt64(1)},
2452 {'Foo': dbus.Double(1.23)},
2453 {'Foo': dbus.Signature('s')},
2457 p2p.Find(dbus.Dictionary(t))
2458 raise Exception("Invalid Find accepted")
2459 except dbus.exceptions.DBusException, e:
2460 if "InvalidArgs" not in str(e):
2461 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2464 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2465 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2467 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2468 raise Exception("Invalid RemovePersistentGroup accepted")
2469 except dbus.exceptions.DBusException, e:
2470 if "InvalidArgs" not in str(e):
2471 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2474 dev[0].request("P2P_SET disabled 1")
2476 raise Exception("Invalid Listen accepted")
2477 except dbus.exceptions.DBusException, e:
2478 if "UnknownError: Could not start P2P listen" not in str(e):
2479 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2481 dev[0].request("P2P_SET disabled 0")
2483 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2484 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2486 test_p2p.Listen("foo")
2487 raise Exception("Invalid Listen accepted")
2488 except dbus.exceptions.DBusException, e:
2489 if "InvalidArgs" not in str(e):
2490 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2493 dev[0].request("P2P_SET disabled 1")
2494 p2p.ExtendedListen(dbus.Dictionary({}))
2495 raise Exception("Invalid ExtendedListen accepted")
2496 except dbus.exceptions.DBusException, e:
2497 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2498 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2500 dev[0].request("P2P_SET disabled 0")
2503 dev[0].request("P2P_SET disabled 1")
2504 args = { 'duration1': 30000, 'interval1': 102400,
2505 'duration2': 20000, 'interval2': 102400 }
2506 p2p.PresenceRequest(args)
2507 raise Exception("Invalid PresenceRequest accepted")
2508 except dbus.exceptions.DBusException, e:
2509 if "UnknownError: Failed to invoke presence request" not in str(e):
2510 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2512 dev[0].request("P2P_SET disabled 0")
2515 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2516 p2p.GroupAdd(params)
2517 raise Exception("Invalid GroupAdd accepted")
2518 except dbus.exceptions.DBusException, e:
2519 if "InvalidArgs" not in str(e):
2520 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2523 params = dbus.Dictionary({'persistent_group_object':
2524 dbus.ObjectPath(path),
2526 p2p.GroupAdd(params)
2527 raise Exception("Invalid GroupAdd accepted")
2528 except dbus.exceptions.DBusException, e:
2529 if "InvalidArgs" not in str(e):
2530 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2534 raise Exception("Invalid Disconnect accepted")
2535 except dbus.exceptions.DBusException, e:
2536 if "UnknownError: failed to disconnect" not in str(e):
2537 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2540 dev[0].request("P2P_SET disabled 1")
2542 raise Exception("Invalid Flush accepted")
2543 except dbus.exceptions.DBusException, e:
2544 if "Error.Failed: P2P is not available for this interface" not in str(e):
2545 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2547 dev[0].request("P2P_SET disabled 0")
2550 dev[0].request("P2P_SET disabled 1")
2551 args = { 'peer': path,
2553 'wps_method': 'pbc',
2555 pin = p2p.Connect(args)
2556 raise Exception("Invalid Connect accepted")
2557 except dbus.exceptions.DBusException, e:
2558 if "Error.Failed: P2P is not available for this interface" not in str(e):
2559 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2561 dev[0].request("P2P_SET disabled 0")
2563 tests = [ { 'frequency': dbus.Int32(-1) },
2564 { 'wps_method': 'pbc' },
2565 { 'wps_method': 'foo' } ]
2568 pin = p2p.Connect(args)
2569 raise Exception("Invalid Connect accepted")
2570 except dbus.exceptions.DBusException, e:
2571 if "InvalidArgs" not in str(e):
2572 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2575 dev[0].request("P2P_SET disabled 1")
2576 args = { 'peer': path }
2577 pin = p2p.Invite(args)
2578 raise Exception("Invalid Invite accepted")
2579 except dbus.exceptions.DBusException, e:
2580 if "Error.Failed: P2P is not available for this interface" not in str(e):
2581 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2583 dev[0].request("P2P_SET disabled 0")
2586 args = { 'foo': 'bar' }
2587 pin = p2p.Invite(args)
2588 raise Exception("Invalid Invite accepted")
2589 except dbus.exceptions.DBusException, e:
2590 if "InvalidArgs" not in str(e):
2591 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2593 tests = [ (path, 'display', "InvalidArgs"),
2594 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2596 "UnknownError: Failed to send provision discovery request"),
2597 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2599 "UnknownError: Failed to send provision discovery request"),
2600 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2602 "UnknownError: Failed to send provision discovery request"),
2603 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2605 "UnknownError: Failed to send provision discovery request"),
2606 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2607 'foo', "InvalidArgs") ]
2608 for (p,method,err) in tests:
2610 p2p.ProvisionDiscoveryRequest(p, method)
2611 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2612 except dbus.exceptions.DBusException, e:
2613 if err not in str(e):
2614 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2617 dev[0].request("P2P_SET disabled 1")
2618 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2619 dbus_interface=dbus.PROPERTIES_IFACE)
2620 raise Exception("Invalid Get(Peers) accepted")
2621 except dbus.exceptions.DBusException, e:
2622 if "Error.Failed: P2P is not available for this interface" not in str(e):
2623 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2625 dev[0].request("P2P_SET disabled 0")
2627 def test_dbus_p2p_oom(dev, apdev):
2628 """D-Bus P2P operations and OOM"""
2629 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2630 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2632 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2633 "Find", "InvalidArgs"):
2634 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2636 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2637 "Find", "InvalidArgs"):
2638 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2640 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2641 "Find", "InvalidArgs"):
2642 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2644 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2645 "Find", "InvalidArgs"):
2646 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2648 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2649 "Find", "InvalidArgs"):
2650 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2652 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2653 "Find", "InvalidArgs"):
2654 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2655 dbus.ByteArray('123'),
2656 dbus.ByteArray('123'),
2657 dbus.ByteArray('123'),
2658 dbus.ByteArray('123'),
2659 dbus.ByteArray('123'),
2660 dbus.ByteArray('123'),
2661 dbus.ByteArray('123'),
2662 dbus.ByteArray('123'),
2663 dbus.ByteArray('123'),
2664 dbus.ByteArray('123') ] }))
2666 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2667 "Find", "InvalidArgs"):
2668 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2670 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2671 "Find", "InvalidArgs"):
2672 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2674 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2675 "AddService", "InvalidArgs"):
2676 args = { 'service_type': 'bonjour',
2677 'response': dbus.ByteArray(500*'b') }
2678 p2p.AddService(args)
2680 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2681 "AddService", "InvalidArgs"):
2682 p2p.AddService(args)
2684 def test_dbus_p2p_discovery(dev, apdev):
2685 """D-Bus P2P discovery"""
2686 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2687 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2689 addr0 = dev[0].p2p_dev_addr()
2691 dev[1].request("SET sec_device_type 1-0050F204-2")
2692 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2694 addr1 = dev[1].p2p_dev_addr()
2695 a1 = binascii.unhexlify(addr1.replace(':',''))
2697 wfd_devinfo = "00001c440028"
2698 dev[2].request("SET wifi_display 1")
2699 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2700 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2702 addr2 = dev[2].p2p_dev_addr()
2703 a2 = binascii.unhexlify(addr2.replace(':',''))
2705 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2706 dbus_interface=dbus.PROPERTIES_IFACE)
2707 if 'Peers' not in res:
2708 raise Exception("GetAll result missing Peers")
2709 if len(res['Peers']) != 0:
2710 raise Exception("Unexpected peer(s) in the list")
2712 args = {'DiscoveryType': 'social',
2713 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2714 'Timeout': dbus.Int32(1) }
2715 p2p.Find(dbus.Dictionary(args))
2718 class TestDbusP2p(TestDbus):
2719 def __init__(self, bus):
2720 TestDbus.__init__(self, bus)
2724 self.find_stopped = False
2726 def __enter__(self):
2727 gobject.timeout_add(1, self.run_test)
2728 gobject.timeout_add(15000, self.timeout)
2729 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2731 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2733 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2734 WPAS_DBUS_IFACE_P2PDEVICE,
2735 "ProvisionDiscoveryResponseEnterPin")
2736 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
2741 def deviceFound(self, path):
2742 logger.debug("deviceFound: path=%s" % path)
2743 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2744 dbus_interface=dbus.PROPERTIES_IFACE)
2746 raise Exception("Unexpected number of peers")
2748 raise Exception("Mismatch in peer object path")
2749 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2750 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2751 dbus_interface=dbus.PROPERTIES_IFACE,
2753 logger.debug("peer properties: " + str(res))
2755 if res['DeviceAddress'] == a1:
2756 if 'SecondaryDeviceTypes' not in res:
2757 raise Exception("Missing SecondaryDeviceTypes")
2758 sec = res['SecondaryDeviceTypes']
2760 raise Exception("Secondary device type missing")
2761 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2762 raise Exception("Secondary device type mismatch")
2764 if 'VendorExtension' not in res:
2765 raise Exception("Missing VendorExtension")
2766 vendor = res['VendorExtension']
2768 raise Exception("Vendor extension missing")
2769 if "\x11\x22\x33\x44" not in vendor:
2770 raise Exception("Secondary device type mismatch")
2773 elif res['DeviceAddress'] == a2:
2774 if 'IEs' not in res:
2775 raise Exception("IEs missing")
2776 if res['IEs'] != wfd:
2777 raise Exception("IEs mismatch")
2780 raise Exception("Unexpected peer device address")
2782 if self.found and self.found2:
2784 p2p.RejectPeer(path)
2785 p2p.ProvisionDiscoveryRequest(path, 'display')
2787 def deviceLost(self, path):
2788 logger.debug("deviceLost: path=%s" % path)
2791 p2p.RejectPeer(path)
2792 raise Exception("Invalid RejectPeer accepted")
2793 except dbus.exceptions.DBusException, e:
2794 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2795 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2798 def provisionDiscoveryResponseEnterPin(self, peer_object):
2799 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2802 def findStopped(self):
2803 logger.debug("findStopped")
2804 self.find_stopped = True
2806 def run_test(self, *args):
2807 logger.debug("run_test")
2808 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2809 'Timeout': dbus.Int32(10)}))
2813 return self.found and self.lost and self.found2 and self.find_stopped
2815 with TestDbusP2p(bus) as t:
2817 raise Exception("Expected signals not seen")
2819 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2820 dev[1].p2p_stop_find()
2823 dev[2].p2p_stop_find()
2824 dev[2].request("P2P_FLUSH")
2825 if not dev[2].discover_peer(addr0):
2826 raise Exception("Peer not found")
2828 dev[2].p2p_stop_find()
2831 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2832 raise Exception("Invalid ExtendedListen accepted")
2833 except dbus.exceptions.DBusException, e:
2834 if "InvalidArgs" not in str(e):
2835 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2837 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2838 p2p.ExtendedListen(dbus.Dictionary({}))
2839 dev[0].global_request("P2P_EXT_LISTEN")
2841 def test_dbus_p2p_service_discovery(dev, apdev):
2842 """D-Bus P2P service discovery"""
2843 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2844 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2846 addr0 = dev[0].p2p_dev_addr()
2847 addr1 = dev[1].p2p_dev_addr()
2849 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2850 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2852 args = { 'service_type': 'bonjour',
2853 'query': bonjour_query,
2854 'response': bonjour_response }
2855 p2p.AddService(args)
2857 p2p.AddService(args)
2860 p2p.DeleteService(args)
2861 raise Exception("Invalid DeleteService() accepted")
2862 except dbus.exceptions.DBusException, e:
2863 if "InvalidArgs" not in str(e):
2864 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2866 args = { 'service_type': 'bonjour',
2867 'query': bonjour_query }
2868 p2p.DeleteService(args)
2870 p2p.DeleteService(args)
2871 raise Exception("Invalid DeleteService() accepted")
2872 except dbus.exceptions.DBusException, e:
2873 if "InvalidArgs" not in str(e):
2874 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2876 args = { 'service_type': 'upnp',
2878 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2879 p2p.AddService(args)
2880 p2p.DeleteService(args)
2882 p2p.DeleteService(args)
2883 raise Exception("Invalid DeleteService() accepted")
2884 except dbus.exceptions.DBusException, e:
2885 if "InvalidArgs" not in str(e):
2886 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2888 tests = [ { 'service_type': 'foo' },
2889 { 'service_type': 'foo', 'query': bonjour_query },
2890 { 'service_type': 'upnp' },
2891 { 'service_type': 'upnp', 'version': 0x10 },
2892 { 'service_type': 'upnp',
2893 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2895 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2896 { 'service_type': 'upnp', 'foo': 'bar' },
2897 { 'service_type': 'bonjour' },
2898 { 'service_type': 'bonjour', 'query': 'foo' },
2899 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2902 p2p.DeleteService(args)
2903 raise Exception("Invalid DeleteService() accepted")
2904 except dbus.exceptions.DBusException, e:
2905 if "InvalidArgs" not in str(e):
2906 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2908 tests = [ { 'service_type': 'foo' },
2909 { 'service_type': 'upnp' },
2910 { 'service_type': 'upnp', 'version': 0x10 },
2911 { 'service_type': 'upnp',
2912 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2914 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2915 { 'service_type': 'upnp', 'foo': 'bar' },
2916 { 'service_type': 'bonjour' },
2917 { 'service_type': 'bonjour', 'query': 'foo' },
2918 { 'service_type': 'bonjour', 'response': 'foo' },
2919 { 'service_type': 'bonjour', 'query': bonjour_query },
2920 { 'service_type': 'bonjour', 'response': bonjour_response },
2921 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2922 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2925 p2p.AddService(args)
2926 raise Exception("Invalid AddService() accepted")
2927 except dbus.exceptions.DBusException, e:
2928 if "InvalidArgs" not in str(e):
2929 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2931 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2932 ref = p2p.ServiceDiscoveryRequest(args)
2933 p2p.ServiceDiscoveryCancelRequest(ref)
2935 p2p.ServiceDiscoveryCancelRequest(ref)
2936 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2937 except dbus.exceptions.DBusException, e:
2938 if "InvalidArgs" not in str(e):
2939 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2941 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2942 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2943 except dbus.exceptions.DBusException, e:
2944 if "InvalidArgs" not in str(e):
2945 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2947 args = { 'service_type': 'upnp',
2949 'service': 'ssdp:foo' }
2950 ref = p2p.ServiceDiscoveryRequest(args)
2951 p2p.ServiceDiscoveryCancelRequest(ref)
2953 tests = [ { 'service_type': 'foo' },
2958 { 'service_type': 'upnp',
2959 'service': 'ssdp:foo' },
2960 { 'service_type': 'upnp',
2962 { 'service_type': 'upnp',
2964 'service': 'ssdp:foo',
2965 'peer_object': dbus.ObjectPath(path + "/Peers") },
2966 { 'service_type': 'upnp',
2968 'service': 'ssdp:foo',
2969 'peer_object': path + "/Peers" },
2970 { 'service_type': 'upnp',
2972 'service': 'ssdp:foo',
2973 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2976 p2p.ServiceDiscoveryRequest(args)
2977 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2978 except dbus.exceptions.DBusException, e:
2979 if "InvalidArgs" not in str(e):
2980 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2982 args = { 'foo': 'bar' }
2984 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2985 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2986 except dbus.exceptions.DBusException, e:
2987 if "InvalidArgs" not in str(e):
2988 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
2990 def test_dbus_p2p_service_discovery_query(dev, apdev):
2991 """D-Bus P2P service discovery query"""
2992 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2993 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2995 addr0 = dev[0].p2p_dev_addr()
2996 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2998 addr1 = dev[1].p2p_dev_addr()
3000 class TestDbusP2p(TestDbus):
3001 def __init__(self, bus):
3002 TestDbus.__init__(self, bus)
3005 def __enter__(self):
3006 gobject.timeout_add(1, self.run_test)
3007 gobject.timeout_add(15000, self.timeout)
3008 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3010 self.add_signal(self.serviceDiscoveryResponse,
3011 WPAS_DBUS_IFACE_P2PDEVICE,
3012 "ServiceDiscoveryResponse", byte_arrays=True)
3016 def deviceFound(self, path):
3017 logger.debug("deviceFound: path=%s" % path)
3018 args = { 'peer_object': path,
3019 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3020 p2p.ServiceDiscoveryRequest(args)
3022 def serviceDiscoveryResponse(self, sd_request):
3023 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3027 def run_test(self, *args):
3028 logger.debug("run_test")
3029 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3030 'Timeout': dbus.Int32(10)}))
3036 with TestDbusP2p(bus) as t:
3038 raise Exception("Expected signals not seen")
3040 dev[1].p2p_stop_find()
3042 def test_dbus_p2p_service_discovery_external(dev, apdev):
3043 """D-Bus P2P service discovery with external response"""
3045 _test_dbus_p2p_service_discovery_external(dev, apdev)
3047 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3049 def _test_dbus_p2p_service_discovery_external(dev, apdev):
3050 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3051 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3053 addr0 = dev[0].p2p_dev_addr()
3054 addr1 = dev[1].p2p_dev_addr()
3057 dev[1].request("P2P_FLUSH")
3058 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3059 dev[1].p2p_find(social=True)
3061 class TestDbusP2p(TestDbus):
3062 def __init__(self, bus):
3063 TestDbus.__init__(self, bus)
3066 def __enter__(self):
3067 gobject.timeout_add(1, self.run_test)
3068 gobject.timeout_add(15000, self.timeout)
3069 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3071 self.add_signal(self.serviceDiscoveryRequest,
3072 WPAS_DBUS_IFACE_P2PDEVICE,
3073 "ServiceDiscoveryRequest")
3077 def deviceFound(self, path):
3078 logger.debug("deviceFound: path=%s" % path)
3080 def serviceDiscoveryRequest(self, sd_request):
3081 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3083 args = { 'peer_object': sd_request['peer_object'],
3084 'frequency': sd_request['frequency'],
3085 'dialog_token': sd_request['dialog_token'],
3086 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3087 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3090 def run_test(self, *args):
3091 logger.debug("run_test")
3092 p2p.ServiceDiscoveryExternal(1)
3100 with TestDbusP2p(bus) as t:
3102 raise Exception("Expected signals not seen")
3104 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3106 raise Exception("Service discovery timed out")
3108 raise Exception("Unexpected address in SD Response: " + ev)
3109 if ev.split(' ')[4] != resp:
3110 raise Exception("Unexpected response data SD Response: " + ev)
3111 dev[1].p2p_stop_find()
3114 p2p.ServiceDiscoveryExternal(0)
3116 def test_dbus_p2p_autogo(dev, apdev):
3117 """D-Bus P2P autonomous GO"""
3118 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3119 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3121 addr0 = dev[0].p2p_dev_addr()
3123 class TestDbusP2p(TestDbus):
3124 def __init__(self, bus):
3125 TestDbus.__init__(self, bus)
3127 self.waiting_end = False
3128 self.exceptions = False
3129 self.deauthorized = False
3132 def __enter__(self):
3133 gobject.timeout_add(1, self.run_test)
3134 gobject.timeout_add(15000, self.timeout)
3135 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3137 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3139 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3141 self.add_signal(self.persistentGroupAdded,
3142 WPAS_DBUS_IFACE_P2PDEVICE,
3143 "PersistentGroupAdded")
3144 self.add_signal(self.persistentGroupRemoved,
3145 WPAS_DBUS_IFACE_P2PDEVICE,
3146 "PersistentGroupRemoved")
3147 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3148 WPAS_DBUS_IFACE_P2PDEVICE,
3149 "ProvisionDiscoveryRequestDisplayPin")
3150 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3152 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3157 def groupStarted(self, properties):
3158 logger.debug("groupStarted: " + str(properties))
3159 self.group = properties['group_object']
3160 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3161 properties['interface_object'])
3162 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3163 dbus_interface=dbus.PROPERTIES_IFACE)
3165 self.exceptions = True
3166 raise Exception("Unexpected role reported: " + role)
3167 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3168 dbus_interface=dbus.PROPERTIES_IFACE)
3169 if group != properties['group_object']:
3170 self.exceptions = True
3171 raise Exception("Unexpected Group reported: " + str(group))
3172 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3173 dbus_interface=dbus.PROPERTIES_IFACE)
3175 self.exceptions = True
3176 raise Exception("Unexpected PeerGO value: " + str(go))
3179 logger.info("Remove persistent group instance")
3180 group_p2p = dbus.Interface(self.g_if_obj,
3181 WPAS_DBUS_IFACE_P2PDEVICE)
3182 group_p2p.Disconnect()
3184 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3185 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3187 def groupFinished(self, properties):
3188 logger.debug("groupFinished: " + str(properties))
3189 if self.waiting_end:
3190 logger.info("Remove persistent group")
3191 p2p.RemovePersistentGroup(self.persistent)
3193 logger.info("Re-start persistent group")
3194 params = dbus.Dictionary({'persistent_group_object':
3197 p2p.GroupAdd(params)
3199 def persistentGroupAdded(self, path, properties):
3200 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3201 self.persistent = path
3203 def persistentGroupRemoved(self, path):
3204 logger.debug("persistentGroupRemoved: %s" % path)
3208 def deviceFound(self, path):
3209 logger.debug("deviceFound: path=%s" % path)
3210 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3211 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3212 dbus_interface=dbus.PROPERTIES_IFACE,
3214 logger.debug('peer properties: ' + str(self.peer))
3216 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3217 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3218 self.peer_path = peer_object
3219 peer = binascii.unhexlify(peer_object.split('/')[-1])
3224 addr += '%02x' % ord(p)
3226 params = { 'Role': 'registrar',
3227 'P2PDeviceAddress': self.peer['DeviceAddress'],
3228 'Bssid': self.peer['DeviceAddress'],
3230 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3233 self.exceptions = True
3234 raise Exception("Invalid WPS.Start() accepted")
3235 except dbus.exceptions.DBusException, e:
3236 if "InvalidArgs" not in str(e):
3237 self.exceptions = True
3238 raise Exception("Unexpected error message: " + str(e))
3239 params = { 'Role': 'registrar',
3240 'P2PDeviceAddress': self.peer['DeviceAddress'],
3243 logger.info("Authorize peer to connect to the group")
3246 def staAuthorized(self, name):
3247 logger.debug("staAuthorized: " + name)
3248 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3249 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3250 dbus_interface=dbus.PROPERTIES_IFACE,
3252 logger.debug("Peer properties: " + str(res))
3253 if 'Groups' not in res or len(res['Groups']) != 1:
3254 self.exceptions = True
3255 raise Exception("Unexpected number of peer Groups entries")
3256 if res['Groups'][0] != self.group:
3257 self.exceptions = True
3258 raise Exception("Unexpected peer Groups[0] value")
3260 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3261 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3262 dbus_interface=dbus.PROPERTIES_IFACE,
3264 logger.debug("Group properties: " + str(res))
3265 if 'Members' not in res or len(res['Members']) != 1:
3266 self.exceptions = True
3267 raise Exception("Unexpected number of group members")
3269 ext = dbus.ByteArray("\x11\x22\x33\x44")
3270 # Earlier implementation of this interface was a bit strange. The
3271 # property is defined to have aay signature and that is what the
3272 # getter returned. However, the setter expected there to be a
3273 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3274 # values.. The current implementations maintains support for that
3275 # for backwards compability reasons. Verify that encoding first.
3276 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3278 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3279 dbus_interface=dbus.PROPERTIES_IFACE)
3280 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3281 dbus_interface=dbus.PROPERTIES_IFACE,
3284 self.exceptions = True
3285 raise Exception("Unexpected number of vendor extensions")
3287 self.exceptions = True
3288 raise Exception("Vendor extension value changed")
3290 # And now verify that the more appropriate encoding is accepted as
3292 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3293 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3294 dbus_interface=dbus.PROPERTIES_IFACE)
3295 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3296 dbus_interface=dbus.PROPERTIES_IFACE,
3299 self.exceptions = True
3300 raise Exception("Unexpected number of vendor extensions")
3301 if res[0] != res2[0] or res[1] != res2[1]:
3302 self.exceptions = True
3303 raise Exception("Vendor extension value changed")
3306 res.append(dbus.ByteArray('\xaa\xbb'))
3308 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3309 dbus_interface=dbus.PROPERTIES_IFACE)
3310 self.exceptions = True
3311 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3312 except dbus.exceptions.DBusException, e:
3313 if "Error.Failed" not in str(e):
3314 self.exceptions = True
3315 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3317 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3319 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3320 dbus_interface=dbus.PROPERTIES_IFACE)
3321 self.exceptions = True
3322 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3323 except dbus.exceptions.DBusException, e:
3324 if "InvalidArgs" not in str(e):
3325 self.exceptions = True
3326 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3330 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3331 dbus_interface=dbus.PROPERTIES_IFACE)
3332 self.exceptions = True
3333 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3334 except dbus.exceptions.DBusException, e:
3335 if "Error.Failed" not in str(e):
3336 self.exceptions = True
3337 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3339 vals = [ [ "foo" ] ]
3341 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3342 dbus_interface=dbus.PROPERTIES_IFACE)
3343 self.exceptions = True
3344 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3345 except dbus.exceptions.DBusException, e:
3346 if "Error.Failed" not in str(e):
3347 self.exceptions = True
3348 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3350 p2p.RemoveClient({ 'peer': self.peer_path })
3352 self.waiting_end = True
3353 group_p2p = dbus.Interface(self.g_if_obj,
3354 WPAS_DBUS_IFACE_P2PDEVICE)
3355 group_p2p.Disconnect()
3357 def staDeauthorized(self, name):
3358 logger.debug("staDeauthorized: " + name)
3359 self.deauthorized = True
3361 def run_test(self, *args):
3362 logger.debug("run_test")
3363 params = dbus.Dictionary({'persistent': True,
3365 logger.info("Add a persistent group")
3366 p2p.GroupAdd(params)
3370 return self.done and self.deauthorized and not self.exceptions
3372 with TestDbusP2p(bus) as t:
3374 raise Exception("Expected signals not seen")
3376 dev[1].wait_go_ending_session()
3378 def test_dbus_p2p_autogo_pbc(dev, apdev):
3379 """D-Bus P2P autonomous GO and PBC"""
3380 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3381 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3383 addr0 = dev[0].p2p_dev_addr()
3385 class TestDbusP2p(TestDbus):
3386 def __init__(self, bus):
3387 TestDbus.__init__(self, bus)
3389 self.waiting_end = False
3392 def __enter__(self):
3393 gobject.timeout_add(1, self.run_test)
3394 gobject.timeout_add(15000, self.timeout)
3395 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3397 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3399 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3401 self.add_signal(self.provisionDiscoveryPBCRequest,
3402 WPAS_DBUS_IFACE_P2PDEVICE,
3403 "ProvisionDiscoveryPBCRequest")
3404 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3409 def groupStarted(self, properties):
3410 logger.debug("groupStarted: " + str(properties))
3411 self.group = properties['group_object']
3412 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3413 properties['interface_object'])
3414 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3415 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3417 def groupFinished(self, properties):
3418 logger.debug("groupFinished: " + str(properties))
3422 def deviceFound(self, path):
3423 logger.debug("deviceFound: path=%s" % path)
3424 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3425 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3426 dbus_interface=dbus.PROPERTIES_IFACE,
3428 logger.debug('peer properties: ' + str(self.peer))
3430 def provisionDiscoveryPBCRequest(self, peer_object):
3431 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3432 self.peer_path = peer_object
3433 peer = binascii.unhexlify(peer_object.split('/')[-1])
3438 addr += '%02x' % ord(p)
3439 params = { 'Role': 'registrar',
3440 'P2PDeviceAddress': self.peer['DeviceAddress'],
3442 logger.info("Authorize peer to connect to the group")
3443 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3446 def staAuthorized(self, name):
3447 logger.debug("staAuthorized: " + name)
3448 group_p2p = dbus.Interface(self.g_if_obj,
3449 WPAS_DBUS_IFACE_P2PDEVICE)
3450 group_p2p.Disconnect()
3452 def run_test(self, *args):
3453 logger.debug("run_test")
3454 params = dbus.Dictionary({'frequency': 2412})
3455 p2p.GroupAdd(params)
3461 with TestDbusP2p(bus) as t:
3463 raise Exception("Expected signals not seen")
3465 dev[1].wait_go_ending_session()
3466 dev[1].flush_scan_cache()
3468 def test_dbus_p2p_autogo_legacy(dev, apdev):
3469 """D-Bus P2P autonomous GO and legacy STA"""
3470 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3471 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3473 addr0 = dev[0].p2p_dev_addr()
3475 class TestDbusP2p(TestDbus):
3476 def __init__(self, bus):
3477 TestDbus.__init__(self, bus)
3480 def __enter__(self):
3481 gobject.timeout_add(1, self.run_test)
3482 gobject.timeout_add(15000, self.timeout)
3483 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3485 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3487 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3492 def groupStarted(self, properties):
3493 logger.debug("groupStarted: " + str(properties))
3494 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3495 properties['group_object'])
3496 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3497 dbus_interface=dbus.PROPERTIES_IFACE,
3499 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3502 params = { 'Role': 'enrollee',
3505 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3506 properties['interface_object'])
3507 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
3509 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3510 dev1.scan_for_bss(bssid, freq=2412)
3511 dev1.request("WPS_PIN " + bssid + " " + pin)
3512 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3514 def groupFinished(self, properties):
3515 logger.debug("groupFinished: " + str(properties))
3519 def staAuthorized(self, name):
3520 logger.debug("staAuthorized: " + name)
3521 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3522 dev1.request("DISCONNECT")
3523 self.group_p2p.Disconnect()
3525 def run_test(self, *args):
3526 logger.debug("run_test")
3527 params = dbus.Dictionary({'frequency': 2412})
3528 p2p.GroupAdd(params)
3534 with TestDbusP2p(bus) as t:
3536 raise Exception("Expected signals not seen")
3538 def test_dbus_p2p_join(dev, apdev):
3539 """D-Bus P2P join an autonomous GO"""
3540 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3541 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3543 addr1 = dev[1].p2p_dev_addr()
3544 addr2 = dev[2].p2p_dev_addr()
3545 dev[1].p2p_start_go(freq=2412)
3546 dev1_group_ifname = dev[1].group_ifname
3549 class TestDbusP2p(TestDbus):
3550 def __init__(self, bus):
3551 TestDbus.__init__(self, bus)
3556 def __enter__(self):
3557 gobject.timeout_add(1, self.run_test)
3558 gobject.timeout_add(15000, self.timeout)
3559 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3561 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3563 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3565 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3570 def deviceFound(self, path):
3571 logger.debug("deviceFound: path=%s" % path)
3572 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3573 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3574 dbus_interface=dbus.PROPERTIES_IFACE,
3576 logger.debug('peer properties: ' + str(res))
3577 if addr2.replace(':','') in path:
3579 elif addr1.replace(':','') in path:
3581 if self.peer and self.go:
3582 logger.info("Join the group")
3584 args = { 'peer': self.go,
3586 'wps_method': 'pin',
3588 pin = p2p.Connect(args)
3590 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3591 dev1.group_ifname = dev1_group_ifname
3592 dev1.group_request("WPS_PIN any " + pin)
3594 def groupStarted(self, properties):
3595 logger.debug("groupStarted: " + str(properties))
3596 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3597 properties['interface_object'])
3598 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3599 dbus_interface=dbus.PROPERTIES_IFACE)
3600 if role != "client":
3601 raise Exception("Unexpected role reported: " + role)
3602 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3603 dbus_interface=dbus.PROPERTIES_IFACE)
3604 if group != properties['group_object']:
3605 raise Exception("Unexpected Group reported: " + str(group))
3606 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3607 dbus_interface=dbus.PROPERTIES_IFACE)
3609 raise Exception("Unexpected PeerGO value: " + str(go))
3611 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3612 properties['group_object'])
3613 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3614 dbus_interface=dbus.PROPERTIES_IFACE,
3616 logger.debug("Group properties: " + str(res))
3618 ext = dbus.ByteArray("\x11\x22\x33\x44")
3620 # Set(WPSVendorExtensions) not allowed for P2P Client
3621 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3622 dbus_interface=dbus.PROPERTIES_IFACE)
3623 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3624 except dbus.exceptions.DBusException, e:
3625 if "Error.Failed: Failed to set property" not in str(e):
3626 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3628 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3629 args = { 'duration1': 30000, 'interval1': 102400,
3630 'duration2': 20000, 'interval2': 102400 }
3631 group_p2p.PresenceRequest(args)
3633 args = { 'peer': self.peer }
3634 group_p2p.Invite(args)
3636 def groupFinished(self, properties):
3637 logger.debug("groupFinished: " + str(properties))
3641 def invitationResult(self, result):
3642 logger.debug("invitationResult: " + str(result))
3643 if result['status'] != 1:
3644 raise Exception("Unexpected invitation result: " + str(result))
3645 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3646 dev1.group_ifname = dev1_group_ifname
3649 def run_test(self, *args):
3650 logger.debug("run_test")
3651 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3657 with TestDbusP2p(bus) as t:
3659 raise Exception("Expected signals not seen")
3661 dev[2].p2p_stop_find()
3663 def test_dbus_p2p_invitation_received(dev, apdev):
3664 """D-Bus P2P and InvitationReceived"""
3665 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3666 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3668 form(dev[0], dev[1])
3669 addr0 = dev[0].p2p_dev_addr()
3671 dev[0].global_request("SET persistent_reconnect 0")
3673 if not dev[1].discover_peer(addr0, social=True):
3674 raise Exception("Peer " + addr0 + " not found")
3675 peer = dev[1].get_peer(addr0)
3677 class TestDbusP2p(TestDbus):
3678 def __init__(self, bus):
3679 TestDbus.__init__(self, bus)
3682 def __enter__(self):
3683 gobject.timeout_add(1, self.run_test)
3684 gobject.timeout_add(15000, self.timeout)
3685 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
3686 "InvitationReceived")
3690 def invitationReceived(self, result):
3691 logger.debug("invitationReceived: " + str(result))
3695 def run_test(self, *args):
3696 logger.debug("run_test")
3697 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3698 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
3699 dev1.global_request(cmd)
3705 with TestDbusP2p(bus) as t:
3707 raise Exception("Expected signals not seen")
3709 dev[0].p2p_stop_find()
3710 dev[1].p2p_stop_find()
3712 def test_dbus_p2p_config(dev, apdev):
3713 """D-Bus Get/Set P2PDeviceConfig"""
3715 _test_dbus_p2p_config(dev, apdev)
3717 dev[0].request("P2P_SET ssid_postfix ")
3719 def _test_dbus_p2p_config(dev, apdev):
3720 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3721 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3723 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3724 dbus_interface=dbus.PROPERTIES_IFACE,
3726 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3727 dbus_interface=dbus.PROPERTIES_IFACE)
3728 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3729 dbus_interface=dbus.PROPERTIES_IFACE,
3732 if len(res) != len(res2):
3733 raise Exception("Different number of parameters")
3735 if res[k] != res2[k]:
3736 raise Exception("Parameter %s value changes" % k)
3738 changes = { 'SsidPostfix': 'foo',
3739 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3740 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3741 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3742 dbus.Dictionary(changes, signature='sv'),
3743 dbus_interface=dbus.PROPERTIES_IFACE)
3745 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3746 dbus_interface=dbus.PROPERTIES_IFACE,
3748 logger.debug("P2PDeviceConfig: " + str(res2))
3749 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3750 raise Exception("VendorExtension does not match")
3751 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3752 raise Exception("SecondaryDeviceType does not match")
3754 changes = { 'SsidPostfix': '',
3755 'VendorExtension': dbus.Array([], signature="ay"),
3756 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3757 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3758 dbus.Dictionary(changes, signature='sv'),
3759 dbus_interface=dbus.PROPERTIES_IFACE)
3761 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3762 dbus_interface=dbus.PROPERTIES_IFACE,
3764 logger.debug("P2PDeviceConfig: " + str(res3))
3765 if 'VendorExtension' in res3:
3766 raise Exception("VendorExtension not removed")
3767 if 'SecondaryDeviceTypes' in res3:
3768 raise Exception("SecondaryDeviceType not removed")
3771 dev[0].request("P2P_SET disabled 1")
3772 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3773 dbus_interface=dbus.PROPERTIES_IFACE,
3775 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3776 except dbus.exceptions.DBusException, e:
3777 if "Error.Failed: P2P is not available for this interface" not in str(e):
3778 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3780 dev[0].request("P2P_SET disabled 0")
3783 dev[0].request("P2P_SET disabled 1")
3784 changes = { 'SsidPostfix': 'foo' }
3785 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3786 dbus.Dictionary(changes, signature='sv'),
3787 dbus_interface=dbus.PROPERTIES_IFACE)
3788 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3789 except dbus.exceptions.DBusException, e:
3790 if "Error.Failed: P2P is not available for this interface" not in str(e):
3791 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3793 dev[0].request("P2P_SET disabled 0")
3795 tests = [ { 'DeviceName': 123 },
3796 { 'SsidPostfix': 123 },
3798 for changes in tests:
3800 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3801 dbus.Dictionary(changes, signature='sv'),
3802 dbus_interface=dbus.PROPERTIES_IFACE)
3803 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3804 except dbus.exceptions.DBusException, e:
3805 if "InvalidArgs" not in str(e):
3806 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3808 def test_dbus_p2p_persistent(dev, apdev):
3809 """D-Bus P2P persistent group"""
3810 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3811 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3813 class TestDbusP2p(TestDbus):
3814 def __init__(self, bus):
3815 TestDbus.__init__(self, bus)
3817 def __enter__(self):
3818 gobject.timeout_add(1, self.run_test)
3819 gobject.timeout_add(15000, self.timeout)
3820 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3822 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3824 self.add_signal(self.persistentGroupAdded,
3825 WPAS_DBUS_IFACE_P2PDEVICE,
3826 "PersistentGroupAdded")
3830 def groupStarted(self, properties):
3831 logger.debug("groupStarted: " + str(properties))
3832 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3833 properties['interface_object'])
3834 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3835 group_p2p.Disconnect()
3837 def groupFinished(self, properties):
3838 logger.debug("groupFinished: " + str(properties))
3841 def persistentGroupAdded(self, path, properties):
3842 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3843 self.persistent = path
3845 def run_test(self, *args):
3846 logger.debug("run_test")
3847 params = dbus.Dictionary({'persistent': True,
3849 logger.info("Add a persistent group")
3850 p2p.GroupAdd(params)
3856 with TestDbusP2p(bus) as t:
3858 raise Exception("Expected signals not seen")
3859 persistent = t.persistent
3861 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3862 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3863 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3864 logger.info("Persistent group Properties: " + str(res))
3865 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3866 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3867 dbus_interface=dbus.PROPERTIES_IFACE)
3868 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3869 dbus_interface=dbus.PROPERTIES_IFACE)
3870 if len(res) != len(res2):
3871 raise Exception("Different number of parameters")
3873 if k != 'ssid' and res[k] != res2[k]:
3874 raise Exception("Parameter %s value changes" % k)
3875 if res2['ssid'] != '"DIRECT-foo"':
3876 raise Exception("Unexpected ssid")
3878 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3879 'psk': '1234567890' }, signature='sv')
3880 group = p2p.AddPersistentGroup(args)
3882 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3883 dbus_interface=dbus.PROPERTIES_IFACE)
3884 if len(groups) != 2:
3885 raise Exception("Unexpected number of persistent groups: " + str(groups))
3887 p2p.RemoveAllPersistentGroups()
3889 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3890 dbus_interface=dbus.PROPERTIES_IFACE)
3891 if len(groups) != 0:
3892 raise Exception("Unexpected number of persistent groups: " + str(groups))
3895 p2p.RemovePersistentGroup(persistent)
3896 raise Exception("Invalid RemovePersistentGroup accepted")
3897 except dbus.exceptions.DBusException, e:
3898 if "NetworkUnknown: There is no such persistent group" not in str(e):
3899 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3901 def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3902 """D-Bus P2P reinvoke persistent group"""
3903 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3904 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3906 addr0 = dev[0].p2p_dev_addr()
3908 class TestDbusP2p(TestDbus):
3909 def __init__(self, bus):
3910 TestDbus.__init__(self, bus)
3912 self.waiting_end = False
3914 self.invited = False
3916 def __enter__(self):
3917 gobject.timeout_add(1, self.run_test)
3918 gobject.timeout_add(15000, self.timeout)
3919 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3921 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3923 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3925 self.add_signal(self.persistentGroupAdded,
3926 WPAS_DBUS_IFACE_P2PDEVICE,
3927 "PersistentGroupAdded")
3928 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3929 WPAS_DBUS_IFACE_P2PDEVICE,
3930 "ProvisionDiscoveryRequestDisplayPin")
3931 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3936 def groupStarted(self, properties):
3937 logger.debug("groupStarted: " + str(properties))
3938 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3939 properties['interface_object'])
3940 if not self.invited:
3941 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3942 properties['group_object'])
3943 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3944 dbus_interface=dbus.PROPERTIES_IFACE,
3946 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3947 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3948 dev1.scan_for_bss(bssid, freq=2412)
3949 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3951 def groupFinished(self, properties):
3952 logger.debug("groupFinished: " + str(properties))
3957 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3958 dev1.global_request("SET persistent_reconnect 1")
3961 args = { 'persistent_group_object': dbus.ObjectPath(path),
3962 'peer': self.peer_path }
3964 pin = p2p.Invite(args)
3965 raise Exception("Invalid Invite accepted")
3966 except dbus.exceptions.DBusException, e:
3967 if "InvalidArgs" not in str(e):
3968 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3970 args = { 'persistent_group_object': self.persistent,
3971 'peer': self.peer_path }
3972 pin = p2p.Invite(args)
3975 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
3977 if self.sta_group_ev is None:
3978 raise Exception("P2P-GROUP-STARTED event not seen")
3980 def persistentGroupAdded(self, path, properties):
3981 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3982 self.persistent = path
3984 def deviceFound(self, path):
3985 logger.debug("deviceFound: path=%s" % path)
3986 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3987 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3988 dbus_interface=dbus.PROPERTIES_IFACE,
3991 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3992 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3993 self.peer_path = peer_object
3994 peer = binascii.unhexlify(peer_object.split('/')[-1])
3999 addr += '%02x' % ord(p)
4000 params = { 'Role': 'registrar',
4001 'P2PDeviceAddress': self.peer['DeviceAddress'],
4002 'Bssid': self.peer['DeviceAddress'],
4005 logger.info("Authorize peer to connect to the group")
4006 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4007 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4009 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4011 if self.sta_group_ev is None:
4012 raise Exception("P2P-GROUP-STARTED event not seen")
4014 def staAuthorized(self, name):
4015 logger.debug("staAuthorized: " + name)
4016 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4017 dev1.group_form_result(self.sta_group_ev)
4019 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4021 raise Exception("Group removal timed out")
4022 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4023 group_p2p.Disconnect()
4025 def run_test(self, *args):
4026 logger.debug("run_test")
4027 params = dbus.Dictionary({'persistent': True,
4029 logger.info("Add a persistent group")
4030 p2p.GroupAdd(params)
4036 with TestDbusP2p(bus) as t:
4038 raise Exception("Expected signals not seen")
4040 def test_dbus_p2p_go_neg_rx(dev, apdev):
4041 """D-Bus P2P GO Negotiation receive"""
4042 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4043 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4044 addr0 = dev[0].p2p_dev_addr()
4046 class TestDbusP2p(TestDbus):
4047 def __init__(self, bus):
4048 TestDbus.__init__(self, bus)
4051 def __enter__(self):
4052 gobject.timeout_add(1, self.run_test)
4053 gobject.timeout_add(15000, self.timeout)
4054 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4056 self.add_signal(self.goNegotiationRequest,
4057 WPAS_DBUS_IFACE_P2PDEVICE,
4058 "GONegotiationRequest",
4060 self.add_signal(self.goNegotiationSuccess,
4061 WPAS_DBUS_IFACE_P2PDEVICE,
4062 "GONegotiationSuccess",
4064 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4066 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4071 def deviceFound(self, path):
4072 logger.debug("deviceFound: path=%s" % path)
4074 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4075 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4076 if dev_passwd_id != 1:
4077 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4078 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4079 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4082 raise Exception("Invalid Connect accepted")
4083 except dbus.exceptions.DBusException, e:
4084 if "ConnectChannelUnsupported" not in str(e):
4085 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4087 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4088 'go_intent': 15, 'persistent': False }
4091 def goNegotiationSuccess(self, properties):
4092 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4094 def groupStarted(self, properties):
4095 logger.debug("groupStarted: " + str(properties))
4096 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4097 properties['interface_object'])
4098 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4099 group_p2p.Disconnect()
4101 def groupFinished(self, properties):
4102 logger.debug("groupFinished: " + str(properties))
4106 def run_test(self, *args):
4107 logger.debug("run_test")
4109 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4110 if not dev1.discover_peer(addr0):
4111 raise Exception("Peer not found")
4112 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4118 with TestDbusP2p(bus) as t:
4120 raise Exception("Expected signals not seen")
4122 def test_dbus_p2p_go_neg_auth(dev, apdev):
4123 """D-Bus P2P GO Negotiation authorized"""
4124 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4125 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4126 addr0 = dev[0].p2p_dev_addr()
4129 class TestDbusP2p(TestDbus):
4130 def __init__(self, bus):
4131 TestDbus.__init__(self, bus)
4133 self.peer_joined = False
4134 self.peer_disconnected = False
4136 def __enter__(self):
4137 gobject.timeout_add(1, self.run_test)
4138 gobject.timeout_add(15000, self.timeout)
4139 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4141 self.add_signal(self.goNegotiationSuccess,
4142 WPAS_DBUS_IFACE_P2PDEVICE,
4143 "GONegotiationSuccess",
4145 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4147 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4149 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4151 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4153 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4158 def deviceFound(self, path):
4159 logger.debug("deviceFound: path=%s" % path)
4160 args = { 'peer': path, 'wps_method': 'keypad',
4161 'go_intent': 15, 'authorize_only': True }
4164 raise Exception("Invalid Connect accepted")
4165 except dbus.exceptions.DBusException, e:
4166 if "InvalidArgs" not in str(e):
4167 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4169 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4170 'go_intent': 15, 'authorize_only': True }
4173 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4174 if not dev1.discover_peer(addr0):
4175 raise Exception("Peer not found")
4176 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4177 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4179 raise Exception("Group formation timed out")
4180 self.sta_group_ev = ev
4182 def goNegotiationSuccess(self, properties):
4183 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4185 def groupStarted(self, properties):
4186 logger.debug("groupStarted: " + str(properties))
4187 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4188 properties['interface_object'])
4189 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4190 dev1.group_form_result(self.sta_group_ev)
4193 def staDeauthorized(self, name):
4194 logger.debug("staDeuthorized: " + name)
4195 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4196 group_p2p.Disconnect()
4198 def peerJoined(self, peer):
4199 logger.debug("peerJoined: " + peer)
4200 self.peer_joined = True
4202 def peerDisconnected(self, peer):
4203 logger.debug("peerDisconnected: " + peer)
4204 self.peer_disconnected = True
4206 def groupFinished(self, properties):
4207 logger.debug("groupFinished: " + str(properties))
4211 def run_test(self, *args):
4212 logger.debug("run_test")
4213 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4217 return self.done and self.peer_joined and self.peer_disconnected
4219 with TestDbusP2p(bus) as t:
4221 raise Exception("Expected signals not seen")
4223 def test_dbus_p2p_go_neg_init(dev, apdev):
4224 """D-Bus P2P GO Negotiation initiation"""
4225 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4226 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4227 addr0 = dev[0].p2p_dev_addr()
4230 class TestDbusP2p(TestDbus):
4231 def __init__(self, bus):
4232 TestDbus.__init__(self, bus)
4234 self.peer_group_added = False
4235 self.peer_group_removed = False
4237 def __enter__(self):
4238 gobject.timeout_add(1, self.run_test)
4239 gobject.timeout_add(15000, self.timeout)
4240 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4242 self.add_signal(self.goNegotiationSuccess,
4243 WPAS_DBUS_IFACE_P2PDEVICE,
4244 "GONegotiationSuccess",
4246 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4248 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4250 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4251 "PropertiesChanged")
4255 def deviceFound(self, path):
4256 logger.debug("deviceFound: path=%s" % path)
4257 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4258 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4262 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4264 raise Exception("Timeout while waiting for GO Neg Request")
4265 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4266 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4268 raise Exception("Group formation timed out")
4269 self.sta_group_ev = ev
4271 def goNegotiationSuccess(self, properties):
4272 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4274 def groupStarted(self, properties):
4275 logger.debug("groupStarted: " + str(properties))
4276 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4277 properties['interface_object'])
4278 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4279 group_p2p.Disconnect()
4280 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4281 dev1.group_form_result(self.sta_group_ev)
4284 def groupFinished(self, properties):
4285 logger.debug("groupFinished: " + str(properties))
4288 def propertiesChanged(self, interface_name, changed_properties,
4289 invalidated_properties):
4290 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4291 if interface_name != WPAS_DBUS_P2P_PEER:
4293 if "Groups" not in changed_properties:
4295 if len(changed_properties["Groups"]) > 0:
4296 self.peer_group_added = True
4297 if len(changed_properties["Groups"]) == 0:
4298 self.peer_group_removed = True
4301 def run_test(self, *args):
4302 logger.debug("run_test")
4303 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4307 return self.done and self.peer_group_added and self.peer_group_removed
4309 with TestDbusP2p(bus) as t:
4311 raise Exception("Expected signals not seen")
4313 def test_dbus_p2p_group_termination_by_go(dev, apdev):
4314 """D-Bus P2P group removal on GO terminating the group"""
4315 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4316 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4317 addr0 = dev[0].p2p_dev_addr()
4320 class TestDbusP2p(TestDbus):
4321 def __init__(self, bus):
4322 TestDbus.__init__(self, bus)
4324 self.peer_group_added = False
4325 self.peer_group_removed = False
4327 def __enter__(self):
4328 gobject.timeout_add(1, self.run_test)
4329 gobject.timeout_add(15000, self.timeout)
4330 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4332 self.add_signal(self.goNegotiationSuccess,
4333 WPAS_DBUS_IFACE_P2PDEVICE,
4334 "GONegotiationSuccess",
4336 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4338 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4340 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4341 "PropertiesChanged")
4345 def deviceFound(self, path):
4346 logger.debug("deviceFound: path=%s" % path)
4347 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4348 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4352 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4354 raise Exception("Timeout while waiting for GO Neg Request")
4355 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4356 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4358 raise Exception("Group formation timed out")
4359 self.sta_group_ev = ev
4361 def goNegotiationSuccess(self, properties):
4362 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4364 def groupStarted(self, properties):
4365 logger.debug("groupStarted: " + str(properties))
4366 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4367 properties['interface_object'])
4368 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4369 dev1.group_form_result(self.sta_group_ev)
4372 def groupFinished(self, properties):
4373 logger.debug("groupFinished: " + str(properties))
4376 def propertiesChanged(self, interface_name, changed_properties,
4377 invalidated_properties):
4378 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4379 if interface_name != WPAS_DBUS_P2P_PEER:
4381 if "Groups" not in changed_properties:
4383 if len(changed_properties["Groups"]) > 0:
4384 self.peer_group_added = True
4385 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4386 self.peer_group_removed = True
4389 def run_test(self, *args):
4390 logger.debug("run_test")
4391 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4395 return self.done and self.peer_group_added and self.peer_group_removed
4397 with TestDbusP2p(bus) as t:
4399 raise Exception("Expected signals not seen")
4401 def test_dbus_p2p_group_idle_timeout(dev, apdev):
4402 """D-Bus P2P group removal on idle timeout"""
4404 dev[0].global_request("SET p2p_group_idle 1")
4405 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4407 dev[0].global_request("SET p2p_group_idle 0")
4409 def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4410 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4411 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4412 addr0 = dev[0].p2p_dev_addr()
4415 class TestDbusP2p(TestDbus):
4416 def __init__(self, bus):
4417 TestDbus.__init__(self, bus)
4419 self.group_started = False
4420 self.peer_group_added = False
4421 self.peer_group_removed = False
4423 def __enter__(self):
4424 gobject.timeout_add(1, self.run_test)
4425 gobject.timeout_add(15000, self.timeout)
4426 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4428 self.add_signal(self.goNegotiationSuccess,
4429 WPAS_DBUS_IFACE_P2PDEVICE,
4430 "GONegotiationSuccess",
4432 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4434 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4436 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4437 "PropertiesChanged")
4441 def deviceFound(self, path):
4442 logger.debug("deviceFound: path=%s" % path)
4443 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4444 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4448 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4450 raise Exception("Timeout while waiting for GO Neg Request")
4451 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4452 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4454 raise Exception("Group formation timed out")
4455 self.sta_group_ev = ev
4457 def goNegotiationSuccess(self, properties):
4458 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4460 def groupStarted(self, properties):
4461 logger.debug("groupStarted: " + str(properties))
4462 self.group_started = True
4463 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4464 properties['interface_object'])
4465 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4466 dev1.group_form_result(self.sta_group_ev)
4467 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4468 # Force disassociation with different reason code so that the
4469 # P2P Client using D-Bus does not get normal group termination event
4471 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4474 def groupFinished(self, properties):
4475 logger.debug("groupFinished: " + str(properties))
4478 def propertiesChanged(self, interface_name, changed_properties,
4479 invalidated_properties):
4480 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4481 if interface_name != WPAS_DBUS_P2P_PEER:
4483 if not self.group_started:
4485 if "Groups" not in changed_properties:
4487 if len(changed_properties["Groups"]) > 0:
4488 self.peer_group_added = True
4489 if len(changed_properties["Groups"]) == 0:
4490 self.peer_group_removed = True
4493 def run_test(self, *args):
4494 logger.debug("run_test")
4495 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4499 return self.done and self.peer_group_added and self.peer_group_removed
4501 with TestDbusP2p(bus) as t:
4503 raise Exception("Expected signals not seen")
4505 def test_dbus_p2p_wps_failure(dev, apdev):
4506 """D-Bus P2P WPS failure"""
4507 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4508 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4509 addr0 = dev[0].p2p_dev_addr()
4511 class TestDbusP2p(TestDbus):
4512 def __init__(self, bus):
4513 TestDbus.__init__(self, bus)
4514 self.wps_failed = False
4515 self.formation_failure = False
4517 def __enter__(self):
4518 gobject.timeout_add(1, self.run_test)
4519 gobject.timeout_add(15000, self.timeout)
4520 self.add_signal(self.goNegotiationRequest,
4521 WPAS_DBUS_IFACE_P2PDEVICE,
4522 "GONegotiationRequest",
4524 self.add_signal(self.goNegotiationSuccess,
4525 WPAS_DBUS_IFACE_P2PDEVICE,
4526 "GONegotiationSuccess",
4528 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4530 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4532 self.add_signal(self.groupFormationFailure,
4533 WPAS_DBUS_IFACE_P2PDEVICE,
4534 "GroupFormationFailure")
4538 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4539 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4540 if dev_passwd_id != 1:
4541 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4542 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4546 def goNegotiationSuccess(self, properties):
4547 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4549 def groupStarted(self, properties):
4550 logger.debug("groupStarted: " + str(properties))
4551 raise Exception("Unexpected GroupStarted")
4553 def wpsFailed(self, name, args):
4554 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
4555 self.wps_failed = True
4556 if self.formation_failure:
4559 def groupFormationFailure(self, reason):
4560 logger.debug("groupFormationFailure - reason=%s" % reason)
4561 self.formation_failure = True
4565 def run_test(self, *args):
4566 logger.debug("run_test")
4568 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4569 if not dev1.discover_peer(addr0):
4570 raise Exception("Peer not found")
4571 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4575 return self.wps_failed and self.formation_failure
4577 with TestDbusP2p(bus) as t:
4579 raise Exception("Expected signals not seen")
4581 def test_dbus_p2p_two_groups(dev, apdev):
4582 """D-Bus P2P with two concurrent groups"""
4583 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4584 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4586 dev[0].request("SET p2p_no_group_iface 0")
4587 addr0 = dev[0].p2p_dev_addr()
4588 addr1 = dev[1].p2p_dev_addr()
4589 addr2 = dev[2].p2p_dev_addr()
4590 dev[1].p2p_start_go(freq=2412)
4591 dev1_group_ifname = dev[1].group_ifname
4593 class TestDbusP2p(TestDbus):
4594 def __init__(self, bus):
4595 TestDbus.__init__(self, bus)
4601 self.groups_removed = False
4603 def __enter__(self):
4604 gobject.timeout_add(1, self.run_test)
4605 gobject.timeout_add(15000, self.timeout)
4606 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4607 "PropertiesChanged", byte_arrays=True)
4608 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4610 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4612 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4614 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4619 def propertiesChanged(self, interface_name, changed_properties,
4620 invalidated_properties):
4621 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4623 def deviceFound(self, path):
4624 logger.debug("deviceFound: path=%s" % path)
4625 if addr2.replace(':','') in path:
4627 elif addr1.replace(':','') in path:
4629 if self.go and not self.group1:
4630 logger.info("Join the group")
4633 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4634 dev1.group_ifname = dev1_group_ifname
4635 dev1.group_request("WPS_PIN any " + pin)
4636 args = { 'peer': self.go,
4638 'wps_method': 'pin',
4643 def groupStarted(self, properties):
4644 logger.debug("groupStarted: " + str(properties))
4645 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4646 dbus_interface=dbus.PROPERTIES_IFACE)
4647 logger.debug("p2pdevice properties: " + str(prop))
4649 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4650 properties['group_object'])
4651 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4652 dbus_interface=dbus.PROPERTIES_IFACE,
4654 logger.debug("Group properties: " + str(res))
4657 self.group1 = properties['group_object']
4658 self.group1iface = properties['interface_object']
4659 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4662 logger.info("Start autonomous GO")
4663 params = dbus.Dictionary({ 'frequency': 2412 })
4664 p2p.GroupAdd(params)
4665 elif not self.group2:
4666 self.group2 = properties['group_object']
4667 self.group2iface = properties['interface_object']
4668 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4670 self.g2_bssid = res['BSSID']
4672 if self.group1 and self.group2:
4673 logger.info("Authorize peer to join the group")
4674 a2 = binascii.unhexlify(addr2.replace(':',''))
4675 params = { 'Role': 'enrollee',
4676 'P2PDeviceAddress': dbus.ByteArray(a2),
4677 'Bssid': dbus.ByteArray(a2),
4680 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4683 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4684 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4685 dev2.scan_for_bss(bssid, freq=2412)
4686 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
4687 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4689 raise Exception("Group join timed out")
4690 self.dev2_group_ev = ev
4692 def groupFinished(self, properties):
4693 logger.debug("groupFinished: " + str(properties))
4695 if self.group1 == properties['group_object']:
4697 elif self.group2 == properties['group_object']:
4700 if not self.group1 and not self.group2:
4704 def peerJoined(self, peer):
4705 logger.debug("peerJoined: " + peer)
4706 if self.groups_removed:
4708 self.check_results()
4710 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4711 dev2.group_form_result(self.dev2_group_ev)
4714 logger.info("Disconnect group2")
4715 group_p2p = dbus.Interface(self.g2_if_obj,
4716 WPAS_DBUS_IFACE_P2PDEVICE)
4717 group_p2p.Disconnect()
4719 logger.info("Disconnect group1")
4720 group_p2p = dbus.Interface(self.g1_if_obj,
4721 WPAS_DBUS_IFACE_P2PDEVICE)
4722 group_p2p.Disconnect()
4723 self.groups_removed = True
4725 def check_results(self):
4726 logger.info("Check results with two concurrent groups in operation")
4728 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4729 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4730 dbus_interface=dbus.PROPERTIES_IFACE,
4733 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4734 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4735 dbus_interface=dbus.PROPERTIES_IFACE,
4738 logger.info("group1 = " + self.group1)
4739 logger.debug("Group properties: " + str(res1))
4741 logger.info("group2 = " + self.group2)
4742 logger.debug("Group properties: " + str(res2))
4744 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4745 dbus_interface=dbus.PROPERTIES_IFACE)
4746 logger.debug("p2pdevice properties: " + str(prop))
4748 if res1['Role'] != 'client':
4749 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4750 if res2['Role'] != 'GO':
4751 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4752 if prop['Role'] != 'device':
4753 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4755 if len(res2['Members']) != 1:
4756 raise Exception("Unexpected Members value for group 2")
4758 def run_test(self, *args):
4759 logger.debug("run_test")
4760 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4766 with TestDbusP2p(bus) as t:
4768 raise Exception("Expected signals not seen")
4770 dev[1].remove_group()
4772 def test_dbus_p2p_cancel(dev, apdev):
4773 """D-Bus P2P Cancel"""
4774 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4775 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4778 raise Exception("Unexpected p2p.Cancel() success")
4779 except dbus.exceptions.DBusException, e:
4782 addr0 = dev[0].p2p_dev_addr()
4785 class TestDbusP2p(TestDbus):
4786 def __init__(self, bus):
4787 TestDbus.__init__(self, bus)
4790 def __enter__(self):
4791 gobject.timeout_add(1, self.run_test)
4792 gobject.timeout_add(15000, self.timeout)
4793 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4798 def deviceFound(self, path):
4799 logger.debug("deviceFound: path=%s" % path)
4800 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4807 def run_test(self, *args):
4808 logger.debug("run_test")
4809 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4815 with TestDbusP2p(bus) as t:
4817 raise Exception("Expected signals not seen")
4819 def test_dbus_introspect(dev, apdev):
4820 """D-Bus introspection"""
4821 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4823 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4824 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4825 logger.info("Initial Introspect: " + str(res))
4826 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4827 raise Exception("Unexpected initial Introspect response: " + str(res))
4828 if "FastReauth" not in res or "PassiveScan" not in res:
4829 raise Exception("Unexpected initial Introspect response: " + str(res))
4831 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4832 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4833 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4834 logger.info("Introspect: " + str(res2))
4835 if res2 is not None:
4836 raise Exception("Unexpected Introspect response")
4838 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4839 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4840 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4841 logger.info("Introspect: " + str(res2))
4843 raise Exception("No Introspect response")
4844 if len(res2) >= len(res):
4845 raise Exception("Unexpected Introspect response")
4847 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4848 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4849 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4850 logger.info("Introspect: " + str(res2))
4852 raise Exception("No Introspect response")
4853 if len(res2) >= len(res):
4854 raise Exception("Unexpected Introspect response")
4856 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4857 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4858 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4859 logger.info("Introspect: " + str(res2))
4861 raise Exception("No Introspect response")
4862 if len(res2) >= len(res):
4863 raise Exception("Unexpected Introspect response")
4865 def test_dbus_ap(dev, apdev):
4866 """D-Bus AddNetwork for AP mode"""
4867 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4868 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4870 ssid = "test-wpa2-psk"
4871 passphrase = 'qwertyuiop'
4873 class TestDbusConnect(TestDbus):
4874 def __init__(self, bus):
4875 TestDbus.__init__(self, bus)
4876 self.started = False
4878 def __enter__(self):
4879 gobject.timeout_add(1, self.run_connect)
4880 gobject.timeout_add(15000, self.timeout)
4881 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4882 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4884 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4885 "PropertiesChanged")
4889 def networkAdded(self, network, properties):
4890 logger.debug("networkAdded: %s" % str(network))
4891 logger.debug(str(properties))
4893 def networkSelected(self, network):
4894 logger.debug("networkSelected: %s" % str(network))
4895 self.network_selected = True
4897 def propertiesChanged(self, properties):
4898 logger.debug("propertiesChanged: %s" % str(properties))
4899 if 'State' in properties and properties['State'] == "completed":
4903 def run_connect(self, *args):
4904 logger.debug("run_connect")
4905 args = dbus.Dictionary({ 'ssid': ssid,
4906 'key_mgmt': 'WPA-PSK',
4909 'frequency': 2412 },
4911 self.netw = iface.AddNetwork(args)
4912 iface.SelectNetwork(self.netw)
4918 with TestDbusConnect(bus) as t:
4920 raise Exception("Expected signals not seen")
4921 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
4923 def test_dbus_connect_wpa_eap(dev, apdev):
4924 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4925 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4926 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4928 ssid = "test-wpa-eap"
4929 params = hostapd.wpa_eap_params(ssid=ssid)
4931 params["rsn_pairwise"] = "CCMP"
4932 hapd = hostapd.add_ap(apdev[0], params)
4934 class TestDbusConnect(TestDbus):
4935 def __init__(self, bus):
4936 TestDbus.__init__(self, bus)
4939 def __enter__(self):
4940 gobject.timeout_add(1, self.run_connect)
4941 gobject.timeout_add(15000, self.timeout)
4942 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4943 "PropertiesChanged")
4944 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
4948 def propertiesChanged(self, properties):
4949 logger.debug("propertiesChanged: %s" % str(properties))
4950 if 'State' in properties and properties['State'] == "completed":
4954 def eap(self, status, parameter):
4955 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
4957 def run_connect(self, *args):
4958 logger.debug("run_connect")
4959 args = dbus.Dictionary({ 'ssid': ssid,
4960 'key_mgmt': 'WPA-EAP',
4963 'password': 'password',
4964 'ca_cert': 'auth_serv/ca.pem',
4965 'phase2': 'auth=MSCHAPV2',
4966 'scan_freq': 2412 },
4968 self.netw = iface.AddNetwork(args)
4969 iface.SelectNetwork(self.netw)
4975 with TestDbusConnect(bus) as t:
4977 raise Exception("Expected signals not seen")
4979 def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4980 """AP_SCAN 2 AP mode and D-Bus Scan()"""
4982 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
4984 dev[0].request("AP_SCAN 1")
4986 def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4987 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4988 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4990 if "OK" not in dev[0].request("AP_SCAN 2"):
4991 raise Exception("Failed to set AP_SCAN 2")
4993 id = dev[0].add_network()
4994 dev[0].set_network(id, "mode", "2")
4995 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
4996 dev[0].set_network(id, "key_mgmt", "NONE")
4997 dev[0].set_network(id, "frequency", "2412")
4998 dev[0].set_network(id, "scan_freq", "2412")
4999 dev[0].set_network(id, "disabled", "0")
5000 dev[0].select_network(id)
5001 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5003 raise Exception("AP failed to start")
5005 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5006 iface.Scan({'Type': 'active',
5008 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5009 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5010 "AP-DISABLED"], timeout=5)
5012 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5013 if "AP-DISABLED" in ev:
5014 raise Exception("Unexpected AP-DISABLED event")
5016 # Wait for the retry to scan happen
5017 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5018 "AP-DISABLED"], timeout=5)
5020 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5021 if "AP-DISABLED" in ev:
5022 raise Exception("Unexpected AP-DISABLED event - retry")
5024 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5025 dev[1].request("DISCONNECT")
5026 dev[1].wait_disconnected()
5027 dev[0].request("DISCONNECT")
5028 dev[0].wait_disconnected()
5030 def test_dbus_expectdisconnect(dev, apdev):
5031 """D-Bus ExpectDisconnect"""
5032 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5033 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5035 params = { "ssid": "test-open" }
5036 hapd = hostapd.add_ap(apdev[0], params)
5037 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5039 # This does not really verify the behavior other than by going through the
5040 # code path for additional coverage.
5041 wpas.ExpectDisconnect()
5042 dev[0].request("DISCONNECT")
5043 dev[0].wait_disconnected()
5045 def test_dbus_save_config(dev, apdev):
5046 """D-Bus SaveConfig"""
5047 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5048 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5051 raise Exception("SaveConfig() accepted unexpectedly")
5052 except dbus.exceptions.DBusException, e:
5053 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5054 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5056 def test_dbus_vendor_elem(dev, apdev):
5057 """D-Bus vendor element operations"""
5059 _test_dbus_vendor_elem(dev, apdev)
5061 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5063 def _test_dbus_vendor_elem(dev, apdev):
5064 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5065 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5067 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5070 ie = dbus.ByteArray("\x00\x00")
5071 iface.VendorElemAdd(-1, ie)
5072 raise Exception("Invalid VendorElemAdd() accepted")
5073 except dbus.exceptions.DBusException, e:
5074 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5075 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5078 ie = dbus.ByteArray("")
5079 iface.VendorElemAdd(1, ie)
5080 raise Exception("Invalid VendorElemAdd() accepted")
5081 except dbus.exceptions.DBusException, e:
5082 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5083 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5086 ie = dbus.ByteArray("\x00\x01")
5087 iface.VendorElemAdd(1, ie)
5088 raise Exception("Invalid VendorElemAdd() accepted")
5089 except dbus.exceptions.DBusException, e:
5090 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5091 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5094 iface.VendorElemGet(-1)
5095 raise Exception("Invalid VendorElemGet() accepted")
5096 except dbus.exceptions.DBusException, e:
5097 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5098 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5101 iface.VendorElemGet(1)
5102 raise Exception("Invalid VendorElemGet() accepted")
5103 except dbus.exceptions.DBusException, e:
5104 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5105 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5108 ie = dbus.ByteArray("\x00\x00")
5109 iface.VendorElemRem(-1, ie)
5110 raise Exception("Invalid VendorElemRemove() accepted")
5111 except dbus.exceptions.DBusException, e:
5112 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5113 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5116 ie = dbus.ByteArray("")
5117 iface.VendorElemRem(1, ie)
5118 raise Exception("Invalid VendorElemRemove() accepted")
5119 except dbus.exceptions.DBusException, e:
5120 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5121 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5123 iface.VendorElemRem(1, "*")
5125 ie = dbus.ByteArray("\x00\x01\x00")
5126 iface.VendorElemAdd(1, ie)
5128 val = iface.VendorElemGet(1)
5129 if len(val) != len(ie):
5130 raise Exception("Unexpected VendorElemGet length")
5131 for i in range(len(val)):
5132 if val[i] != dbus.Byte(ie[i]):
5133 raise Exception("Unexpected VendorElemGet data")
5135 ie2 = dbus.ByteArray("\xe0\x00")
5136 iface.VendorElemAdd(1, ie2)
5139 val = iface.VendorElemGet(1)
5140 if len(val) != len(ies):
5141 raise Exception("Unexpected VendorElemGet length[2]")
5142 for i in range(len(val)):
5143 if val[i] != dbus.Byte(ies[i]):
5144 raise Exception("Unexpected VendorElemGet data[2]")
5147 test_ie = dbus.ByteArray("\x01\x01")
5148 iface.VendorElemRem(1, test_ie)
5149 raise Exception("Invalid VendorElemRemove() accepted")
5150 except dbus.exceptions.DBusException, e:
5151 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5152 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5154 iface.VendorElemRem(1, ie)
5155 val = iface.VendorElemGet(1)
5156 if len(val) != len(ie2):
5157 raise Exception("Unexpected VendorElemGet length[3]")
5159 iface.VendorElemRem(1, "*")
5161 iface.VendorElemGet(1)
5162 raise Exception("Invalid VendorElemGet() accepted after removal")
5163 except dbus.exceptions.DBusException, e:
5164 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5165 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5167 def test_dbus_assoc_reject(dev, apdev):
5168 """D-Bus AssocStatusCode"""
5169 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5170 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5173 params = { "ssid": ssid,
5174 "max_listen_interval": "1" }
5175 hapd = hostapd.add_ap(apdev[0], params)
5177 class TestDbusConnect(TestDbus):
5178 def __init__(self, bus):
5179 TestDbus.__init__(self, bus)
5180 self.assoc_status_seen = False
5183 def __enter__(self):
5184 gobject.timeout_add(1, self.run_connect)
5185 gobject.timeout_add(15000, self.timeout)
5186 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5187 "PropertiesChanged")
5191 def propertiesChanged(self, properties):
5192 logger.debug("propertiesChanged: %s" % str(properties))
5193 if 'AssocStatusCode' in properties:
5194 status = properties['AssocStatusCode']
5196 logger.info("Unexpected status code: " + str(status))
5198 self.assoc_status_seen = True
5202 def run_connect(self, *args):
5203 args = dbus.Dictionary({ 'ssid': ssid,
5205 'scan_freq': 2412 },
5207 self.netw = iface.AddNetwork(args)
5208 iface.SelectNetwork(self.netw)
5212 return self.assoc_status_seen
5214 with TestDbusConnect(bus) as t:
5216 raise Exception("Expected signals not seen")