1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
9 logger = logging.getLogger()
21 from wpasupplicant import WpaSupplicant
22 from utils import HwsimSkip, alloc_fail, fail_test
23 from p2p_utils import *
24 from test_ap_tdls import connect_2sta_open
25 from test_ap_eap import check_altsubject_match_support
27 WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
28 WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
29 WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
30 WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
31 WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
32 WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
33 WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
34 WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
35 WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
36 WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
38 def prepare_dbus(dev):
40 logger.info("No dbus module available")
41 raise HwsimSkip("No dbus module available")
43 from dbus.mainloop.glib import DBusGMainLoop
44 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
45 bus = dbus.SystemBus()
46 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
47 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
48 path = wpas.GetInterface(dev.ifname)
49 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
50 return (bus,wpas_obj,path,if_obj)
52 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
54 class TestDbus(object):
55 def __init__(self, bus):
56 self.loop = gobject.MainLoop()
60 def __exit__(self, type, value, traceback):
61 for s in self.signals:
64 def add_signal(self, handler, interface, name, byte_arrays=False):
65 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
67 byte_arrays=byte_arrays)
68 self.signals.append(s)
70 def timeout(self, *args):
71 logger.debug("timeout")
75 class alloc_fail_dbus(object):
76 def __init__(self, dev, count, funcs, operation="Operation",
81 self._operation = operation
82 self._expected = expected
84 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
85 if "OK" not in self._dev.request(cmd):
86 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87 def __exit__(self, type, value, traceback):
89 raise Exception("%s succeeded during out-of-memory" % self._operation)
90 if type == dbus.exceptions.DBusException and self._expected in str(value):
92 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
93 raise Exception("%s did not trigger allocation failure" % self._operation)
96 def start_ap(ap, ssid="test-wps",
97 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
99 "wpa_passphrase": "12345678", "wpa": "2",
100 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
101 "ap_pin": "12345670", "uuid": ap_uuid}
102 return hostapd.add_ap(ap, params)
104 def test_dbus_getall(dev, apdev):
106 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
108 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
109 dbus_interface=dbus.PROPERTIES_IFACE)
110 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
112 props = if_obj.GetAll(WPAS_DBUS_IFACE,
113 dbus_interface=dbus.PROPERTIES_IFACE)
114 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
116 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
120 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
121 dbus_interface=dbus.PROPERTIES_IFACE)
123 raise Exception("Unexpected BSSs entry: " + str(res))
125 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
126 dbus_interface=dbus.PROPERTIES_IFACE)
128 raise Exception("Unexpected Networks entry: " + str(res))
130 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
131 bssid = apdev[0]['bssid']
132 dev[0].scan_for_bss(bssid, freq=2412)
133 id = dev[0].add_network()
134 dev[0].set_network(id, "disabled", "0")
135 dev[0].set_network_quoted(id, "ssid", "test")
137 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
138 dbus_interface=dbus.PROPERTIES_IFACE)
140 raise Exception("Missing BSSs entry: " + str(res))
141 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
142 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
143 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
145 for item in props['BSSID']:
146 if len(bssid_str) > 0:
148 bssid_str += '%02x' % item
149 if bssid_str != bssid:
150 raise Exception("Unexpected BSSID in BSSs entry")
152 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
153 dbus_interface=dbus.PROPERTIES_IFACE)
155 raise Exception("Missing Networks entry: " + str(res))
156 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
157 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
158 dbus_interface=dbus.PROPERTIES_IFACE)
159 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
160 ssid = props['Properties']['ssid']
162 raise Exception("Unexpected SSID in network entry")
164 def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
165 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
166 dbus_interface=dbus.PROPERTIES_IFACE,
167 byte_arrays=byte_arrays)
168 if expect is not None and val != expect:
169 raise Exception("Unexpected %s: %s (expected: %s)" %
170 (prop, str(val), str(expect)))
173 def dbus_set(dbus, wpas_obj, prop, val):
174 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
175 dbus_interface=dbus.PROPERTIES_IFACE)
177 def test_dbus_properties(dev, apdev):
178 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
179 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
181 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
182 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
183 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
184 for (val,err) in [ (3, "Error.Failed: wrong property type"),
185 ("foo", "Error.Failed: wrong debug level value") ]:
187 dbus_set(dbus, wpas_obj, "DebugLevel", val)
188 raise Exception("Invalid DebugLevel value accepted: " + str(val))
189 except dbus.exceptions.DBusException, e:
190 if err not in str(e):
191 raise Exception("Unexpected error message: " + str(e))
192 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
193 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
195 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
196 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
197 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
199 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
200 raise Exception("Invalid DebugTimestamp value accepted")
201 except dbus.exceptions.DBusException, e:
202 if "Error.Failed: wrong property type" not in str(e):
203 raise Exception("Unexpected error message: " + str(e))
204 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
205 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
207 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
208 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
209 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
211 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
212 raise Exception("Invalid DebugShowKeys value accepted")
213 except dbus.exceptions.DBusException, e:
214 if "Error.Failed: wrong property type" not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
217 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
219 res = dbus_get(dbus, wpas_obj, "Interfaces")
221 raise Exception("Unexpected Interfaces value: " + str(res))
223 res = dbus_get(dbus, wpas_obj, "EapMethods")
224 if len(res) < 5 or "TTLS" not in res:
225 raise Exception("Unexpected EapMethods value: " + str(res))
227 res = dbus_get(dbus, wpas_obj, "Capabilities")
228 if len(res) < 2 or "p2p" not in res:
229 raise Exception("Unexpected Capabilities value: " + str(res))
231 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
232 val = binascii.unhexlify("010006020304050608")
233 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
234 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
236 raise Exception("WFDIEs value changed")
238 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
239 raise Exception("Invalid WFDIEs value accepted")
240 except dbus.exceptions.DBusException, e:
241 if "InvalidArgs" not in str(e):
242 raise Exception("Unexpected error message: " + str(e))
243 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
244 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
245 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
246 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
248 raise Exception("WFDIEs not cleared properly")
250 res = dbus_get(dbus, wpas_obj, "EapMethods")
252 dbus_set(dbus, wpas_obj, "EapMethods", res)
253 raise Exception("Invalid Set accepted")
254 except dbus.exceptions.DBusException, e:
255 if "InvalidArgs: Property is read-only" not in str(e):
256 raise Exception("Unexpected error message: " + str(e))
259 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
260 dbus_interface=dbus.PROPERTIES_IFACE)
261 raise Exception("Unknown method accepted")
262 except dbus.exceptions.DBusException, e:
263 if "UnknownMethod" not in str(e):
264 raise Exception("Unexpected error message: " + str(e))
267 wpas_obj.Get("foo", "DebugShowKeys",
268 dbus_interface=dbus.PROPERTIES_IFACE)
269 raise Exception("Invalid Get accepted")
270 except dbus.exceptions.DBusException, e:
271 if "InvalidArgs: No such property" not in str(e):
272 raise Exception("Unexpected error message: " + str(e))
274 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
277 test_obj.Get(123, "DebugShowKeys",
278 dbus_interface=dbus.PROPERTIES_IFACE)
279 raise Exception("Invalid Get accepted")
280 except dbus.exceptions.DBusException, e:
281 if "InvalidArgs: Invalid arguments" not in str(e):
282 raise Exception("Unexpected error message: " + str(e))
284 test_obj.Get(WPAS_DBUS_SERVICE, 123,
285 dbus_interface=dbus.PROPERTIES_IFACE)
286 raise Exception("Invalid Get accepted")
287 except dbus.exceptions.DBusException, e:
288 if "InvalidArgs: Invalid arguments" not in str(e):
289 raise Exception("Unexpected error message: " + str(e))
292 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
293 dbus.ByteArray('', variant_level=2),
294 dbus_interface=dbus.PROPERTIES_IFACE)
295 raise Exception("Invalid Set accepted")
296 except dbus.exceptions.DBusException, e:
297 if "InvalidArgs: invalid message format" not in str(e):
298 raise Exception("Unexpected error message: " + str(e))
300 def test_dbus_set_global_properties(dev, apdev):
301 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
302 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
304 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
307 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
308 dbus_interface=dbus.PROPERTIES_IFACE)
310 raise Exception("Unexpected " + p[0] + " value: " + str(res))
312 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
313 dbus_interface=dbus.PROPERTIES_IFACE)
315 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
316 dbus_interface=dbus.PROPERTIES_IFACE)
318 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
320 def test_dbus_invalid_method(dev, apdev):
321 """D-Bus invalid method"""
322 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
323 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
327 raise Exception("Unknown method accepted")
328 except dbus.exceptions.DBusException, e:
329 if "UnknownMethod" not in str(e):
330 raise Exception("Unexpected error message: " + str(e))
332 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
333 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
336 raise Exception("WPS.Start with incorrect signature accepted")
337 except dbus.exceptions.DBusException, e:
338 if "InvalidArgs: Invalid arg" not in str(e):
339 raise Exception("Unexpected error message: " + str(e))
341 def test_dbus_get_set_wps(dev, apdev):
342 """D-Bus Get/Set for WPS properties"""
344 _test_dbus_get_set_wps(dev, apdev)
346 dev[0].request("SET wps_cred_processing 0")
347 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
349 def _test_dbus_get_set_wps(dev, apdev):
350 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
352 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
353 dbus_interface=dbus.PROPERTIES_IFACE)
355 val = "display keypad virtual_display nfc_interface"
356 dev[0].request("SET config_methods " + val)
358 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
359 dbus_interface=dbus.PROPERTIES_IFACE)
361 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
363 val2 = "push_button display"
364 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
365 dbus_interface=dbus.PROPERTIES_IFACE)
366 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
367 dbus_interface=dbus.PROPERTIES_IFACE)
369 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
371 dev[0].request("SET config_methods " + val)
374 dev[0].request("SET wps_cred_processing " + str(i))
375 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
376 dbus_interface=dbus.PROPERTIES_IFACE)
377 expected_val = False if i == 1 else True
378 if val != expected_val:
379 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
381 class TestDbusGetSet(TestDbus):
382 def __init__(self, bus):
383 TestDbus.__init__(self, bus)
384 self.signal_received = False
385 self.signal_received_deprecated = False
386 self.sets_done = False
389 gobject.timeout_add(1, self.run_sets)
390 gobject.timeout_add(1000, self.timeout)
391 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
393 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
398 def propertiesChanged(self, properties):
399 logger.debug("PropertiesChanged: " + str(properties))
400 if properties.has_key("ProcessCredentials"):
401 self.signal_received_deprecated = True
402 if self.sets_done and self.signal_received:
405 def propertiesChanged2(self, interface_name, changed_properties,
406 invalidated_properties):
407 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
408 if interface_name != WPAS_DBUS_IFACE_WPS:
410 if changed_properties.has_key("ProcessCredentials"):
411 self.signal_received = True
412 if self.sets_done and self.signal_received_deprecated:
415 def run_sets(self, *args):
416 logger.debug("run_sets")
417 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
419 dbus_interface=dbus.PROPERTIES_IFACE)
420 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
421 dbus_interface=dbus.PROPERTIES_IFACE) != True:
422 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
423 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
425 dbus_interface=dbus.PROPERTIES_IFACE)
426 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
427 dbus_interface=dbus.PROPERTIES_IFACE) != False:
428 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
430 self.dbus_sets_done = True
434 return self.signal_received and self.signal_received_deprecated
436 with TestDbusGetSet(bus) as t:
438 raise Exception("No signal received for ProcessCredentials change")
440 def test_dbus_wps_invalid(dev, apdev):
441 """D-Bus invaldi WPS operation"""
442 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
443 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
445 failures = [ {'Role': 'foo', 'Type': 'pbc'},
446 {'Role': 123, 'Type': 'pbc'},
448 {'Role': 'enrollee'},
449 {'Role': 'registrar'},
450 {'Role': 'enrollee', 'Type': 123},
451 {'Role': 'enrollee', 'Type': 'foo'},
452 {'Role': 'enrollee', 'Type': 'pbc',
453 'Bssid': '02:33:44:55:66:77'},
454 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
455 {'Role': 'enrollee', 'Type': 'pbc',
456 'Bssid': dbus.ByteArray('12345')},
457 {'Role': 'enrollee', 'Type': 'pbc',
458 'P2PDeviceAddress': 12345},
459 {'Role': 'enrollee', 'Type': 'pbc',
460 'P2PDeviceAddress': dbus.ByteArray('12345')},
461 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
462 for args in failures:
465 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
466 except dbus.exceptions.DBusException, e:
467 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
468 raise Exception("Unexpected error message: " + str(e))
470 def test_dbus_wps_oom(dev, apdev):
471 """D-Bus WPS operation (OOM)"""
472 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
473 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
475 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
476 if_obj.Get(WPAS_DBUS_IFACE, "State",
477 dbus_interface=dbus.PROPERTIES_IFACE)
479 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
480 bssid = apdev[0]['bssid']
481 dev[0].scan_for_bss(bssid, freq=2412)
484 for i in range(1, 3):
485 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
486 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
487 dbus_interface=dbus.PROPERTIES_IFACE)
489 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
490 dbus_interface=dbus.PROPERTIES_IFACE)
491 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
492 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
493 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
494 dbus_interface=dbus.PROPERTIES_IFACE)
495 with alloc_fail(dev[0], 1,
496 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
498 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
499 dbus_interface=dbus.PROPERTIES_IFACE)
500 except dbus.exceptions.DBusException, e:
503 id = dev[0].add_network()
504 dev[0].set_network(id, "disabled", "0")
505 dev[0].set_network_quoted(id, "ssid", "test")
507 for i in range(1, 3):
508 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
509 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
510 dbus_interface=dbus.PROPERTIES_IFACE)
512 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
513 dbus_get(dbus, wpas_obj, "Interfaces")
515 for i in range(1, 6):
516 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
517 dbus_get(dbus, wpas_obj, "EapMethods")
519 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
520 expected="Error.Failed: Failed to set property"):
521 val2 = "push_button display"
522 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
523 dbus_interface=dbus.PROPERTIES_IFACE)
525 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
527 expected="UnknownError: WPS start failed"):
528 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
530 def test_dbus_wps_pbc(dev, apdev):
531 """D-Bus WPS/PBC operation and signals"""
533 _test_dbus_wps_pbc(dev, apdev)
535 dev[0].request("SET wps_cred_processing 0")
537 def _test_dbus_wps_pbc(dev, apdev):
538 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
539 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
541 hapd = start_ap(apdev[0])
542 hapd.request("WPS_PBC")
543 bssid = apdev[0]['bssid']
544 dev[0].scan_for_bss(bssid, freq="2412")
545 dev[0].request("SET wps_cred_processing 2")
547 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
548 dbus_interface=dbus.PROPERTIES_IFACE)
550 raise Exception("Missing BSSs entry: " + str(res))
551 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
552 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
553 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
554 if 'WPS' not in props:
555 raise Exception("No WPS information in the BSS entry")
556 if 'Type' not in props['WPS']:
557 raise Exception("No Type field in the WPS dictionary")
558 if props['WPS']['Type'] != 'pbc':
559 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
561 class TestDbusWps(TestDbus):
562 def __init__(self, bus, wps):
563 TestDbus.__init__(self, bus)
564 self.success_seen = False
565 self.credentials_received = False
569 gobject.timeout_add(1, self.start_pbc)
570 gobject.timeout_add(15000, self.timeout)
571 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
572 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
577 def wpsEvent(self, name, args):
578 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
579 if name == "success":
580 self.success_seen = True
581 if self.credentials_received:
584 def credentials(self, args):
585 logger.debug("credentials: " + str(args))
586 self.credentials_received = True
587 if self.success_seen:
590 def start_pbc(self, *args):
591 logger.debug("start_pbc")
592 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
596 return self.success_seen and self.credentials_received
598 with TestDbusWps(bus, wps) as t:
600 raise Exception("Failure in D-Bus operations")
602 dev[0].wait_connected(timeout=10)
603 dev[0].request("DISCONNECT")
605 dev[0].flush_scan_cache()
607 def test_dbus_wps_pbc_overlap(dev, apdev):
608 """D-Bus WPS/PBC operation and signal for PBC overlap"""
609 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
610 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
612 hapd = start_ap(apdev[0])
613 hapd2 = start_ap(apdev[1], ssid="test-wps2",
614 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
615 hapd.request("WPS_PBC")
616 hapd2.request("WPS_PBC")
617 bssid = apdev[0]['bssid']
618 dev[0].scan_for_bss(bssid, freq="2412")
619 bssid2 = apdev[1]['bssid']
620 dev[0].scan_for_bss(bssid2, freq="2412")
622 class TestDbusWps(TestDbus):
623 def __init__(self, bus, wps):
624 TestDbus.__init__(self, bus)
625 self.overlap_seen = False
629 gobject.timeout_add(1, self.start_pbc)
630 gobject.timeout_add(15000, self.timeout)
631 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
635 def wpsEvent(self, name, args):
636 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
637 if name == "pbc-overlap":
638 self.overlap_seen = True
641 def start_pbc(self, *args):
642 logger.debug("start_pbc")
643 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
647 return self.overlap_seen
649 with TestDbusWps(bus, wps) as t:
651 raise Exception("Failure in D-Bus operations")
653 dev[0].request("WPS_CANCEL")
654 dev[0].request("DISCONNECT")
656 dev[0].flush_scan_cache()
658 def test_dbus_wps_pin(dev, apdev):
659 """D-Bus WPS/PIN operation and signals"""
661 _test_dbus_wps_pin(dev, apdev)
663 dev[0].request("SET wps_cred_processing 0")
665 def _test_dbus_wps_pin(dev, apdev):
666 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
667 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
669 hapd = start_ap(apdev[0])
670 hapd.request("WPS_PIN any 12345670")
671 bssid = apdev[0]['bssid']
672 dev[0].scan_for_bss(bssid, freq="2412")
673 dev[0].request("SET wps_cred_processing 2")
675 class TestDbusWps(TestDbus):
676 def __init__(self, bus):
677 TestDbus.__init__(self, bus)
678 self.success_seen = False
679 self.credentials_received = False
682 gobject.timeout_add(1, self.start_pin)
683 gobject.timeout_add(15000, self.timeout)
684 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
685 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
690 def wpsEvent(self, name, args):
691 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
692 if name == "success":
693 self.success_seen = True
694 if self.credentials_received:
697 def credentials(self, args):
698 logger.debug("credentials: " + str(args))
699 self.credentials_received = True
700 if self.success_seen:
703 def start_pin(self, *args):
704 logger.debug("start_pin")
705 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
706 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
711 return self.success_seen and self.credentials_received
713 with TestDbusWps(bus) as t:
715 raise Exception("Failure in D-Bus operations")
717 dev[0].wait_connected(timeout=10)
719 def test_dbus_wps_pin2(dev, apdev):
720 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
722 _test_dbus_wps_pin2(dev, apdev)
724 dev[0].request("SET wps_cred_processing 0")
726 def _test_dbus_wps_pin2(dev, apdev):
727 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
728 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
730 hapd = start_ap(apdev[0])
731 bssid = apdev[0]['bssid']
732 dev[0].scan_for_bss(bssid, freq="2412")
733 dev[0].request("SET wps_cred_processing 2")
735 class TestDbusWps(TestDbus):
736 def __init__(self, bus):
737 TestDbus.__init__(self, bus)
738 self.success_seen = False
742 gobject.timeout_add(1, self.start_pin)
743 gobject.timeout_add(15000, self.timeout)
744 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
745 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
750 def wpsEvent(self, name, args):
751 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
752 if name == "success":
753 self.success_seen = True
754 if self.credentials_received:
757 def credentials(self, args):
758 logger.debug("credentials: " + str(args))
759 self.credentials_received = True
760 if self.success_seen:
763 def start_pin(self, *args):
764 logger.debug("start_pin")
765 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
766 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
769 h = hostapd.Hostapd(apdev[0]['ifname'])
770 h.request("WPS_PIN any " + pin)
774 return self.success_seen and self.credentials_received
776 with TestDbusWps(bus) as t:
778 raise Exception("Failure in D-Bus operations")
780 dev[0].wait_connected(timeout=10)
782 def test_dbus_wps_pin_m2d(dev, apdev):
783 """D-Bus WPS/PIN operation and signals with M2D"""
785 _test_dbus_wps_pin_m2d(dev, apdev)
787 dev[0].request("SET wps_cred_processing 0")
789 def _test_dbus_wps_pin_m2d(dev, apdev):
790 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
791 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
793 hapd = start_ap(apdev[0])
794 bssid = apdev[0]['bssid']
795 dev[0].scan_for_bss(bssid, freq="2412")
796 dev[0].request("SET wps_cred_processing 2")
798 class TestDbusWps(TestDbus):
799 def __init__(self, bus):
800 TestDbus.__init__(self, bus)
801 self.success_seen = False
802 self.credentials_received = False
805 gobject.timeout_add(1, self.start_pin)
806 gobject.timeout_add(15000, self.timeout)
807 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
808 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
813 def wpsEvent(self, name, args):
814 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
815 if name == "success":
816 self.success_seen = True
817 if self.credentials_received:
820 h = hostapd.Hostapd(apdev[0]['ifname'])
821 h.request("WPS_PIN any 12345670")
823 def credentials(self, args):
824 logger.debug("credentials: " + str(args))
825 self.credentials_received = True
826 if self.success_seen:
829 def start_pin(self, *args):
830 logger.debug("start_pin")
831 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
832 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
837 return self.success_seen and self.credentials_received
839 with TestDbusWps(bus) as t:
841 raise Exception("Failure in D-Bus operations")
843 dev[0].wait_connected(timeout=10)
845 def test_dbus_wps_reg(dev, apdev):
846 """D-Bus WPS/Registrar operation and signals"""
848 _test_dbus_wps_reg(dev, apdev)
850 dev[0].request("SET wps_cred_processing 0")
852 def _test_dbus_wps_reg(dev, apdev):
853 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
854 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
856 hapd = start_ap(apdev[0])
857 hapd.request("WPS_PIN any 12345670")
858 bssid = apdev[0]['bssid']
859 dev[0].scan_for_bss(bssid, freq="2412")
860 dev[0].request("SET wps_cred_processing 2")
862 class TestDbusWps(TestDbus):
863 def __init__(self, bus):
864 TestDbus.__init__(self, bus)
865 self.credentials_received = False
868 gobject.timeout_add(100, self.start_reg)
869 gobject.timeout_add(15000, self.timeout)
870 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
871 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
876 def wpsEvent(self, name, args):
877 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
879 def credentials(self, args):
880 logger.debug("credentials: " + str(args))
881 self.credentials_received = True
884 def start_reg(self, *args):
885 logger.debug("start_reg")
886 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
887 wps.Start({'Role': 'registrar', 'Type': 'pin',
888 'Pin': '12345670', 'Bssid': bssid_ay})
892 return self.credentials_received
894 with TestDbusWps(bus) as t:
896 raise Exception("Failure in D-Bus operations")
898 dev[0].wait_connected(timeout=10)
900 def test_dbus_wps_cancel(dev, apdev):
901 """D-Bus WPS Cancel operation"""
902 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
903 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
905 hapd = start_ap(apdev[0])
906 bssid = apdev[0]['bssid']
909 dev[0].scan_for_bss(bssid, freq="2412")
910 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
911 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
914 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
916 def test_dbus_scan_invalid(dev, apdev):
917 """D-Bus invalid scan method"""
918 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
919 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
921 tests = [ ({}, "InvalidArgs"),
922 ({'Type': 123}, "InvalidArgs"),
923 ({'Type': 'foo'}, "InvalidArgs"),
924 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
925 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
926 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
928 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
929 dbus.ByteArray("3"), dbus.ByteArray("4"),
930 dbus.ByteArray("5"), dbus.ByteArray("6"),
931 dbus.ByteArray("7"), dbus.ByteArray("8"),
932 dbus.ByteArray("9"), dbus.ByteArray("10"),
933 dbus.ByteArray("11"), dbus.ByteArray("12"),
934 dbus.ByteArray("13"), dbus.ByteArray("14"),
935 dbus.ByteArray("15"), dbus.ByteArray("16"),
936 dbus.ByteArray("17") ]},
939 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
941 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
942 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
943 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
944 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
946 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
949 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
951 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
952 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
954 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
956 for (t,err) in tests:
959 raise Exception("Invalid Scan() arguments accepted: " + str(t))
960 except dbus.exceptions.DBusException, e:
961 if err not in str(e):
962 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
964 def test_dbus_scan_oom(dev, apdev):
965 """D-Bus scan method and OOM"""
966 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
967 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
969 with alloc_fail_dbus(dev[0], 1,
970 "wpa_scan_clone_params;wpas_dbus_handler_scan",
971 "Scan", expected="ScanError: Scan request rejected"):
972 iface.Scan({ 'Type': 'passive',
973 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
975 with alloc_fail_dbus(dev[0], 1,
976 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
978 iface.Scan({ 'Type': 'passive',
979 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
981 with alloc_fail_dbus(dev[0], 1,
982 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
984 iface.Scan({ 'Type': 'active',
985 'IEs': [ dbus.ByteArray("\xdd\x00") ],
986 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
988 with alloc_fail_dbus(dev[0], 1,
989 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
991 iface.Scan({ 'Type': 'active',
992 'SSIDs': [ dbus.ByteArray("open"),
994 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
996 def test_dbus_scan(dev, apdev):
997 """D-Bus scan and related signals"""
998 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
999 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1001 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
1003 class TestDbusScan(TestDbus):
1004 def __init__(self, bus):
1005 TestDbus.__init__(self, bus)
1006 self.scan_completed = 0
1007 self.bss_added = False
1008 self.fail_reason = None
1010 def __enter__(self):
1011 gobject.timeout_add(1, self.run_scan)
1012 gobject.timeout_add(15000, self.timeout)
1013 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1014 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1015 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1019 def scanDone(self, success):
1020 logger.debug("scanDone: success=%s" % success)
1021 self.scan_completed += 1
1022 if self.scan_completed == 1:
1023 iface.Scan({'Type': 'passive',
1025 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1026 elif self.scan_completed == 2:
1027 iface.Scan({'Type': 'passive',
1028 'AllowRoam': False})
1029 elif self.bss_added and self.scan_completed == 3:
1032 def bssAdded(self, bss, properties):
1033 logger.debug("bssAdded: %s" % bss)
1034 logger.debug(str(properties))
1035 if 'WPS' in properties:
1036 if 'Type' in properties['WPS']:
1037 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1039 self.bss_added = True
1040 if self.scan_completed == 3:
1043 def bssRemoved(self, bss):
1044 logger.debug("bssRemoved: %s" % bss)
1046 def run_scan(self, *args):
1047 logger.debug("run_scan")
1048 iface.Scan({'Type': 'active',
1049 'SSIDs': [ dbus.ByteArray("open"),
1051 'IEs': [ dbus.ByteArray("\xdd\x00"),
1054 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1058 return self.scan_completed == 3 and self.bss_added
1060 with TestDbusScan(bus) as t:
1062 raise Exception(t.fail_reason)
1064 raise Exception("Expected signals not seen")
1066 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1067 dbus_interface=dbus.PROPERTIES_IFACE)
1069 raise Exception("Scan result not in BSSs property")
1071 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1072 dbus_interface=dbus.PROPERTIES_IFACE)
1074 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1077 def test_dbus_scan_busy(dev, apdev):
1078 """D-Bus scan trigger rejection when busy with previous scan"""
1079 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1080 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1082 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1083 raise Exception("Failed to start scan")
1084 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1086 raise Exception("Scan start timed out")
1089 iface.Scan({'Type': 'active', 'AllowRoam': False})
1090 raise Exception("Scan() accepted when busy")
1091 except dbus.exceptions.DBusException, e:
1092 if "ScanError: Scan request reject" not in str(e):
1093 raise Exception("Unexpected error message: " + str(e))
1095 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1097 raise Exception("Scan timed out")
1099 def test_dbus_connect(dev, apdev):
1100 """D-Bus AddNetwork and connect"""
1101 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1102 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1104 ssid = "test-wpa2-psk"
1105 passphrase = 'qwertyuiop'
1106 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1107 hapd = hostapd.add_ap(apdev[0], params)
1109 class TestDbusConnect(TestDbus):
1110 def __init__(self, bus):
1111 TestDbus.__init__(self, bus)
1112 self.network_added = False
1113 self.network_selected = False
1114 self.network_removed = False
1117 def __enter__(self):
1118 gobject.timeout_add(1, self.run_connect)
1119 gobject.timeout_add(15000, self.timeout)
1120 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1121 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1123 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1125 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1126 "PropertiesChanged")
1130 def networkAdded(self, network, properties):
1131 logger.debug("networkAdded: %s" % str(network))
1132 logger.debug(str(properties))
1133 self.network_added = True
1135 def networkRemoved(self, network):
1136 logger.debug("networkRemoved: %s" % str(network))
1137 self.network_removed = True
1139 def networkSelected(self, network):
1140 logger.debug("networkSelected: %s" % str(network))
1141 self.network_selected = True
1143 def propertiesChanged(self, properties):
1144 logger.debug("propertiesChanged: %s" % str(properties))
1145 if 'State' in properties and properties['State'] == "completed":
1149 elif self.state == 2:
1152 elif self.state == 4:
1155 elif self.state == 5:
1158 elif self.state == 7:
1160 res = iface.SignalPoll()
1161 logger.debug("SignalPoll: " + str(res))
1162 if 'frequency' not in res or res['frequency'] != 2412:
1164 logger.info("Unexpected SignalPoll result")
1165 iface.RemoveNetwork(self.netw)
1166 if 'State' in properties and properties['State'] == "disconnected":
1169 iface.SelectNetwork(self.netw)
1170 elif self.state == 3:
1173 elif self.state == 6:
1176 elif self.state == 8:
1180 def run_connect(self, *args):
1181 logger.debug("run_connect")
1182 args = dbus.Dictionary({ 'ssid': ssid,
1183 'key_mgmt': 'WPA-PSK',
1185 'scan_freq': 2412 },
1187 self.netw = iface.AddNetwork(args)
1188 iface.SelectNetwork(self.netw)
1192 if not self.network_added or \
1193 not self.network_removed or \
1194 not self.network_selected:
1196 return self.state == 9
1198 with TestDbusConnect(bus) as t:
1200 raise Exception("Expected signals not seen")
1202 def test_dbus_connect_psk_mem(dev, apdev):
1203 """D-Bus AddNetwork and connect with memory-only PSK"""
1204 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1205 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1207 ssid = "test-wpa2-psk"
1208 passphrase = 'qwertyuiop'
1209 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1210 hapd = hostapd.add_ap(apdev[0], params)
1212 class TestDbusConnect(TestDbus):
1213 def __init__(self, bus):
1214 TestDbus.__init__(self, bus)
1215 self.connected = False
1217 def __enter__(self):
1218 gobject.timeout_add(1, self.run_connect)
1219 gobject.timeout_add(15000, self.timeout)
1220 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1221 "PropertiesChanged")
1222 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1227 def propertiesChanged(self, properties):
1228 logger.debug("propertiesChanged: %s" % str(properties))
1229 if 'State' in properties and properties['State'] == "completed":
1230 self.connected = True
1233 def networkRequest(self, path, field, txt):
1234 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1235 if field == "PSK_PASSPHRASE":
1236 iface.NetworkReply(path, field, '"' + passphrase + '"')
1238 def run_connect(self, *args):
1239 logger.debug("run_connect")
1240 args = dbus.Dictionary({ 'ssid': ssid,
1241 'key_mgmt': 'WPA-PSK',
1243 'scan_freq': 2412 },
1245 self.netw = iface.AddNetwork(args)
1246 iface.SelectNetwork(self.netw)
1250 return self.connected
1252 with TestDbusConnect(bus) as t:
1254 raise Exception("Expected signals not seen")
1256 def test_dbus_connect_oom(dev, apdev):
1257 """D-Bus AddNetwork and connect when out-of-memory"""
1258 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1259 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1261 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1262 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1264 ssid = "test-wpa2-psk"
1265 passphrase = 'qwertyuiop'
1266 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1267 hapd = hostapd.add_ap(apdev[0], params)
1269 class TestDbusConnect(TestDbus):
1270 def __init__(self, bus):
1271 TestDbus.__init__(self, bus)
1272 self.network_added = False
1273 self.network_selected = False
1274 self.network_removed = False
1277 def __enter__(self):
1278 gobject.timeout_add(1, self.run_connect)
1279 gobject.timeout_add(1500, self.timeout)
1280 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1281 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1283 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1285 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1286 "PropertiesChanged")
1290 def networkAdded(self, network, properties):
1291 logger.debug("networkAdded: %s" % str(network))
1292 logger.debug(str(properties))
1293 self.network_added = True
1295 def networkRemoved(self, network):
1296 logger.debug("networkRemoved: %s" % str(network))
1297 self.network_removed = True
1299 def networkSelected(self, network):
1300 logger.debug("networkSelected: %s" % str(network))
1301 self.network_selected = True
1303 def propertiesChanged(self, properties):
1304 logger.debug("propertiesChanged: %s" % str(properties))
1305 if 'State' in properties and properties['State'] == "completed":
1309 elif self.state == 2:
1312 elif self.state == 4:
1315 elif self.state == 5:
1317 res = iface.SignalPoll()
1318 logger.debug("SignalPoll: " + str(res))
1319 if 'frequency' not in res or res['frequency'] != 2412:
1321 logger.info("Unexpected SignalPoll result")
1322 iface.RemoveNetwork(self.netw)
1323 if 'State' in properties and properties['State'] == "disconnected":
1326 iface.SelectNetwork(self.netw)
1327 elif self.state == 3:
1330 elif self.state == 6:
1334 def run_connect(self, *args):
1335 logger.debug("run_connect")
1336 args = dbus.Dictionary({ 'ssid': ssid,
1337 'key_mgmt': 'WPA-PSK',
1339 'scan_freq': 2412 },
1342 self.netw = iface.AddNetwork(args)
1343 except Exception, e:
1344 logger.info("Exception on AddNetwork: " + str(e))
1348 iface.SelectNetwork(self.netw)
1349 except Exception, e:
1350 logger.info("Exception on SelectNetwork: " + str(e))
1356 if not self.network_added or \
1357 not self.network_removed or \
1358 not self.network_selected:
1360 return self.state == 7
1363 for i in range(1, 1000):
1365 dev[j].dump_monitor()
1366 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1368 with TestDbusConnect(bus) as t:
1370 logger.info("Iteration %d - Expected signals not seen" % i)
1372 logger.info("Iteration %d - success" % i)
1374 state = dev[0].request('GET_ALLOC_FAIL')
1375 logger.info("GET_ALLOC_FAIL: " + state)
1376 dev[0].dump_monitor()
1377 dev[0].request("TEST_ALLOC_FAIL 0:")
1379 raise Exception("Connection succeeded during out-of-memory")
1380 if not state.startswith('0:'):
1387 # Force regulatory update to re-fetch hw capabilities for the following
1390 dev[0].dump_monitor()
1391 subprocess.call(['iw', 'reg', 'set', 'US'])
1392 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1394 dev[0].dump_monitor()
1395 subprocess.call(['iw', 'reg', 'set', '00'])
1396 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1398 def test_dbus_while_not_connected(dev, apdev):
1399 """D-Bus invalid operations while not connected"""
1400 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1401 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1405 raise Exception("Disconnect() accepted when not connected")
1406 except dbus.exceptions.DBusException, e:
1407 if "NotConnected" not in str(e):
1408 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1412 raise Exception("Reattach() accepted when not connected")
1413 except dbus.exceptions.DBusException, e:
1414 if "NotConnected" not in str(e):
1415 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1417 def test_dbus_connect_eap(dev, apdev):
1418 """D-Bus AddNetwork and connect to EAP network"""
1419 check_altsubject_match_support(dev[0])
1420 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1421 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1423 ssid = "ieee8021x-open"
1424 params = hostapd.radius_params()
1425 params["ssid"] = ssid
1426 params["ieee8021x"] = "1"
1427 hapd = hostapd.add_ap(apdev[0], params)
1429 class TestDbusConnect(TestDbus):
1430 def __init__(self, bus):
1431 TestDbus.__init__(self, bus)
1432 self.certification_received = False
1433 self.eap_status = False
1436 def __enter__(self):
1437 gobject.timeout_add(1, self.run_connect)
1438 gobject.timeout_add(15000, self.timeout)
1439 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1440 "PropertiesChanged")
1441 self.add_signal(self.certification, WPAS_DBUS_IFACE,
1442 "Certification", byte_arrays=True)
1443 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1445 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1449 def propertiesChanged(self, properties):
1450 logger.debug("propertiesChanged: %s" % str(properties))
1451 if 'State' in properties and properties['State'] == "completed":
1455 logger.info("Set dNSName constraint")
1456 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1457 args = dbus.Dictionary({ 'altsubject_match':
1458 self.server_dnsname },
1460 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1461 dbus_interface=dbus.PROPERTIES_IFACE)
1462 elif self.state == 2:
1465 logger.info("Set non-matching dNSName constraint")
1466 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1467 args = dbus.Dictionary({ 'altsubject_match':
1468 self.server_dnsname + "FOO" },
1470 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1471 dbus_interface=dbus.PROPERTIES_IFACE)
1472 if 'State' in properties and properties['State'] == "disconnected":
1476 iface.SelectNetwork(self.netw)
1479 iface.SelectNetwork(self.netw)
1481 def certification(self, args):
1482 logger.debug("certification: %s" % str(args))
1483 self.certification_received = True
1484 if args['depth'] == 0:
1485 # The test server certificate is supposed to have dNSName
1486 if len(args['altsubject']) < 1:
1487 raise Exception("Missing dNSName")
1488 dnsname = args['altsubject'][0]
1489 if not dnsname.startswith("DNS:"):
1490 raise Exception("Expected dNSName not found: " + dnsname)
1491 logger.info("altsubject: " + dnsname)
1492 self.server_dnsname = dnsname
1494 def eap(self, status, parameter):
1495 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1496 if status == 'completion' and parameter == 'success':
1497 self.eap_status = True
1498 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1502 def networkRequest(self, path, field, txt):
1503 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1504 if field == "PASSWORD":
1505 iface.NetworkReply(path, field, "password")
1507 def run_connect(self, *args):
1508 logger.debug("run_connect")
1509 args = dbus.Dictionary({ 'ssid': ssid,
1510 'key_mgmt': 'IEEE8021X',
1513 'anonymous_identity': 'ttls',
1514 'identity': 'pap user',
1515 'ca_cert': 'auth_serv/ca.pem',
1516 'phase2': 'auth=PAP',
1517 'scan_freq': 2412 },
1519 self.netw = iface.AddNetwork(args)
1520 iface.SelectNetwork(self.netw)
1524 if not self.eap_status or not self.certification_received:
1526 return self.state == 5
1528 with TestDbusConnect(bus) as t:
1530 raise Exception("Expected signals not seen")
1532 def test_dbus_network(dev, apdev):
1533 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1534 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1535 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1537 args = dbus.Dictionary({ 'ssid': "foo",
1538 'key_mgmt': 'WPA-PSK',
1540 'identity': dbus.ByteArray([ 1, 2 ]),
1541 'priority': dbus.Int32(0),
1542 'scan_freq': dbus.UInt32(2412) },
1544 netw = iface.AddNetwork(args)
1545 id = int(dev[0].list_networks()[0]['id'])
1546 val = dev[0].get_network(id, "scan_freq")
1548 raise Exception("Invalid scan_freq value: " + str(val))
1549 iface.RemoveNetwork(netw)
1551 args = dbus.Dictionary({ 'ssid': "foo",
1553 'scan_freq': "2412 2432",
1554 'freq_list': "2412 2417 2432" },
1556 netw = iface.AddNetwork(args)
1557 id = int(dev[0].list_networks()[0]['id'])
1558 val = dev[0].get_network(id, "scan_freq")
1559 if val != "2412 2432":
1560 raise Exception("Invalid scan_freq value (2): " + str(val))
1561 val = dev[0].get_network(id, "freq_list")
1562 if val != "2412 2417 2432":
1563 raise Exception("Invalid freq_list value: " + str(val))
1564 iface.RemoveNetwork(netw)
1566 iface.RemoveNetwork(netw)
1567 raise Exception("Invalid RemoveNetwork() accepted")
1568 except dbus.exceptions.DBusException, e:
1569 if "NetworkUnknown" not in str(e):
1570 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1572 iface.SelectNetwork(netw)
1573 raise Exception("Invalid SelectNetwork() accepted")
1574 except dbus.exceptions.DBusException, e:
1575 if "NetworkUnknown" not in str(e):
1576 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1578 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1579 'identity': "testuser", 'scan_freq': '2412' },
1581 netw1 = iface.AddNetwork(args)
1582 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1584 netw2 = iface.AddNetwork(args)
1585 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1586 dbus_interface=dbus.PROPERTIES_IFACE)
1588 raise Exception("Unexpected number of networks")
1590 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1591 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1592 dbus_interface=dbus.PROPERTIES_IFACE)
1594 raise Exception("Added network was unexpectedly enabled by default")
1595 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1596 dbus_interface=dbus.PROPERTIES_IFACE)
1597 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1598 dbus_interface=dbus.PROPERTIES_IFACE)
1600 raise Exception("Set(Enabled,True) did not seem to change property value")
1601 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1602 dbus_interface=dbus.PROPERTIES_IFACE)
1603 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1604 dbus_interface=dbus.PROPERTIES_IFACE)
1606 raise Exception("Set(Enabled,False) did not seem to change property value")
1608 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1609 dbus_interface=dbus.PROPERTIES_IFACE)
1610 raise Exception("Invalid Set(Enabled,1) accepted")
1611 except dbus.exceptions.DBusException, e:
1612 if "Error.Failed: wrong property type" not in str(e):
1613 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1615 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1616 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1617 dbus_interface=dbus.PROPERTIES_IFACE)
1618 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1619 dbus_interface=dbus.PROPERTIES_IFACE)
1620 if res['ssid'] != '"foo1new"':
1621 raise Exception("Set(Properties) failed to update ssid")
1622 if res['identity'] != '"testuser"':
1623 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1625 iface.RemoveAllNetworks()
1626 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1627 dbus_interface=dbus.PROPERTIES_IFACE)
1629 raise Exception("Unexpected number of networks")
1630 iface.RemoveAllNetworks()
1632 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1633 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1635 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1636 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1639 iface.AddNetwork(args)
1640 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1641 except dbus.exceptions.DBusException, e:
1642 if "InvalidArgs" not in str(e):
1643 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1645 def test_dbus_network_oom(dev, apdev):
1646 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1647 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1648 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1650 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1651 'identity': "testuser", 'scan_freq': '2412' },
1653 netw1 = iface.AddNetwork(args)
1654 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1656 with alloc_fail_dbus(dev[0], 1,
1657 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1659 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1660 dbus_interface=dbus.PROPERTIES_IFACE)
1662 iface.RemoveAllNetworks()
1664 with alloc_fail_dbus(dev[0], 1,
1665 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1666 "RemoveNetwork", "InvalidArgs"):
1667 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1669 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1670 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1673 netw = iface.AddNetwork(args)
1674 # Currently, AddNetwork() succeeds even if os_strdup() for path
1675 # fails, so remove the network if that occurs.
1676 iface.RemoveNetwork(netw)
1677 except dbus.exceptions.DBusException, e:
1680 for i in range(1, 3):
1681 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1683 netw = iface.AddNetwork(args)
1684 # Currently, AddNetwork() succeeds even if network registration
1685 # fails, so remove the network if that occurs.
1686 iface.RemoveNetwork(netw)
1687 except dbus.exceptions.DBusException, e:
1690 with alloc_fail_dbus(dev[0], 1,
1691 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1693 "UnknownError: wpa_supplicant could not add a network"):
1694 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1696 netw = iface.AddNetwork(args)
1699 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1700 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1702 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1703 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1704 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1705 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1706 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1707 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1709 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1710 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1712 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1713 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1715 for (count,funcs,args) in tests:
1716 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1717 netw = iface.AddNetwork(args)
1719 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1720 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1721 raise Exception("Unexpected network block added")
1722 if len(dev[0].list_networks()) > 0:
1723 raise Exception("Unexpected network block visible")
1725 def test_dbus_interface(dev, apdev):
1726 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1728 _test_dbus_interface(dev, apdev)
1730 # Need to force P2P channel list update since the 'lo' interface
1731 # with driver=none ends up configuring default dualband channels.
1732 dev[0].request("SET country US")
1733 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1735 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1737 dev[0].request("SET country 00")
1738 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1740 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1742 subprocess.call(['iw', 'reg', 'set', '00'])
1744 def _test_dbus_interface(dev, apdev):
1745 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1746 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1748 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1750 path = wpas.CreateInterface(params)
1751 logger.debug("New interface path: " + str(path))
1752 path2 = wpas.GetInterface("lo")
1754 raise Exception("Interface object mismatch")
1756 params = dbus.Dictionary({ 'Ifname': 'lo',
1758 'ConfigFile': 'foo',
1759 'BridgeIfname': 'foo', },
1762 wpas.CreateInterface(params)
1763 raise Exception("Invalid CreateInterface() accepted")
1764 except dbus.exceptions.DBusException, e:
1765 if "InterfaceExists" not in str(e):
1766 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1768 wpas.RemoveInterface(path)
1770 wpas.RemoveInterface(path)
1771 raise Exception("Invalid RemoveInterface() accepted")
1772 except dbus.exceptions.DBusException, e:
1773 if "InterfaceUnknown" not in str(e):
1774 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1776 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1780 wpas.CreateInterface(params)
1781 raise Exception("Invalid CreateInterface() accepted")
1782 except dbus.exceptions.DBusException, e:
1783 if "InvalidArgs" not in str(e):
1784 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1786 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1788 wpas.CreateInterface(params)
1789 raise Exception("Invalid CreateInterface() accepted")
1790 except dbus.exceptions.DBusException, e:
1791 if "InvalidArgs" not in str(e):
1792 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1795 wpas.GetInterface("lo")
1796 raise Exception("Invalid GetInterface() accepted")
1797 except dbus.exceptions.DBusException, e:
1798 if "InterfaceUnknown" not in str(e):
1799 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1801 def test_dbus_interface_oom(dev, apdev):
1802 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1803 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1804 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1806 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1807 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1809 wpas.CreateInterface(params)
1811 for i in range(1, 1000):
1812 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1813 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1816 npath = wpas.CreateInterface(params)
1817 wpas.RemoveInterface(npath)
1818 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1819 state = dev[0].request('GET_ALLOC_FAIL')
1820 logger.info("GET_ALLOC_FAIL: " + state)
1821 dev[0].dump_monitor()
1822 dev[0].request("TEST_ALLOC_FAIL 0:")
1824 raise Exception("CreateInterface succeeded during out-of-memory")
1825 if not state.startswith('0:'):
1827 except dbus.exceptions.DBusException, e:
1830 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1831 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1833 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1834 wpas.CreateInterface(params)
1836 def test_dbus_blob(dev, apdev):
1837 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1838 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1839 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1841 blob = dbus.ByteArray("\x01\x02\x03")
1842 iface.AddBlob('blob1', blob)
1844 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1845 raise Exception("Invalid AddBlob() accepted")
1846 except dbus.exceptions.DBusException, e:
1847 if "BlobExists" not in str(e):
1848 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1849 res = iface.GetBlob('blob1')
1850 if len(res) != len(blob):
1851 raise Exception("Unexpected blob data length")
1852 for i in range(len(res)):
1853 if res[i] != dbus.Byte(blob[i]):
1854 raise Exception("Unexpected blob data")
1855 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1856 dbus_interface=dbus.PROPERTIES_IFACE)
1857 if 'blob1' not in res:
1858 raise Exception("Added blob missing from Blobs property")
1859 iface.RemoveBlob('blob1')
1861 iface.RemoveBlob('blob1')
1862 raise Exception("Invalid RemoveBlob() accepted")
1863 except dbus.exceptions.DBusException, e:
1864 if "BlobUnknown" not in str(e):
1865 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1867 iface.GetBlob('blob1')
1868 raise Exception("Invalid GetBlob() accepted")
1869 except dbus.exceptions.DBusException, e:
1870 if "BlobUnknown" not in str(e):
1871 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1873 class TestDbusBlob(TestDbus):
1874 def __init__(self, bus):
1875 TestDbus.__init__(self, bus)
1876 self.blob_added = False
1877 self.blob_removed = False
1879 def __enter__(self):
1880 gobject.timeout_add(1, self.run_blob)
1881 gobject.timeout_add(15000, self.timeout)
1882 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1883 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1887 def blobAdded(self, blobName):
1888 logger.debug("blobAdded: %s" % blobName)
1889 if blobName == 'blob2':
1890 self.blob_added = True
1892 def blobRemoved(self, blobName):
1893 logger.debug("blobRemoved: %s" % blobName)
1894 if blobName == 'blob2':
1895 self.blob_removed = True
1898 def run_blob(self, *args):
1899 logger.debug("run_blob")
1900 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1901 iface.RemoveBlob('blob2')
1905 return self.blob_added and self.blob_removed
1907 with TestDbusBlob(bus) as t:
1909 raise Exception("Expected signals not seen")
1911 def test_dbus_blob_oom(dev, apdev):
1912 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1913 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1914 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1916 for i in range(1, 4):
1917 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1919 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1921 def test_dbus_autoscan(dev, apdev):
1922 """D-Bus Autoscan()"""
1923 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1924 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1926 iface.AutoScan("foo")
1927 iface.AutoScan("periodic:1")
1929 dev[0].request("AUTOSCAN ")
1931 def test_dbus_autoscan_oom(dev, apdev):
1932 """D-Bus Autoscan() OOM"""
1933 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1934 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1937 iface.AutoScan("foo")
1938 dev[0].request("AUTOSCAN ")
1940 def test_dbus_tdls_invalid(dev, apdev):
1941 """D-Bus invalid TDLS operations"""
1942 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1943 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1945 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
1946 connect_2sta_open(dev, hapd)
1947 addr1 = dev[1].p2p_interface_addr()
1950 iface.TDLSDiscover("foo")
1951 raise Exception("Invalid TDLSDiscover() accepted")
1952 except dbus.exceptions.DBusException, e:
1953 if "InvalidArgs" not in str(e):
1954 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1957 iface.TDLSStatus("foo")
1958 raise Exception("Invalid TDLSStatus() accepted")
1959 except dbus.exceptions.DBusException, e:
1960 if "InvalidArgs" not in str(e):
1961 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1963 res = iface.TDLSStatus(addr1)
1964 if res != "peer does not exist":
1965 raise Exception("Unexpected TDLSStatus response")
1968 iface.TDLSSetup("foo")
1969 raise Exception("Invalid TDLSSetup() accepted")
1970 except dbus.exceptions.DBusException, e:
1971 if "InvalidArgs" not in str(e):
1972 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1975 iface.TDLSTeardown("foo")
1976 raise Exception("Invalid TDLSTeardown() accepted")
1977 except dbus.exceptions.DBusException, e:
1978 if "InvalidArgs" not in str(e):
1979 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1982 iface.TDLSTeardown("00:11:22:33:44:55")
1983 raise Exception("TDLSTeardown accepted for unknown peer")
1984 except dbus.exceptions.DBusException, e:
1985 if "UnknownError: error performing TDLS teardown" not in str(e):
1986 raise Exception("Unexpected error message: " + str(e))
1988 def test_dbus_tdls_oom(dev, apdev):
1989 """D-Bus TDLS operations during OOM"""
1990 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1991 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1993 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1994 "UnknownError: error performing TDLS setup"):
1995 iface.TDLSSetup("00:11:22:33:44:55")
1997 def test_dbus_tdls(dev, apdev):
1999 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2000 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2002 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2003 connect_2sta_open(dev, hapd)
2005 addr1 = dev[1].p2p_interface_addr()
2007 class TestDbusTdls(TestDbus):
2008 def __init__(self, bus):
2009 TestDbus.__init__(self, bus)
2010 self.tdls_setup = False
2011 self.tdls_teardown = False
2013 def __enter__(self):
2014 gobject.timeout_add(1, self.run_tdls)
2015 gobject.timeout_add(15000, self.timeout)
2016 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2017 "PropertiesChanged")
2021 def propertiesChanged(self, properties):
2022 logger.debug("propertiesChanged: %s" % str(properties))
2024 def run_tdls(self, *args):
2025 logger.debug("run_tdls")
2026 iface.TDLSDiscover(addr1)
2027 gobject.timeout_add(100, self.run_tdls2)
2030 def run_tdls2(self, *args):
2031 logger.debug("run_tdls2")
2032 iface.TDLSSetup(addr1)
2033 gobject.timeout_add(500, self.run_tdls3)
2036 def run_tdls3(self, *args):
2037 logger.debug("run_tdls3")
2038 res = iface.TDLSStatus(addr1)
2039 if res == "connected":
2040 self.tdls_setup = True
2042 logger.info("Unexpected TDLSStatus: " + res)
2043 iface.TDLSTeardown(addr1)
2044 gobject.timeout_add(200, self.run_tdls4)
2047 def run_tdls4(self, *args):
2048 logger.debug("run_tdls4")
2049 res = iface.TDLSStatus(addr1)
2050 if res == "peer does not exist":
2051 self.tdls_teardown = True
2053 logger.info("Unexpected TDLSStatus: " + res)
2058 return self.tdls_setup and self.tdls_teardown
2060 with TestDbusTdls(bus) as t:
2062 raise Exception("Expected signals not seen")
2064 def test_dbus_pkcs11(dev, apdev):
2065 """D-Bus SetPKCS11EngineAndModulePath()"""
2066 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2067 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2070 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2071 except dbus.exceptions.DBusException, e:
2072 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2073 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2076 iface.SetPKCS11EngineAndModulePath("foo", "")
2077 except dbus.exceptions.DBusException, e:
2078 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2079 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2081 iface.SetPKCS11EngineAndModulePath("", "bar")
2082 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2083 dbus_interface=dbus.PROPERTIES_IFACE)
2085 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2086 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2087 dbus_interface=dbus.PROPERTIES_IFACE)
2089 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2091 iface.SetPKCS11EngineAndModulePath("", "")
2092 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2093 dbus_interface=dbus.PROPERTIES_IFACE)
2095 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2096 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2097 dbus_interface=dbus.PROPERTIES_IFACE)
2099 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2101 def test_dbus_apscan(dev, apdev):
2102 """D-Bus Get/Set ApScan"""
2104 _test_dbus_apscan(dev, apdev)
2106 dev[0].request("AP_SCAN 1")
2108 def _test_dbus_apscan(dev, apdev):
2109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2111 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2112 dbus_interface=dbus.PROPERTIES_IFACE)
2114 raise Exception("Unexpected initial ApScan value: %d" % res)
2117 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2118 dbus_interface=dbus.PROPERTIES_IFACE)
2119 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2120 dbus_interface=dbus.PROPERTIES_IFACE)
2122 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2125 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2126 dbus_interface=dbus.PROPERTIES_IFACE)
2127 raise Exception("Invalid Set(ApScan,-1) accepted")
2128 except dbus.exceptions.DBusException, e:
2129 if "Error.Failed: wrong property type" not in str(e):
2130 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2133 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2134 dbus_interface=dbus.PROPERTIES_IFACE)
2135 raise Exception("Invalid Set(ApScan,123) accepted")
2136 except dbus.exceptions.DBusException, e:
2137 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2138 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2140 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2141 dbus_interface=dbus.PROPERTIES_IFACE)
2143 def test_dbus_fastreauth(dev, apdev):
2144 """D-Bus Get/Set FastReauth"""
2145 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2147 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2148 dbus_interface=dbus.PROPERTIES_IFACE)
2150 raise Exception("Unexpected initial FastReauth value: " + str(res))
2152 for i in [ False, True ]:
2153 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2154 dbus_interface=dbus.PROPERTIES_IFACE)
2155 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2156 dbus_interface=dbus.PROPERTIES_IFACE)
2158 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2161 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2162 dbus_interface=dbus.PROPERTIES_IFACE)
2163 raise Exception("Invalid Set(FastReauth,-1) accepted")
2164 except dbus.exceptions.DBusException, e:
2165 if "Error.Failed: wrong property type" not in str(e):
2166 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2168 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2169 dbus_interface=dbus.PROPERTIES_IFACE)
2171 def test_dbus_bss_expire(dev, apdev):
2172 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2173 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2175 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2176 dbus_interface=dbus.PROPERTIES_IFACE)
2177 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2178 dbus_interface=dbus.PROPERTIES_IFACE)
2180 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2182 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2183 dbus_interface=dbus.PROPERTIES_IFACE)
2184 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2185 dbus_interface=dbus.PROPERTIES_IFACE)
2187 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2190 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2191 dbus_interface=dbus.PROPERTIES_IFACE)
2192 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2193 except dbus.exceptions.DBusException, e:
2194 if "Error.Failed: wrong property type" not in str(e):
2195 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2198 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2199 dbus_interface=dbus.PROPERTIES_IFACE)
2200 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2201 except dbus.exceptions.DBusException, e:
2202 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2203 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2206 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2207 dbus_interface=dbus.PROPERTIES_IFACE)
2208 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2209 except dbus.exceptions.DBusException, e:
2210 if "Error.Failed: wrong property type" not in str(e):
2211 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2214 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2215 dbus_interface=dbus.PROPERTIES_IFACE)
2216 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2217 except dbus.exceptions.DBusException, e:
2218 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2219 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2221 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2222 dbus_interface=dbus.PROPERTIES_IFACE)
2223 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2224 dbus_interface=dbus.PROPERTIES_IFACE)
2226 def test_dbus_country(dev, apdev):
2227 """D-Bus Get/Set Country"""
2229 _test_dbus_country(dev, apdev)
2231 dev[0].request("SET country 00")
2232 subprocess.call(['iw', 'reg', 'set', '00'])
2234 def _test_dbus_country(dev, apdev):
2235 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2237 # work around issues with possible pending regdom event from the end of
2238 # the previous test case
2240 dev[0].dump_monitor()
2242 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2243 dbus_interface=dbus.PROPERTIES_IFACE)
2244 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2245 dbus_interface=dbus.PROPERTIES_IFACE)
2247 raise Exception("Unexpected Country value %s (expected FI)" % res)
2249 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2251 # For now, work around separate P2P Device interface event delivery
2252 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2254 raise Exception("regdom change event not seen")
2255 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2256 raise Exception("Unexpected event contents: " + ev)
2259 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2260 dbus_interface=dbus.PROPERTIES_IFACE)
2261 raise Exception("Invalid Set(Country,-1) accepted")
2262 except dbus.exceptions.DBusException, e:
2263 if "Error.Failed: wrong property type" not in str(e):
2264 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2267 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2268 dbus_interface=dbus.PROPERTIES_IFACE)
2269 raise Exception("Invalid Set(Country,F) accepted")
2270 except dbus.exceptions.DBusException, e:
2271 if "Error.Failed: invalid country code" not in str(e):
2272 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2274 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2275 dbus_interface=dbus.PROPERTIES_IFACE)
2277 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2279 # For now, work around separate P2P Device interface event delivery
2280 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2282 raise Exception("regdom change event not seen")
2283 # init=CORE was previously used due to invalid db.txt data for 00. For
2284 # now, allow both it and the new init=USER after fixed db.txt.
2285 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2286 raise Exception("Unexpected event contents: " + ev)
2288 def test_dbus_scan_interval(dev, apdev):
2289 """D-Bus Get/Set ScanInterval"""
2291 _test_dbus_scan_interval(dev, apdev)
2293 dev[0].request("SCAN_INTERVAL 5")
2295 def _test_dbus_scan_interval(dev, apdev):
2296 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2298 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2299 dbus_interface=dbus.PROPERTIES_IFACE)
2300 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2301 dbus_interface=dbus.PROPERTIES_IFACE)
2303 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2306 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2307 dbus_interface=dbus.PROPERTIES_IFACE)
2308 raise Exception("Invalid Set(ScanInterval,100) accepted")
2309 except dbus.exceptions.DBusException, e:
2310 if "Error.Failed: wrong property type" not in str(e):
2311 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2314 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2315 dbus_interface=dbus.PROPERTIES_IFACE)
2316 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2317 except dbus.exceptions.DBusException, e:
2318 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2319 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2321 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2322 dbus_interface=dbus.PROPERTIES_IFACE)
2324 def test_dbus_probe_req_reporting(dev, apdev):
2325 """D-Bus Probe Request reporting"""
2326 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2328 dev[1].p2p_find(social=True)
2330 class TestDbusProbe(TestDbus):
2331 def __init__(self, bus):
2332 TestDbus.__init__(self, bus)
2333 self.reported = False
2335 def __enter__(self):
2336 gobject.timeout_add(1, self.run_test)
2337 gobject.timeout_add(15000, self.timeout)
2338 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2340 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2345 def groupStarted(self, properties):
2346 logger.debug("groupStarted: " + str(properties))
2347 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2348 properties['interface_object'])
2349 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2350 self.iface.SubscribeProbeReq()
2351 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2353 def probeRequest(self, args):
2354 logger.debug("probeRequest: args=%s" % str(args))
2355 self.reported = True
2358 def run_test(self, *args):
2359 logger.debug("run_test")
2360 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2361 params = dbus.Dictionary({ 'frequency': 2412 })
2362 p2p.GroupAdd(params)
2366 return self.reported
2368 with TestDbusProbe(bus) as t:
2370 raise Exception("Expected signals not seen")
2371 t.iface.UnsubscribeProbeReq()
2373 t.iface.UnsubscribeProbeReq()
2374 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2375 except dbus.exceptions.DBusException, e:
2376 if "NoSubscription" not in str(e):
2377 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2378 t.group_p2p.Disconnect()
2380 with TestDbusProbe(bus) as t:
2382 raise Exception("Expected signals not seen")
2383 # On purpose, leave ProbeReq subscription in place to test automatic
2386 dev[1].p2p_stop_find()
2388 def test_dbus_probe_req_reporting_oom(dev, apdev):
2389 """D-Bus Probe Request reporting (OOM)"""
2390 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2391 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2393 # Need to make sure this process has not already subscribed to avoid false
2394 # failures due to the operation succeeding due to os_strdup() not even
2397 iface.UnsubscribeProbeReq()
2398 was_subscribed = True
2399 except dbus.exceptions.DBusException, e:
2400 was_subscribed = False
2403 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2404 "SubscribeProbeReq"):
2405 iface.SubscribeProbeReq()
2408 # On purpose, leave ProbeReq subscription in place to test automatic
2410 iface.SubscribeProbeReq()
2412 def test_dbus_p2p_invalid(dev, apdev):
2413 """D-Bus invalid P2P operations"""
2414 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2415 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2418 p2p.RejectPeer(path + "/Peers/00112233445566")
2419 raise Exception("Invalid RejectPeer accepted")
2420 except dbus.exceptions.DBusException, e:
2421 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2422 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2425 p2p.RejectPeer("/foo")
2426 raise Exception("Invalid RejectPeer accepted")
2427 except dbus.exceptions.DBusException, e:
2428 if "InvalidArgs" not in str(e):
2429 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2439 raise Exception("Invalid RemoveClient accepted")
2440 except dbus.exceptions.DBusException, e:
2441 if "InvalidArgs" not in str(e):
2442 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2444 tests = [ {'DiscoveryType': 'foo'},
2445 {'RequestedDeviceTypes': 'foo'},
2446 {'RequestedDeviceTypes': ['foo']},
2447 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2448 '10','11','12','13','14','15','16',
2450 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2451 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2452 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2453 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2454 dbus.ByteArray('1234567')]},
2455 {'Foo': dbus.Int16(1)},
2456 {'Foo': dbus.UInt16(1)},
2457 {'Foo': dbus.Int64(1)},
2458 {'Foo': dbus.UInt64(1)},
2459 {'Foo': dbus.Double(1.23)},
2460 {'Foo': dbus.Signature('s')},
2464 p2p.Find(dbus.Dictionary(t))
2465 raise Exception("Invalid Find accepted")
2466 except dbus.exceptions.DBusException, e:
2467 if "InvalidArgs" not in str(e):
2468 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2471 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2472 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2474 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2475 raise Exception("Invalid RemovePersistentGroup accepted")
2476 except dbus.exceptions.DBusException, e:
2477 if "InvalidArgs" not in str(e):
2478 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2481 dev[0].request("P2P_SET disabled 1")
2483 raise Exception("Invalid Listen accepted")
2484 except dbus.exceptions.DBusException, e:
2485 if "UnknownError: Could not start P2P listen" not in str(e):
2486 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2488 dev[0].request("P2P_SET disabled 0")
2490 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2491 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2493 test_p2p.Listen("foo")
2494 raise Exception("Invalid Listen accepted")
2495 except dbus.exceptions.DBusException, e:
2496 if "InvalidArgs" not in str(e):
2497 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2500 dev[0].request("P2P_SET disabled 1")
2501 p2p.ExtendedListen(dbus.Dictionary({}))
2502 raise Exception("Invalid ExtendedListen accepted")
2503 except dbus.exceptions.DBusException, e:
2504 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2505 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2507 dev[0].request("P2P_SET disabled 0")
2510 dev[0].request("P2P_SET disabled 1")
2511 args = { 'duration1': 30000, 'interval1': 102400,
2512 'duration2': 20000, 'interval2': 102400 }
2513 p2p.PresenceRequest(args)
2514 raise Exception("Invalid PresenceRequest accepted")
2515 except dbus.exceptions.DBusException, e:
2516 if "UnknownError: Failed to invoke presence request" not in str(e):
2517 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2519 dev[0].request("P2P_SET disabled 0")
2522 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2523 p2p.GroupAdd(params)
2524 raise Exception("Invalid GroupAdd accepted")
2525 except dbus.exceptions.DBusException, e:
2526 if "InvalidArgs" not in str(e):
2527 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2530 params = dbus.Dictionary({'persistent_group_object':
2531 dbus.ObjectPath(path),
2533 p2p.GroupAdd(params)
2534 raise Exception("Invalid GroupAdd accepted")
2535 except dbus.exceptions.DBusException, e:
2536 if "InvalidArgs" not in str(e):
2537 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2541 raise Exception("Invalid Disconnect accepted")
2542 except dbus.exceptions.DBusException, e:
2543 if "UnknownError: failed to disconnect" not in str(e):
2544 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2547 dev[0].request("P2P_SET disabled 1")
2549 raise Exception("Invalid Flush accepted")
2550 except dbus.exceptions.DBusException, e:
2551 if "Error.Failed: P2P is not available for this interface" not in str(e):
2552 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2554 dev[0].request("P2P_SET disabled 0")
2557 dev[0].request("P2P_SET disabled 1")
2558 args = { 'peer': path,
2560 'wps_method': 'pbc',
2562 pin = p2p.Connect(args)
2563 raise Exception("Invalid Connect accepted")
2564 except dbus.exceptions.DBusException, e:
2565 if "Error.Failed: P2P is not available for this interface" not in str(e):
2566 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2568 dev[0].request("P2P_SET disabled 0")
2570 tests = [ { 'frequency': dbus.Int32(-1) },
2571 { 'wps_method': 'pbc' },
2572 { 'wps_method': 'foo' } ]
2575 pin = p2p.Connect(args)
2576 raise Exception("Invalid Connect accepted")
2577 except dbus.exceptions.DBusException, e:
2578 if "InvalidArgs" not in str(e):
2579 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2582 dev[0].request("P2P_SET disabled 1")
2583 args = { 'peer': path }
2584 pin = p2p.Invite(args)
2585 raise Exception("Invalid Invite accepted")
2586 except dbus.exceptions.DBusException, e:
2587 if "Error.Failed: P2P is not available for this interface" not in str(e):
2588 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2590 dev[0].request("P2P_SET disabled 0")
2593 args = { 'foo': 'bar' }
2594 pin = p2p.Invite(args)
2595 raise Exception("Invalid Invite accepted")
2596 except dbus.exceptions.DBusException, e:
2597 if "InvalidArgs" not in str(e):
2598 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2600 tests = [ (path, 'display', "InvalidArgs"),
2601 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2603 "UnknownError: Failed to send provision discovery request"),
2604 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2606 "UnknownError: Failed to send provision discovery request"),
2607 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2609 "UnknownError: Failed to send provision discovery request"),
2610 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2612 "UnknownError: Failed to send provision discovery request"),
2613 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2614 'foo', "InvalidArgs") ]
2615 for (p,method,err) in tests:
2617 p2p.ProvisionDiscoveryRequest(p, method)
2618 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2619 except dbus.exceptions.DBusException, e:
2620 if err not in str(e):
2621 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2624 dev[0].request("P2P_SET disabled 1")
2625 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2626 dbus_interface=dbus.PROPERTIES_IFACE)
2627 raise Exception("Invalid Get(Peers) accepted")
2628 except dbus.exceptions.DBusException, e:
2629 if "Error.Failed: P2P is not available for this interface" not in str(e):
2630 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2632 dev[0].request("P2P_SET disabled 0")
2634 def test_dbus_p2p_oom(dev, apdev):
2635 """D-Bus P2P operations and OOM"""
2636 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2637 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2639 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2640 "Find", "InvalidArgs"):
2641 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2643 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2644 "Find", "InvalidArgs"):
2645 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2647 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2648 "Find", "InvalidArgs"):
2649 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2651 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2652 "Find", "InvalidArgs"):
2653 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2655 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2656 "Find", "InvalidArgs"):
2657 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2659 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2660 "Find", "InvalidArgs"):
2661 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2662 dbus.ByteArray('123'),
2663 dbus.ByteArray('123'),
2664 dbus.ByteArray('123'),
2665 dbus.ByteArray('123'),
2666 dbus.ByteArray('123'),
2667 dbus.ByteArray('123'),
2668 dbus.ByteArray('123'),
2669 dbus.ByteArray('123'),
2670 dbus.ByteArray('123'),
2671 dbus.ByteArray('123') ] }))
2673 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2674 "Find", "InvalidArgs"):
2675 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2677 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2678 "Find", "InvalidArgs"):
2679 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2681 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2682 "AddService", "InvalidArgs"):
2683 args = { 'service_type': 'bonjour',
2684 'response': dbus.ByteArray(500*'b') }
2685 p2p.AddService(args)
2687 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2688 "AddService", "InvalidArgs"):
2689 p2p.AddService(args)
2691 def test_dbus_p2p_discovery(dev, apdev):
2692 """D-Bus P2P discovery"""
2693 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2694 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2696 addr0 = dev[0].p2p_dev_addr()
2698 dev[1].request("SET sec_device_type 1-0050F204-2")
2699 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2701 addr1 = dev[1].p2p_dev_addr()
2702 a1 = binascii.unhexlify(addr1.replace(':',''))
2704 wfd_devinfo = "00001c440028"
2705 dev[2].request("SET wifi_display 1")
2706 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2707 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2709 addr2 = dev[2].p2p_dev_addr()
2710 a2 = binascii.unhexlify(addr2.replace(':',''))
2712 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2713 dbus_interface=dbus.PROPERTIES_IFACE)
2714 if 'Peers' not in res:
2715 raise Exception("GetAll result missing Peers")
2716 if len(res['Peers']) != 0:
2717 raise Exception("Unexpected peer(s) in the list")
2719 args = {'DiscoveryType': 'social',
2720 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2721 'Timeout': dbus.Int32(1) }
2722 p2p.Find(dbus.Dictionary(args))
2725 class TestDbusP2p(TestDbus):
2726 def __init__(self, bus):
2727 TestDbus.__init__(self, bus)
2730 self.found_prop = False
2732 self.find_stopped = False
2734 def __enter__(self):
2735 gobject.timeout_add(1, self.run_test)
2736 gobject.timeout_add(15000, self.timeout)
2737 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2739 self.add_signal(self.deviceFoundProperties,
2740 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2741 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2743 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2744 WPAS_DBUS_IFACE_P2PDEVICE,
2745 "ProvisionDiscoveryResponseEnterPin")
2746 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
2751 def deviceFound(self, path):
2752 logger.debug("deviceFound: path=%s" % path)
2753 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2754 dbus_interface=dbus.PROPERTIES_IFACE)
2756 raise Exception("Unexpected number of peers")
2758 raise Exception("Mismatch in peer object path")
2759 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2760 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2761 dbus_interface=dbus.PROPERTIES_IFACE,
2763 logger.debug("peer properties: " + str(res))
2765 if res['DeviceAddress'] == a1:
2766 if 'SecondaryDeviceTypes' not in res:
2767 raise Exception("Missing SecondaryDeviceTypes")
2768 sec = res['SecondaryDeviceTypes']
2770 raise Exception("Secondary device type missing")
2771 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2772 raise Exception("Secondary device type mismatch")
2774 if 'VendorExtension' not in res:
2775 raise Exception("Missing VendorExtension")
2776 vendor = res['VendorExtension']
2778 raise Exception("Vendor extension missing")
2779 if "\x11\x22\x33\x44" not in vendor:
2780 raise Exception("Secondary device type mismatch")
2783 elif res['DeviceAddress'] == a2:
2784 if 'IEs' not in res:
2785 raise Exception("IEs missing")
2786 if res['IEs'] != wfd:
2787 raise Exception("IEs mismatch")
2790 raise Exception("Unexpected peer device address")
2792 if self.found and self.found2:
2794 p2p.RejectPeer(path)
2795 p2p.ProvisionDiscoveryRequest(path, 'display')
2797 def deviceLost(self, path):
2798 logger.debug("deviceLost: path=%s" % path)
2801 p2p.RejectPeer(path)
2802 raise Exception("Invalid RejectPeer accepted")
2803 except dbus.exceptions.DBusException, e:
2804 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2805 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2808 def deviceFoundProperties(self, path, properties):
2809 logger.debug("deviceFoundProperties: path=%s" % path)
2810 logger.debug("peer properties: " + str(properties))
2811 if properties['DeviceAddress'] == a1:
2812 self.found_prop = True
2814 def provisionDiscoveryResponseEnterPin(self, peer_object):
2815 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2818 def findStopped(self):
2819 logger.debug("findStopped")
2820 self.find_stopped = True
2822 def run_test(self, *args):
2823 logger.debug("run_test")
2824 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2825 'Timeout': dbus.Int32(10)}))
2829 return self.found and self.lost and self.found2 and self.find_stopped
2831 with TestDbusP2p(bus) as t:
2833 raise Exception("Expected signals not seen")
2835 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2836 dev[1].p2p_stop_find()
2839 dev[2].p2p_stop_find()
2840 dev[2].request("P2P_FLUSH")
2841 if not dev[2].discover_peer(addr0):
2842 raise Exception("Peer not found")
2844 dev[2].p2p_stop_find()
2847 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2848 raise Exception("Invalid ExtendedListen accepted")
2849 except dbus.exceptions.DBusException, e:
2850 if "InvalidArgs" not in str(e):
2851 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2853 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2854 p2p.ExtendedListen(dbus.Dictionary({}))
2855 dev[0].global_request("P2P_EXT_LISTEN")
2857 def test_dbus_p2p_service_discovery(dev, apdev):
2858 """D-Bus P2P service discovery"""
2859 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2860 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2862 addr0 = dev[0].p2p_dev_addr()
2863 addr1 = dev[1].p2p_dev_addr()
2865 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2866 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2868 args = { 'service_type': 'bonjour',
2869 'query': bonjour_query,
2870 'response': bonjour_response }
2871 p2p.AddService(args)
2873 p2p.AddService(args)
2876 p2p.DeleteService(args)
2877 raise Exception("Invalid DeleteService() accepted")
2878 except dbus.exceptions.DBusException, e:
2879 if "InvalidArgs" not in str(e):
2880 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2882 args = { 'service_type': 'bonjour',
2883 'query': bonjour_query }
2884 p2p.DeleteService(args)
2886 p2p.DeleteService(args)
2887 raise Exception("Invalid DeleteService() accepted")
2888 except dbus.exceptions.DBusException, e:
2889 if "InvalidArgs" not in str(e):
2890 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2892 args = { 'service_type': 'upnp',
2894 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2895 p2p.AddService(args)
2896 p2p.DeleteService(args)
2898 p2p.DeleteService(args)
2899 raise Exception("Invalid DeleteService() accepted")
2900 except dbus.exceptions.DBusException, e:
2901 if "InvalidArgs" not in str(e):
2902 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2904 tests = [ { 'service_type': 'foo' },
2905 { 'service_type': 'foo', 'query': bonjour_query },
2906 { 'service_type': 'upnp' },
2907 { 'service_type': 'upnp', 'version': 0x10 },
2908 { 'service_type': 'upnp',
2909 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2911 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2912 { 'service_type': 'upnp', 'foo': 'bar' },
2913 { 'service_type': 'bonjour' },
2914 { 'service_type': 'bonjour', 'query': 'foo' },
2915 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2918 p2p.DeleteService(args)
2919 raise Exception("Invalid DeleteService() accepted")
2920 except dbus.exceptions.DBusException, e:
2921 if "InvalidArgs" not in str(e):
2922 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2924 tests = [ { 'service_type': 'foo' },
2925 { 'service_type': 'upnp' },
2926 { 'service_type': 'upnp', 'version': 0x10 },
2927 { 'service_type': 'upnp',
2928 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2930 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2931 { 'service_type': 'upnp', 'foo': 'bar' },
2932 { 'service_type': 'bonjour' },
2933 { 'service_type': 'bonjour', 'query': 'foo' },
2934 { 'service_type': 'bonjour', 'response': 'foo' },
2935 { 'service_type': 'bonjour', 'query': bonjour_query },
2936 { 'service_type': 'bonjour', 'response': bonjour_response },
2937 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2938 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2941 p2p.AddService(args)
2942 raise Exception("Invalid AddService() accepted")
2943 except dbus.exceptions.DBusException, e:
2944 if "InvalidArgs" not in str(e):
2945 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2947 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2948 ref = p2p.ServiceDiscoveryRequest(args)
2949 p2p.ServiceDiscoveryCancelRequest(ref)
2951 p2p.ServiceDiscoveryCancelRequest(ref)
2952 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2953 except dbus.exceptions.DBusException, e:
2954 if "InvalidArgs" not in str(e):
2955 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2957 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2958 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2959 except dbus.exceptions.DBusException, e:
2960 if "InvalidArgs" not in str(e):
2961 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2963 args = { 'service_type': 'upnp',
2965 'service': 'ssdp:foo' }
2966 ref = p2p.ServiceDiscoveryRequest(args)
2967 p2p.ServiceDiscoveryCancelRequest(ref)
2969 tests = [ { 'service_type': 'foo' },
2974 { 'service_type': 'upnp',
2975 'service': 'ssdp:foo' },
2976 { 'service_type': 'upnp',
2978 { 'service_type': 'upnp',
2980 'service': 'ssdp:foo',
2981 'peer_object': dbus.ObjectPath(path + "/Peers") },
2982 { 'service_type': 'upnp',
2984 'service': 'ssdp:foo',
2985 'peer_object': path + "/Peers" },
2986 { 'service_type': 'upnp',
2988 'service': 'ssdp:foo',
2989 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2992 p2p.ServiceDiscoveryRequest(args)
2993 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2994 except dbus.exceptions.DBusException, e:
2995 if "InvalidArgs" not in str(e):
2996 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2998 args = { 'foo': 'bar' }
3000 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3001 raise Exception("Invalid ServiceDiscoveryResponse accepted")
3002 except dbus.exceptions.DBusException, e:
3003 if "InvalidArgs" not in str(e):
3004 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3006 def test_dbus_p2p_service_discovery_query(dev, apdev):
3007 """D-Bus P2P service discovery query"""
3008 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3009 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3011 addr0 = dev[0].p2p_dev_addr()
3012 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3014 addr1 = dev[1].p2p_dev_addr()
3016 class TestDbusP2p(TestDbus):
3017 def __init__(self, bus):
3018 TestDbus.__init__(self, bus)
3021 def __enter__(self):
3022 gobject.timeout_add(1, self.run_test)
3023 gobject.timeout_add(15000, self.timeout)
3024 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3026 self.add_signal(self.serviceDiscoveryResponse,
3027 WPAS_DBUS_IFACE_P2PDEVICE,
3028 "ServiceDiscoveryResponse", byte_arrays=True)
3032 def deviceFound(self, path):
3033 logger.debug("deviceFound: path=%s" % path)
3034 args = { 'peer_object': path,
3035 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3036 p2p.ServiceDiscoveryRequest(args)
3038 def serviceDiscoveryResponse(self, sd_request):
3039 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3043 def run_test(self, *args):
3044 logger.debug("run_test")
3045 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3046 'Timeout': dbus.Int32(10)}))
3052 with TestDbusP2p(bus) as t:
3054 raise Exception("Expected signals not seen")
3056 dev[1].p2p_stop_find()
3058 def test_dbus_p2p_service_discovery_external(dev, apdev):
3059 """D-Bus P2P service discovery with external response"""
3061 _test_dbus_p2p_service_discovery_external(dev, apdev)
3063 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3065 def _test_dbus_p2p_service_discovery_external(dev, apdev):
3066 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3067 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3069 addr0 = dev[0].p2p_dev_addr()
3070 addr1 = dev[1].p2p_dev_addr()
3073 dev[1].request("P2P_FLUSH")
3074 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3075 dev[1].p2p_find(social=True)
3077 class TestDbusP2p(TestDbus):
3078 def __init__(self, bus):
3079 TestDbus.__init__(self, bus)
3082 def __enter__(self):
3083 gobject.timeout_add(1, self.run_test)
3084 gobject.timeout_add(15000, self.timeout)
3085 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3087 self.add_signal(self.serviceDiscoveryRequest,
3088 WPAS_DBUS_IFACE_P2PDEVICE,
3089 "ServiceDiscoveryRequest")
3093 def deviceFound(self, path):
3094 logger.debug("deviceFound: path=%s" % path)
3096 def serviceDiscoveryRequest(self, sd_request):
3097 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3099 args = { 'peer_object': sd_request['peer_object'],
3100 'frequency': sd_request['frequency'],
3101 'dialog_token': sd_request['dialog_token'],
3102 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3103 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3106 def run_test(self, *args):
3107 logger.debug("run_test")
3108 p2p.ServiceDiscoveryExternal(1)
3116 with TestDbusP2p(bus) as t:
3118 raise Exception("Expected signals not seen")
3120 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3122 raise Exception("Service discovery timed out")
3124 raise Exception("Unexpected address in SD Response: " + ev)
3125 if ev.split(' ')[4] != resp:
3126 raise Exception("Unexpected response data SD Response: " + ev)
3127 dev[1].p2p_stop_find()
3130 p2p.ServiceDiscoveryExternal(0)
3132 def test_dbus_p2p_autogo(dev, apdev):
3133 """D-Bus P2P autonomous GO"""
3134 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3135 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3137 addr0 = dev[0].p2p_dev_addr()
3139 class TestDbusP2p(TestDbus):
3140 def __init__(self, bus):
3141 TestDbus.__init__(self, bus)
3143 self.waiting_end = False
3144 self.exceptions = False
3145 self.deauthorized = False
3148 def __enter__(self):
3149 gobject.timeout_add(1, self.run_test)
3150 gobject.timeout_add(15000, self.timeout)
3151 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3153 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3155 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3157 self.add_signal(self.persistentGroupAdded,
3158 WPAS_DBUS_IFACE_P2PDEVICE,
3159 "PersistentGroupAdded")
3160 self.add_signal(self.persistentGroupRemoved,
3161 WPAS_DBUS_IFACE_P2PDEVICE,
3162 "PersistentGroupRemoved")
3163 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3164 WPAS_DBUS_IFACE_P2PDEVICE,
3165 "ProvisionDiscoveryRequestDisplayPin")
3166 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3168 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3173 def groupStarted(self, properties):
3174 logger.debug("groupStarted: " + str(properties))
3175 self.group = properties['group_object']
3176 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3177 properties['interface_object'])
3178 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3179 dbus_interface=dbus.PROPERTIES_IFACE)
3181 self.exceptions = True
3182 raise Exception("Unexpected role reported: " + role)
3183 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3184 dbus_interface=dbus.PROPERTIES_IFACE)
3185 if group != properties['group_object']:
3186 self.exceptions = True
3187 raise Exception("Unexpected Group reported: " + str(group))
3188 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3189 dbus_interface=dbus.PROPERTIES_IFACE)
3191 self.exceptions = True
3192 raise Exception("Unexpected PeerGO value: " + str(go))
3195 logger.info("Remove persistent group instance")
3196 group_p2p = dbus.Interface(self.g_if_obj,
3197 WPAS_DBUS_IFACE_P2PDEVICE)
3198 group_p2p.Disconnect()
3200 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3201 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3203 def groupFinished(self, properties):
3204 logger.debug("groupFinished: " + str(properties))
3205 if self.waiting_end:
3206 logger.info("Remove persistent group")
3207 p2p.RemovePersistentGroup(self.persistent)
3209 logger.info("Re-start persistent group")
3210 params = dbus.Dictionary({'persistent_group_object':
3213 p2p.GroupAdd(params)
3215 def persistentGroupAdded(self, path, properties):
3216 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3217 self.persistent = path
3219 def persistentGroupRemoved(self, path):
3220 logger.debug("persistentGroupRemoved: %s" % path)
3224 def deviceFound(self, path):
3225 logger.debug("deviceFound: path=%s" % path)
3226 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3227 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3228 dbus_interface=dbus.PROPERTIES_IFACE,
3230 logger.debug('peer properties: ' + str(self.peer))
3232 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3233 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3234 self.peer_path = peer_object
3235 peer = binascii.unhexlify(peer_object.split('/')[-1])
3240 addr += '%02x' % ord(p)
3242 params = { 'Role': 'registrar',
3243 'P2PDeviceAddress': self.peer['DeviceAddress'],
3244 'Bssid': self.peer['DeviceAddress'],
3246 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3249 self.exceptions = True
3250 raise Exception("Invalid WPS.Start() accepted")
3251 except dbus.exceptions.DBusException, e:
3252 if "InvalidArgs" not in str(e):
3253 self.exceptions = True
3254 raise Exception("Unexpected error message: " + str(e))
3255 params = { 'Role': 'registrar',
3256 'P2PDeviceAddress': self.peer['DeviceAddress'],
3259 logger.info("Authorize peer to connect to the group")
3262 def staAuthorized(self, name):
3263 logger.debug("staAuthorized: " + name)
3264 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3265 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3266 dbus_interface=dbus.PROPERTIES_IFACE,
3268 logger.debug("Peer properties: " + str(res))
3269 if 'Groups' not in res or len(res['Groups']) != 1:
3270 self.exceptions = True
3271 raise Exception("Unexpected number of peer Groups entries")
3272 if res['Groups'][0] != self.group:
3273 self.exceptions = True
3274 raise Exception("Unexpected peer Groups[0] value")
3276 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3277 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3278 dbus_interface=dbus.PROPERTIES_IFACE,
3280 logger.debug("Group properties: " + str(res))
3281 if 'Members' not in res or len(res['Members']) != 1:
3282 self.exceptions = True
3283 raise Exception("Unexpected number of group members")
3285 ext = dbus.ByteArray("\x11\x22\x33\x44")
3286 # Earlier implementation of this interface was a bit strange. The
3287 # property is defined to have aay signature and that is what the
3288 # getter returned. However, the setter expected there to be a
3289 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3290 # values.. The current implementations maintains support for that
3291 # for backwards compability reasons. Verify that encoding first.
3292 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3294 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3295 dbus_interface=dbus.PROPERTIES_IFACE)
3296 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3297 dbus_interface=dbus.PROPERTIES_IFACE,
3300 self.exceptions = True
3301 raise Exception("Unexpected number of vendor extensions")
3303 self.exceptions = True
3304 raise Exception("Vendor extension value changed")
3306 # And now verify that the more appropriate encoding is accepted as
3308 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3309 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3310 dbus_interface=dbus.PROPERTIES_IFACE)
3311 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3312 dbus_interface=dbus.PROPERTIES_IFACE,
3315 self.exceptions = True
3316 raise Exception("Unexpected number of vendor extensions")
3317 if res[0] != res2[0] or res[1] != res2[1]:
3318 self.exceptions = True
3319 raise Exception("Vendor extension value changed")
3322 res.append(dbus.ByteArray('\xaa\xbb'))
3324 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3325 dbus_interface=dbus.PROPERTIES_IFACE)
3326 self.exceptions = True
3327 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3328 except dbus.exceptions.DBusException, e:
3329 if "Error.Failed" not in str(e):
3330 self.exceptions = True
3331 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3333 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3335 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3336 dbus_interface=dbus.PROPERTIES_IFACE)
3337 self.exceptions = True
3338 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3339 except dbus.exceptions.DBusException, e:
3340 if "InvalidArgs" not in str(e):
3341 self.exceptions = True
3342 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3346 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3347 dbus_interface=dbus.PROPERTIES_IFACE)
3348 self.exceptions = True
3349 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3350 except dbus.exceptions.DBusException, e:
3351 if "Error.Failed" not in str(e):
3352 self.exceptions = True
3353 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3355 vals = [ [ "foo" ] ]
3357 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3358 dbus_interface=dbus.PROPERTIES_IFACE)
3359 self.exceptions = True
3360 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3361 except dbus.exceptions.DBusException, e:
3362 if "Error.Failed" not in str(e):
3363 self.exceptions = True
3364 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3366 p2p.RemoveClient({ 'peer': self.peer_path })
3368 self.waiting_end = True
3369 group_p2p = dbus.Interface(self.g_if_obj,
3370 WPAS_DBUS_IFACE_P2PDEVICE)
3371 group_p2p.Disconnect()
3373 def staDeauthorized(self, name):
3374 logger.debug("staDeauthorized: " + name)
3375 self.deauthorized = True
3377 def run_test(self, *args):
3378 logger.debug("run_test")
3379 params = dbus.Dictionary({'persistent': True,
3381 logger.info("Add a persistent group")
3382 p2p.GroupAdd(params)
3386 return self.done and self.deauthorized and not self.exceptions
3388 with TestDbusP2p(bus) as t:
3390 raise Exception("Expected signals not seen")
3392 dev[1].wait_go_ending_session()
3394 def test_dbus_p2p_autogo_pbc(dev, apdev):
3395 """D-Bus P2P autonomous GO and PBC"""
3396 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3397 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3399 addr0 = dev[0].p2p_dev_addr()
3401 class TestDbusP2p(TestDbus):
3402 def __init__(self, bus):
3403 TestDbus.__init__(self, bus)
3405 self.waiting_end = False
3408 def __enter__(self):
3409 gobject.timeout_add(1, self.run_test)
3410 gobject.timeout_add(15000, self.timeout)
3411 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3413 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3415 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3417 self.add_signal(self.provisionDiscoveryPBCRequest,
3418 WPAS_DBUS_IFACE_P2PDEVICE,
3419 "ProvisionDiscoveryPBCRequest")
3420 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3425 def groupStarted(self, properties):
3426 logger.debug("groupStarted: " + str(properties))
3427 self.group = properties['group_object']
3428 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3429 properties['interface_object'])
3430 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3431 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3433 def groupFinished(self, properties):
3434 logger.debug("groupFinished: " + str(properties))
3438 def deviceFound(self, path):
3439 logger.debug("deviceFound: path=%s" % path)
3440 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3441 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3442 dbus_interface=dbus.PROPERTIES_IFACE,
3444 logger.debug('peer properties: ' + str(self.peer))
3446 def provisionDiscoveryPBCRequest(self, peer_object):
3447 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3448 self.peer_path = peer_object
3449 peer = binascii.unhexlify(peer_object.split('/')[-1])
3454 addr += '%02x' % ord(p)
3455 params = { 'Role': 'registrar',
3456 'P2PDeviceAddress': self.peer['DeviceAddress'],
3458 logger.info("Authorize peer to connect to the group")
3459 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3462 def staAuthorized(self, name):
3463 logger.debug("staAuthorized: " + name)
3464 group_p2p = dbus.Interface(self.g_if_obj,
3465 WPAS_DBUS_IFACE_P2PDEVICE)
3466 group_p2p.Disconnect()
3468 def run_test(self, *args):
3469 logger.debug("run_test")
3470 params = dbus.Dictionary({'frequency': 2412})
3471 p2p.GroupAdd(params)
3477 with TestDbusP2p(bus) as t:
3479 raise Exception("Expected signals not seen")
3481 dev[1].wait_go_ending_session()
3482 dev[1].flush_scan_cache()
3484 def test_dbus_p2p_autogo_legacy(dev, apdev):
3485 """D-Bus P2P autonomous GO and legacy STA"""
3486 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3487 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3489 addr0 = dev[0].p2p_dev_addr()
3491 class TestDbusP2p(TestDbus):
3492 def __init__(self, bus):
3493 TestDbus.__init__(self, bus)
3496 def __enter__(self):
3497 gobject.timeout_add(1, self.run_test)
3498 gobject.timeout_add(15000, self.timeout)
3499 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3501 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3503 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3508 def groupStarted(self, properties):
3509 logger.debug("groupStarted: " + str(properties))
3510 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3511 properties['group_object'])
3512 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3513 dbus_interface=dbus.PROPERTIES_IFACE,
3515 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3518 params = { 'Role': 'enrollee',
3521 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3522 properties['interface_object'])
3523 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
3525 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3526 dev1.scan_for_bss(bssid, freq=2412)
3527 dev1.request("WPS_PIN " + bssid + " " + pin)
3528 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3530 def groupFinished(self, properties):
3531 logger.debug("groupFinished: " + str(properties))
3535 def staAuthorized(self, name):
3536 logger.debug("staAuthorized: " + name)
3537 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3538 dev1.request("DISCONNECT")
3539 self.group_p2p.Disconnect()
3541 def run_test(self, *args):
3542 logger.debug("run_test")
3543 params = dbus.Dictionary({'frequency': 2412})
3544 p2p.GroupAdd(params)
3550 with TestDbusP2p(bus) as t:
3552 raise Exception("Expected signals not seen")
3554 def test_dbus_p2p_join(dev, apdev):
3555 """D-Bus P2P join an autonomous GO"""
3556 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3557 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3559 addr1 = dev[1].p2p_dev_addr()
3560 addr2 = dev[2].p2p_dev_addr()
3561 dev[1].p2p_start_go(freq=2412)
3562 dev1_group_ifname = dev[1].group_ifname
3565 class TestDbusP2p(TestDbus):
3566 def __init__(self, bus):
3567 TestDbus.__init__(self, bus)
3572 def __enter__(self):
3573 gobject.timeout_add(1, self.run_test)
3574 gobject.timeout_add(15000, self.timeout)
3575 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3577 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3579 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3581 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3586 def deviceFound(self, path):
3587 logger.debug("deviceFound: path=%s" % path)
3588 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3589 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3590 dbus_interface=dbus.PROPERTIES_IFACE,
3592 logger.debug('peer properties: ' + str(res))
3593 if addr2.replace(':','') in path:
3595 elif addr1.replace(':','') in path:
3597 if self.peer and self.go:
3598 logger.info("Join the group")
3600 args = { 'peer': self.go,
3602 'wps_method': 'pin',
3604 pin = p2p.Connect(args)
3606 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3607 dev1.group_ifname = dev1_group_ifname
3608 dev1.group_request("WPS_PIN any " + pin)
3610 def groupStarted(self, properties):
3611 logger.debug("groupStarted: " + str(properties))
3612 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3613 properties['interface_object'])
3614 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3615 dbus_interface=dbus.PROPERTIES_IFACE)
3616 if role != "client":
3617 raise Exception("Unexpected role reported: " + role)
3618 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3619 dbus_interface=dbus.PROPERTIES_IFACE)
3620 if group != properties['group_object']:
3621 raise Exception("Unexpected Group reported: " + str(group))
3622 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3623 dbus_interface=dbus.PROPERTIES_IFACE)
3625 raise Exception("Unexpected PeerGO value: " + str(go))
3627 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3628 properties['group_object'])
3629 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3630 dbus_interface=dbus.PROPERTIES_IFACE,
3632 logger.debug("Group properties: " + str(res))
3634 ext = dbus.ByteArray("\x11\x22\x33\x44")
3636 # Set(WPSVendorExtensions) not allowed for P2P Client
3637 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3638 dbus_interface=dbus.PROPERTIES_IFACE)
3639 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3640 except dbus.exceptions.DBusException, e:
3641 if "Error.Failed: Failed to set property" not in str(e):
3642 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3644 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3645 args = { 'duration1': 30000, 'interval1': 102400,
3646 'duration2': 20000, 'interval2': 102400 }
3647 group_p2p.PresenceRequest(args)
3649 args = { 'peer': self.peer }
3650 group_p2p.Invite(args)
3652 def groupFinished(self, properties):
3653 logger.debug("groupFinished: " + str(properties))
3657 def invitationResult(self, result):
3658 logger.debug("invitationResult: " + str(result))
3659 if result['status'] != 1:
3660 raise Exception("Unexpected invitation result: " + str(result))
3661 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3662 dev1.group_ifname = dev1_group_ifname
3665 def run_test(self, *args):
3666 logger.debug("run_test")
3667 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3673 with TestDbusP2p(bus) as t:
3675 raise Exception("Expected signals not seen")
3677 dev[2].p2p_stop_find()
3679 def test_dbus_p2p_invitation_received(dev, apdev):
3680 """D-Bus P2P and InvitationReceived"""
3681 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3682 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3684 form(dev[0], dev[1])
3685 addr0 = dev[0].p2p_dev_addr()
3687 dev[0].global_request("SET persistent_reconnect 0")
3689 if not dev[1].discover_peer(addr0, social=True):
3690 raise Exception("Peer " + addr0 + " not found")
3691 peer = dev[1].get_peer(addr0)
3693 class TestDbusP2p(TestDbus):
3694 def __init__(self, bus):
3695 TestDbus.__init__(self, bus)
3698 def __enter__(self):
3699 gobject.timeout_add(1, self.run_test)
3700 gobject.timeout_add(15000, self.timeout)
3701 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
3702 "InvitationReceived")
3706 def invitationReceived(self, result):
3707 logger.debug("invitationReceived: " + str(result))
3711 def run_test(self, *args):
3712 logger.debug("run_test")
3713 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3714 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
3715 dev1.global_request(cmd)
3721 with TestDbusP2p(bus) as t:
3723 raise Exception("Expected signals not seen")
3725 dev[0].p2p_stop_find()
3726 dev[1].p2p_stop_find()
3728 def test_dbus_p2p_config(dev, apdev):
3729 """D-Bus Get/Set P2PDeviceConfig"""
3731 _test_dbus_p2p_config(dev, apdev)
3733 dev[0].request("P2P_SET ssid_postfix ")
3735 def _test_dbus_p2p_config(dev, apdev):
3736 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3737 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3739 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3740 dbus_interface=dbus.PROPERTIES_IFACE,
3742 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3743 dbus_interface=dbus.PROPERTIES_IFACE)
3744 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3745 dbus_interface=dbus.PROPERTIES_IFACE,
3748 if len(res) != len(res2):
3749 raise Exception("Different number of parameters")
3751 if res[k] != res2[k]:
3752 raise Exception("Parameter %s value changes" % k)
3754 changes = { 'SsidPostfix': 'foo',
3755 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3756 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3757 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3758 dbus.Dictionary(changes, signature='sv'),
3759 dbus_interface=dbus.PROPERTIES_IFACE)
3761 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3762 dbus_interface=dbus.PROPERTIES_IFACE,
3764 logger.debug("P2PDeviceConfig: " + str(res2))
3765 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3766 raise Exception("VendorExtension does not match")
3767 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3768 raise Exception("SecondaryDeviceType does not match")
3770 changes = { 'SsidPostfix': '',
3771 'VendorExtension': dbus.Array([], signature="ay"),
3772 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3773 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3774 dbus.Dictionary(changes, signature='sv'),
3775 dbus_interface=dbus.PROPERTIES_IFACE)
3777 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3778 dbus_interface=dbus.PROPERTIES_IFACE,
3780 logger.debug("P2PDeviceConfig: " + str(res3))
3781 if 'VendorExtension' in res3:
3782 raise Exception("VendorExtension not removed")
3783 if 'SecondaryDeviceTypes' in res3:
3784 raise Exception("SecondaryDeviceType not removed")
3787 dev[0].request("P2P_SET disabled 1")
3788 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3789 dbus_interface=dbus.PROPERTIES_IFACE,
3791 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3792 except dbus.exceptions.DBusException, e:
3793 if "Error.Failed: P2P is not available for this interface" not in str(e):
3794 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3796 dev[0].request("P2P_SET disabled 0")
3799 dev[0].request("P2P_SET disabled 1")
3800 changes = { 'SsidPostfix': 'foo' }
3801 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3802 dbus.Dictionary(changes, signature='sv'),
3803 dbus_interface=dbus.PROPERTIES_IFACE)
3804 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3805 except dbus.exceptions.DBusException, e:
3806 if "Error.Failed: P2P is not available for this interface" not in str(e):
3807 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3809 dev[0].request("P2P_SET disabled 0")
3811 tests = [ { 'DeviceName': 123 },
3812 { 'SsidPostfix': 123 },
3814 for changes in tests:
3816 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3817 dbus.Dictionary(changes, signature='sv'),
3818 dbus_interface=dbus.PROPERTIES_IFACE)
3819 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3820 except dbus.exceptions.DBusException, e:
3821 if "InvalidArgs" not in str(e):
3822 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3824 def test_dbus_p2p_persistent(dev, apdev):
3825 """D-Bus P2P persistent group"""
3826 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3827 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3829 class TestDbusP2p(TestDbus):
3830 def __init__(self, bus):
3831 TestDbus.__init__(self, bus)
3833 def __enter__(self):
3834 gobject.timeout_add(1, self.run_test)
3835 gobject.timeout_add(15000, self.timeout)
3836 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3838 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3840 self.add_signal(self.persistentGroupAdded,
3841 WPAS_DBUS_IFACE_P2PDEVICE,
3842 "PersistentGroupAdded")
3846 def groupStarted(self, properties):
3847 logger.debug("groupStarted: " + str(properties))
3848 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3849 properties['interface_object'])
3850 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3851 group_p2p.Disconnect()
3853 def groupFinished(self, properties):
3854 logger.debug("groupFinished: " + str(properties))
3857 def persistentGroupAdded(self, path, properties):
3858 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3859 self.persistent = path
3861 def run_test(self, *args):
3862 logger.debug("run_test")
3863 params = dbus.Dictionary({'persistent': True,
3865 logger.info("Add a persistent group")
3866 p2p.GroupAdd(params)
3872 with TestDbusP2p(bus) as t:
3874 raise Exception("Expected signals not seen")
3875 persistent = t.persistent
3877 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3878 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3879 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3880 logger.info("Persistent group Properties: " + str(res))
3881 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3882 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3883 dbus_interface=dbus.PROPERTIES_IFACE)
3884 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3885 dbus_interface=dbus.PROPERTIES_IFACE)
3886 if len(res) != len(res2):
3887 raise Exception("Different number of parameters")
3889 if k != 'ssid' and res[k] != res2[k]:
3890 raise Exception("Parameter %s value changes" % k)
3891 if res2['ssid'] != '"DIRECT-foo"':
3892 raise Exception("Unexpected ssid")
3894 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3895 'psk': '1234567890' }, signature='sv')
3896 group = p2p.AddPersistentGroup(args)
3898 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3899 dbus_interface=dbus.PROPERTIES_IFACE)
3900 if len(groups) != 2:
3901 raise Exception("Unexpected number of persistent groups: " + str(groups))
3903 p2p.RemoveAllPersistentGroups()
3905 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3906 dbus_interface=dbus.PROPERTIES_IFACE)
3907 if len(groups) != 0:
3908 raise Exception("Unexpected number of persistent groups: " + str(groups))
3911 p2p.RemovePersistentGroup(persistent)
3912 raise Exception("Invalid RemovePersistentGroup accepted")
3913 except dbus.exceptions.DBusException, e:
3914 if "NetworkUnknown: There is no such persistent group" not in str(e):
3915 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3917 def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3918 """D-Bus P2P reinvoke persistent group"""
3919 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3920 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3922 addr0 = dev[0].p2p_dev_addr()
3924 class TestDbusP2p(TestDbus):
3925 def __init__(self, bus):
3926 TestDbus.__init__(self, bus)
3928 self.waiting_end = False
3930 self.invited = False
3932 def __enter__(self):
3933 gobject.timeout_add(1, self.run_test)
3934 gobject.timeout_add(15000, self.timeout)
3935 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3937 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3939 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3941 self.add_signal(self.persistentGroupAdded,
3942 WPAS_DBUS_IFACE_P2PDEVICE,
3943 "PersistentGroupAdded")
3944 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3945 WPAS_DBUS_IFACE_P2PDEVICE,
3946 "ProvisionDiscoveryRequestDisplayPin")
3947 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3952 def groupStarted(self, properties):
3953 logger.debug("groupStarted: " + str(properties))
3954 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3955 properties['interface_object'])
3956 if not self.invited:
3957 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3958 properties['group_object'])
3959 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3960 dbus_interface=dbus.PROPERTIES_IFACE,
3962 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3963 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3964 dev1.scan_for_bss(bssid, freq=2412)
3965 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3967 def groupFinished(self, properties):
3968 logger.debug("groupFinished: " + str(properties))
3973 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3974 dev1.global_request("SET persistent_reconnect 1")
3977 args = { 'persistent_group_object': dbus.ObjectPath(path),
3978 'peer': self.peer_path }
3980 pin = p2p.Invite(args)
3981 raise Exception("Invalid Invite accepted")
3982 except dbus.exceptions.DBusException, e:
3983 if "InvalidArgs" not in str(e):
3984 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3986 args = { 'persistent_group_object': self.persistent,
3987 'peer': self.peer_path }
3988 pin = p2p.Invite(args)
3991 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
3993 if self.sta_group_ev is None:
3994 raise Exception("P2P-GROUP-STARTED event not seen")
3996 def persistentGroupAdded(self, path, properties):
3997 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3998 self.persistent = path
4000 def deviceFound(self, path):
4001 logger.debug("deviceFound: path=%s" % path)
4002 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4003 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4004 dbus_interface=dbus.PROPERTIES_IFACE,
4007 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4008 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4009 self.peer_path = peer_object
4010 peer = binascii.unhexlify(peer_object.split('/')[-1])
4015 addr += '%02x' % ord(p)
4016 params = { 'Role': 'registrar',
4017 'P2PDeviceAddress': self.peer['DeviceAddress'],
4018 'Bssid': self.peer['DeviceAddress'],
4021 logger.info("Authorize peer to connect to the group")
4022 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4023 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4025 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4027 if self.sta_group_ev is None:
4028 raise Exception("P2P-GROUP-STARTED event not seen")
4030 def staAuthorized(self, name):
4031 logger.debug("staAuthorized: " + name)
4032 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4033 dev1.group_form_result(self.sta_group_ev)
4035 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4037 raise Exception("Group removal timed out")
4038 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4039 group_p2p.Disconnect()
4041 def run_test(self, *args):
4042 logger.debug("run_test")
4043 params = dbus.Dictionary({'persistent': True,
4045 logger.info("Add a persistent group")
4046 p2p.GroupAdd(params)
4052 with TestDbusP2p(bus) as t:
4054 raise Exception("Expected signals not seen")
4056 def test_dbus_p2p_go_neg_rx(dev, apdev):
4057 """D-Bus P2P GO Negotiation receive"""
4058 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4059 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4060 addr0 = dev[0].p2p_dev_addr()
4062 class TestDbusP2p(TestDbus):
4063 def __init__(self, bus):
4064 TestDbus.__init__(self, bus)
4067 def __enter__(self):
4068 gobject.timeout_add(1, self.run_test)
4069 gobject.timeout_add(15000, self.timeout)
4070 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4072 self.add_signal(self.goNegotiationRequest,
4073 WPAS_DBUS_IFACE_P2PDEVICE,
4074 "GONegotiationRequest",
4076 self.add_signal(self.goNegotiationSuccess,
4077 WPAS_DBUS_IFACE_P2PDEVICE,
4078 "GONegotiationSuccess",
4080 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4082 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4087 def deviceFound(self, path):
4088 logger.debug("deviceFound: path=%s" % path)
4090 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4091 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4092 if dev_passwd_id != 1:
4093 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4094 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4095 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4098 raise Exception("Invalid Connect accepted")
4099 except dbus.exceptions.DBusException, e:
4100 if "ConnectChannelUnsupported" not in str(e):
4101 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4103 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4104 'go_intent': 15, 'persistent': False }
4107 def goNegotiationSuccess(self, properties):
4108 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4110 def groupStarted(self, properties):
4111 logger.debug("groupStarted: " + str(properties))
4112 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4113 properties['interface_object'])
4114 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4115 group_p2p.Disconnect()
4117 def groupFinished(self, properties):
4118 logger.debug("groupFinished: " + str(properties))
4122 def run_test(self, *args):
4123 logger.debug("run_test")
4125 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4126 if not dev1.discover_peer(addr0):
4127 raise Exception("Peer not found")
4128 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4134 with TestDbusP2p(bus) as t:
4136 raise Exception("Expected signals not seen")
4138 def test_dbus_p2p_go_neg_auth(dev, apdev):
4139 """D-Bus P2P GO Negotiation authorized"""
4140 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4141 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4142 addr0 = dev[0].p2p_dev_addr()
4145 class TestDbusP2p(TestDbus):
4146 def __init__(self, bus):
4147 TestDbus.__init__(self, bus)
4149 self.peer_joined = False
4150 self.peer_disconnected = False
4152 def __enter__(self):
4153 gobject.timeout_add(1, self.run_test)
4154 gobject.timeout_add(15000, self.timeout)
4155 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4157 self.add_signal(self.goNegotiationSuccess,
4158 WPAS_DBUS_IFACE_P2PDEVICE,
4159 "GONegotiationSuccess",
4161 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4163 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4165 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4167 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4169 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4174 def deviceFound(self, path):
4175 logger.debug("deviceFound: path=%s" % path)
4176 args = { 'peer': path, 'wps_method': 'keypad',
4177 'go_intent': 15, 'authorize_only': True }
4180 raise Exception("Invalid Connect accepted")
4181 except dbus.exceptions.DBusException, e:
4182 if "InvalidArgs" not in str(e):
4183 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4185 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4186 'go_intent': 15, 'authorize_only': True }
4189 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4190 if not dev1.discover_peer(addr0):
4191 raise Exception("Peer not found")
4192 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4193 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4195 raise Exception("Group formation timed out")
4196 self.sta_group_ev = ev
4198 def goNegotiationSuccess(self, properties):
4199 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4201 def groupStarted(self, properties):
4202 logger.debug("groupStarted: " + str(properties))
4203 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4204 properties['interface_object'])
4205 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4206 dev1.group_form_result(self.sta_group_ev)
4209 def staDeauthorized(self, name):
4210 logger.debug("staDeuthorized: " + name)
4211 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4212 group_p2p.Disconnect()
4214 def peerJoined(self, peer):
4215 logger.debug("peerJoined: " + peer)
4216 self.peer_joined = True
4218 def peerDisconnected(self, peer):
4219 logger.debug("peerDisconnected: " + peer)
4220 self.peer_disconnected = True
4222 def groupFinished(self, properties):
4223 logger.debug("groupFinished: " + str(properties))
4227 def run_test(self, *args):
4228 logger.debug("run_test")
4229 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4233 return self.done and self.peer_joined and self.peer_disconnected
4235 with TestDbusP2p(bus) as t:
4237 raise Exception("Expected signals not seen")
4239 def test_dbus_p2p_go_neg_init(dev, apdev):
4240 """D-Bus P2P GO Negotiation initiation"""
4241 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4242 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4243 addr0 = dev[0].p2p_dev_addr()
4246 class TestDbusP2p(TestDbus):
4247 def __init__(self, bus):
4248 TestDbus.__init__(self, bus)
4250 self.peer_group_added = False
4251 self.peer_group_removed = False
4253 def __enter__(self):
4254 gobject.timeout_add(1, self.run_test)
4255 gobject.timeout_add(15000, self.timeout)
4256 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4258 self.add_signal(self.goNegotiationSuccess,
4259 WPAS_DBUS_IFACE_P2PDEVICE,
4260 "GONegotiationSuccess",
4262 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4264 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4266 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4267 "PropertiesChanged")
4271 def deviceFound(self, path):
4272 logger.debug("deviceFound: path=%s" % path)
4273 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4274 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4278 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4280 raise Exception("Timeout while waiting for GO Neg Request")
4281 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4282 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4284 raise Exception("Group formation timed out")
4285 self.sta_group_ev = ev
4287 def goNegotiationSuccess(self, properties):
4288 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4290 def groupStarted(self, properties):
4291 logger.debug("groupStarted: " + str(properties))
4292 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4293 properties['interface_object'])
4294 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4295 group_p2p.Disconnect()
4296 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4297 dev1.group_form_result(self.sta_group_ev)
4300 def groupFinished(self, properties):
4301 logger.debug("groupFinished: " + str(properties))
4304 def propertiesChanged(self, interface_name, changed_properties,
4305 invalidated_properties):
4306 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4307 if interface_name != WPAS_DBUS_P2P_PEER:
4309 if "Groups" not in changed_properties:
4311 if len(changed_properties["Groups"]) > 0:
4312 self.peer_group_added = True
4313 if len(changed_properties["Groups"]) == 0:
4314 self.peer_group_removed = True
4317 def run_test(self, *args):
4318 logger.debug("run_test")
4319 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4323 return self.done and self.peer_group_added and self.peer_group_removed
4325 with TestDbusP2p(bus) as t:
4327 raise Exception("Expected signals not seen")
4329 def test_dbus_p2p_group_termination_by_go(dev, apdev):
4330 """D-Bus P2P group removal on GO terminating the group"""
4331 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4332 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4333 addr0 = dev[0].p2p_dev_addr()
4336 class TestDbusP2p(TestDbus):
4337 def __init__(self, bus):
4338 TestDbus.__init__(self, bus)
4340 self.peer_group_added = False
4341 self.peer_group_removed = False
4343 def __enter__(self):
4344 gobject.timeout_add(1, self.run_test)
4345 gobject.timeout_add(15000, self.timeout)
4346 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4348 self.add_signal(self.goNegotiationSuccess,
4349 WPAS_DBUS_IFACE_P2PDEVICE,
4350 "GONegotiationSuccess",
4352 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4354 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4356 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4357 "PropertiesChanged")
4361 def deviceFound(self, path):
4362 logger.debug("deviceFound: path=%s" % path)
4363 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4364 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4368 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4370 raise Exception("Timeout while waiting for GO Neg Request")
4371 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4372 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4374 raise Exception("Group formation timed out")
4375 self.sta_group_ev = ev
4377 def goNegotiationSuccess(self, properties):
4378 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4380 def groupStarted(self, properties):
4381 logger.debug("groupStarted: " + str(properties))
4382 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4383 properties['interface_object'])
4384 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4385 dev1.group_form_result(self.sta_group_ev)
4388 def groupFinished(self, properties):
4389 logger.debug("groupFinished: " + str(properties))
4392 def propertiesChanged(self, interface_name, changed_properties,
4393 invalidated_properties):
4394 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4395 if interface_name != WPAS_DBUS_P2P_PEER:
4397 if "Groups" not in changed_properties:
4399 if len(changed_properties["Groups"]) > 0:
4400 self.peer_group_added = True
4401 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4402 self.peer_group_removed = True
4405 def run_test(self, *args):
4406 logger.debug("run_test")
4407 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4411 return self.done and self.peer_group_added and self.peer_group_removed
4413 with TestDbusP2p(bus) as t:
4415 raise Exception("Expected signals not seen")
4417 def test_dbus_p2p_group_idle_timeout(dev, apdev):
4418 """D-Bus P2P group removal on idle timeout"""
4420 dev[0].global_request("SET p2p_group_idle 1")
4421 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4423 dev[0].global_request("SET p2p_group_idle 0")
4425 def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4426 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4427 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4428 addr0 = dev[0].p2p_dev_addr()
4431 class TestDbusP2p(TestDbus):
4432 def __init__(self, bus):
4433 TestDbus.__init__(self, bus)
4435 self.group_started = False
4436 self.peer_group_added = False
4437 self.peer_group_removed = False
4439 def __enter__(self):
4440 gobject.timeout_add(1, self.run_test)
4441 gobject.timeout_add(15000, self.timeout)
4442 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4444 self.add_signal(self.goNegotiationSuccess,
4445 WPAS_DBUS_IFACE_P2PDEVICE,
4446 "GONegotiationSuccess",
4448 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4450 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4452 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4453 "PropertiesChanged")
4457 def deviceFound(self, path):
4458 logger.debug("deviceFound: path=%s" % path)
4459 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4460 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4464 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4466 raise Exception("Timeout while waiting for GO Neg Request")
4467 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4468 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4470 raise Exception("Group formation timed out")
4471 self.sta_group_ev = ev
4473 def goNegotiationSuccess(self, properties):
4474 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4476 def groupStarted(self, properties):
4477 logger.debug("groupStarted: " + str(properties))
4478 self.group_started = True
4479 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4480 properties['interface_object'])
4481 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4482 dev1.group_form_result(self.sta_group_ev)
4483 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4484 # Force disassociation with different reason code so that the
4485 # P2P Client using D-Bus does not get normal group termination event
4487 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4490 def groupFinished(self, properties):
4491 logger.debug("groupFinished: " + str(properties))
4494 def propertiesChanged(self, interface_name, changed_properties,
4495 invalidated_properties):
4496 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4497 if interface_name != WPAS_DBUS_P2P_PEER:
4499 if not self.group_started:
4501 if "Groups" not in changed_properties:
4503 if len(changed_properties["Groups"]) > 0:
4504 self.peer_group_added = True
4505 if len(changed_properties["Groups"]) == 0:
4506 self.peer_group_removed = True
4509 def run_test(self, *args):
4510 logger.debug("run_test")
4511 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4515 return self.done and self.peer_group_added and self.peer_group_removed
4517 with TestDbusP2p(bus) as t:
4519 raise Exception("Expected signals not seen")
4521 def test_dbus_p2p_wps_failure(dev, apdev):
4522 """D-Bus P2P WPS failure"""
4523 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4524 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4525 addr0 = dev[0].p2p_dev_addr()
4527 class TestDbusP2p(TestDbus):
4528 def __init__(self, bus):
4529 TestDbus.__init__(self, bus)
4530 self.wps_failed = False
4531 self.formation_failure = False
4533 def __enter__(self):
4534 gobject.timeout_add(1, self.run_test)
4535 gobject.timeout_add(15000, self.timeout)
4536 self.add_signal(self.goNegotiationRequest,
4537 WPAS_DBUS_IFACE_P2PDEVICE,
4538 "GONegotiationRequest",
4540 self.add_signal(self.goNegotiationSuccess,
4541 WPAS_DBUS_IFACE_P2PDEVICE,
4542 "GONegotiationSuccess",
4544 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4546 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4548 self.add_signal(self.groupFormationFailure,
4549 WPAS_DBUS_IFACE_P2PDEVICE,
4550 "GroupFormationFailure")
4554 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4555 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4556 if dev_passwd_id != 1:
4557 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4558 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4562 def goNegotiationSuccess(self, properties):
4563 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4565 def groupStarted(self, properties):
4566 logger.debug("groupStarted: " + str(properties))
4567 raise Exception("Unexpected GroupStarted")
4569 def wpsFailed(self, name, args):
4570 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
4571 self.wps_failed = True
4572 if self.formation_failure:
4575 def groupFormationFailure(self, reason):
4576 logger.debug("groupFormationFailure - reason=%s" % reason)
4577 self.formation_failure = True
4581 def run_test(self, *args):
4582 logger.debug("run_test")
4584 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4585 if not dev1.discover_peer(addr0):
4586 raise Exception("Peer not found")
4587 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4591 return self.wps_failed and self.formation_failure
4593 with TestDbusP2p(bus) as t:
4595 raise Exception("Expected signals not seen")
4597 def test_dbus_p2p_two_groups(dev, apdev):
4598 """D-Bus P2P with two concurrent groups"""
4599 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4600 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4602 dev[0].request("SET p2p_no_group_iface 0")
4603 addr0 = dev[0].p2p_dev_addr()
4604 addr1 = dev[1].p2p_dev_addr()
4605 addr2 = dev[2].p2p_dev_addr()
4606 dev[1].p2p_start_go(freq=2412)
4607 dev1_group_ifname = dev[1].group_ifname
4609 class TestDbusP2p(TestDbus):
4610 def __init__(self, bus):
4611 TestDbus.__init__(self, bus)
4617 self.groups_removed = False
4619 def __enter__(self):
4620 gobject.timeout_add(1, self.run_test)
4621 gobject.timeout_add(15000, self.timeout)
4622 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4623 "PropertiesChanged", byte_arrays=True)
4624 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4626 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4628 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4630 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4635 def propertiesChanged(self, interface_name, changed_properties,
4636 invalidated_properties):
4637 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4639 def deviceFound(self, path):
4640 logger.debug("deviceFound: path=%s" % path)
4641 if addr2.replace(':','') in path:
4643 elif addr1.replace(':','') in path:
4645 if self.go and not self.group1:
4646 logger.info("Join the group")
4649 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4650 dev1.group_ifname = dev1_group_ifname
4651 dev1.group_request("WPS_PIN any " + pin)
4652 args = { 'peer': self.go,
4654 'wps_method': 'pin',
4659 def groupStarted(self, properties):
4660 logger.debug("groupStarted: " + str(properties))
4661 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4662 dbus_interface=dbus.PROPERTIES_IFACE)
4663 logger.debug("p2pdevice properties: " + str(prop))
4665 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4666 properties['group_object'])
4667 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4668 dbus_interface=dbus.PROPERTIES_IFACE,
4670 logger.debug("Group properties: " + str(res))
4673 self.group1 = properties['group_object']
4674 self.group1iface = properties['interface_object']
4675 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4678 logger.info("Start autonomous GO")
4679 params = dbus.Dictionary({ 'frequency': 2412 })
4680 p2p.GroupAdd(params)
4681 elif not self.group2:
4682 self.group2 = properties['group_object']
4683 self.group2iface = properties['interface_object']
4684 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4686 self.g2_bssid = res['BSSID']
4688 if self.group1 and self.group2:
4689 logger.info("Authorize peer to join the group")
4690 a2 = binascii.unhexlify(addr2.replace(':',''))
4691 params = { 'Role': 'enrollee',
4692 'P2PDeviceAddress': dbus.ByteArray(a2),
4693 'Bssid': dbus.ByteArray(a2),
4696 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4699 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4700 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4701 dev2.scan_for_bss(bssid, freq=2412)
4702 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
4703 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4705 raise Exception("Group join timed out")
4706 self.dev2_group_ev = ev
4708 def groupFinished(self, properties):
4709 logger.debug("groupFinished: " + str(properties))
4711 if self.group1 == properties['group_object']:
4713 elif self.group2 == properties['group_object']:
4716 if not self.group1 and not self.group2:
4720 def peerJoined(self, peer):
4721 logger.debug("peerJoined: " + peer)
4722 if self.groups_removed:
4724 self.check_results()
4726 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4727 dev2.group_form_result(self.dev2_group_ev)
4730 logger.info("Disconnect group2")
4731 group_p2p = dbus.Interface(self.g2_if_obj,
4732 WPAS_DBUS_IFACE_P2PDEVICE)
4733 group_p2p.Disconnect()
4735 logger.info("Disconnect group1")
4736 group_p2p = dbus.Interface(self.g1_if_obj,
4737 WPAS_DBUS_IFACE_P2PDEVICE)
4738 group_p2p.Disconnect()
4739 self.groups_removed = True
4741 def check_results(self):
4742 logger.info("Check results with two concurrent groups in operation")
4744 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4745 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4746 dbus_interface=dbus.PROPERTIES_IFACE,
4749 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4750 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4751 dbus_interface=dbus.PROPERTIES_IFACE,
4754 logger.info("group1 = " + self.group1)
4755 logger.debug("Group properties: " + str(res1))
4757 logger.info("group2 = " + self.group2)
4758 logger.debug("Group properties: " + str(res2))
4760 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4761 dbus_interface=dbus.PROPERTIES_IFACE)
4762 logger.debug("p2pdevice properties: " + str(prop))
4764 if res1['Role'] != 'client':
4765 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4766 if res2['Role'] != 'GO':
4767 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4768 if prop['Role'] != 'device':
4769 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4771 if len(res2['Members']) != 1:
4772 raise Exception("Unexpected Members value for group 2")
4774 def run_test(self, *args):
4775 logger.debug("run_test")
4776 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4782 with TestDbusP2p(bus) as t:
4784 raise Exception("Expected signals not seen")
4786 dev[1].remove_group()
4788 def test_dbus_p2p_cancel(dev, apdev):
4789 """D-Bus P2P Cancel"""
4790 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4791 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4794 raise Exception("Unexpected p2p.Cancel() success")
4795 except dbus.exceptions.DBusException, e:
4798 addr0 = dev[0].p2p_dev_addr()
4801 class TestDbusP2p(TestDbus):
4802 def __init__(self, bus):
4803 TestDbus.__init__(self, bus)
4806 def __enter__(self):
4807 gobject.timeout_add(1, self.run_test)
4808 gobject.timeout_add(15000, self.timeout)
4809 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4814 def deviceFound(self, path):
4815 logger.debug("deviceFound: path=%s" % path)
4816 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4823 def run_test(self, *args):
4824 logger.debug("run_test")
4825 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4831 with TestDbusP2p(bus) as t:
4833 raise Exception("Expected signals not seen")
4835 def test_dbus_introspect(dev, apdev):
4836 """D-Bus introspection"""
4837 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4839 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4840 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4841 logger.info("Initial Introspect: " + str(res))
4842 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4843 raise Exception("Unexpected initial Introspect response: " + str(res))
4844 if "FastReauth" not in res or "PassiveScan" not in res:
4845 raise Exception("Unexpected initial Introspect response: " + str(res))
4847 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4848 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4849 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4850 logger.info("Introspect: " + str(res2))
4851 if res2 is not None:
4852 raise Exception("Unexpected Introspect response")
4854 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4855 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4856 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4857 logger.info("Introspect: " + str(res2))
4859 raise Exception("No Introspect response")
4860 if len(res2) >= len(res):
4861 raise Exception("Unexpected Introspect response")
4863 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4864 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4865 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4866 logger.info("Introspect: " + str(res2))
4868 raise Exception("No Introspect response")
4869 if len(res2) >= len(res):
4870 raise Exception("Unexpected Introspect response")
4872 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4873 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4874 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4875 logger.info("Introspect: " + str(res2))
4877 raise Exception("No Introspect response")
4878 if len(res2) >= len(res):
4879 raise Exception("Unexpected Introspect response")
4881 def test_dbus_ap(dev, apdev):
4882 """D-Bus AddNetwork for AP mode"""
4883 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4884 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4886 ssid = "test-wpa2-psk"
4887 passphrase = 'qwertyuiop'
4889 class TestDbusConnect(TestDbus):
4890 def __init__(self, bus):
4891 TestDbus.__init__(self, bus)
4892 self.started = False
4894 def __enter__(self):
4895 gobject.timeout_add(1, self.run_connect)
4896 gobject.timeout_add(15000, self.timeout)
4897 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4898 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4900 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4901 "PropertiesChanged")
4905 def networkAdded(self, network, properties):
4906 logger.debug("networkAdded: %s" % str(network))
4907 logger.debug(str(properties))
4909 def networkSelected(self, network):
4910 logger.debug("networkSelected: %s" % str(network))
4911 self.network_selected = True
4913 def propertiesChanged(self, properties):
4914 logger.debug("propertiesChanged: %s" % str(properties))
4915 if 'State' in properties and properties['State'] == "completed":
4919 def run_connect(self, *args):
4920 logger.debug("run_connect")
4921 args = dbus.Dictionary({ 'ssid': ssid,
4922 'key_mgmt': 'WPA-PSK',
4925 'frequency': 2412 },
4927 self.netw = iface.AddNetwork(args)
4928 iface.SelectNetwork(self.netw)
4934 with TestDbusConnect(bus) as t:
4936 raise Exception("Expected signals not seen")
4937 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
4939 def test_dbus_connect_wpa_eap(dev, apdev):
4940 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4941 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4942 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4944 ssid = "test-wpa-eap"
4945 params = hostapd.wpa_eap_params(ssid=ssid)
4947 params["rsn_pairwise"] = "CCMP"
4948 hapd = hostapd.add_ap(apdev[0], params)
4950 class TestDbusConnect(TestDbus):
4951 def __init__(self, bus):
4952 TestDbus.__init__(self, bus)
4955 def __enter__(self):
4956 gobject.timeout_add(1, self.run_connect)
4957 gobject.timeout_add(15000, self.timeout)
4958 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4959 "PropertiesChanged")
4960 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
4964 def propertiesChanged(self, properties):
4965 logger.debug("propertiesChanged: %s" % str(properties))
4966 if 'State' in properties and properties['State'] == "completed":
4970 def eap(self, status, parameter):
4971 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
4973 def run_connect(self, *args):
4974 logger.debug("run_connect")
4975 args = dbus.Dictionary({ 'ssid': ssid,
4976 'key_mgmt': 'WPA-EAP',
4979 'password': 'password',
4980 'ca_cert': 'auth_serv/ca.pem',
4981 'phase2': 'auth=MSCHAPV2',
4982 'scan_freq': 2412 },
4984 self.netw = iface.AddNetwork(args)
4985 iface.SelectNetwork(self.netw)
4991 with TestDbusConnect(bus) as t:
4993 raise Exception("Expected signals not seen")
4995 def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4996 """AP_SCAN 2 AP mode and D-Bus Scan()"""
4998 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5000 dev[0].request("AP_SCAN 1")
5002 def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5003 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5004 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5006 if "OK" not in dev[0].request("AP_SCAN 2"):
5007 raise Exception("Failed to set AP_SCAN 2")
5009 id = dev[0].add_network()
5010 dev[0].set_network(id, "mode", "2")
5011 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5012 dev[0].set_network(id, "key_mgmt", "NONE")
5013 dev[0].set_network(id, "frequency", "2412")
5014 dev[0].set_network(id, "scan_freq", "2412")
5015 dev[0].set_network(id, "disabled", "0")
5016 dev[0].select_network(id)
5017 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5019 raise Exception("AP failed to start")
5021 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5022 iface.Scan({'Type': 'active',
5024 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5025 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5026 "AP-DISABLED"], timeout=5)
5028 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5029 if "AP-DISABLED" in ev:
5030 raise Exception("Unexpected AP-DISABLED event")
5032 # Wait for the retry to scan happen
5033 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5034 "AP-DISABLED"], timeout=5)
5036 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5037 if "AP-DISABLED" in ev:
5038 raise Exception("Unexpected AP-DISABLED event - retry")
5040 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5041 dev[1].request("DISCONNECT")
5042 dev[1].wait_disconnected()
5043 dev[0].request("DISCONNECT")
5044 dev[0].wait_disconnected()
5046 def test_dbus_expectdisconnect(dev, apdev):
5047 """D-Bus ExpectDisconnect"""
5048 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5049 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5051 params = { "ssid": "test-open" }
5052 hapd = hostapd.add_ap(apdev[0], params)
5053 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5055 # This does not really verify the behavior other than by going through the
5056 # code path for additional coverage.
5057 wpas.ExpectDisconnect()
5058 dev[0].request("DISCONNECT")
5059 dev[0].wait_disconnected()
5061 def test_dbus_save_config(dev, apdev):
5062 """D-Bus SaveConfig"""
5063 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5064 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5067 raise Exception("SaveConfig() accepted unexpectedly")
5068 except dbus.exceptions.DBusException, e:
5069 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5070 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5072 def test_dbus_vendor_elem(dev, apdev):
5073 """D-Bus vendor element operations"""
5075 _test_dbus_vendor_elem(dev, apdev)
5077 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5079 def _test_dbus_vendor_elem(dev, apdev):
5080 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5081 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5083 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5086 ie = dbus.ByteArray("\x00\x00")
5087 iface.VendorElemAdd(-1, ie)
5088 raise Exception("Invalid VendorElemAdd() accepted")
5089 except dbus.exceptions.DBusException, e:
5090 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5091 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5094 ie = dbus.ByteArray("")
5095 iface.VendorElemAdd(1, ie)
5096 raise Exception("Invalid VendorElemAdd() accepted")
5097 except dbus.exceptions.DBusException, e:
5098 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5099 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5102 ie = dbus.ByteArray("\x00\x01")
5103 iface.VendorElemAdd(1, ie)
5104 raise Exception("Invalid VendorElemAdd() accepted")
5105 except dbus.exceptions.DBusException, e:
5106 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5107 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5110 iface.VendorElemGet(-1)
5111 raise Exception("Invalid VendorElemGet() accepted")
5112 except dbus.exceptions.DBusException, e:
5113 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5114 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5117 iface.VendorElemGet(1)
5118 raise Exception("Invalid VendorElemGet() accepted")
5119 except dbus.exceptions.DBusException, e:
5120 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5121 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5124 ie = dbus.ByteArray("\x00\x00")
5125 iface.VendorElemRem(-1, ie)
5126 raise Exception("Invalid VendorElemRemove() accepted")
5127 except dbus.exceptions.DBusException, e:
5128 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5129 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5132 ie = dbus.ByteArray("")
5133 iface.VendorElemRem(1, ie)
5134 raise Exception("Invalid VendorElemRemove() accepted")
5135 except dbus.exceptions.DBusException, e:
5136 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5137 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5139 iface.VendorElemRem(1, "*")
5141 ie = dbus.ByteArray("\x00\x01\x00")
5142 iface.VendorElemAdd(1, ie)
5144 val = iface.VendorElemGet(1)
5145 if len(val) != len(ie):
5146 raise Exception("Unexpected VendorElemGet length")
5147 for i in range(len(val)):
5148 if val[i] != dbus.Byte(ie[i]):
5149 raise Exception("Unexpected VendorElemGet data")
5151 ie2 = dbus.ByteArray("\xe0\x00")
5152 iface.VendorElemAdd(1, ie2)
5155 val = iface.VendorElemGet(1)
5156 if len(val) != len(ies):
5157 raise Exception("Unexpected VendorElemGet length[2]")
5158 for i in range(len(val)):
5159 if val[i] != dbus.Byte(ies[i]):
5160 raise Exception("Unexpected VendorElemGet data[2]")
5163 test_ie = dbus.ByteArray("\x01\x01")
5164 iface.VendorElemRem(1, test_ie)
5165 raise Exception("Invalid VendorElemRemove() accepted")
5166 except dbus.exceptions.DBusException, e:
5167 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5168 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5170 iface.VendorElemRem(1, ie)
5171 val = iface.VendorElemGet(1)
5172 if len(val) != len(ie2):
5173 raise Exception("Unexpected VendorElemGet length[3]")
5175 iface.VendorElemRem(1, "*")
5177 iface.VendorElemGet(1)
5178 raise Exception("Invalid VendorElemGet() accepted after removal")
5179 except dbus.exceptions.DBusException, e:
5180 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5181 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5183 def test_dbus_assoc_reject(dev, apdev):
5184 """D-Bus AssocStatusCode"""
5185 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5186 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5189 params = { "ssid": ssid,
5190 "max_listen_interval": "1" }
5191 hapd = hostapd.add_ap(apdev[0], params)
5193 class TestDbusConnect(TestDbus):
5194 def __init__(self, bus):
5195 TestDbus.__init__(self, bus)
5196 self.assoc_status_seen = False
5199 def __enter__(self):
5200 gobject.timeout_add(1, self.run_connect)
5201 gobject.timeout_add(15000, self.timeout)
5202 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5203 "PropertiesChanged")
5207 def propertiesChanged(self, properties):
5208 logger.debug("propertiesChanged: %s" % str(properties))
5209 if 'AssocStatusCode' in properties:
5210 status = properties['AssocStatusCode']
5212 logger.info("Unexpected status code: " + str(status))
5214 self.assoc_status_seen = True
5218 def run_connect(self, *args):
5219 args = dbus.Dictionary({ 'ssid': ssid,
5221 'scan_freq': 2412 },
5223 self.netw = iface.AddNetwork(args)
5224 iface.SelectNetwork(self.netw)
5228 return self.assoc_status_seen
5230 with TestDbusConnect(bus) as t:
5232 raise Exception("Expected signals not seen")