Updated to hostap_2_6
[mech_eap.git] / libeap / tests / hwsim / test_dbus.py
1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import logging
9 logger = logging.getLogger()
10 import subprocess
11 import time
12
13 try:
14     import gobject
15     import dbus
16     dbus_imported = True
17 except ImportError:
18     dbus_imported = False
19
20 import hostapd
21 from wpasupplicant import WpaSupplicant
22 from utils import HwsimSkip, alloc_fail, fail_test
23 from p2p_utils import *
24 from test_ap_tdls import connect_2sta_open
25 from test_ap_eap import check_altsubject_match_support
26
27 WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
28 WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
29 WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
30 WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
31 WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
32 WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
33 WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
34 WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
35 WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
36 WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
37
38 def prepare_dbus(dev):
39     if not dbus_imported:
40         logger.info("No dbus module available")
41         raise HwsimSkip("No dbus module available")
42     try:
43         from dbus.mainloop.glib import DBusGMainLoop
44         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
45         bus = dbus.SystemBus()
46         wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
47         wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
48         path = wpas.GetInterface(dev.ifname)
49         if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
50         return (bus,wpas_obj,path,if_obj)
51     except Exception, e:
52         raise HwsimSkip("Could not connect to D-Bus: %s" % e)
53
54 class TestDbus(object):
55     def __init__(self, bus):
56         self.loop = gobject.MainLoop()
57         self.signals = []
58         self.bus = bus
59
60     def __exit__(self, type, value, traceback):
61         for s in self.signals:
62             s.remove()
63
64     def add_signal(self, handler, interface, name, byte_arrays=False):
65         s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
66                                          signal_name=name,
67                                          byte_arrays=byte_arrays)
68         self.signals.append(s)
69
70     def timeout(self, *args):
71         logger.debug("timeout")
72         self.loop.quit()
73         return False
74
75 class alloc_fail_dbus(object):
76     def __init__(self, dev, count, funcs, operation="Operation",
77                  expected="NoMemory"):
78         self._dev = dev
79         self._count = count
80         self._funcs = funcs
81         self._operation = operation
82         self._expected = expected
83     def __enter__(self):
84         cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
85         if "OK" not in self._dev.request(cmd):
86             raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87     def __exit__(self, type, value, traceback):
88         if type is None:
89             raise Exception("%s succeeded during out-of-memory" % self._operation)
90         if type == dbus.exceptions.DBusException and self._expected in str(value):
91             return True
92         if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
93             raise Exception("%s did not trigger allocation failure" % self._operation)
94         return False
95
96 def start_ap(ap, ssid="test-wps",
97              ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
98     params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
99                "wpa_passphrase": "12345678", "wpa": "2",
100                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
101                "ap_pin": "12345670", "uuid": ap_uuid}
102     return hostapd.add_ap(ap, params)
103
104 def test_dbus_getall(dev, apdev):
105     """D-Bus GetAll"""
106     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
107
108     props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
109                             dbus_interface=dbus.PROPERTIES_IFACE)
110     logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
111
112     props = if_obj.GetAll(WPAS_DBUS_IFACE,
113                           dbus_interface=dbus.PROPERTIES_IFACE)
114     logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
115
116     props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
117                           dbus_interface=dbus.PROPERTIES_IFACE)
118     logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
119
120     res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
121                      dbus_interface=dbus.PROPERTIES_IFACE)
122     if len(res) != 0:
123         raise Exception("Unexpected BSSs entry: " + str(res))
124
125     res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
126                      dbus_interface=dbus.PROPERTIES_IFACE)
127     if len(res) != 0:
128         raise Exception("Unexpected Networks entry: " + str(res))
129
130     hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
131     bssid = apdev[0]['bssid']
132     dev[0].scan_for_bss(bssid, freq=2412)
133     id = dev[0].add_network()
134     dev[0].set_network(id, "disabled", "0")
135     dev[0].set_network_quoted(id, "ssid", "test")
136
137     res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
138                      dbus_interface=dbus.PROPERTIES_IFACE)
139     if len(res) != 1:
140         raise Exception("Missing BSSs entry: " + str(res))
141     bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
142     props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
143     logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
144     bssid_str = ''
145     for item in props['BSSID']:
146         if len(bssid_str) > 0:
147             bssid_str += ':'
148         bssid_str += '%02x' % item
149     if bssid_str != bssid:
150         raise Exception("Unexpected BSSID in BSSs entry")
151
152     res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
153                      dbus_interface=dbus.PROPERTIES_IFACE)
154     if len(res) != 1:
155         raise Exception("Missing Networks entry: " + str(res))
156     net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
157     props = net_obj.GetAll(WPAS_DBUS_NETWORK,
158                            dbus_interface=dbus.PROPERTIES_IFACE)
159     logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
160     ssid = props['Properties']['ssid']
161     if ssid != '"test"':
162         raise Exception("Unexpected SSID in network entry")
163
164 def test_dbus_getall_oom(dev, apdev):
165     """D-Bus GetAll wpa_config_get_all() OOM"""
166     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
167
168     id = dev[0].add_network()
169     dev[0].set_network(id, "disabled", "0")
170     dev[0].set_network_quoted(id, "ssid", "test")
171
172     res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
173                      dbus_interface=dbus.PROPERTIES_IFACE)
174     if len(res) != 1:
175         raise Exception("Missing Networks entry: " + str(res))
176     net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
177     for i in range(1, 50):
178         with alloc_fail(dev[0], i, "wpa_config_get_all"):
179             try:
180                 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
181                                        dbus_interface=dbus.PROPERTIES_IFACE)
182             except dbus.exceptions.DBusException, e:
183                 pass
184
185 def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
186     val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
187                        dbus_interface=dbus.PROPERTIES_IFACE,
188                        byte_arrays=byte_arrays)
189     if expect is not None and val != expect:
190         raise Exception("Unexpected %s: %s (expected: %s)" %
191                         (prop, str(val), str(expect)))
192     return val
193
194 def dbus_set(dbus, wpas_obj, prop, val):
195     wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
196                  dbus_interface=dbus.PROPERTIES_IFACE)
197
198 def test_dbus_properties(dev, apdev):
199     """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
200     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
201
202     dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
203     dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
204     dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
205     for (val,err) in [ (3, "Error.Failed: wrong property type"),
206                        ("foo", "Error.Failed: wrong debug level value") ]:
207         try:
208             dbus_set(dbus, wpas_obj, "DebugLevel", val)
209             raise Exception("Invalid DebugLevel value accepted: " + str(val))
210         except dbus.exceptions.DBusException, e:
211             if err not in str(e):
212                 raise Exception("Unexpected error message: " + str(e))
213     dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
214     dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
215
216     dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
217     dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
218     dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
219     try:
220         dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
221         raise Exception("Invalid DebugTimestamp value accepted")
222     except dbus.exceptions.DBusException, e:
223         if "Error.Failed: wrong property type" not in str(e):
224             raise Exception("Unexpected error message: " + str(e))
225     dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
226     dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
227
228     dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
229     dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
230     dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
231     try:
232         dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
233         raise Exception("Invalid DebugShowKeys value accepted")
234     except dbus.exceptions.DBusException, e:
235         if "Error.Failed: wrong property type" not in str(e):
236             raise Exception("Unexpected error message: " + str(e))
237     dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
238     dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
239
240     res = dbus_get(dbus, wpas_obj, "Interfaces")
241     if len(res) != 1:
242         raise Exception("Unexpected Interfaces value: " + str(res))
243
244     res = dbus_get(dbus, wpas_obj, "EapMethods")
245     if len(res) < 5 or "TTLS" not in res:
246         raise Exception("Unexpected EapMethods value: " + str(res))
247
248     res = dbus_get(dbus, wpas_obj, "Capabilities")
249     if len(res) < 2 or "p2p" not in res:
250         raise Exception("Unexpected Capabilities value: " + str(res))
251
252     dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
253     val = binascii.unhexlify("010006020304050608")
254     dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
255     res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
256     if val != res:
257         raise Exception("WFDIEs value changed")
258     try:
259         dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
260         raise Exception("Invalid WFDIEs value accepted")
261     except dbus.exceptions.DBusException, e:
262         if "InvalidArgs" not in str(e):
263             raise Exception("Unexpected error message: " + str(e))
264     dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
265     dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
266     dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
267     res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
268     if len(res) != 0:
269         raise Exception("WFDIEs not cleared properly")
270
271     res = dbus_get(dbus, wpas_obj, "EapMethods")
272     try:
273         dbus_set(dbus, wpas_obj, "EapMethods", res)
274         raise Exception("Invalid Set accepted")
275     except dbus.exceptions.DBusException, e:
276         if "InvalidArgs: Property is read-only" not in str(e):
277             raise Exception("Unexpected error message: " + str(e))
278
279     try:
280         wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
281                         dbus_interface=dbus.PROPERTIES_IFACE)
282         raise Exception("Unknown method accepted")
283     except dbus.exceptions.DBusException, e:
284         if "UnknownMethod" not in str(e):
285             raise Exception("Unexpected error message: " + str(e))
286
287     try:
288         wpas_obj.Get("foo", "DebugShowKeys",
289                      dbus_interface=dbus.PROPERTIES_IFACE)
290         raise Exception("Invalid Get accepted")
291     except dbus.exceptions.DBusException, e:
292         if "InvalidArgs: No such property" not in str(e):
293             raise Exception("Unexpected error message: " + str(e))
294
295     test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
296                               introspect=False)
297     try:
298         test_obj.Get(123, "DebugShowKeys",
299                      dbus_interface=dbus.PROPERTIES_IFACE)
300         raise Exception("Invalid Get accepted")
301     except dbus.exceptions.DBusException, e:
302         if "InvalidArgs: Invalid arguments" not in str(e):
303             raise Exception("Unexpected error message: " + str(e))
304     try:
305         test_obj.Get(WPAS_DBUS_SERVICE, 123,
306                      dbus_interface=dbus.PROPERTIES_IFACE)
307         raise Exception("Invalid Get accepted")
308     except dbus.exceptions.DBusException, e:
309         if "InvalidArgs: Invalid arguments" not in str(e):
310             raise Exception("Unexpected error message: " + str(e))
311
312     try:
313         wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
314                      dbus.ByteArray('', variant_level=2),
315                      dbus_interface=dbus.PROPERTIES_IFACE)
316         raise Exception("Invalid Set accepted")
317     except dbus.exceptions.DBusException, e:
318         if "InvalidArgs: invalid message format" not in str(e):
319             raise Exception("Unexpected error message: " + str(e))
320
321 def test_dbus_set_global_properties(dev, apdev):
322     """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
323     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
324
325     props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
326
327     for p in props:
328         res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
329                          dbus_interface=dbus.PROPERTIES_IFACE)
330         if res != p[1]:
331             raise Exception("Unexpected " + p[0] + " value: " + str(res))
332
333         if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
334                    dbus_interface=dbus.PROPERTIES_IFACE)
335
336         res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
337                          dbus_interface=dbus.PROPERTIES_IFACE)
338         if res != p[2]:
339             raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
340
341 def test_dbus_invalid_method(dev, apdev):
342     """D-Bus invalid method"""
343     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
344     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
345
346     try:
347         wps.Foo()
348         raise Exception("Unknown method accepted")
349     except dbus.exceptions.DBusException, e:
350         if "UnknownMethod" not in str(e):
351             raise Exception("Unexpected error message: " + str(e))
352
353     test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
354     test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
355     try:
356         test_wps.Start(123)
357         raise Exception("WPS.Start with incorrect signature accepted")
358     except dbus.exceptions.DBusException, e:
359         if "InvalidArgs: Invalid arg" not in str(e):
360             raise Exception("Unexpected error message: " + str(e))
361
362 def test_dbus_get_set_wps(dev, apdev):
363     """D-Bus Get/Set for WPS properties"""
364     try:
365         _test_dbus_get_set_wps(dev, apdev)
366     finally:
367         dev[0].request("SET wps_cred_processing 0")
368         dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
369
370 def _test_dbus_get_set_wps(dev, apdev):
371     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
372
373     if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
374                dbus_interface=dbus.PROPERTIES_IFACE)
375
376     val = "display keypad virtual_display nfc_interface"
377     dev[0].request("SET config_methods " + val)
378
379     config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
380                         dbus_interface=dbus.PROPERTIES_IFACE)
381     if config != val:
382         raise Exception("Unexpected Get(ConfigMethods) result: " + config)
383
384     val2 = "push_button display"
385     if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
386                dbus_interface=dbus.PROPERTIES_IFACE)
387     config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
388                         dbus_interface=dbus.PROPERTIES_IFACE)
389     if config != val2:
390         raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
391
392     dev[0].request("SET config_methods " + val)
393
394     for i in range(3):
395         dev[0].request("SET wps_cred_processing " + str(i))
396         val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
397                          dbus_interface=dbus.PROPERTIES_IFACE)
398         expected_val = False if i == 1 else True
399         if val != expected_val:
400             raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
401
402     class TestDbusGetSet(TestDbus):
403         def __init__(self, bus):
404             TestDbus.__init__(self, bus)
405             self.signal_received = False
406             self.signal_received_deprecated = False
407             self.sets_done = False
408
409         def __enter__(self):
410             gobject.timeout_add(1, self.run_sets)
411             gobject.timeout_add(1000, self.timeout)
412             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
413                             "PropertiesChanged")
414             self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
415                             "PropertiesChanged")
416             self.loop.run()
417             return self
418
419         def propertiesChanged(self, properties):
420             logger.debug("PropertiesChanged: " + str(properties))
421             if properties.has_key("ProcessCredentials"):
422                 self.signal_received_deprecated = True
423                 if self.sets_done and self.signal_received:
424                     self.loop.quit()
425
426         def propertiesChanged2(self, interface_name, changed_properties,
427                                invalidated_properties):
428             logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
429             if interface_name != WPAS_DBUS_IFACE_WPS:
430                 return
431             if changed_properties.has_key("ProcessCredentials"):
432                 self.signal_received = True
433                 if self.sets_done and self.signal_received_deprecated:
434                     self.loop.quit()
435
436         def run_sets(self, *args):
437             logger.debug("run_sets")
438             if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
439                        dbus.Boolean(1),
440                        dbus_interface=dbus.PROPERTIES_IFACE)
441             if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
442                           dbus_interface=dbus.PROPERTIES_IFACE) != True:
443                 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
444             if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
445                        dbus.Boolean(0),
446                        dbus_interface=dbus.PROPERTIES_IFACE)
447             if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
448                           dbus_interface=dbus.PROPERTIES_IFACE) != False:
449                 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
450
451             self.dbus_sets_done = True
452             return False
453
454         def success(self):
455             return self.signal_received and self.signal_received_deprecated
456
457     with TestDbusGetSet(bus) as t:
458         if not t.success():
459             raise Exception("No signal received for ProcessCredentials change")
460
461 def test_dbus_wps_invalid(dev, apdev):
462     """D-Bus invaldi WPS operation"""
463     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
464     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
465
466     failures = [ {'Role': 'foo', 'Type': 'pbc'},
467                  {'Role': 123, 'Type': 'pbc'},
468                  {'Type': 'pbc'},
469                  {'Role': 'enrollee'},
470                  {'Role': 'registrar'},
471                  {'Role': 'enrollee', 'Type': 123},
472                  {'Role': 'enrollee', 'Type': 'foo'},
473                  {'Role': 'enrollee', 'Type': 'pbc',
474                   'Bssid': '02:33:44:55:66:77'},
475                  {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
476                  {'Role': 'enrollee', 'Type': 'pbc',
477                   'Bssid': dbus.ByteArray('12345')},
478                  {'Role': 'enrollee', 'Type': 'pbc',
479                   'P2PDeviceAddress': 12345},
480                  {'Role': 'enrollee', 'Type': 'pbc',
481                   'P2PDeviceAddress': dbus.ByteArray('12345')},
482                  {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
483     for args in failures:
484         try:
485             wps.Start(args)
486             raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
487         except dbus.exceptions.DBusException, e:
488             if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
489                 raise Exception("Unexpected error message: " + str(e))
490
491 def test_dbus_wps_oom(dev, apdev):
492     """D-Bus WPS operation (OOM)"""
493     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
494     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
495
496     with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
497         if_obj.Get(WPAS_DBUS_IFACE, "State",
498                    dbus_interface=dbus.PROPERTIES_IFACE)
499
500     hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
501     bssid = apdev[0]['bssid']
502     dev[0].scan_for_bss(bssid, freq=2412)
503
504     time.sleep(0.05)
505     for i in range(1, 3):
506         with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
507             if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
508                        dbus_interface=dbus.PROPERTIES_IFACE)
509
510     res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
511                      dbus_interface=dbus.PROPERTIES_IFACE)
512     bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
513     with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
514         bss_obj.Get(WPAS_DBUS_BSS, "Rates",
515                     dbus_interface=dbus.PROPERTIES_IFACE)
516     with alloc_fail(dev[0], 1,
517                     "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
518         try:
519             bss_obj.Get(WPAS_DBUS_BSS, "Rates",
520                         dbus_interface=dbus.PROPERTIES_IFACE)
521         except dbus.exceptions.DBusException, e:
522             pass
523
524     id = dev[0].add_network()
525     dev[0].set_network(id, "disabled", "0")
526     dev[0].set_network_quoted(id, "ssid", "test")
527
528     for i in range(1, 3):
529         with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
530             if_obj.Get(WPAS_DBUS_IFACE, "Networks",
531                        dbus_interface=dbus.PROPERTIES_IFACE)
532
533     with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
534         dbus_get(dbus, wpas_obj, "Interfaces")
535
536     for i in range(1, 6):
537         with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
538             dbus_get(dbus, wpas_obj, "EapMethods")
539
540     with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
541                          expected="Error.Failed: Failed to set property"):
542         val2 = "push_button display"
543         if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
544                    dbus_interface=dbus.PROPERTIES_IFACE)
545
546     with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
547                          "WPS.Start",
548                          expected="UnknownError: WPS start failed"):
549         wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
550
551 def test_dbus_wps_pbc(dev, apdev):
552     """D-Bus WPS/PBC operation and signals"""
553     try:
554         _test_dbus_wps_pbc(dev, apdev)
555     finally:
556         dev[0].request("SET wps_cred_processing 0")
557
558 def _test_dbus_wps_pbc(dev, apdev):
559     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
560     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
561
562     hapd = start_ap(apdev[0])
563     hapd.request("WPS_PBC")
564     bssid = apdev[0]['bssid']
565     dev[0].scan_for_bss(bssid, freq="2412")
566     dev[0].request("SET wps_cred_processing 2")
567
568     res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
569                      dbus_interface=dbus.PROPERTIES_IFACE)
570     if len(res) != 1:
571         raise Exception("Missing BSSs entry: " + str(res))
572     bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
573     props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
574     logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
575     if 'WPS' not in props:
576         raise Exception("No WPS information in the BSS entry")
577     if 'Type' not in props['WPS']:
578         raise Exception("No Type field in the WPS dictionary")
579     if props['WPS']['Type'] != 'pbc':
580         raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
581
582     class TestDbusWps(TestDbus):
583         def __init__(self, bus, wps):
584             TestDbus.__init__(self, bus)
585             self.success_seen = False
586             self.credentials_received = False
587             self.wps = wps
588
589         def __enter__(self):
590             gobject.timeout_add(1, self.start_pbc)
591             gobject.timeout_add(15000, self.timeout)
592             self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
593             self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
594                             "Credentials")
595             self.loop.run()
596             return self
597
598         def wpsEvent(self, name, args):
599             logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
600             if name == "success":
601                 self.success_seen = True
602                 if self.credentials_received:
603                     self.loop.quit()
604
605         def credentials(self, args):
606             logger.debug("credentials: " + str(args))
607             self.credentials_received = True
608             if self.success_seen:
609                 self.loop.quit()
610
611         def start_pbc(self, *args):
612             logger.debug("start_pbc")
613             self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
614             return False
615
616         def success(self):
617             return self.success_seen and self.credentials_received
618
619     with TestDbusWps(bus, wps) as t:
620         if not t.success():
621             raise Exception("Failure in D-Bus operations")
622
623     dev[0].wait_connected(timeout=10)
624     dev[0].request("DISCONNECT")
625     hapd.disable()
626     dev[0].flush_scan_cache()
627
628 def test_dbus_wps_pbc_overlap(dev, apdev):
629     """D-Bus WPS/PBC operation and signal for PBC overlap"""
630     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
631     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
632
633     hapd = start_ap(apdev[0])
634     hapd2 = start_ap(apdev[1], ssid="test-wps2",
635                      ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
636     hapd.request("WPS_PBC")
637     hapd2.request("WPS_PBC")
638     bssid = apdev[0]['bssid']
639     dev[0].scan_for_bss(bssid, freq="2412")
640     bssid2 = apdev[1]['bssid']
641     dev[0].scan_for_bss(bssid2, freq="2412")
642
643     class TestDbusWps(TestDbus):
644         def __init__(self, bus, wps):
645             TestDbus.__init__(self, bus)
646             self.overlap_seen = False
647             self.wps = wps
648
649         def __enter__(self):
650             gobject.timeout_add(1, self.start_pbc)
651             gobject.timeout_add(15000, self.timeout)
652             self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
653             self.loop.run()
654             return self
655
656         def wpsEvent(self, name, args):
657             logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
658             if name == "pbc-overlap":
659                 self.overlap_seen = True
660                 self.loop.quit()
661
662         def start_pbc(self, *args):
663             logger.debug("start_pbc")
664             self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
665             return False
666
667         def success(self):
668             return self.overlap_seen
669
670     with TestDbusWps(bus, wps) as t:
671         if not t.success():
672             raise Exception("Failure in D-Bus operations")
673
674     dev[0].request("WPS_CANCEL")
675     dev[0].request("DISCONNECT")
676     hapd.disable()
677     dev[0].flush_scan_cache()
678
679 def test_dbus_wps_pin(dev, apdev):
680     """D-Bus WPS/PIN operation and signals"""
681     try:
682         _test_dbus_wps_pin(dev, apdev)
683     finally:
684         dev[0].request("SET wps_cred_processing 0")
685
686 def _test_dbus_wps_pin(dev, apdev):
687     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
688     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
689
690     hapd = start_ap(apdev[0])
691     hapd.request("WPS_PIN any 12345670")
692     bssid = apdev[0]['bssid']
693     dev[0].scan_for_bss(bssid, freq="2412")
694     dev[0].request("SET wps_cred_processing 2")
695
696     class TestDbusWps(TestDbus):
697         def __init__(self, bus):
698             TestDbus.__init__(self, bus)
699             self.success_seen = False
700             self.credentials_received = False
701
702         def __enter__(self):
703             gobject.timeout_add(1, self.start_pin)
704             gobject.timeout_add(15000, self.timeout)
705             self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
706             self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
707                             "Credentials")
708             self.loop.run()
709             return self
710
711         def wpsEvent(self, name, args):
712             logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
713             if name == "success":
714                 self.success_seen = True
715                 if self.credentials_received:
716                     self.loop.quit()
717
718         def credentials(self, args):
719             logger.debug("credentials: " + str(args))
720             self.credentials_received = True
721             if self.success_seen:
722                 self.loop.quit()
723
724         def start_pin(self, *args):
725             logger.debug("start_pin")
726             bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
727             wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
728                        'Bssid': bssid_ay})
729             return False
730
731         def success(self):
732             return self.success_seen and self.credentials_received
733
734     with TestDbusWps(bus) as t:
735         if not t.success():
736             raise Exception("Failure in D-Bus operations")
737
738     dev[0].wait_connected(timeout=10)
739
740 def test_dbus_wps_pin2(dev, apdev):
741     """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
742     try:
743         _test_dbus_wps_pin2(dev, apdev)
744     finally:
745         dev[0].request("SET wps_cred_processing 0")
746
747 def _test_dbus_wps_pin2(dev, apdev):
748     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
749     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
750
751     hapd = start_ap(apdev[0])
752     bssid = apdev[0]['bssid']
753     dev[0].scan_for_bss(bssid, freq="2412")
754     dev[0].request("SET wps_cred_processing 2")
755
756     class TestDbusWps(TestDbus):
757         def __init__(self, bus):
758             TestDbus.__init__(self, bus)
759             self.success_seen = False
760             self.failed = False
761
762         def __enter__(self):
763             gobject.timeout_add(1, self.start_pin)
764             gobject.timeout_add(15000, self.timeout)
765             self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
766             self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
767                             "Credentials")
768             self.loop.run()
769             return self
770
771         def wpsEvent(self, name, args):
772             logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
773             if name == "success":
774                 self.success_seen = True
775                 if self.credentials_received:
776                     self.loop.quit()
777
778         def credentials(self, args):
779             logger.debug("credentials: " + str(args))
780             self.credentials_received = True
781             if self.success_seen:
782                 self.loop.quit()
783
784         def start_pin(self, *args):
785             logger.debug("start_pin")
786             bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
787             res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
788                              'Bssid': bssid_ay})
789             pin = res['Pin']
790             h = hostapd.Hostapd(apdev[0]['ifname'])
791             h.request("WPS_PIN any " + pin)
792             return False
793
794         def success(self):
795             return self.success_seen and self.credentials_received
796
797     with TestDbusWps(bus) as t:
798         if not t.success():
799             raise Exception("Failure in D-Bus operations")
800
801     dev[0].wait_connected(timeout=10)
802
803 def test_dbus_wps_pin_m2d(dev, apdev):
804     """D-Bus WPS/PIN operation and signals with M2D"""
805     try:
806         _test_dbus_wps_pin_m2d(dev, apdev)
807     finally:
808         dev[0].request("SET wps_cred_processing 0")
809
810 def _test_dbus_wps_pin_m2d(dev, apdev):
811     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
812     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
813
814     hapd = start_ap(apdev[0])
815     bssid = apdev[0]['bssid']
816     dev[0].scan_for_bss(bssid, freq="2412")
817     dev[0].request("SET wps_cred_processing 2")
818
819     class TestDbusWps(TestDbus):
820         def __init__(self, bus):
821             TestDbus.__init__(self, bus)
822             self.success_seen = False
823             self.credentials_received = False
824
825         def __enter__(self):
826             gobject.timeout_add(1, self.start_pin)
827             gobject.timeout_add(15000, self.timeout)
828             self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
829             self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
830                             "Credentials")
831             self.loop.run()
832             return self
833
834         def wpsEvent(self, name, args):
835             logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
836             if name == "success":
837                 self.success_seen = True
838                 if self.credentials_received:
839                     self.loop.quit()
840             elif name == "m2d":
841                 h = hostapd.Hostapd(apdev[0]['ifname'])
842                 h.request("WPS_PIN any 12345670")
843
844         def credentials(self, args):
845             logger.debug("credentials: " + str(args))
846             self.credentials_received = True
847             if self.success_seen:
848                 self.loop.quit()
849
850         def start_pin(self, *args):
851             logger.debug("start_pin")
852             bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
853             wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
854                        'Bssid': bssid_ay})
855             return False
856
857         def success(self):
858             return self.success_seen and self.credentials_received
859
860     with TestDbusWps(bus) as t:
861         if not t.success():
862             raise Exception("Failure in D-Bus operations")
863
864     dev[0].wait_connected(timeout=10)
865
866 def test_dbus_wps_reg(dev, apdev):
867     """D-Bus WPS/Registrar operation and signals"""
868     try:
869         _test_dbus_wps_reg(dev, apdev)
870     finally:
871         dev[0].request("SET wps_cred_processing 0")
872
873 def _test_dbus_wps_reg(dev, apdev):
874     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
875     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
876
877     hapd = start_ap(apdev[0])
878     hapd.request("WPS_PIN any 12345670")
879     bssid = apdev[0]['bssid']
880     dev[0].scan_for_bss(bssid, freq="2412")
881     dev[0].request("SET wps_cred_processing 2")
882
883     class TestDbusWps(TestDbus):
884         def __init__(self, bus):
885             TestDbus.__init__(self, bus)
886             self.credentials_received = False
887
888         def __enter__(self):
889             gobject.timeout_add(100, self.start_reg)
890             gobject.timeout_add(15000, self.timeout)
891             self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
892             self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
893                             "Credentials")
894             self.loop.run()
895             return self
896
897         def wpsEvent(self, name, args):
898             logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
899
900         def credentials(self, args):
901             logger.debug("credentials: " + str(args))
902             self.credentials_received = True
903             self.loop.quit()
904
905         def start_reg(self, *args):
906             logger.debug("start_reg")
907             bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
908             wps.Start({'Role': 'registrar', 'Type': 'pin',
909                        'Pin': '12345670', 'Bssid': bssid_ay})
910             return False
911
912         def success(self):
913             return self.credentials_received
914
915     with TestDbusWps(bus) as t:
916         if not t.success():
917             raise Exception("Failure in D-Bus operations")
918
919     dev[0].wait_connected(timeout=10)
920
921 def test_dbus_wps_cancel(dev, apdev):
922     """D-Bus WPS Cancel operation"""
923     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
924     wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
925
926     hapd = start_ap(apdev[0])
927     bssid = apdev[0]['bssid']
928
929     wps.Cancel()
930     dev[0].scan_for_bss(bssid, freq="2412")
931     bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
932     wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
933                'Bssid': bssid_ay})
934     wps.Cancel()
935     dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
936
937 def test_dbus_scan_invalid(dev, apdev):
938     """D-Bus invalid scan method"""
939     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
940     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
941
942     tests = [ ({}, "InvalidArgs"),
943               ({'Type': 123}, "InvalidArgs"),
944               ({'Type': 'foo'}, "InvalidArgs"),
945               ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
946               ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
947               ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
948               ({'Type': 'active',
949                 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
950                            dbus.ByteArray("3"), dbus.ByteArray("4"),
951                            dbus.ByteArray("5"), dbus.ByteArray("6"),
952                            dbus.ByteArray("7"), dbus.ByteArray("8"),
953                            dbus.ByteArray("9"), dbus.ByteArray("10"),
954                            dbus.ByteArray("11"), dbus.ByteArray("12"),
955                            dbus.ByteArray("13"), dbus.ByteArray("14"),
956                            dbus.ByteArray("15"), dbus.ByteArray("16"),
957                            dbus.ByteArray("17") ]},
958                "InvalidArgs"),
959               ({'Type': 'active',
960                 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
961                "InvalidArgs"),
962               ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
963               ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
964               ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
965               ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
966               ({'Type': 'active',
967                 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
968                "InvalidArgs"),
969               ({'Type': 'active',
970                 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
971                "InvalidArgs"),
972               ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
973               ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
974                "InvalidArgs"),
975               ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
976                "InvalidArgs")]
977     for (t,err) in tests:
978         try:
979             iface.Scan(t)
980             raise Exception("Invalid Scan() arguments accepted: " + str(t))
981         except dbus.exceptions.DBusException, e:
982             if err not in str(e):
983                 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
984
985 def test_dbus_scan_oom(dev, apdev):
986     """D-Bus scan method and OOM"""
987     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
988     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
989
990     with alloc_fail_dbus(dev[0], 1,
991                          "wpa_scan_clone_params;wpas_dbus_handler_scan",
992                          "Scan", expected="ScanError: Scan request rejected"):
993         iface.Scan({ 'Type': 'passive',
994                      'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
995
996     with alloc_fail_dbus(dev[0], 1,
997                          "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
998                          "Scan"):
999         iface.Scan({ 'Type': 'passive',
1000                      'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1001
1002     with alloc_fail_dbus(dev[0], 1,
1003                          "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1004                          "Scan"):
1005         iface.Scan({ 'Type': 'active',
1006                      'IEs': [ dbus.ByteArray("\xdd\x00") ],
1007                      'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1008
1009     with alloc_fail_dbus(dev[0], 1,
1010                          "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1011                          "Scan"):
1012         iface.Scan({ 'Type': 'active',
1013                      'SSIDs': [ dbus.ByteArray("open"),
1014                                 dbus.ByteArray() ],
1015                      'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1016
1017 def test_dbus_scan(dev, apdev):
1018     """D-Bus scan and related signals"""
1019     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1020     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1021
1022     hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
1023
1024     class TestDbusScan(TestDbus):
1025         def __init__(self, bus):
1026             TestDbus.__init__(self, bus)
1027             self.scan_completed = 0
1028             self.bss_added = False
1029             self.fail_reason = None
1030
1031         def __enter__(self):
1032             gobject.timeout_add(1, self.run_scan)
1033             gobject.timeout_add(15000, self.timeout)
1034             self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1035             self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1036             self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1037             self.loop.run()
1038             return self
1039
1040         def scanDone(self, success):
1041             logger.debug("scanDone: success=%s" % success)
1042             self.scan_completed += 1
1043             if self.scan_completed == 1:
1044                 iface.Scan({'Type': 'passive',
1045                             'AllowRoam': True,
1046                             'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1047             elif self.scan_completed == 2:
1048                 iface.Scan({'Type': 'passive',
1049                             'AllowRoam': False})
1050             elif self.bss_added and self.scan_completed == 3:
1051                 self.loop.quit()
1052
1053         def bssAdded(self, bss, properties):
1054             logger.debug("bssAdded: %s" % bss)
1055             logger.debug(str(properties))
1056             if 'WPS' in properties:
1057                 if 'Type' in properties['WPS']:
1058                     self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1059                     self.loop.quit()
1060             self.bss_added = True
1061             if self.scan_completed == 3:
1062                 self.loop.quit()
1063
1064         def bssRemoved(self, bss):
1065             logger.debug("bssRemoved: %s" % bss)
1066
1067         def run_scan(self, *args):
1068             logger.debug("run_scan")
1069             iface.Scan({'Type': 'active',
1070                         'SSIDs': [ dbus.ByteArray("open"),
1071                                    dbus.ByteArray() ],
1072                         'IEs': [ dbus.ByteArray("\xdd\x00"),
1073                                  dbus.ByteArray() ],
1074                         'AllowRoam': False,
1075                         'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1076             return False
1077
1078         def success(self):
1079             return self.scan_completed == 3 and self.bss_added
1080
1081     with TestDbusScan(bus) as t:
1082         if t.fail_reason:
1083             raise Exception(t.fail_reason)
1084         if not t.success():
1085             raise Exception("Expected signals not seen")
1086
1087     res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1088                      dbus_interface=dbus.PROPERTIES_IFACE)
1089     if len(res) < 1:
1090         raise Exception("Scan result not in BSSs property")
1091     iface.FlushBSS(0)
1092     res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1093                      dbus_interface=dbus.PROPERTIES_IFACE)
1094     if len(res) != 0:
1095         raise Exception("FlushBSS() did not remove scan results from BSSs property")
1096     iface.FlushBSS(1)
1097
1098 def test_dbus_scan_busy(dev, apdev):
1099     """D-Bus scan trigger rejection when busy with previous scan"""
1100     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1101     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1102
1103     if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1104         raise Exception("Failed to start scan")
1105     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1106     if ev is None:
1107         raise Exception("Scan start timed out")
1108
1109     try:
1110         iface.Scan({'Type': 'active', 'AllowRoam': False})
1111         raise Exception("Scan() accepted when busy")
1112     except dbus.exceptions.DBusException, e:
1113         if "ScanError: Scan request reject" not in str(e):
1114             raise Exception("Unexpected error message: " + str(e))
1115
1116     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1117     if ev is None:
1118         raise Exception("Scan timed out")
1119
1120 def test_dbus_connect(dev, apdev):
1121     """D-Bus AddNetwork and connect"""
1122     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1123     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1124
1125     ssid = "test-wpa2-psk"
1126     passphrase = 'qwertyuiop'
1127     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1128     hapd = hostapd.add_ap(apdev[0], params)
1129
1130     class TestDbusConnect(TestDbus):
1131         def __init__(self, bus):
1132             TestDbus.__init__(self, bus)
1133             self.network_added = False
1134             self.network_selected = False
1135             self.network_removed = False
1136             self.state = 0
1137
1138         def __enter__(self):
1139             gobject.timeout_add(1, self.run_connect)
1140             gobject.timeout_add(15000, self.timeout)
1141             self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1142             self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1143                             "NetworkRemoved")
1144             self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1145                             "NetworkSelected")
1146             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1147                             "PropertiesChanged")
1148             self.loop.run()
1149             return self
1150
1151         def networkAdded(self, network, properties):
1152             logger.debug("networkAdded: %s" % str(network))
1153             logger.debug(str(properties))
1154             self.network_added = True
1155
1156         def networkRemoved(self, network):
1157             logger.debug("networkRemoved: %s" % str(network))
1158             self.network_removed = True
1159
1160         def networkSelected(self, network):
1161             logger.debug("networkSelected: %s" % str(network))
1162             self.network_selected = True
1163
1164         def propertiesChanged(self, properties):
1165             logger.debug("propertiesChanged: %s" % str(properties))
1166             if 'State' in properties and properties['State'] == "completed":
1167                 if self.state == 0:
1168                     self.state = 1
1169                     iface.Disconnect()
1170                 elif self.state == 2:
1171                     self.state = 3
1172                     iface.Disconnect()
1173                 elif self.state == 4:
1174                     self.state = 5
1175                     iface.Reattach()
1176                 elif self.state == 5:
1177                     self.state = 6
1178                     iface.Disconnect()
1179                 elif self.state == 7:
1180                     self.state = 8
1181                     res = iface.SignalPoll()
1182                     logger.debug("SignalPoll: " + str(res))
1183                     if 'frequency' not in res or res['frequency'] != 2412:
1184                         self.state = -1
1185                         logger.info("Unexpected SignalPoll result")
1186                     iface.RemoveNetwork(self.netw)
1187             if 'State' in properties and properties['State'] == "disconnected":
1188                 if self.state == 1:
1189                     self.state = 2
1190                     iface.SelectNetwork(self.netw)
1191                 elif self.state == 3:
1192                     self.state = 4
1193                     iface.Reassociate()
1194                 elif self.state == 6:
1195                     self.state = 7
1196                     iface.Reconnect()
1197                 elif self.state == 8:
1198                     self.state = 9
1199                     self.loop.quit()
1200
1201         def run_connect(self, *args):
1202             logger.debug("run_connect")
1203             args = dbus.Dictionary({ 'ssid': ssid,
1204                                      'key_mgmt': 'WPA-PSK',
1205                                      'psk': passphrase,
1206                                      'scan_freq': 2412 },
1207                                    signature='sv')
1208             self.netw = iface.AddNetwork(args)
1209             iface.SelectNetwork(self.netw)
1210             return False
1211
1212         def success(self):
1213             if not self.network_added or \
1214                not self.network_removed or \
1215                not self.network_selected:
1216                 return False
1217             return self.state == 9
1218
1219     with TestDbusConnect(bus) as t:
1220         if not t.success():
1221             raise Exception("Expected signals not seen")
1222
1223 def test_dbus_connect_psk_mem(dev, apdev):
1224     """D-Bus AddNetwork and connect with memory-only PSK"""
1225     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1226     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1227
1228     ssid = "test-wpa2-psk"
1229     passphrase = 'qwertyuiop'
1230     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1231     hapd = hostapd.add_ap(apdev[0], params)
1232
1233     class TestDbusConnect(TestDbus):
1234         def __init__(self, bus):
1235             TestDbus.__init__(self, bus)
1236             self.connected = False
1237
1238         def __enter__(self):
1239             gobject.timeout_add(1, self.run_connect)
1240             gobject.timeout_add(15000, self.timeout)
1241             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1242                             "PropertiesChanged")
1243             self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1244                             "NetworkRequest")
1245             self.loop.run()
1246             return self
1247
1248         def propertiesChanged(self, properties):
1249             logger.debug("propertiesChanged: %s" % str(properties))
1250             if 'State' in properties and properties['State'] == "completed":
1251                 self.connected = True
1252                 self.loop.quit()
1253
1254         def networkRequest(self, path, field, txt):
1255             logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1256             if field == "PSK_PASSPHRASE":
1257                 iface.NetworkReply(path, field, '"' + passphrase + '"')
1258
1259         def run_connect(self, *args):
1260             logger.debug("run_connect")
1261             args = dbus.Dictionary({ 'ssid': ssid,
1262                                      'key_mgmt': 'WPA-PSK',
1263                                      'mem_only_psk': 1,
1264                                      'scan_freq': 2412 },
1265                                    signature='sv')
1266             self.netw = iface.AddNetwork(args)
1267             iface.SelectNetwork(self.netw)
1268             return False
1269
1270         def success(self):
1271             return self.connected
1272
1273     with TestDbusConnect(bus) as t:
1274         if not t.success():
1275             raise Exception("Expected signals not seen")
1276
1277 def test_dbus_connect_oom(dev, apdev):
1278     """D-Bus AddNetwork and connect when out-of-memory"""
1279     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1280     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1281
1282     if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1283         raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1284
1285     ssid = "test-wpa2-psk"
1286     passphrase = 'qwertyuiop'
1287     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1288     hapd = hostapd.add_ap(apdev[0], params)
1289
1290     class TestDbusConnect(TestDbus):
1291         def __init__(self, bus):
1292             TestDbus.__init__(self, bus)
1293             self.network_added = False
1294             self.network_selected = False
1295             self.network_removed = False
1296             self.state = 0
1297
1298         def __enter__(self):
1299             gobject.timeout_add(1, self.run_connect)
1300             gobject.timeout_add(1500, self.timeout)
1301             self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1302             self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1303                             "NetworkRemoved")
1304             self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1305                             "NetworkSelected")
1306             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1307                             "PropertiesChanged")
1308             self.loop.run()
1309             return self
1310
1311         def networkAdded(self, network, properties):
1312             logger.debug("networkAdded: %s" % str(network))
1313             logger.debug(str(properties))
1314             self.network_added = True
1315
1316         def networkRemoved(self, network):
1317             logger.debug("networkRemoved: %s" % str(network))
1318             self.network_removed = True
1319
1320         def networkSelected(self, network):
1321             logger.debug("networkSelected: %s" % str(network))
1322             self.network_selected = True
1323
1324         def propertiesChanged(self, properties):
1325             logger.debug("propertiesChanged: %s" % str(properties))
1326             if 'State' in properties and properties['State'] == "completed":
1327                 if self.state == 0:
1328                     self.state = 1
1329                     iface.Disconnect()
1330                 elif self.state == 2:
1331                     self.state = 3
1332                     iface.Disconnect()
1333                 elif self.state == 4:
1334                     self.state = 5
1335                     iface.Reattach()
1336                 elif self.state == 5:
1337                     self.state = 6
1338                     res = iface.SignalPoll()
1339                     logger.debug("SignalPoll: " + str(res))
1340                     if 'frequency' not in res or res['frequency'] != 2412:
1341                         self.state = -1
1342                         logger.info("Unexpected SignalPoll result")
1343                     iface.RemoveNetwork(self.netw)
1344             if 'State' in properties and properties['State'] == "disconnected":
1345                 if self.state == 1:
1346                     self.state = 2
1347                     iface.SelectNetwork(self.netw)
1348                 elif self.state == 3:
1349                     self.state = 4
1350                     iface.Reassociate()
1351                 elif self.state == 6:
1352                     self.state = 7
1353                     self.loop.quit()
1354
1355         def run_connect(self, *args):
1356             logger.debug("run_connect")
1357             args = dbus.Dictionary({ 'ssid': ssid,
1358                                      'key_mgmt': 'WPA-PSK',
1359                                      'psk': passphrase,
1360                                      'scan_freq': 2412 },
1361                                    signature='sv')
1362             try:
1363                 self.netw = iface.AddNetwork(args)
1364             except Exception, e:
1365                 logger.info("Exception on AddNetwork: " + str(e))
1366                 self.loop.quit()
1367                 return False
1368             try:
1369                 iface.SelectNetwork(self.netw)
1370             except Exception, e:
1371                 logger.info("Exception on SelectNetwork: " + str(e))
1372                 self.loop.quit()
1373
1374             return False
1375
1376         def success(self):
1377             if not self.network_added or \
1378                not self.network_removed or \
1379                not self.network_selected:
1380                 return False
1381             return self.state == 7
1382
1383     count = 0
1384     for i in range(1, 1000):
1385         for j in range(3):
1386             dev[j].dump_monitor()
1387         dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1388         try:
1389             with TestDbusConnect(bus) as t:
1390                 if not t.success():
1391                     logger.info("Iteration %d - Expected signals not seen" % i)
1392                 else:
1393                     logger.info("Iteration %d - success" % i)
1394
1395             state = dev[0].request('GET_ALLOC_FAIL')
1396             logger.info("GET_ALLOC_FAIL: " + state)
1397             dev[0].dump_monitor()
1398             dev[0].request("TEST_ALLOC_FAIL 0:")
1399             if i < 3:
1400                 raise Exception("Connection succeeded during out-of-memory")
1401             if not state.startswith('0:'):
1402                 count += 1
1403                 if count == 5:
1404                     break
1405         except:
1406             pass
1407
1408     # Force regulatory update to re-fetch hw capabilities for the following
1409     # test cases.
1410     try:
1411         dev[0].dump_monitor()
1412         subprocess.call(['iw', 'reg', 'set', 'US'])
1413         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1414     finally:
1415         dev[0].dump_monitor()
1416         subprocess.call(['iw', 'reg', 'set', '00'])
1417         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1418
1419 def test_dbus_while_not_connected(dev, apdev):
1420     """D-Bus invalid operations while not connected"""
1421     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1422     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1423
1424     try:
1425         iface.Disconnect()
1426         raise Exception("Disconnect() accepted when not connected")
1427     except dbus.exceptions.DBusException, e:
1428         if "NotConnected" not in str(e):
1429             raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1430
1431     try:
1432         iface.Reattach()
1433         raise Exception("Reattach() accepted when not connected")
1434     except dbus.exceptions.DBusException, e:
1435         if "NotConnected" not in str(e):
1436             raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1437
1438 def test_dbus_connect_eap(dev, apdev):
1439     """D-Bus AddNetwork and connect to EAP network"""
1440     check_altsubject_match_support(dev[0])
1441     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1442     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1443
1444     ssid = "ieee8021x-open"
1445     params = hostapd.radius_params()
1446     params["ssid"] = ssid
1447     params["ieee8021x"] = "1"
1448     hapd = hostapd.add_ap(apdev[0], params)
1449
1450     class TestDbusConnect(TestDbus):
1451         def __init__(self, bus):
1452             TestDbus.__init__(self, bus)
1453             self.certification_received = False
1454             self.eap_status = False
1455             self.state = 0
1456
1457         def __enter__(self):
1458             gobject.timeout_add(1, self.run_connect)
1459             gobject.timeout_add(15000, self.timeout)
1460             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1461                             "PropertiesChanged")
1462             self.add_signal(self.certification, WPAS_DBUS_IFACE,
1463                             "Certification", byte_arrays=True)
1464             self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1465                             "NetworkRequest")
1466             self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1467             self.loop.run()
1468             return self
1469
1470         def propertiesChanged(self, properties):
1471             logger.debug("propertiesChanged: %s" % str(properties))
1472             if 'State' in properties and properties['State'] == "completed":
1473                 if self.state == 0:
1474                     self.state = 1
1475                     iface.EAPLogoff()
1476                     logger.info("Set dNSName constraint")
1477                     net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1478                     args = dbus.Dictionary({ 'altsubject_match':
1479                                              self.server_dnsname },
1480                                            signature='sv')
1481                     net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1482                                 dbus_interface=dbus.PROPERTIES_IFACE)
1483                 elif self.state == 2:
1484                     self.state = 3
1485                     iface.Disconnect()
1486                     logger.info("Set non-matching dNSName constraint")
1487                     net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1488                     args = dbus.Dictionary({ 'altsubject_match':
1489                                              self.server_dnsname + "FOO" },
1490                                            signature='sv')
1491                     net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1492                                 dbus_interface=dbus.PROPERTIES_IFACE)
1493             if 'State' in properties and properties['State'] == "disconnected":
1494                 if self.state == 1:
1495                     self.state = 2
1496                     iface.EAPLogon()
1497                     iface.SelectNetwork(self.netw)
1498                 if self.state == 3:
1499                     self.state = 4
1500                     iface.SelectNetwork(self.netw)
1501
1502         def certification(self, args):
1503             logger.debug("certification: %s" % str(args))
1504             self.certification_received = True
1505             if args['depth'] == 0:
1506                 # The test server certificate is supposed to have dNSName
1507                 if len(args['altsubject']) < 1:
1508                     raise Exception("Missing dNSName")
1509                 dnsname = args['altsubject'][0]
1510                 if not dnsname.startswith("DNS:"):
1511                     raise Exception("Expected dNSName not found: " + dnsname)
1512                 logger.info("altsubject: " + dnsname)
1513                 self.server_dnsname = dnsname
1514
1515         def eap(self, status, parameter):
1516             logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1517             if status == 'completion' and parameter == 'success':
1518                 self.eap_status = True
1519             if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1520                 self.state = 5
1521                 self.loop.quit()
1522
1523         def networkRequest(self, path, field, txt):
1524             logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1525             if field == "PASSWORD":
1526                 iface.NetworkReply(path, field, "password")
1527
1528         def run_connect(self, *args):
1529             logger.debug("run_connect")
1530             args = dbus.Dictionary({ 'ssid': ssid,
1531                                      'key_mgmt': 'IEEE8021X',
1532                                      'eapol_flags': 0,
1533                                      'eap': 'TTLS',
1534                                      'anonymous_identity': 'ttls',
1535                                      'identity': 'pap user',
1536                                      'ca_cert': 'auth_serv/ca.pem',
1537                                      'phase2': 'auth=PAP',
1538                                      'scan_freq': 2412 },
1539                                    signature='sv')
1540             self.netw = iface.AddNetwork(args)
1541             iface.SelectNetwork(self.netw)
1542             return False
1543
1544         def success(self):
1545             if not self.eap_status or not self.certification_received:
1546                 return False
1547             return self.state == 5
1548
1549     with TestDbusConnect(bus) as t:
1550         if not t.success():
1551             raise Exception("Expected signals not seen")
1552
1553 def test_dbus_network(dev, apdev):
1554     """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1555     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1556     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1557
1558     args = dbus.Dictionary({ 'ssid': "foo",
1559                              'key_mgmt': 'WPA-PSK',
1560                              'psk': "12345678",
1561                              'identity': dbus.ByteArray([ 1, 2 ]),
1562                              'priority': dbus.Int32(0),
1563                              'scan_freq': dbus.UInt32(2412) },
1564                            signature='sv')
1565     netw = iface.AddNetwork(args)
1566     id = int(dev[0].list_networks()[0]['id'])
1567     val = dev[0].get_network(id, "scan_freq")
1568     if val != "2412":
1569         raise Exception("Invalid scan_freq value: " + str(val))
1570     iface.RemoveNetwork(netw)
1571
1572     args = dbus.Dictionary({ 'ssid': "foo",
1573                              'key_mgmt': 'NONE',
1574                              'scan_freq': "2412 2432",
1575                              'freq_list': "2412 2417 2432" },
1576                            signature='sv')
1577     netw = iface.AddNetwork(args)
1578     id = int(dev[0].list_networks()[0]['id'])
1579     val = dev[0].get_network(id, "scan_freq")
1580     if val != "2412 2432":
1581         raise Exception("Invalid scan_freq value (2): " + str(val))
1582     val = dev[0].get_network(id, "freq_list")
1583     if val != "2412 2417 2432":
1584         raise Exception("Invalid freq_list value: " + str(val))
1585     iface.RemoveNetwork(netw)
1586     try:
1587         iface.RemoveNetwork(netw)
1588         raise Exception("Invalid RemoveNetwork() accepted")
1589     except dbus.exceptions.DBusException, e:
1590         if "NetworkUnknown" not in str(e):
1591             raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1592     try:
1593         iface.SelectNetwork(netw)
1594         raise Exception("Invalid SelectNetwork() accepted")
1595     except dbus.exceptions.DBusException, e:
1596         if "NetworkUnknown" not in str(e):
1597             raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1598
1599     args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1600                              'identity': "testuser", 'scan_freq': '2412' },
1601                            signature='sv')
1602     netw1 = iface.AddNetwork(args)
1603     args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1604                            signature='sv')
1605     netw2 = iface.AddNetwork(args)
1606     res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1607                      dbus_interface=dbus.PROPERTIES_IFACE)
1608     if len(res) != 2:
1609         raise Exception("Unexpected number of networks")
1610
1611     net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1612     res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1613                       dbus_interface=dbus.PROPERTIES_IFACE)
1614     if res != False:
1615         raise Exception("Added network was unexpectedly enabled by default")
1616     net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1617                 dbus_interface=dbus.PROPERTIES_IFACE)
1618     res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1619                       dbus_interface=dbus.PROPERTIES_IFACE)
1620     if res != True:
1621         raise Exception("Set(Enabled,True) did not seem to change property value")
1622     net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1623                 dbus_interface=dbus.PROPERTIES_IFACE)
1624     res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1625                       dbus_interface=dbus.PROPERTIES_IFACE)
1626     if res != False:
1627         raise Exception("Set(Enabled,False) did not seem to change property value")
1628     try:
1629         net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1630                     dbus_interface=dbus.PROPERTIES_IFACE)
1631         raise Exception("Invalid Set(Enabled,1) accepted")
1632     except dbus.exceptions.DBusException, e:
1633         if "Error.Failed: wrong property type" not in str(e):
1634             raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1635
1636     args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1637     net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1638                 dbus_interface=dbus.PROPERTIES_IFACE)
1639     res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1640                       dbus_interface=dbus.PROPERTIES_IFACE)
1641     if res['ssid'] != '"foo1new"':
1642         raise Exception("Set(Properties) failed to update ssid")
1643     if res['identity'] != '"testuser"':
1644         raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1645
1646     iface.RemoveAllNetworks()
1647     res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1648                      dbus_interface=dbus.PROPERTIES_IFACE)
1649     if len(res) != 0:
1650         raise Exception("Unexpected number of networks")
1651     iface.RemoveAllNetworks()
1652
1653     tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1654               dbus.Dictionary({ 'identity': dbus.ByteArray() },
1655                               signature='sv'),
1656               dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1657               dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1658     for args in tests:
1659         try:
1660             iface.AddNetwork(args)
1661             raise Exception("Invalid AddNetwork args accepted: " + str(args))
1662         except dbus.exceptions.DBusException, e:
1663             if "InvalidArgs" not in str(e):
1664                 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1665
1666 def test_dbus_network_oom(dev, apdev):
1667     """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1668     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1669     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1670
1671     args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1672                              'identity': "testuser", 'scan_freq': '2412' },
1673                            signature='sv')
1674     netw1 = iface.AddNetwork(args)
1675     net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1676
1677     with alloc_fail_dbus(dev[0], 1,
1678                          "wpa_config_get_all;wpas_dbus_getter_network_properties",
1679                          "Get"):
1680         net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1681                     dbus_interface=dbus.PROPERTIES_IFACE)
1682
1683     iface.RemoveAllNetworks()
1684
1685     with alloc_fail_dbus(dev[0], 1,
1686                          "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1687                          "RemoveNetwork", "InvalidArgs"):
1688         iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1689
1690     with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1691         args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1692                                signature='sv')
1693         try:
1694             netw = iface.AddNetwork(args)
1695             # Currently, AddNetwork() succeeds even if os_strdup() for path
1696             # fails, so remove the network if that occurs.
1697             iface.RemoveNetwork(netw)
1698         except dbus.exceptions.DBusException, e:
1699             pass
1700
1701     for i in range(1, 3):
1702         with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1703             try:
1704                 netw = iface.AddNetwork(args)
1705                 # Currently, AddNetwork() succeeds even if network registration
1706                 # fails, so remove the network if that occurs.
1707                 iface.RemoveNetwork(netw)
1708             except dbus.exceptions.DBusException, e:
1709                 pass
1710
1711     with alloc_fail_dbus(dev[0], 1,
1712                          "=wpa_config_add_network;wpas_dbus_handler_add_network",
1713                          "AddNetwork",
1714                          "UnknownError: wpa_supplicant could not add a network"):
1715         args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1716                                signature='sv')
1717         netw = iface.AddNetwork(args)
1718
1719     tests = [ (1,
1720                'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1721                dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1722                                signature='sv')),
1723               (1, '=set_network_properties;wpas_dbus_handler_add_network',
1724                dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1725               (1, '=set_network_properties;wpas_dbus_handler_add_network',
1726                dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1727               (1, '=set_network_properties;wpas_dbus_handler_add_network',
1728                dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1729                                signature='sv')),
1730               (1, '=set_network_properties;wpas_dbus_handler_add_network',
1731                dbus.Dictionary({ 'priority': dbus.Int32(1) },
1732                                signature='sv')),
1733               (1, '=set_network_properties;wpas_dbus_handler_add_network',
1734                dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1735                                signature='sv')) ]
1736     for (count,funcs,args) in tests:
1737         with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1738             netw = iface.AddNetwork(args)
1739
1740     if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1741                       dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1742         raise Exception("Unexpected network block added")
1743     if len(dev[0].list_networks()) > 0:
1744         raise Exception("Unexpected network block visible")
1745
1746 def test_dbus_interface(dev, apdev):
1747     """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1748     try:
1749         _test_dbus_interface(dev, apdev)
1750     finally:
1751         # Need to force P2P channel list update since the 'lo' interface
1752         # with driver=none ends up configuring default dualband channels.
1753         dev[0].request("SET country US")
1754         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1755         if ev is None:
1756             ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1757                                           timeout=1)
1758         dev[0].request("SET country 00")
1759         ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1760         if ev is None:
1761             ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1762                                           timeout=1)
1763         subprocess.call(['iw', 'reg', 'set', '00'])
1764
1765 def _test_dbus_interface(dev, apdev):
1766     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1767     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1768
1769     params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1770                              signature='sv')
1771     path = wpas.CreateInterface(params)
1772     logger.debug("New interface path: " + str(path))
1773     path2 = wpas.GetInterface("lo")
1774     if path != path2:
1775         raise Exception("Interface object mismatch")
1776
1777     params = dbus.Dictionary({ 'Ifname': 'lo',
1778                                'Driver': 'none',
1779                                'ConfigFile': 'foo',
1780                                'BridgeIfname': 'foo', },
1781                              signature='sv')
1782     try:
1783         wpas.CreateInterface(params)
1784         raise Exception("Invalid CreateInterface() accepted")
1785     except dbus.exceptions.DBusException, e:
1786         if "InterfaceExists" not in str(e):
1787             raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1788
1789     wpas.RemoveInterface(path)
1790     try:
1791         wpas.RemoveInterface(path)
1792         raise Exception("Invalid RemoveInterface() accepted")
1793     except dbus.exceptions.DBusException, e:
1794         if "InterfaceUnknown" not in str(e):
1795             raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1796
1797     params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1798                                'Foo': 123 },
1799                              signature='sv')
1800     try:
1801         wpas.CreateInterface(params)
1802         raise Exception("Invalid CreateInterface() accepted")
1803     except dbus.exceptions.DBusException, e:
1804         if "InvalidArgs" not in str(e):
1805             raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1806
1807     params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1808     try:
1809         wpas.CreateInterface(params)
1810         raise Exception("Invalid CreateInterface() accepted")
1811     except dbus.exceptions.DBusException, e:
1812         if "InvalidArgs" not in str(e):
1813             raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1814
1815     try:
1816         wpas.GetInterface("lo")
1817         raise Exception("Invalid GetInterface() accepted")
1818     except dbus.exceptions.DBusException, e:
1819         if "InterfaceUnknown" not in str(e):
1820             raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1821
1822 def test_dbus_interface_oom(dev, apdev):
1823     """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1824     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1825     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1826
1827     with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1828         params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1829                                  signature='sv')
1830         wpas.CreateInterface(params)
1831
1832     for i in range(1, 1000):
1833         dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1834         params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1835                                  signature='sv')
1836         try:
1837             npath = wpas.CreateInterface(params)
1838             wpas.RemoveInterface(npath)
1839             logger.info("CreateInterface succeeds after %d allocation failures" % i)
1840             state = dev[0].request('GET_ALLOC_FAIL')
1841             logger.info("GET_ALLOC_FAIL: " + state)
1842             dev[0].dump_monitor()
1843             dev[0].request("TEST_ALLOC_FAIL 0:")
1844             if i < 5:
1845                 raise Exception("CreateInterface succeeded during out-of-memory")
1846             if not state.startswith('0:'):
1847                 break
1848         except dbus.exceptions.DBusException, e:
1849             pass
1850
1851     for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1852         with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1853                              "CreateInterface"):
1854             params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1855             wpas.CreateInterface(params)
1856
1857 def test_dbus_blob(dev, apdev):
1858     """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1859     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1860     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1861
1862     blob = dbus.ByteArray("\x01\x02\x03")
1863     iface.AddBlob('blob1', blob)
1864     try:
1865         iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1866         raise Exception("Invalid AddBlob() accepted")
1867     except dbus.exceptions.DBusException, e:
1868         if "BlobExists" not in str(e):
1869             raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1870     res = iface.GetBlob('blob1')
1871     if len(res) != len(blob):
1872         raise Exception("Unexpected blob data length")
1873     for i in range(len(res)):
1874         if res[i] != dbus.Byte(blob[i]):
1875             raise Exception("Unexpected blob data")
1876     res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1877                      dbus_interface=dbus.PROPERTIES_IFACE)
1878     if 'blob1' not in res:
1879         raise Exception("Added blob missing from Blobs property")
1880     iface.RemoveBlob('blob1')
1881     try:
1882         iface.RemoveBlob('blob1')
1883         raise Exception("Invalid RemoveBlob() accepted")
1884     except dbus.exceptions.DBusException, e:
1885         if "BlobUnknown" not in str(e):
1886             raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1887     try:
1888         iface.GetBlob('blob1')
1889         raise Exception("Invalid GetBlob() accepted")
1890     except dbus.exceptions.DBusException, e:
1891         if "BlobUnknown" not in str(e):
1892             raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1893
1894     class TestDbusBlob(TestDbus):
1895         def __init__(self, bus):
1896             TestDbus.__init__(self, bus)
1897             self.blob_added = False
1898             self.blob_removed = False
1899
1900         def __enter__(self):
1901             gobject.timeout_add(1, self.run_blob)
1902             gobject.timeout_add(15000, self.timeout)
1903             self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1904             self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1905             self.loop.run()
1906             return self
1907
1908         def blobAdded(self, blobName):
1909             logger.debug("blobAdded: %s" % blobName)
1910             if blobName == 'blob2':
1911                 self.blob_added = True
1912
1913         def blobRemoved(self, blobName):
1914             logger.debug("blobRemoved: %s" % blobName)
1915             if blobName == 'blob2':
1916                 self.blob_removed = True
1917                 self.loop.quit()
1918
1919         def run_blob(self, *args):
1920             logger.debug("run_blob")
1921             iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1922             iface.RemoveBlob('blob2')
1923             return False
1924
1925         def success(self):
1926             return self.blob_added and self.blob_removed
1927
1928     with TestDbusBlob(bus) as t:
1929         if not t.success():
1930             raise Exception("Expected signals not seen")
1931
1932 def test_dbus_blob_oom(dev, apdev):
1933     """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1934     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1935     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937     for i in range(1, 4):
1938         with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1939                              "AddBlob"):
1940             iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1941
1942 def test_dbus_autoscan(dev, apdev):
1943     """D-Bus Autoscan()"""
1944     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1945     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1946
1947     iface.AutoScan("foo")
1948     iface.AutoScan("periodic:1")
1949     iface.AutoScan("")
1950     dev[0].request("AUTOSCAN ")
1951
1952 def test_dbus_autoscan_oom(dev, apdev):
1953     """D-Bus Autoscan() OOM"""
1954     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1955     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1956
1957     with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1958         iface.AutoScan("foo")
1959     dev[0].request("AUTOSCAN ")
1960
1961 def test_dbus_tdls_invalid(dev, apdev):
1962     """D-Bus invalid TDLS operations"""
1963     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1964     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1965
1966     hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
1967     connect_2sta_open(dev, hapd)
1968     addr1 = dev[1].p2p_interface_addr()
1969
1970     try:
1971         iface.TDLSDiscover("foo")
1972         raise Exception("Invalid TDLSDiscover() accepted")
1973     except dbus.exceptions.DBusException, e:
1974         if "InvalidArgs" not in str(e):
1975             raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1976
1977     try:
1978         iface.TDLSStatus("foo")
1979         raise Exception("Invalid TDLSStatus() accepted")
1980     except dbus.exceptions.DBusException, e:
1981         if "InvalidArgs" not in str(e):
1982             raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1983
1984     res = iface.TDLSStatus(addr1)
1985     if res != "peer does not exist":
1986         raise Exception("Unexpected TDLSStatus response")
1987
1988     try:
1989         iface.TDLSSetup("foo")
1990         raise Exception("Invalid TDLSSetup() accepted")
1991     except dbus.exceptions.DBusException, e:
1992         if "InvalidArgs" not in str(e):
1993             raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1994
1995     try:
1996         iface.TDLSTeardown("foo")
1997         raise Exception("Invalid TDLSTeardown() accepted")
1998     except dbus.exceptions.DBusException, e:
1999         if "InvalidArgs" not in str(e):
2000             raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2001
2002     try:
2003         iface.TDLSTeardown("00:11:22:33:44:55")
2004         raise Exception("TDLSTeardown accepted for unknown peer")
2005     except dbus.exceptions.DBusException, e:
2006         if "UnknownError: error performing TDLS teardown" not in str(e):
2007             raise Exception("Unexpected error message: " + str(e))
2008
2009 def test_dbus_tdls_oom(dev, apdev):
2010     """D-Bus TDLS operations during OOM"""
2011     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2012     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2013
2014     with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2015                          "UnknownError: error performing TDLS setup"):
2016         iface.TDLSSetup("00:11:22:33:44:55")
2017
2018 def test_dbus_tdls(dev, apdev):
2019     """D-Bus TDLS"""
2020     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2021     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2022
2023     hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2024     connect_2sta_open(dev, hapd)
2025
2026     addr1 = dev[1].p2p_interface_addr()
2027
2028     class TestDbusTdls(TestDbus):
2029         def __init__(self, bus):
2030             TestDbus.__init__(self, bus)
2031             self.tdls_setup = False
2032             self.tdls_teardown = False
2033
2034         def __enter__(self):
2035             gobject.timeout_add(1, self.run_tdls)
2036             gobject.timeout_add(15000, self.timeout)
2037             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2038                             "PropertiesChanged")
2039             self.loop.run()
2040             return self
2041
2042         def propertiesChanged(self, properties):
2043             logger.debug("propertiesChanged: %s" % str(properties))
2044
2045         def run_tdls(self, *args):
2046             logger.debug("run_tdls")
2047             iface.TDLSDiscover(addr1)
2048             gobject.timeout_add(100, self.run_tdls2)
2049             return False
2050
2051         def run_tdls2(self, *args):
2052             logger.debug("run_tdls2")
2053             iface.TDLSSetup(addr1)
2054             gobject.timeout_add(500, self.run_tdls3)
2055             return False
2056
2057         def run_tdls3(self, *args):
2058             logger.debug("run_tdls3")
2059             res = iface.TDLSStatus(addr1)
2060             if res == "connected":
2061                 self.tdls_setup = True
2062             else:
2063                 logger.info("Unexpected TDLSStatus: " + res)
2064             iface.TDLSTeardown(addr1)
2065             gobject.timeout_add(200, self.run_tdls4)
2066             return False
2067
2068         def run_tdls4(self, *args):
2069             logger.debug("run_tdls4")
2070             res = iface.TDLSStatus(addr1)
2071             if res == "peer does not exist":
2072                 self.tdls_teardown = True
2073             else:
2074                 logger.info("Unexpected TDLSStatus: " + res)
2075             self.loop.quit()
2076             return False
2077
2078         def success(self):
2079             return self.tdls_setup and self.tdls_teardown
2080
2081     with TestDbusTdls(bus) as t:
2082         if not t.success():
2083             raise Exception("Expected signals not seen")
2084
2085 def test_dbus_pkcs11(dev, apdev):
2086     """D-Bus SetPKCS11EngineAndModulePath()"""
2087     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2088     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2089
2090     try:
2091         iface.SetPKCS11EngineAndModulePath("foo", "bar")
2092     except dbus.exceptions.DBusException, e:
2093         if "Error.Failed: Reinit of the EAPOL" not in str(e):
2094             raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2095
2096     try:
2097         iface.SetPKCS11EngineAndModulePath("foo", "")
2098     except dbus.exceptions.DBusException, e:
2099         if "Error.Failed: Reinit of the EAPOL" not in str(e):
2100             raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2101
2102     iface.SetPKCS11EngineAndModulePath("", "bar")
2103     res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2104                      dbus_interface=dbus.PROPERTIES_IFACE)
2105     if res != "":
2106         raise Exception("Unexpected PKCS11EnginePath value: " + res)
2107     res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2108                      dbus_interface=dbus.PROPERTIES_IFACE)
2109     if res != "bar":
2110         raise Exception("Unexpected PKCS11ModulePath value: " + res)
2111
2112     iface.SetPKCS11EngineAndModulePath("", "")
2113     res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2114                      dbus_interface=dbus.PROPERTIES_IFACE)
2115     if res != "":
2116         raise Exception("Unexpected PKCS11EnginePath value: " + res)
2117     res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2118                      dbus_interface=dbus.PROPERTIES_IFACE)
2119     if res != "":
2120         raise Exception("Unexpected PKCS11ModulePath value: " + res)
2121
2122 def test_dbus_apscan(dev, apdev):
2123     """D-Bus Get/Set ApScan"""
2124     try:
2125         _test_dbus_apscan(dev, apdev)
2126     finally:
2127         dev[0].request("AP_SCAN 1")
2128
2129 def _test_dbus_apscan(dev, apdev):
2130     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2131
2132     res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2133                      dbus_interface=dbus.PROPERTIES_IFACE)
2134     if res != 1:
2135         raise Exception("Unexpected initial ApScan value: %d" % res)
2136
2137     for i in range(3):
2138         if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2139                      dbus_interface=dbus.PROPERTIES_IFACE)
2140         res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2141                          dbus_interface=dbus.PROPERTIES_IFACE)
2142         if res != i:
2143             raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2144
2145     try:
2146         if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2147                    dbus_interface=dbus.PROPERTIES_IFACE)
2148         raise Exception("Invalid Set(ApScan,-1) accepted")
2149     except dbus.exceptions.DBusException, e:
2150         if "Error.Failed: wrong property type" not in str(e):
2151             raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2152
2153     try:
2154         if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2155                    dbus_interface=dbus.PROPERTIES_IFACE)
2156         raise Exception("Invalid Set(ApScan,123) accepted")
2157     except dbus.exceptions.DBusException, e:
2158         if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2159             raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2160
2161     if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2162                dbus_interface=dbus.PROPERTIES_IFACE)
2163
2164 def test_dbus_fastreauth(dev, apdev):
2165     """D-Bus Get/Set FastReauth"""
2166     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2167
2168     res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2169                      dbus_interface=dbus.PROPERTIES_IFACE)
2170     if res != True:
2171         raise Exception("Unexpected initial FastReauth value: " + str(res))
2172
2173     for i in [ False, True ]:
2174         if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2175                      dbus_interface=dbus.PROPERTIES_IFACE)
2176         res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2177                          dbus_interface=dbus.PROPERTIES_IFACE)
2178         if res != i:
2179             raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2180
2181     try:
2182         if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2183                    dbus_interface=dbus.PROPERTIES_IFACE)
2184         raise Exception("Invalid Set(FastReauth,-1) accepted")
2185     except dbus.exceptions.DBusException, e:
2186         if "Error.Failed: wrong property type" not in str(e):
2187             raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2188
2189     if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2190                dbus_interface=dbus.PROPERTIES_IFACE)
2191
2192 def test_dbus_bss_expire(dev, apdev):
2193     """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2194     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2195
2196     if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2197                dbus_interface=dbus.PROPERTIES_IFACE)
2198     res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2199                      dbus_interface=dbus.PROPERTIES_IFACE)
2200     if res != 179:
2201         raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2202
2203     if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2204                dbus_interface=dbus.PROPERTIES_IFACE)
2205     res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2206                      dbus_interface=dbus.PROPERTIES_IFACE)
2207     if res != 3:
2208         raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2209
2210     try:
2211         if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2212                    dbus_interface=dbus.PROPERTIES_IFACE)
2213         raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2214     except dbus.exceptions.DBusException, e:
2215         if "Error.Failed: wrong property type" not in str(e):
2216             raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2217
2218     try:
2219         if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2220                    dbus_interface=dbus.PROPERTIES_IFACE)
2221         raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2222     except dbus.exceptions.DBusException, e:
2223         if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2224             raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2225
2226     try:
2227         if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2228                    dbus_interface=dbus.PROPERTIES_IFACE)
2229         raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2230     except dbus.exceptions.DBusException, e:
2231         if "Error.Failed: wrong property type" not in str(e):
2232             raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2233
2234     try:
2235         if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2236                    dbus_interface=dbus.PROPERTIES_IFACE)
2237         raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2238     except dbus.exceptions.DBusException, e:
2239         if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2240             raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2241
2242     if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2243                dbus_interface=dbus.PROPERTIES_IFACE)
2244     if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2245                dbus_interface=dbus.PROPERTIES_IFACE)
2246
2247 def test_dbus_country(dev, apdev):
2248     """D-Bus Get/Set Country"""
2249     try:
2250         _test_dbus_country(dev, apdev)
2251     finally:
2252         dev[0].request("SET country 00")
2253         subprocess.call(['iw', 'reg', 'set', '00'])
2254
2255 def _test_dbus_country(dev, apdev):
2256     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2257
2258     # work around issues with possible pending regdom event from the end of
2259     # the previous test case
2260     time.sleep(0.2)
2261     dev[0].dump_monitor()
2262
2263     if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2264                dbus_interface=dbus.PROPERTIES_IFACE)
2265     res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2266                      dbus_interface=dbus.PROPERTIES_IFACE)
2267     if res != "FI":
2268         raise Exception("Unexpected Country value %s (expected FI)" % res)
2269
2270     ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2271     if ev is None:
2272         # For now, work around separate P2P Device interface event delivery
2273         ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2274         if ev is None:
2275             raise Exception("regdom change event not seen")
2276     if "init=USER type=COUNTRY alpha2=FI" not in ev:
2277         raise Exception("Unexpected event contents: " + ev)
2278
2279     try:
2280         if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2281                    dbus_interface=dbus.PROPERTIES_IFACE)
2282         raise Exception("Invalid Set(Country,-1) accepted")
2283     except dbus.exceptions.DBusException, e:
2284         if "Error.Failed: wrong property type" not in str(e):
2285             raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2286
2287     try:
2288         if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2289                    dbus_interface=dbus.PROPERTIES_IFACE)
2290         raise Exception("Invalid Set(Country,F) accepted")
2291     except dbus.exceptions.DBusException, e:
2292         if "Error.Failed: invalid country code" not in str(e):
2293             raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2294
2295     if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2296                dbus_interface=dbus.PROPERTIES_IFACE)
2297
2298     ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2299     if ev is None:
2300         # For now, work around separate P2P Device interface event delivery
2301         ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2302         if ev is None:
2303             raise Exception("regdom change event not seen")
2304     # init=CORE was previously used due to invalid db.txt data for 00. For
2305     # now, allow both it and the new init=USER after fixed db.txt.
2306     if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2307         raise Exception("Unexpected event contents: " + ev)
2308
2309 def test_dbus_scan_interval(dev, apdev):
2310     """D-Bus Get/Set ScanInterval"""
2311     try:
2312         _test_dbus_scan_interval(dev, apdev)
2313     finally:
2314         dev[0].request("SCAN_INTERVAL 5")
2315
2316 def _test_dbus_scan_interval(dev, apdev):
2317     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2318
2319     if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2320                dbus_interface=dbus.PROPERTIES_IFACE)
2321     res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2322                      dbus_interface=dbus.PROPERTIES_IFACE)
2323     if res != 3:
2324         raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2325
2326     try:
2327         if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2328                    dbus_interface=dbus.PROPERTIES_IFACE)
2329         raise Exception("Invalid Set(ScanInterval,100) accepted")
2330     except dbus.exceptions.DBusException, e:
2331         if "Error.Failed: wrong property type" not in str(e):
2332             raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2333
2334     try:
2335         if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2336                    dbus_interface=dbus.PROPERTIES_IFACE)
2337         raise Exception("Invalid Set(ScanInterval,-1) accepted")
2338     except dbus.exceptions.DBusException, e:
2339         if "Error.Failed: scan_interval must be >= 0" not in str(e):
2340             raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2341
2342     if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2343                dbus_interface=dbus.PROPERTIES_IFACE)
2344
2345 def test_dbus_probe_req_reporting(dev, apdev):
2346     """D-Bus Probe Request reporting"""
2347     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2348
2349     dev[1].p2p_find(social=True)
2350
2351     class TestDbusProbe(TestDbus):
2352         def __init__(self, bus):
2353             TestDbus.__init__(self, bus)
2354             self.reported = False
2355
2356         def __enter__(self):
2357             gobject.timeout_add(1, self.run_test)
2358             gobject.timeout_add(15000, self.timeout)
2359             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2360                             "GroupStarted")
2361             self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2362                             byte_arrays=True)
2363             self.loop.run()
2364             return self
2365
2366         def groupStarted(self, properties):
2367             logger.debug("groupStarted: " + str(properties))
2368             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2369                                       properties['interface_object'])
2370             self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2371             self.iface.SubscribeProbeReq()
2372             self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2373
2374         def probeRequest(self, args):
2375             logger.debug("probeRequest: args=%s" % str(args))
2376             self.reported = True
2377             self.loop.quit()
2378
2379         def run_test(self, *args):
2380             logger.debug("run_test")
2381             p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2382             params = dbus.Dictionary({ 'frequency': 2412 })
2383             p2p.GroupAdd(params)
2384             return False
2385
2386         def success(self):
2387             return self.reported
2388
2389     with TestDbusProbe(bus) as t:
2390         if not t.success():
2391             raise Exception("Expected signals not seen")
2392         t.iface.UnsubscribeProbeReq()
2393         try:
2394             t.iface.UnsubscribeProbeReq()
2395             raise Exception("Invalid UnsubscribeProbeReq() accepted")
2396         except dbus.exceptions.DBusException, e:
2397             if "NoSubscription" not in str(e):
2398                 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2399         t.group_p2p.Disconnect()
2400
2401     with TestDbusProbe(bus) as t:
2402         if not t.success():
2403             raise Exception("Expected signals not seen")
2404         # On purpose, leave ProbeReq subscription in place to test automatic
2405         # cleanup.
2406
2407     dev[1].p2p_stop_find()
2408
2409 def test_dbus_probe_req_reporting_oom(dev, apdev):
2410     """D-Bus Probe Request reporting (OOM)"""
2411     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2412     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2413
2414     # Need to make sure this process has not already subscribed to avoid false
2415     # failures due to the operation succeeding due to os_strdup() not even
2416     # getting called.
2417     try:
2418         iface.UnsubscribeProbeReq()
2419         was_subscribed = True
2420     except dbus.exceptions.DBusException, e:
2421         was_subscribed = False
2422         pass
2423
2424     with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2425                          "SubscribeProbeReq"):
2426         iface.SubscribeProbeReq()
2427
2428     if was_subscribed:
2429         # On purpose, leave ProbeReq subscription in place to test automatic
2430         # cleanup.
2431         iface.SubscribeProbeReq()
2432
2433 def test_dbus_p2p_invalid(dev, apdev):
2434     """D-Bus invalid P2P operations"""
2435     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2436     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2437
2438     try:
2439         p2p.RejectPeer(path + "/Peers/00112233445566")
2440         raise Exception("Invalid RejectPeer accepted")
2441     except dbus.exceptions.DBusException, e:
2442         if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2443             raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2444
2445     try:
2446         p2p.RejectPeer("/foo")
2447         raise Exception("Invalid RejectPeer accepted")
2448     except dbus.exceptions.DBusException, e:
2449         if "InvalidArgs" not in str(e):
2450             raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2451
2452     tests = [ { },
2453               { 'peer': 'foo' },
2454               { 'foo': "bar" },
2455               { 'iface': "abc" },
2456               { 'iface': 123 } ]
2457     for t in tests:
2458         try:
2459             p2p.RemoveClient(t)
2460             raise Exception("Invalid RemoveClient accepted")
2461         except dbus.exceptions.DBusException, e:
2462             if "InvalidArgs" not in str(e):
2463                 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2464
2465     tests = [ {'DiscoveryType': 'foo'},
2466               {'RequestedDeviceTypes': 'foo'},
2467               {'RequestedDeviceTypes': ['foo']},
2468               {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2469                                         '10','11','12','13','14','15','16',
2470                                         '17']},
2471               {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2472               {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2473               {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2474               {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2475                                         dbus.ByteArray('1234567')]},
2476               {'Foo': dbus.Int16(1)},
2477               {'Foo': dbus.UInt16(1)},
2478               {'Foo': dbus.Int64(1)},
2479               {'Foo': dbus.UInt64(1)},
2480               {'Foo': dbus.Double(1.23)},
2481               {'Foo': dbus.Signature('s')},
2482               {'Foo': 'bar'}]
2483     for t in tests:
2484         try:
2485             p2p.Find(dbus.Dictionary(t))
2486             raise Exception("Invalid Find accepted")
2487         except dbus.exceptions.DBusException, e:
2488             if "InvalidArgs" not in str(e):
2489                 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2490
2491     for p in [ "/foo",
2492                "/fi/w1/wpa_supplicant1/Interfaces/1234",
2493                "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2494         try:
2495             p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2496             raise Exception("Invalid RemovePersistentGroup accepted")
2497         except dbus.exceptions.DBusException, e:
2498             if "InvalidArgs" not in str(e):
2499                 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2500
2501     try:
2502         dev[0].request("P2P_SET disabled 1")
2503         p2p.Listen(5)
2504         raise Exception("Invalid Listen accepted")
2505     except dbus.exceptions.DBusException, e:
2506         if "UnknownError: Could not start P2P listen" not in str(e):
2507             raise Exception("Unexpected error message for invalid Listen: " + str(e))
2508     finally:
2509         dev[0].request("P2P_SET disabled 0")
2510
2511     test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2512     test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2513     try:
2514         test_p2p.Listen("foo")
2515         raise Exception("Invalid Listen accepted")
2516     except dbus.exceptions.DBusException, e:
2517         if "InvalidArgs" not in str(e):
2518             raise Exception("Unexpected error message for invalid Listen: " + str(e))
2519
2520     try:
2521         dev[0].request("P2P_SET disabled 1")
2522         p2p.ExtendedListen(dbus.Dictionary({}))
2523         raise Exception("Invalid ExtendedListen accepted")
2524     except dbus.exceptions.DBusException, e:
2525         if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2526             raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2527     finally:
2528         dev[0].request("P2P_SET disabled 0")
2529
2530     try:
2531         dev[0].request("P2P_SET disabled 1")
2532         args = { 'duration1': 30000, 'interval1': 102400,
2533                  'duration2': 20000, 'interval2': 102400 }
2534         p2p.PresenceRequest(args)
2535         raise Exception("Invalid PresenceRequest accepted")
2536     except dbus.exceptions.DBusException, e:
2537         if "UnknownError: Failed to invoke presence request" not in str(e):
2538             raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2539     finally:
2540         dev[0].request("P2P_SET disabled 0")
2541
2542     try:
2543         params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2544         p2p.GroupAdd(params)
2545         raise Exception("Invalid GroupAdd accepted")
2546     except dbus.exceptions.DBusException, e:
2547         if "InvalidArgs" not in str(e):
2548             raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2549
2550     try:
2551         params = dbus.Dictionary({'persistent_group_object':
2552                                   dbus.ObjectPath(path),
2553                                   'frequency': 2412})
2554         p2p.GroupAdd(params)
2555         raise Exception("Invalid GroupAdd accepted")
2556     except dbus.exceptions.DBusException, e:
2557         if "InvalidArgs" not in str(e):
2558             raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2559
2560     try:
2561         p2p.Disconnect()
2562         raise Exception("Invalid Disconnect accepted")
2563     except dbus.exceptions.DBusException, e:
2564         if "UnknownError: failed to disconnect" not in str(e):
2565             raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2566
2567     try:
2568         dev[0].request("P2P_SET disabled 1")
2569         p2p.Flush()
2570         raise Exception("Invalid Flush accepted")
2571     except dbus.exceptions.DBusException, e:
2572         if "Error.Failed: P2P is not available for this interface" not in str(e):
2573             raise Exception("Unexpected error message for invalid Flush: " + str(e))
2574     finally:
2575         dev[0].request("P2P_SET disabled 0")
2576
2577     try:
2578         dev[0].request("P2P_SET disabled 1")
2579         args = { 'peer': path,
2580                  'join': True,
2581                  'wps_method': 'pbc',
2582                  'frequency': 2412 }
2583         pin = p2p.Connect(args)
2584         raise Exception("Invalid Connect accepted")
2585     except dbus.exceptions.DBusException, e:
2586         if "Error.Failed: P2P is not available for this interface" not in str(e):
2587             raise Exception("Unexpected error message for invalid Connect: " + str(e))
2588     finally:
2589         dev[0].request("P2P_SET disabled 0")
2590
2591     tests = [ { 'frequency': dbus.Int32(-1) },
2592               { 'wps_method': 'pbc' },
2593               { 'wps_method': 'foo' } ]
2594     for args in tests:
2595         try:
2596             pin = p2p.Connect(args)
2597             raise Exception("Invalid Connect accepted")
2598         except dbus.exceptions.DBusException, e:
2599             if "InvalidArgs" not in str(e):
2600                 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2601
2602     try:
2603         dev[0].request("P2P_SET disabled 1")
2604         args = { 'peer': path }
2605         pin = p2p.Invite(args)
2606         raise Exception("Invalid Invite accepted")
2607     except dbus.exceptions.DBusException, e:
2608         if "Error.Failed: P2P is not available for this interface" not in str(e):
2609             raise Exception("Unexpected error message for invalid Invite: " + str(e))
2610     finally:
2611         dev[0].request("P2P_SET disabled 0")
2612
2613     try:
2614         args = { 'foo': 'bar' }
2615         pin = p2p.Invite(args)
2616         raise Exception("Invalid Invite accepted")
2617     except dbus.exceptions.DBusException, e:
2618         if "InvalidArgs" not in str(e):
2619             raise Exception("Unexpected error message for invalid Connect: " + str(e))
2620
2621     tests = [ (path, 'display', "InvalidArgs"),
2622               (dbus.ObjectPath(path + "/Peers/00112233445566"),
2623                'display',
2624                "UnknownError: Failed to send provision discovery request"),
2625               (dbus.ObjectPath(path + "/Peers/00112233445566"),
2626                'keypad',
2627                "UnknownError: Failed to send provision discovery request"),
2628               (dbus.ObjectPath(path + "/Peers/00112233445566"),
2629                'pbc',
2630                "UnknownError: Failed to send provision discovery request"),
2631               (dbus.ObjectPath(path + "/Peers/00112233445566"),
2632                'pushbutton',
2633                "UnknownError: Failed to send provision discovery request"),
2634               (dbus.ObjectPath(path + "/Peers/00112233445566"),
2635                'foo', "InvalidArgs") ]
2636     for (p,method,err) in tests:
2637         try:
2638             p2p.ProvisionDiscoveryRequest(p, method)
2639             raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2640         except dbus.exceptions.DBusException, e:
2641             if err not in str(e):
2642                 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2643
2644     try:
2645         dev[0].request("P2P_SET disabled 1")
2646         if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2647                    dbus_interface=dbus.PROPERTIES_IFACE)
2648         raise Exception("Invalid Get(Peers) accepted")
2649     except dbus.exceptions.DBusException, e:
2650         if "Error.Failed: P2P is not available for this interface" not in str(e):
2651             raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2652     finally:
2653         dev[0].request("P2P_SET disabled 0")
2654
2655 def test_dbus_p2p_oom(dev, apdev):
2656     """D-Bus P2P operations and OOM"""
2657     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2658     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2659
2660     with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2661                          "Find", "InvalidArgs"):
2662         p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2663
2664     with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2665                          "Find", "InvalidArgs"):
2666         p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2667
2668     with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2669                          "Find", "InvalidArgs"):
2670         p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2671
2672     with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2673                          "Find", "InvalidArgs"):
2674         p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2675
2676     with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2677                          "Find", "InvalidArgs"):
2678         p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2679
2680     with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2681                          "Find", "InvalidArgs"):
2682         p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2683                                             dbus.ByteArray('123'),
2684                                             dbus.ByteArray('123'),
2685                                             dbus.ByteArray('123'),
2686                                             dbus.ByteArray('123'),
2687                                             dbus.ByteArray('123'),
2688                                             dbus.ByteArray('123'),
2689                                             dbus.ByteArray('123'),
2690                                             dbus.ByteArray('123'),
2691                                             dbus.ByteArray('123'),
2692                                             dbus.ByteArray('123') ] }))
2693
2694     with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2695                          "Find", "InvalidArgs"):
2696         p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2697
2698     with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2699                          "Find", "InvalidArgs"):
2700         p2p.Find(dbus.Dictionary({ 'Foo': path }))
2701
2702     with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2703                          "AddService", "InvalidArgs"):
2704         args = { 'service_type': 'bonjour',
2705                  'response': dbus.ByteArray(500*'b') }
2706         p2p.AddService(args)
2707
2708     with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2709                          "AddService", "InvalidArgs"):
2710         p2p.AddService(args)
2711
2712 def test_dbus_p2p_discovery(dev, apdev):
2713     """D-Bus P2P discovery"""
2714     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2715     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2716
2717     addr0 = dev[0].p2p_dev_addr()
2718
2719     dev[1].request("SET sec_device_type 1-0050F204-2")
2720     dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2721     dev[1].p2p_listen()
2722     addr1 = dev[1].p2p_dev_addr()
2723     a1 = binascii.unhexlify(addr1.replace(':',''))
2724
2725     wfd_devinfo = "00001c440028"
2726     dev[2].request("SET wifi_display 1")
2727     dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2728     wfd = binascii.unhexlify('000006' + wfd_devinfo)
2729     dev[2].p2p_listen()
2730     addr2 = dev[2].p2p_dev_addr()
2731     a2 = binascii.unhexlify(addr2.replace(':',''))
2732
2733     res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2734                         dbus_interface=dbus.PROPERTIES_IFACE)
2735     if 'Peers' not in res:
2736         raise Exception("GetAll result missing Peers")
2737     if len(res['Peers']) != 0:
2738         raise Exception("Unexpected peer(s) in the list")
2739
2740     args = {'DiscoveryType': 'social',
2741             'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2742             'Timeout': dbus.Int32(1) }
2743     p2p.Find(dbus.Dictionary(args))
2744     p2p.StopFind()
2745
2746     class TestDbusP2p(TestDbus):
2747         def __init__(self, bus):
2748             TestDbus.__init__(self, bus)
2749             self.found = False
2750             self.found2 = False
2751             self.found_prop = False
2752             self.lost = False
2753             self.find_stopped = False
2754
2755         def __enter__(self):
2756             gobject.timeout_add(1, self.run_test)
2757             gobject.timeout_add(15000, self.timeout)
2758             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2759                             "DeviceFound")
2760             self.add_signal(self.deviceFoundProperties,
2761                             WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2762             self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2763                             "DeviceLost")
2764             self.add_signal(self.provisionDiscoveryResponseEnterPin,
2765                             WPAS_DBUS_IFACE_P2PDEVICE,
2766                             "ProvisionDiscoveryResponseEnterPin")
2767             self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
2768                             "FindStopped")
2769             self.loop.run()
2770             return self
2771
2772         def deviceFound(self, path):
2773             logger.debug("deviceFound: path=%s" % path)
2774             res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2775                              dbus_interface=dbus.PROPERTIES_IFACE)
2776             if len(res) < 1:
2777                 raise Exception("Unexpected number of peers")
2778             if path not in res:
2779                 raise Exception("Mismatch in peer object path")
2780             peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2781             res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2782                                   dbus_interface=dbus.PROPERTIES_IFACE,
2783                                   byte_arrays=True)
2784             logger.debug("peer properties: " + str(res))
2785
2786             if res['DeviceAddress'] == a1:
2787                 if 'SecondaryDeviceTypes' not in res:
2788                     raise Exception("Missing SecondaryDeviceTypes")
2789                 sec = res['SecondaryDeviceTypes']
2790                 if len(sec) < 1:
2791                     raise Exception("Secondary device type missing")
2792                 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2793                     raise Exception("Secondary device type mismatch")
2794
2795                 if 'VendorExtension' not in res:
2796                     raise Exception("Missing VendorExtension")
2797                 vendor = res['VendorExtension']
2798                 if len(vendor) < 1:
2799                     raise Exception("Vendor extension missing")
2800                 if "\x11\x22\x33\x44" not in vendor:
2801                     raise Exception("Secondary device type mismatch")
2802
2803                 self.found = True
2804             elif res['DeviceAddress'] == a2:
2805                 if 'IEs' not in res:
2806                     raise Exception("IEs missing")
2807                 if res['IEs'] != wfd:
2808                     raise Exception("IEs mismatch")
2809                 self.found2 = True
2810             else:
2811                 raise Exception("Unexpected peer device address")
2812
2813             if self.found and self.found2:
2814                 p2p.StopFind()
2815                 p2p.RejectPeer(path)
2816                 p2p.ProvisionDiscoveryRequest(path, 'display')
2817
2818         def deviceLost(self, path):
2819             logger.debug("deviceLost: path=%s" % path)
2820             self.lost = True
2821             try:
2822                 p2p.RejectPeer(path)
2823                 raise Exception("Invalid RejectPeer accepted")
2824             except dbus.exceptions.DBusException, e:
2825                 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2826                     raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2827             self.loop.quit()
2828
2829         def deviceFoundProperties(self, path, properties):
2830             logger.debug("deviceFoundProperties: path=%s" % path)
2831             logger.debug("peer properties: " + str(properties))
2832             if properties['DeviceAddress'] == a1:
2833                 self.found_prop = True
2834
2835         def provisionDiscoveryResponseEnterPin(self, peer_object):
2836             logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2837             p2p.Flush()
2838
2839         def findStopped(self):
2840             logger.debug("findStopped")
2841             self.find_stopped = True
2842
2843         def run_test(self, *args):
2844             logger.debug("run_test")
2845             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2846                                       'Timeout': dbus.Int32(10)}))
2847             return False
2848
2849         def success(self):
2850             return self.found and self.lost and self.found2 and self.find_stopped
2851
2852     with TestDbusP2p(bus) as t:
2853         if not t.success():
2854             raise Exception("Expected signals not seen")
2855
2856     dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2857     dev[1].p2p_stop_find()
2858
2859     p2p.Listen(1)
2860     dev[2].p2p_stop_find()
2861     dev[2].request("P2P_FLUSH")
2862     if not dev[2].discover_peer(addr0):
2863         raise Exception("Peer not found")
2864     p2p.StopFind()
2865     dev[2].p2p_stop_find()
2866
2867     try:
2868         p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2869         raise Exception("Invalid ExtendedListen accepted")
2870     except dbus.exceptions.DBusException, e:
2871         if "InvalidArgs" not in str(e):
2872             raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2873
2874     p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2875     p2p.ExtendedListen(dbus.Dictionary({}))
2876     dev[0].global_request("P2P_EXT_LISTEN")
2877
2878 def test_dbus_p2p_service_discovery(dev, apdev):
2879     """D-Bus P2P service discovery"""
2880     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2881     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2882
2883     addr0 = dev[0].p2p_dev_addr()
2884     addr1 = dev[1].p2p_dev_addr()
2885
2886     bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2887     bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2888                                    
2889     args = { 'service_type': 'bonjour',
2890              'query': bonjour_query,
2891              'response': bonjour_response }
2892     p2p.AddService(args)
2893     p2p.FlushService()
2894     p2p.AddService(args)
2895
2896     try:
2897         p2p.DeleteService(args)
2898         raise Exception("Invalid DeleteService() accepted")
2899     except dbus.exceptions.DBusException, e:
2900         if "InvalidArgs" not in str(e):
2901             raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2902
2903     args = { 'service_type': 'bonjour',
2904              'query': bonjour_query }
2905     p2p.DeleteService(args)
2906     try:
2907         p2p.DeleteService(args)
2908         raise Exception("Invalid DeleteService() accepted")
2909     except dbus.exceptions.DBusException, e:
2910         if "InvalidArgs" not in str(e):
2911             raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2912
2913     args = { 'service_type': 'upnp',
2914              'version': 0x10,
2915              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2916     p2p.AddService(args)
2917     p2p.DeleteService(args)
2918     try:
2919         p2p.DeleteService(args)
2920         raise Exception("Invalid DeleteService() accepted")
2921     except dbus.exceptions.DBusException, e:
2922         if "InvalidArgs" not in str(e):
2923             raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2924
2925     tests = [ { 'service_type': 'foo' },
2926               { 'service_type': 'foo', 'query': bonjour_query },
2927               { 'service_type': 'upnp' },
2928               { 'service_type': 'upnp', 'version': 0x10 },
2929               { 'service_type': 'upnp',
2930                 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2931               { 'version': 0x10,
2932                 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2933               { 'service_type': 'upnp', 'foo': 'bar' },
2934               { 'service_type': 'bonjour' },
2935               { 'service_type': 'bonjour', 'query': 'foo' },
2936               { 'service_type': 'bonjour', 'foo': 'bar' } ]
2937     for args in tests:
2938         try:
2939             p2p.DeleteService(args)
2940             raise Exception("Invalid DeleteService() accepted")
2941         except dbus.exceptions.DBusException, e:
2942             if "InvalidArgs" not in str(e):
2943                 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2944
2945     tests = [ { 'service_type': 'foo' },
2946               { 'service_type': 'upnp' },
2947               { 'service_type': 'upnp', 'version': 0x10 },
2948               { 'service_type': 'upnp',
2949                 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2950               { 'version': 0x10,
2951                 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2952               { 'service_type': 'upnp', 'foo': 'bar' },
2953               { 'service_type': 'bonjour' },
2954               { 'service_type': 'bonjour', 'query': 'foo' },
2955               { 'service_type': 'bonjour', 'response': 'foo' },
2956               { 'service_type': 'bonjour', 'query': bonjour_query },
2957               { 'service_type': 'bonjour', 'response': bonjour_response },
2958               { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2959               { 'service_type': 'bonjour', 'foo': 'bar' } ]
2960     for args in tests:
2961         try:
2962             p2p.AddService(args)
2963             raise Exception("Invalid AddService() accepted")
2964         except dbus.exceptions.DBusException, e:
2965             if "InvalidArgs" not in str(e):
2966                 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2967
2968     args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2969     ref = p2p.ServiceDiscoveryRequest(args)
2970     p2p.ServiceDiscoveryCancelRequest(ref)
2971     try:
2972         p2p.ServiceDiscoveryCancelRequest(ref)
2973         raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2974     except dbus.exceptions.DBusException, e:
2975         if "InvalidArgs" not in str(e):
2976             raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2977     try:
2978         p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2979         raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2980     except dbus.exceptions.DBusException, e:
2981         if "InvalidArgs" not in str(e):
2982             raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2983
2984     args = { 'service_type': 'upnp',
2985              'version': 0x10,
2986              'service': 'ssdp:foo' }
2987     ref = p2p.ServiceDiscoveryRequest(args)
2988     p2p.ServiceDiscoveryCancelRequest(ref)
2989
2990     tests =  [ { 'service_type': 'foo' },
2991                { 'foo': 'bar' },
2992                { 'tlv': 'foo' },
2993                { },
2994                { 'version': 0 },
2995                { 'service_type': 'upnp',
2996                  'service': 'ssdp:foo' },
2997                { 'service_type': 'upnp',
2998                  'version': 0x10 },
2999                { 'service_type': 'upnp',
3000                  'version': 0x10,
3001                  'service': 'ssdp:foo',
3002                  'peer_object': dbus.ObjectPath(path + "/Peers") },
3003                { 'service_type': 'upnp',
3004                  'version': 0x10,
3005                  'service': 'ssdp:foo',
3006                  'peer_object': path + "/Peers" },
3007                { 'service_type': 'upnp',
3008                  'version': 0x10,
3009                  'service': 'ssdp:foo',
3010                  'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
3011     for args in tests:
3012         try:
3013             p2p.ServiceDiscoveryRequest(args)
3014             raise Exception("Invalid ServiceDiscoveryRequest accepted")
3015         except dbus.exceptions.DBusException, e:
3016             if "InvalidArgs" not in str(e):
3017                 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3018
3019     args = { 'foo': 'bar' }
3020     try:
3021         p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3022         raise Exception("Invalid ServiceDiscoveryResponse accepted")
3023     except dbus.exceptions.DBusException, e:
3024         if "InvalidArgs" not in str(e):
3025             raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3026
3027 def test_dbus_p2p_service_discovery_query(dev, apdev):
3028     """D-Bus P2P service discovery query"""
3029     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3030     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3031
3032     addr0 = dev[0].p2p_dev_addr()
3033     dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3034     dev[1].p2p_listen()
3035     addr1 = dev[1].p2p_dev_addr()
3036
3037     class TestDbusP2p(TestDbus):
3038         def __init__(self, bus):
3039             TestDbus.__init__(self, bus)
3040             self.done = False
3041
3042         def __enter__(self):
3043             gobject.timeout_add(1, self.run_test)
3044             gobject.timeout_add(15000, self.timeout)
3045             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3046                             "DeviceFound")
3047             self.add_signal(self.serviceDiscoveryResponse,
3048                             WPAS_DBUS_IFACE_P2PDEVICE,
3049                             "ServiceDiscoveryResponse", byte_arrays=True)
3050             self.loop.run()
3051             return self
3052
3053         def deviceFound(self, path):
3054             logger.debug("deviceFound: path=%s" % path)
3055             args = { 'peer_object': path,
3056                      'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3057             p2p.ServiceDiscoveryRequest(args)
3058
3059         def serviceDiscoveryResponse(self, sd_request):
3060             logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3061             self.done = True
3062             self.loop.quit()
3063
3064         def run_test(self, *args):
3065             logger.debug("run_test")
3066             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3067                                       'Timeout': dbus.Int32(10)}))
3068             return False
3069
3070         def success(self):
3071             return self.done
3072
3073     with TestDbusP2p(bus) as t:
3074         if not t.success():
3075             raise Exception("Expected signals not seen")
3076
3077     dev[1].p2p_stop_find()
3078
3079 def test_dbus_p2p_service_discovery_external(dev, apdev):
3080     """D-Bus P2P service discovery with external response"""
3081     try:
3082         _test_dbus_p2p_service_discovery_external(dev, apdev)
3083     finally:
3084         dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3085
3086 def _test_dbus_p2p_service_discovery_external(dev, apdev):
3087     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3088     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3089
3090     addr0 = dev[0].p2p_dev_addr()
3091     addr1 = dev[1].p2p_dev_addr()
3092     resp = "0300000101"
3093
3094     dev[1].request("P2P_FLUSH")
3095     dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3096     dev[1].p2p_find(social=True)
3097
3098     class TestDbusP2p(TestDbus):
3099         def __init__(self, bus):
3100             TestDbus.__init__(self, bus)
3101             self.sd = False
3102
3103         def __enter__(self):
3104             gobject.timeout_add(1, self.run_test)
3105             gobject.timeout_add(15000, self.timeout)
3106             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3107                             "DeviceFound")
3108             self.add_signal(self.serviceDiscoveryRequest,
3109                             WPAS_DBUS_IFACE_P2PDEVICE,
3110                             "ServiceDiscoveryRequest")
3111             self.loop.run()
3112             return self
3113
3114         def deviceFound(self, path):
3115             logger.debug("deviceFound: path=%s" % path)
3116
3117         def serviceDiscoveryRequest(self, sd_request):
3118             logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3119             self.sd = True
3120             args = { 'peer_object': sd_request['peer_object'],
3121                      'frequency': sd_request['frequency'],
3122                      'dialog_token': sd_request['dialog_token'],
3123                      'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3124             p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3125             self.loop.quit()
3126
3127         def run_test(self, *args):
3128             logger.debug("run_test")
3129             p2p.ServiceDiscoveryExternal(1)
3130             p2p.ServiceUpdate()
3131             p2p.Listen(15)
3132             return False
3133
3134         def success(self):
3135             return self.sd
3136
3137     with TestDbusP2p(bus) as t:
3138         if not t.success():
3139             raise Exception("Expected signals not seen")
3140
3141     ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3142     if ev is None:
3143         raise Exception("Service discovery timed out")
3144     if addr0 not in ev:
3145         raise Exception("Unexpected address in SD Response: " + ev)
3146     if ev.split(' ')[4] != resp:
3147         raise Exception("Unexpected response data SD Response: " + ev)
3148     dev[1].p2p_stop_find()
3149
3150     p2p.StopFind()
3151     p2p.ServiceDiscoveryExternal(0)
3152
3153 def test_dbus_p2p_autogo(dev, apdev):
3154     """D-Bus P2P autonomous GO"""
3155     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3156     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3157
3158     addr0 = dev[0].p2p_dev_addr()
3159
3160     class TestDbusP2p(TestDbus):
3161         def __init__(self, bus):
3162             TestDbus.__init__(self, bus)
3163             self.first = True
3164             self.waiting_end = False
3165             self.exceptions = False
3166             self.deauthorized = False
3167             self.done = False
3168
3169         def __enter__(self):
3170             gobject.timeout_add(1, self.run_test)
3171             gobject.timeout_add(15000, self.timeout)
3172             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3173                             "DeviceFound")
3174             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3175                             "GroupStarted")
3176             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3177                             "GroupFinished")
3178             self.add_signal(self.persistentGroupAdded,
3179                             WPAS_DBUS_IFACE_P2PDEVICE,
3180                             "PersistentGroupAdded")
3181             self.add_signal(self.persistentGroupRemoved,
3182                             WPAS_DBUS_IFACE_P2PDEVICE,
3183                             "PersistentGroupRemoved")
3184             self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3185                             WPAS_DBUS_IFACE_P2PDEVICE,
3186                             "ProvisionDiscoveryRequestDisplayPin")
3187             self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3188                             "StaAuthorized")
3189             self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3190                             "StaDeauthorized")
3191             self.loop.run()
3192             return self
3193
3194         def groupStarted(self, properties):
3195             logger.debug("groupStarted: " + str(properties))
3196             self.group = properties['group_object']
3197             self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3198                                            properties['interface_object'])
3199             role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3200                                      dbus_interface=dbus.PROPERTIES_IFACE)
3201             if role != "GO":
3202                 self.exceptions = True
3203                 raise Exception("Unexpected role reported: " + role)
3204             group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3205                                       dbus_interface=dbus.PROPERTIES_IFACE)
3206             if group != properties['group_object']:
3207                 self.exceptions = True
3208                 raise Exception("Unexpected Group reported: " + str(group))
3209             go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3210                                    dbus_interface=dbus.PROPERTIES_IFACE)
3211             if go != '/':
3212                 self.exceptions = True
3213                 raise Exception("Unexpected PeerGO value: " + str(go))
3214             if self.first:
3215                 self.first = False
3216                 logger.info("Remove persistent group instance")
3217                 group_p2p = dbus.Interface(self.g_if_obj,
3218                                            WPAS_DBUS_IFACE_P2PDEVICE)
3219                 group_p2p.Disconnect()
3220             else:
3221                 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3222                 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3223
3224         def groupFinished(self, properties):
3225             logger.debug("groupFinished: " + str(properties))
3226             if self.waiting_end:
3227                 logger.info("Remove persistent group")
3228                 p2p.RemovePersistentGroup(self.persistent)
3229             else:
3230                 logger.info("Re-start persistent group")
3231                 params = dbus.Dictionary({'persistent_group_object':
3232                                           self.persistent,
3233                                           'frequency': 2412})
3234                 p2p.GroupAdd(params)
3235
3236         def persistentGroupAdded(self, path, properties):
3237             logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3238             self.persistent = path
3239
3240         def persistentGroupRemoved(self, path):
3241             logger.debug("persistentGroupRemoved: %s" % path)
3242             self.done = True
3243             self.loop.quit()
3244
3245         def deviceFound(self, path):
3246             logger.debug("deviceFound: path=%s" % path)
3247             peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3248             self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3249                                         dbus_interface=dbus.PROPERTIES_IFACE,
3250                                         byte_arrays=True)
3251             logger.debug('peer properties: ' + str(self.peer))
3252
3253         def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3254             logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3255             self.peer_path = peer_object
3256             peer = binascii.unhexlify(peer_object.split('/')[-1])
3257             addr = ""
3258             for p in peer:
3259                 if len(addr) > 0:
3260                     addr += ':'
3261                 addr += '%02x' % ord(p)
3262
3263             params = { 'Role': 'registrar',
3264                        'P2PDeviceAddress': self.peer['DeviceAddress'],
3265                        'Bssid': self.peer['DeviceAddress'],
3266                        'Type': 'pin' }
3267             wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3268             try:
3269                 wps.Start(params)
3270                 self.exceptions = True
3271                 raise Exception("Invalid WPS.Start() accepted")
3272             except dbus.exceptions.DBusException, e:
3273                 if "InvalidArgs" not in str(e):
3274                     self.exceptions = True
3275                     raise Exception("Unexpected error message: " + str(e))
3276             params = { 'Role': 'registrar',
3277                        'P2PDeviceAddress': self.peer['DeviceAddress'],
3278                        'Type': 'pin',
3279                        'Pin': '12345670' }
3280             logger.info("Authorize peer to connect to the group")
3281             wps.Start(params)
3282
3283         def staAuthorized(self, name):
3284             logger.debug("staAuthorized: " + name)
3285             peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3286             res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3287                                   dbus_interface=dbus.PROPERTIES_IFACE,
3288                                   byte_arrays=True)
3289             logger.debug("Peer properties: " + str(res))
3290             if 'Groups' not in res or len(res['Groups']) != 1:
3291                 self.exceptions = True
3292                 raise Exception("Unexpected number of peer Groups entries")
3293             if res['Groups'][0] != self.group:
3294                 self.exceptions = True
3295                 raise Exception("Unexpected peer Groups[0] value")
3296
3297             g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3298             res = g_obj.GetAll(WPAS_DBUS_GROUP,
3299                                dbus_interface=dbus.PROPERTIES_IFACE,
3300                                byte_arrays=True)
3301             logger.debug("Group properties: " + str(res))
3302             if 'Members' not in res or len(res['Members']) != 1:
3303                 self.exceptions = True
3304                 raise Exception("Unexpected number of group members")
3305
3306             ext = dbus.ByteArray("\x11\x22\x33\x44")
3307             # Earlier implementation of this interface was a bit strange. The
3308             # property is defined to have aay signature and that is what the
3309             # getter returned. However, the setter expected there to be a
3310             # dictionary with 'WPSVendorExtensions' as the key surrounding these
3311             # values.. The current implementations maintains support for that
3312             # for backwards compability reasons. Verify that encoding first.
3313             vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3314                                    signature='sv')
3315             g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3316                       dbus_interface=dbus.PROPERTIES_IFACE)
3317             res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3318                                dbus_interface=dbus.PROPERTIES_IFACE,
3319                                byte_arrays=True)
3320             if len(res) != 1:
3321                 self.exceptions = True
3322                 raise Exception("Unexpected number of vendor extensions")
3323             if res[0] != ext:
3324                 self.exceptions = True
3325                 raise Exception("Vendor extension value changed")
3326
3327             # And now verify that the more appropriate encoding is accepted as
3328             # well.
3329             res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3330             g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3331                       dbus_interface=dbus.PROPERTIES_IFACE)
3332             res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3333                              dbus_interface=dbus.PROPERTIES_IFACE,
3334                              byte_arrays=True)
3335             if len(res) != 2:
3336                 self.exceptions = True
3337                 raise Exception("Unexpected number of vendor extensions")
3338             if res[0] != res2[0] or res[1] != res2[1]:
3339                 self.exceptions = True
3340                 raise Exception("Vendor extension value changed")
3341
3342             for i in range(10):
3343                 res.append(dbus.ByteArray('\xaa\xbb'))
3344             try:
3345                 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3346                           dbus_interface=dbus.PROPERTIES_IFACE)
3347                 self.exceptions = True
3348                 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3349             except dbus.exceptions.DBusException, e:
3350                 if "Error.Failed" not in str(e):
3351                     self.exceptions = True
3352                     raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3353
3354             vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3355             try:
3356                 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3357                           dbus_interface=dbus.PROPERTIES_IFACE)
3358                 self.exceptions = True
3359                 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3360             except dbus.exceptions.DBusException, e:
3361                 if "InvalidArgs" not in str(e):
3362                     self.exceptions = True
3363                     raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3364
3365             vals = [ "foo" ]
3366             try:
3367                 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3368                           dbus_interface=dbus.PROPERTIES_IFACE)
3369                 self.exceptions = True
3370                 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3371             except dbus.exceptions.DBusException, e:
3372                 if "Error.Failed" not in str(e):
3373                     self.exceptions = True
3374                     raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3375
3376             vals = [ [ "foo" ] ]
3377             try:
3378                 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3379                           dbus_interface=dbus.PROPERTIES_IFACE)
3380                 self.exceptions = True
3381                 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3382             except dbus.exceptions.DBusException, e:
3383                 if "Error.Failed" not in str(e):
3384                     self.exceptions = True
3385                     raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3386
3387             p2p.RemoveClient({ 'peer': self.peer_path })
3388
3389             self.waiting_end = True
3390             group_p2p = dbus.Interface(self.g_if_obj,
3391                                        WPAS_DBUS_IFACE_P2PDEVICE)
3392             group_p2p.Disconnect()
3393
3394         def staDeauthorized(self, name):
3395             logger.debug("staDeauthorized: " + name)
3396             self.deauthorized = True
3397
3398         def run_test(self, *args):
3399             logger.debug("run_test")
3400             params = dbus.Dictionary({'persistent': True,
3401                                       'frequency': 2412})
3402             logger.info("Add a persistent group")
3403             p2p.GroupAdd(params)
3404             return False
3405
3406         def success(self):
3407             return self.done and self.deauthorized and not self.exceptions
3408
3409     with TestDbusP2p(bus) as t:
3410         if not t.success():
3411             raise Exception("Expected signals not seen")
3412
3413     dev[1].wait_go_ending_session()
3414
3415 def test_dbus_p2p_autogo_pbc(dev, apdev):
3416     """D-Bus P2P autonomous GO and PBC"""
3417     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3418     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3419
3420     addr0 = dev[0].p2p_dev_addr()
3421
3422     class TestDbusP2p(TestDbus):
3423         def __init__(self, bus):
3424             TestDbus.__init__(self, bus)
3425             self.first = True
3426             self.waiting_end = False
3427             self.done = False
3428
3429         def __enter__(self):
3430             gobject.timeout_add(1, self.run_test)
3431             gobject.timeout_add(15000, self.timeout)
3432             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3433                             "DeviceFound")
3434             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3435                             "GroupStarted")
3436             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3437                             "GroupFinished")
3438             self.add_signal(self.provisionDiscoveryPBCRequest,
3439                             WPAS_DBUS_IFACE_P2PDEVICE,
3440                             "ProvisionDiscoveryPBCRequest")
3441             self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3442                             "StaAuthorized")
3443             self.loop.run()
3444             return self
3445
3446         def groupStarted(self, properties):
3447             logger.debug("groupStarted: " + str(properties))
3448             self.group = properties['group_object']
3449             self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3450                                            properties['interface_object'])
3451             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3452             dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3453
3454         def groupFinished(self, properties):
3455             logger.debug("groupFinished: " + str(properties))
3456             self.done = True
3457             self.loop.quit()
3458
3459         def deviceFound(self, path):
3460             logger.debug("deviceFound: path=%s" % path)
3461             peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3462             self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3463                                         dbus_interface=dbus.PROPERTIES_IFACE,
3464                                         byte_arrays=True)
3465             logger.debug('peer properties: ' + str(self.peer))
3466
3467         def provisionDiscoveryPBCRequest(self, peer_object):
3468             logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3469             self.peer_path = peer_object
3470             peer = binascii.unhexlify(peer_object.split('/')[-1])
3471             addr = ""
3472             for p in peer:
3473                 if len(addr) > 0:
3474                     addr += ':'
3475                 addr += '%02x' % ord(p)
3476             params = { 'Role': 'registrar',
3477                        'P2PDeviceAddress': self.peer['DeviceAddress'],
3478                        'Type': 'pbc' }
3479             logger.info("Authorize peer to connect to the group")
3480             wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3481             wps.Start(params)
3482
3483         def staAuthorized(self, name):
3484             logger.debug("staAuthorized: " + name)
3485             group_p2p = dbus.Interface(self.g_if_obj,
3486                                        WPAS_DBUS_IFACE_P2PDEVICE)
3487             group_p2p.Disconnect()
3488
3489         def run_test(self, *args):
3490             logger.debug("run_test")
3491             params = dbus.Dictionary({'frequency': 2412})
3492             p2p.GroupAdd(params)
3493             return False
3494
3495         def success(self):
3496             return self.done
3497
3498     with TestDbusP2p(bus) as t:
3499         if not t.success():
3500             raise Exception("Expected signals not seen")
3501
3502     dev[1].wait_go_ending_session()
3503     dev[1].flush_scan_cache()
3504
3505 def test_dbus_p2p_autogo_legacy(dev, apdev):
3506     """D-Bus P2P autonomous GO and legacy STA"""
3507     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3508     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3509
3510     addr0 = dev[0].p2p_dev_addr()
3511
3512     class TestDbusP2p(TestDbus):
3513         def __init__(self, bus):
3514             TestDbus.__init__(self, bus)
3515             self.done = False
3516
3517         def __enter__(self):
3518             gobject.timeout_add(1, self.run_test)
3519             gobject.timeout_add(15000, self.timeout)
3520             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3521                             "GroupStarted")
3522             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3523                             "GroupFinished")
3524             self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3525                             "StaAuthorized")
3526             self.loop.run()
3527             return self
3528
3529         def groupStarted(self, properties):
3530             logger.debug("groupStarted: " + str(properties))
3531             g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3532                                    properties['group_object'])
3533             res = g_obj.GetAll(WPAS_DBUS_GROUP,
3534                                dbus_interface=dbus.PROPERTIES_IFACE,
3535                                byte_arrays=True)
3536             bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3537
3538             pin = '12345670'
3539             params = { 'Role': 'enrollee',
3540                        'Type': 'pin',
3541                        'Pin': pin }
3542             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3543                                       properties['interface_object'])
3544             wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
3545             wps.Start(params)
3546             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3547             dev1.scan_for_bss(bssid, freq=2412)
3548             dev1.request("WPS_PIN " + bssid + " " + pin)
3549             self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3550
3551         def groupFinished(self, properties):
3552             logger.debug("groupFinished: " + str(properties))
3553             self.done = True
3554             self.loop.quit()
3555
3556         def staAuthorized(self, name):
3557             logger.debug("staAuthorized: " + name)
3558             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3559             dev1.request("DISCONNECT")
3560             self.group_p2p.Disconnect()
3561
3562         def run_test(self, *args):
3563             logger.debug("run_test")
3564             params = dbus.Dictionary({'frequency': 2412})
3565             p2p.GroupAdd(params)
3566             return False
3567
3568         def success(self):
3569             return self.done
3570
3571     with TestDbusP2p(bus) as t:
3572         if not t.success():
3573             raise Exception("Expected signals not seen")
3574
3575 def test_dbus_p2p_join(dev, apdev):
3576     """D-Bus P2P join an autonomous GO"""
3577     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3578     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3579
3580     addr1 = dev[1].p2p_dev_addr()
3581     addr2 = dev[2].p2p_dev_addr()
3582     dev[1].p2p_start_go(freq=2412)
3583     dev1_group_ifname = dev[1].group_ifname
3584     dev[2].p2p_listen()
3585
3586     class TestDbusP2p(TestDbus):
3587         def __init__(self, bus):
3588             TestDbus.__init__(self, bus)
3589             self.done = False
3590             self.peer = None
3591             self.go = None
3592
3593         def __enter__(self):
3594             gobject.timeout_add(1, self.run_test)
3595             gobject.timeout_add(15000, self.timeout)
3596             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3597                             "DeviceFound")
3598             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3599                             "GroupStarted")
3600             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3601                             "GroupFinished")
3602             self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3603                             "InvitationResult")
3604             self.loop.run()
3605             return self
3606
3607         def deviceFound(self, path):
3608             logger.debug("deviceFound: path=%s" % path)
3609             peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3610             res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3611                                   dbus_interface=dbus.PROPERTIES_IFACE,
3612                                   byte_arrays=True)
3613             logger.debug('peer properties: ' + str(res))
3614             if addr2.replace(':','') in path:
3615                 self.peer = path
3616             elif addr1.replace(':','') in path:
3617                 self.go = path
3618             if self.peer and self.go:
3619                 logger.info("Join the group")
3620                 p2p.StopFind()
3621                 args = { 'peer': self.go,
3622                          'join': True,
3623                          'wps_method': 'pin',
3624                          'frequency': 2412 }
3625                 pin = p2p.Connect(args)
3626
3627                 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3628                 dev1.group_ifname = dev1_group_ifname
3629                 dev1.group_request("WPS_PIN any " + pin)
3630
3631         def groupStarted(self, properties):
3632             logger.debug("groupStarted: " + str(properties))
3633             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3634                                       properties['interface_object'])
3635             role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3636                                 dbus_interface=dbus.PROPERTIES_IFACE)
3637             if role != "client":
3638                 raise Exception("Unexpected role reported: " + role)
3639             group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3640                                  dbus_interface=dbus.PROPERTIES_IFACE)
3641             if group != properties['group_object']:
3642                 raise Exception("Unexpected Group reported: " + str(group))
3643             go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3644                               dbus_interface=dbus.PROPERTIES_IFACE)
3645             if go != self.go:
3646                 raise Exception("Unexpected PeerGO value: " + str(go))
3647
3648             g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3649                                    properties['group_object'])
3650             res = g_obj.GetAll(WPAS_DBUS_GROUP,
3651                                dbus_interface=dbus.PROPERTIES_IFACE,
3652                                byte_arrays=True)
3653             logger.debug("Group properties: " + str(res))
3654
3655             ext = dbus.ByteArray("\x11\x22\x33\x44")
3656             try:
3657                 # Set(WPSVendorExtensions) not allowed for P2P Client
3658                 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3659                           dbus_interface=dbus.PROPERTIES_IFACE)
3660                 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3661             except dbus.exceptions.DBusException, e:
3662                 if "Error.Failed: Failed to set property" not in str(e):
3663                     raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3664
3665             group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3666             args = { 'duration1': 30000, 'interval1': 102400,
3667                      'duration2': 20000, 'interval2': 102400 }
3668             group_p2p.PresenceRequest(args)
3669
3670             args = { 'peer': self.peer }
3671             group_p2p.Invite(args)
3672
3673         def groupFinished(self, properties):
3674             logger.debug("groupFinished: " + str(properties))
3675             self.done = True
3676             self.loop.quit()
3677
3678         def invitationResult(self, result):
3679             logger.debug("invitationResult: " + str(result))
3680             if result['status'] != 1:
3681                 raise Exception("Unexpected invitation result: " + str(result))
3682             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3683             dev1.group_ifname = dev1_group_ifname
3684             dev1.remove_group()
3685
3686         def run_test(self, *args):
3687             logger.debug("run_test")
3688             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3689             return False
3690
3691         def success(self):
3692             return self.done
3693
3694     with TestDbusP2p(bus) as t:
3695         if not t.success():
3696             raise Exception("Expected signals not seen")
3697
3698     dev[2].p2p_stop_find()
3699
3700 def test_dbus_p2p_invitation_received(dev, apdev):
3701     """D-Bus P2P and InvitationReceived"""
3702     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3703     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3704
3705     form(dev[0], dev[1])
3706     addr0 = dev[0].p2p_dev_addr()
3707     dev[0].p2p_listen()
3708     dev[0].global_request("SET persistent_reconnect 0")
3709
3710     if not dev[1].discover_peer(addr0, social=True):
3711         raise Exception("Peer " + addr0 + " not found")
3712     peer = dev[1].get_peer(addr0)
3713
3714     class TestDbusP2p(TestDbus):
3715         def __init__(self, bus):
3716             TestDbus.__init__(self, bus)
3717             self.done = False
3718
3719         def __enter__(self):
3720             gobject.timeout_add(1, self.run_test)
3721             gobject.timeout_add(15000, self.timeout)
3722             self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
3723                             "InvitationReceived")
3724             self.loop.run()
3725             return self
3726
3727         def invitationReceived(self, result):
3728             logger.debug("invitationReceived: " + str(result))
3729             self.done = True
3730             self.loop.quit()
3731
3732         def run_test(self, *args):
3733             logger.debug("run_test")
3734             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3735             cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
3736             dev1.global_request(cmd)
3737             return False
3738
3739         def success(self):
3740             return self.done
3741
3742     with TestDbusP2p(bus) as t:
3743         if not t.success():
3744             raise Exception("Expected signals not seen")
3745
3746     dev[0].p2p_stop_find()
3747     dev[1].p2p_stop_find()
3748
3749 def test_dbus_p2p_config(dev, apdev):
3750     """D-Bus Get/Set P2PDeviceConfig"""
3751     try:
3752         _test_dbus_p2p_config(dev, apdev)
3753     finally:
3754         dev[0].request("P2P_SET ssid_postfix ")
3755
3756 def _test_dbus_p2p_config(dev, apdev):
3757     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3758     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3759
3760     res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3761                      dbus_interface=dbus.PROPERTIES_IFACE,
3762                      byte_arrays=True)
3763     if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3764                dbus_interface=dbus.PROPERTIES_IFACE)
3765     res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3766                       dbus_interface=dbus.PROPERTIES_IFACE,
3767                       byte_arrays=True)
3768
3769     if len(res) != len(res2):
3770         raise Exception("Different number of parameters")
3771     for k in res:
3772         if res[k] != res2[k]:
3773             raise Exception("Parameter %s value changes" % k)
3774
3775     changes = { 'SsidPostfix': 'foo',
3776                 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3777                 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3778     if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3779                dbus.Dictionary(changes, signature='sv'),
3780                dbus_interface=dbus.PROPERTIES_IFACE)
3781
3782     res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3783                       dbus_interface=dbus.PROPERTIES_IFACE,
3784                       byte_arrays=True)
3785     logger.debug("P2PDeviceConfig: " + str(res2))
3786     if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3787         raise Exception("VendorExtension does not match")
3788     if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3789         raise Exception("SecondaryDeviceType does not match")
3790
3791     changes = { 'SsidPostfix': '',
3792                 'VendorExtension': dbus.Array([], signature="ay"),
3793                 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3794     if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3795                dbus.Dictionary(changes, signature='sv'),
3796                dbus_interface=dbus.PROPERTIES_IFACE)
3797
3798     res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3799                       dbus_interface=dbus.PROPERTIES_IFACE,
3800                       byte_arrays=True)
3801     logger.debug("P2PDeviceConfig: " + str(res3))
3802     if 'VendorExtension' in res3:
3803         raise Exception("VendorExtension not removed")
3804     if 'SecondaryDeviceTypes' in res3:
3805         raise Exception("SecondaryDeviceType not removed")
3806
3807     try:
3808         dev[0].request("P2P_SET disabled 1")
3809         if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3810                    dbus_interface=dbus.PROPERTIES_IFACE,
3811                    byte_arrays=True)
3812         raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3813     except dbus.exceptions.DBusException, e:
3814         if "Error.Failed: P2P is not available for this interface" not in str(e):
3815             raise Exception("Unexpected error message for invalid Invite: " + str(e))
3816     finally:
3817         dev[0].request("P2P_SET disabled 0")
3818
3819     try:
3820         dev[0].request("P2P_SET disabled 1")
3821         changes = { 'SsidPostfix': 'foo' }
3822         if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3823                    dbus.Dictionary(changes, signature='sv'),
3824                    dbus_interface=dbus.PROPERTIES_IFACE)
3825         raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3826     except dbus.exceptions.DBusException, e:
3827         if "Error.Failed: P2P is not available for this interface" not in str(e):
3828             raise Exception("Unexpected error message for invalid Invite: " + str(e))
3829     finally:
3830         dev[0].request("P2P_SET disabled 0")
3831
3832     tests = [ { 'DeviceName': 123 },
3833               { 'SsidPostfix': 123 },
3834               { 'Foo': 'Bar' } ]
3835     for changes in tests:
3836         try:
3837             if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3838                        dbus.Dictionary(changes, signature='sv'),
3839                        dbus_interface=dbus.PROPERTIES_IFACE)
3840             raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3841         except dbus.exceptions.DBusException, e:
3842             if "InvalidArgs" not in str(e):
3843                 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3844
3845 def test_dbus_p2p_persistent(dev, apdev):
3846     """D-Bus P2P persistent group"""
3847     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3848     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3849
3850     class TestDbusP2p(TestDbus):
3851         def __init__(self, bus):
3852             TestDbus.__init__(self, bus)
3853
3854         def __enter__(self):
3855             gobject.timeout_add(1, self.run_test)
3856             gobject.timeout_add(15000, self.timeout)
3857             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3858                             "GroupStarted")
3859             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3860                             "GroupFinished")
3861             self.add_signal(self.persistentGroupAdded,
3862                             WPAS_DBUS_IFACE_P2PDEVICE,
3863                             "PersistentGroupAdded")
3864             self.loop.run()
3865             return self
3866
3867         def groupStarted(self, properties):
3868             logger.debug("groupStarted: " + str(properties))
3869             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3870                                       properties['interface_object'])
3871             group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3872             group_p2p.Disconnect()
3873
3874         def groupFinished(self, properties):
3875             logger.debug("groupFinished: " + str(properties))
3876             self.loop.quit()
3877
3878         def persistentGroupAdded(self, path, properties):
3879             logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3880             self.persistent = path
3881
3882         def run_test(self, *args):
3883             logger.debug("run_test")
3884             params = dbus.Dictionary({'persistent': True,
3885                                       'frequency': 2412})
3886             logger.info("Add a persistent group")
3887             p2p.GroupAdd(params)
3888             return False
3889
3890         def success(self):
3891             return True
3892
3893     with TestDbusP2p(bus) as t:
3894         if not t.success():
3895             raise Exception("Expected signals not seen")
3896         persistent = t.persistent
3897
3898     p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3899     res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3900                     dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3901     logger.info("Persistent group Properties: " + str(res))
3902     vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3903     p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3904               dbus_interface=dbus.PROPERTIES_IFACE)
3905     res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3906                      dbus_interface=dbus.PROPERTIES_IFACE)
3907     if len(res) != len(res2):
3908         raise Exception("Different number of parameters")
3909     for k in res:
3910         if k != 'ssid' and res[k] != res2[k]:
3911             raise Exception("Parameter %s value changes" % k)
3912     if res2['ssid'] != '"DIRECT-foo"':
3913         raise Exception("Unexpected ssid")
3914
3915     args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3916                              'psk': '1234567890' }, signature='sv')
3917     group = p2p.AddPersistentGroup(args)
3918
3919     groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3920                         dbus_interface=dbus.PROPERTIES_IFACE)
3921     if len(groups) != 2:
3922         raise Exception("Unexpected number of persistent groups: " + str(groups))
3923
3924     p2p.RemoveAllPersistentGroups()
3925
3926     groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3927                         dbus_interface=dbus.PROPERTIES_IFACE)
3928     if len(groups) != 0:
3929         raise Exception("Unexpected number of persistent groups: " + str(groups))
3930
3931     try:
3932         p2p.RemovePersistentGroup(persistent)
3933         raise Exception("Invalid RemovePersistentGroup accepted")
3934     except dbus.exceptions.DBusException, e:
3935         if "NetworkUnknown: There is no such persistent group" not in str(e):
3936             raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3937
3938 def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3939     """D-Bus P2P reinvoke persistent group"""
3940     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3941     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3942
3943     addr0 = dev[0].p2p_dev_addr()
3944
3945     class TestDbusP2p(TestDbus):
3946         def __init__(self, bus):
3947             TestDbus.__init__(self, bus)
3948             self.first = True
3949             self.waiting_end = False
3950             self.done = False
3951             self.invited = False
3952
3953         def __enter__(self):
3954             gobject.timeout_add(1, self.run_test)
3955             gobject.timeout_add(15000, self.timeout)
3956             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3957                             "DeviceFound")
3958             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3959                             "GroupStarted")
3960             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3961                             "GroupFinished")
3962             self.add_signal(self.persistentGroupAdded,
3963                             WPAS_DBUS_IFACE_P2PDEVICE,
3964                             "PersistentGroupAdded")
3965             self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3966                             WPAS_DBUS_IFACE_P2PDEVICE,
3967                             "ProvisionDiscoveryRequestDisplayPin")
3968             self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3969                             "StaAuthorized")
3970             self.loop.run()
3971             return self
3972
3973         def groupStarted(self, properties):
3974             logger.debug("groupStarted: " + str(properties))
3975             self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3976                                            properties['interface_object'])
3977             if not self.invited:
3978                 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3979                                        properties['group_object'])
3980                 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3981                                    dbus_interface=dbus.PROPERTIES_IFACE,
3982                                    byte_arrays=True)
3983                 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3984                 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3985                 dev1.scan_for_bss(bssid, freq=2412)
3986                 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3987
3988         def groupFinished(self, properties):
3989             logger.debug("groupFinished: " + str(properties))
3990             if self.invited:
3991                 self.done = True
3992                 self.loop.quit()
3993             else:
3994                 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3995                 dev1.global_request("SET persistent_reconnect 1")
3996                 dev1.p2p_listen()
3997
3998                 args = { 'persistent_group_object': dbus.ObjectPath(path),
3999                          'peer': self.peer_path }
4000                 try:
4001                     pin = p2p.Invite(args)
4002                     raise Exception("Invalid Invite accepted")
4003                 except dbus.exceptions.DBusException, e:
4004                     if "InvalidArgs" not in str(e):
4005                         raise Exception("Unexpected error message for invalid Invite: " + str(e))
4006
4007                 args = { 'persistent_group_object': self.persistent,
4008                          'peer': self.peer_path }
4009                 pin = p2p.Invite(args)
4010                 self.invited = True
4011
4012                 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4013                                                            timeout=15)
4014                 if self.sta_group_ev is None:
4015                     raise Exception("P2P-GROUP-STARTED event not seen")
4016
4017         def persistentGroupAdded(self, path, properties):
4018             logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4019             self.persistent = path
4020
4021         def deviceFound(self, path):
4022             logger.debug("deviceFound: path=%s" % path)
4023             peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4024             self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4025                                         dbus_interface=dbus.PROPERTIES_IFACE,
4026                                         byte_arrays=True)
4027
4028         def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4029             logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4030             self.peer_path = peer_object
4031             peer = binascii.unhexlify(peer_object.split('/')[-1])
4032             addr = ""
4033             for p in peer:
4034                 if len(addr) > 0:
4035                     addr += ':'
4036                 addr += '%02x' % ord(p)
4037             params = { 'Role': 'registrar',
4038                        'P2PDeviceAddress': self.peer['DeviceAddress'],
4039                        'Bssid': self.peer['DeviceAddress'],
4040                        'Type': 'pin',
4041                        'Pin': '12345670' }
4042             logger.info("Authorize peer to connect to the group")
4043             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4044             wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4045             wps.Start(params)
4046             self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4047                                                        timeout=15)
4048             if self.sta_group_ev is None:
4049                 raise Exception("P2P-GROUP-STARTED event not seen")
4050
4051         def staAuthorized(self, name):
4052             logger.debug("staAuthorized: " + name)
4053             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4054             dev1.group_form_result(self.sta_group_ev)
4055             dev1.remove_group()
4056             ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4057             if ev is None:
4058                 raise Exception("Group removal timed out")
4059             group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4060             group_p2p.Disconnect()
4061
4062         def run_test(self, *args):
4063             logger.debug("run_test")
4064             params = dbus.Dictionary({'persistent': True,
4065                                       'frequency': 2412})
4066             logger.info("Add a persistent group")
4067             p2p.GroupAdd(params)
4068             return False
4069
4070         def success(self):
4071             return self.done
4072
4073     with TestDbusP2p(bus) as t:
4074         if not t.success():
4075             raise Exception("Expected signals not seen")
4076
4077 def test_dbus_p2p_go_neg_rx(dev, apdev):
4078     """D-Bus P2P GO Negotiation receive"""
4079     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4080     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4081     addr0 = dev[0].p2p_dev_addr()
4082
4083     class TestDbusP2p(TestDbus):
4084         def __init__(self, bus):
4085             TestDbus.__init__(self, bus)
4086             self.done = False
4087
4088         def __enter__(self):
4089             gobject.timeout_add(1, self.run_test)
4090             gobject.timeout_add(15000, self.timeout)
4091             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4092                             "DeviceFound")
4093             self.add_signal(self.goNegotiationRequest,
4094                             WPAS_DBUS_IFACE_P2PDEVICE,
4095                             "GONegotiationRequest",
4096                             byte_arrays=True)
4097             self.add_signal(self.goNegotiationSuccess,
4098                             WPAS_DBUS_IFACE_P2PDEVICE,
4099                             "GONegotiationSuccess",
4100                             byte_arrays=True)
4101             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4102                             "GroupStarted")
4103             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4104                             "GroupFinished")
4105             self.loop.run()
4106             return self
4107
4108         def deviceFound(self, path):
4109             logger.debug("deviceFound: path=%s" % path)
4110
4111         def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4112             logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4113             if dev_passwd_id != 1:
4114                 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4115             args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4116                      'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4117             try:
4118                 p2p.Connect(args)
4119                 raise Exception("Invalid Connect accepted")
4120             except dbus.exceptions.DBusException, e:
4121                 if "ConnectChannelUnsupported" not in str(e):
4122                     raise Exception("Unexpected error message for invalid Connect: " + str(e))
4123
4124             args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4125                      'go_intent': 15, 'persistent': False }
4126             p2p.Connect(args)
4127
4128         def goNegotiationSuccess(self, properties):
4129             logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4130
4131         def groupStarted(self, properties):
4132             logger.debug("groupStarted: " + str(properties))
4133             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4134                                       properties['interface_object'])
4135             group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4136             group_p2p.Disconnect()
4137
4138         def groupFinished(self, properties):
4139             logger.debug("groupFinished: " + str(properties))
4140             self.done = True
4141             self.loop.quit()
4142
4143         def run_test(self, *args):
4144             logger.debug("run_test")
4145             p2p.Listen(10)
4146             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4147             if not dev1.discover_peer(addr0):
4148                 raise Exception("Peer not found")
4149             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4150             return False
4151
4152         def success(self):
4153             return self.done
4154
4155     with TestDbusP2p(bus) as t:
4156         if not t.success():
4157             raise Exception("Expected signals not seen")
4158
4159 def test_dbus_p2p_go_neg_auth(dev, apdev):
4160     """D-Bus P2P GO Negotiation authorized"""
4161     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4162     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4163     addr0 = dev[0].p2p_dev_addr()
4164     dev[1].p2p_listen()
4165
4166     class TestDbusP2p(TestDbus):
4167         def __init__(self, bus):
4168             TestDbus.__init__(self, bus)
4169             self.done = False
4170             self.peer_joined = False
4171             self.peer_disconnected = False
4172
4173         def __enter__(self):
4174             gobject.timeout_add(1, self.run_test)
4175             gobject.timeout_add(15000, self.timeout)
4176             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4177                             "DeviceFound")
4178             self.add_signal(self.goNegotiationSuccess,
4179                             WPAS_DBUS_IFACE_P2PDEVICE,
4180                             "GONegotiationSuccess",
4181                             byte_arrays=True)
4182             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4183                             "GroupStarted")
4184             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4185                             "GroupFinished")
4186             self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4187                             "StaDeauthorized")
4188             self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4189                             "PeerJoined")
4190             self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4191                             "PeerDisconnected")
4192             self.loop.run()
4193             return self
4194
4195         def deviceFound(self, path):
4196             logger.debug("deviceFound: path=%s" % path)
4197             args = { 'peer': path, 'wps_method': 'keypad',
4198                      'go_intent': 15, 'authorize_only': True }
4199             try:
4200                 p2p.Connect(args)
4201                 raise Exception("Invalid Connect accepted")
4202             except dbus.exceptions.DBusException, e:
4203                 if "InvalidArgs" not in str(e):
4204                     raise Exception("Unexpected error message for invalid Connect: " + str(e))
4205
4206             args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4207                      'go_intent': 15, 'authorize_only': True }
4208             p2p.Connect(args)
4209             p2p.Listen(10)
4210             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4211             if not dev1.discover_peer(addr0):
4212                 raise Exception("Peer not found")
4213             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4214             ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4215             if ev is None:
4216                 raise Exception("Group formation timed out")
4217             self.sta_group_ev = ev
4218
4219         def goNegotiationSuccess(self, properties):
4220             logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4221
4222         def groupStarted(self, properties):
4223             logger.debug("groupStarted: " + str(properties))
4224             self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4225                                            properties['interface_object'])
4226             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4227             dev1.group_form_result(self.sta_group_ev)
4228             dev1.remove_group()
4229
4230         def staDeauthorized(self, name):
4231             logger.debug("staDeuthorized: " + name)
4232             group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4233             group_p2p.Disconnect()
4234
4235         def peerJoined(self, peer):
4236             logger.debug("peerJoined: " + peer)
4237             self.peer_joined = True
4238
4239         def peerDisconnected(self, peer):
4240             logger.debug("peerDisconnected: " + peer)
4241             self.peer_disconnected = True
4242
4243         def groupFinished(self, properties):
4244             logger.debug("groupFinished: " + str(properties))
4245             self.done = True
4246             self.loop.quit()
4247
4248         def run_test(self, *args):
4249             logger.debug("run_test")
4250             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4251             return False
4252
4253         def success(self):
4254             return self.done and self.peer_joined and self.peer_disconnected
4255
4256     with TestDbusP2p(bus) as t:
4257         if not t.success():
4258             raise Exception("Expected signals not seen")
4259
4260 def test_dbus_p2p_go_neg_init(dev, apdev):
4261     """D-Bus P2P GO Negotiation initiation"""
4262     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4263     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4264     addr0 = dev[0].p2p_dev_addr()
4265     dev[1].p2p_listen()
4266
4267     class TestDbusP2p(TestDbus):
4268         def __init__(self, bus):
4269             TestDbus.__init__(self, bus)
4270             self.done = False
4271             self.peer_group_added = False
4272             self.peer_group_removed = False
4273
4274         def __enter__(self):
4275             gobject.timeout_add(1, self.run_test)
4276             gobject.timeout_add(15000, self.timeout)
4277             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4278                             "DeviceFound")
4279             self.add_signal(self.goNegotiationSuccess,
4280                             WPAS_DBUS_IFACE_P2PDEVICE,
4281                             "GONegotiationSuccess",
4282                             byte_arrays=True)
4283             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4284                             "GroupStarted")
4285             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4286                             "GroupFinished")
4287             self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4288                             "PropertiesChanged")
4289             self.loop.run()
4290             return self
4291
4292         def deviceFound(self, path):
4293             logger.debug("deviceFound: path=%s" % path)
4294             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4295             args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4296                      'go_intent': 0 }
4297             p2p.Connect(args)
4298
4299             ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4300             if ev is None:
4301                 raise Exception("Timeout while waiting for GO Neg Request")
4302             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4303             ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4304             if ev is None:
4305                 raise Exception("Group formation timed out")
4306             self.sta_group_ev = ev
4307
4308         def goNegotiationSuccess(self, properties):
4309             logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4310
4311         def groupStarted(self, properties):
4312             logger.debug("groupStarted: " + str(properties))
4313             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4314                                       properties['interface_object'])
4315             group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4316             group_p2p.Disconnect()
4317             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4318             dev1.group_form_result(self.sta_group_ev)
4319             dev1.remove_group()
4320
4321         def groupFinished(self, properties):
4322             logger.debug("groupFinished: " + str(properties))
4323             self.done = True
4324
4325         def propertiesChanged(self, interface_name, changed_properties,
4326                               invalidated_properties):
4327             logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4328             if interface_name != WPAS_DBUS_P2P_PEER:
4329                 return
4330             if "Groups" not in changed_properties:
4331                 return
4332             if len(changed_properties["Groups"]) > 0:
4333                 self.peer_group_added = True
4334             if len(changed_properties["Groups"]) == 0:
4335                 self.peer_group_removed = True
4336                 self.loop.quit()
4337
4338         def run_test(self, *args):
4339             logger.debug("run_test")
4340             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4341             return False
4342
4343         def success(self):
4344             return self.done and self.peer_group_added and self.peer_group_removed
4345
4346     with TestDbusP2p(bus) as t:
4347         if not t.success():
4348             raise Exception("Expected signals not seen")
4349
4350 def test_dbus_p2p_group_termination_by_go(dev, apdev):
4351     """D-Bus P2P group removal on GO terminating the group"""
4352     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4353     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4354     addr0 = dev[0].p2p_dev_addr()
4355     dev[1].p2p_listen()
4356
4357     class TestDbusP2p(TestDbus):
4358         def __init__(self, bus):
4359             TestDbus.__init__(self, bus)
4360             self.done = False
4361             self.peer_group_added = False
4362             self.peer_group_removed = False
4363
4364         def __enter__(self):
4365             gobject.timeout_add(1, self.run_test)
4366             gobject.timeout_add(15000, self.timeout)
4367             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4368                             "DeviceFound")
4369             self.add_signal(self.goNegotiationSuccess,
4370                             WPAS_DBUS_IFACE_P2PDEVICE,
4371                             "GONegotiationSuccess",
4372                             byte_arrays=True)
4373             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4374                             "GroupStarted")
4375             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4376                             "GroupFinished")
4377             self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4378                             "PropertiesChanged")
4379             self.loop.run()
4380             return self
4381
4382         def deviceFound(self, path):
4383             logger.debug("deviceFound: path=%s" % path)
4384             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4385             args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4386                      'go_intent': 0 }
4387             p2p.Connect(args)
4388
4389             ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4390             if ev is None:
4391                 raise Exception("Timeout while waiting for GO Neg Request")
4392             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4393             ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4394             if ev is None:
4395                 raise Exception("Group formation timed out")
4396             self.sta_group_ev = ev
4397
4398         def goNegotiationSuccess(self, properties):
4399             logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4400
4401         def groupStarted(self, properties):
4402             logger.debug("groupStarted: " + str(properties))
4403             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4404                                       properties['interface_object'])
4405             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4406             dev1.group_form_result(self.sta_group_ev)
4407             dev1.remove_group()
4408
4409         def groupFinished(self, properties):
4410             logger.debug("groupFinished: " + str(properties))
4411             self.done = True
4412
4413         def propertiesChanged(self, interface_name, changed_properties,
4414                               invalidated_properties):
4415             logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4416             if interface_name != WPAS_DBUS_P2P_PEER:
4417                 return
4418             if "Groups" not in changed_properties:
4419                 return
4420             if len(changed_properties["Groups"]) > 0:
4421                 self.peer_group_added = True
4422             if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4423                 self.peer_group_removed = True
4424                 self.loop.quit()
4425
4426         def run_test(self, *args):
4427             logger.debug("run_test")
4428             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4429             return False
4430
4431         def success(self):
4432             return self.done and self.peer_group_added and self.peer_group_removed
4433
4434     with TestDbusP2p(bus) as t:
4435         if not t.success():
4436             raise Exception("Expected signals not seen")
4437
4438 def test_dbus_p2p_group_idle_timeout(dev, apdev):
4439     """D-Bus P2P group removal on idle timeout"""
4440     try:
4441         dev[0].global_request("SET p2p_group_idle 1")
4442         _test_dbus_p2p_group_idle_timeout(dev, apdev)
4443     finally:
4444         dev[0].global_request("SET p2p_group_idle 0")
4445
4446 def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4447     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4448     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4449     addr0 = dev[0].p2p_dev_addr()
4450     dev[1].p2p_listen()
4451
4452     class TestDbusP2p(TestDbus):
4453         def __init__(self, bus):
4454             TestDbus.__init__(self, bus)
4455             self.done = False
4456             self.group_started = False
4457             self.peer_group_added = False
4458             self.peer_group_removed = False
4459
4460         def __enter__(self):
4461             gobject.timeout_add(1, self.run_test)
4462             gobject.timeout_add(15000, self.timeout)
4463             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4464                             "DeviceFound")
4465             self.add_signal(self.goNegotiationSuccess,
4466                             WPAS_DBUS_IFACE_P2PDEVICE,
4467                             "GONegotiationSuccess",
4468                             byte_arrays=True)
4469             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4470                             "GroupStarted")
4471             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4472                             "GroupFinished")
4473             self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4474                             "PropertiesChanged")
4475             self.loop.run()
4476             return self
4477
4478         def deviceFound(self, path):
4479             logger.debug("deviceFound: path=%s" % path)
4480             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4481             args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4482                      'go_intent': 0 }
4483             p2p.Connect(args)
4484
4485             ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4486             if ev is None:
4487                 raise Exception("Timeout while waiting for GO Neg Request")
4488             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4489             ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4490             if ev is None:
4491                 raise Exception("Group formation timed out")
4492             self.sta_group_ev = ev
4493
4494         def goNegotiationSuccess(self, properties):
4495             logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4496
4497         def groupStarted(self, properties):
4498             logger.debug("groupStarted: " + str(properties))
4499             self.group_started = True
4500             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4501                                       properties['interface_object'])
4502             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4503             dev1.group_form_result(self.sta_group_ev)
4504             ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4505             # Force disassociation with different reason code so that the
4506             # P2P Client using D-Bus does not get normal group termination event
4507             # from the GO.
4508             dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4509             dev1.remove_group()
4510
4511         def groupFinished(self, properties):
4512             logger.debug("groupFinished: " + str(properties))
4513             self.done = True
4514
4515         def propertiesChanged(self, interface_name, changed_properties,
4516                               invalidated_properties):
4517             logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4518             if interface_name != WPAS_DBUS_P2P_PEER:
4519                 return
4520             if not self.group_started:
4521                 return
4522             if "Groups" not in changed_properties:
4523                 return
4524             if len(changed_properties["Groups"]) > 0:
4525                 self.peer_group_added = True
4526             if len(changed_properties["Groups"]) == 0:
4527                 self.peer_group_removed = True
4528                 self.loop.quit()
4529
4530         def run_test(self, *args):
4531             logger.debug("run_test")
4532             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4533             return False
4534
4535         def success(self):
4536             return self.done and self.peer_group_added and self.peer_group_removed
4537
4538     with TestDbusP2p(bus) as t:
4539         if not t.success():
4540             raise Exception("Expected signals not seen")
4541
4542 def test_dbus_p2p_wps_failure(dev, apdev):
4543     """D-Bus P2P WPS failure"""
4544     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4545     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4546     addr0 = dev[0].p2p_dev_addr()
4547
4548     class TestDbusP2p(TestDbus):
4549         def __init__(self, bus):
4550             TestDbus.__init__(self, bus)
4551             self.wps_failed = False
4552             self.formation_failure = False
4553
4554         def __enter__(self):
4555             gobject.timeout_add(1, self.run_test)
4556             gobject.timeout_add(15000, self.timeout)
4557             self.add_signal(self.goNegotiationRequest,
4558                             WPAS_DBUS_IFACE_P2PDEVICE,
4559                             "GONegotiationRequest",
4560                             byte_arrays=True)
4561             self.add_signal(self.goNegotiationSuccess,
4562                             WPAS_DBUS_IFACE_P2PDEVICE,
4563                             "GONegotiationSuccess",
4564                             byte_arrays=True)
4565             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4566                             "GroupStarted")
4567             self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4568                             "WpsFailed")
4569             self.add_signal(self.groupFormationFailure,
4570                             WPAS_DBUS_IFACE_P2PDEVICE,
4571                             "GroupFormationFailure")
4572             self.loop.run()
4573             return self
4574
4575         def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4576             logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4577             if dev_passwd_id != 1:
4578                 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4579             args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4580                      'go_intent': 15 }
4581             p2p.Connect(args)
4582
4583         def goNegotiationSuccess(self, properties):
4584             logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4585
4586         def groupStarted(self, properties):
4587             logger.debug("groupStarted: " + str(properties))
4588             raise Exception("Unexpected GroupStarted")
4589
4590         def wpsFailed(self, name, args):
4591             logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
4592             self.wps_failed = True
4593             if self.formation_failure:
4594                 self.loop.quit()
4595
4596         def groupFormationFailure(self, reason):
4597             logger.debug("groupFormationFailure - reason=%s" % reason)
4598             self.formation_failure = True
4599             if self.wps_failed:
4600                 self.loop.quit()
4601
4602         def run_test(self, *args):
4603             logger.debug("run_test")
4604             p2p.Listen(10)
4605             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4606             if not dev1.discover_peer(addr0):
4607                 raise Exception("Peer not found")
4608             dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4609             return False
4610
4611         def success(self):
4612             return self.wps_failed and self.formation_failure
4613
4614     with TestDbusP2p(bus) as t:
4615         if not t.success():
4616             raise Exception("Expected signals not seen")
4617
4618 def test_dbus_p2p_two_groups(dev, apdev):
4619     """D-Bus P2P with two concurrent groups"""
4620     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4621     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4622
4623     dev[0].request("SET p2p_no_group_iface 0")
4624     addr0 = dev[0].p2p_dev_addr()
4625     addr1 = dev[1].p2p_dev_addr()
4626     addr2 = dev[2].p2p_dev_addr()
4627     dev[1].p2p_start_go(freq=2412)
4628     dev1_group_ifname = dev[1].group_ifname
4629
4630     class TestDbusP2p(TestDbus):
4631         def __init__(self, bus):
4632             TestDbus.__init__(self, bus)
4633             self.done = False
4634             self.peer = None
4635             self.go = None
4636             self.group1 = None
4637             self.group2 = None
4638             self.groups_removed = False
4639
4640         def __enter__(self):
4641             gobject.timeout_add(1, self.run_test)
4642             gobject.timeout_add(15000, self.timeout)
4643             self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4644                             "PropertiesChanged", byte_arrays=True)
4645             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4646                             "DeviceFound")
4647             self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4648                             "GroupStarted")
4649             self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4650                             "GroupFinished")
4651             self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4652                             "PeerJoined")
4653             self.loop.run()
4654             return self
4655
4656         def propertiesChanged(self, interface_name, changed_properties,
4657                               invalidated_properties):
4658             logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4659
4660         def deviceFound(self, path):
4661             logger.debug("deviceFound: path=%s" % path)
4662             if addr2.replace(':','') in path:
4663                 self.peer = path
4664             elif addr1.replace(':','') in path:
4665                 self.go = path
4666             if self.go and not self.group1:
4667                 logger.info("Join the group")
4668                 p2p.StopFind()
4669                 pin = '12345670'
4670                 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4671                 dev1.group_ifname = dev1_group_ifname
4672                 dev1.group_request("WPS_PIN any " + pin)
4673                 args = { 'peer': self.go,
4674                          'join': True,
4675                          'wps_method': 'pin',
4676                          'pin': pin,
4677                          'frequency': 2412 }
4678                 p2p.Connect(args)
4679
4680         def groupStarted(self, properties):
4681             logger.debug("groupStarted: " + str(properties))
4682             prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4683                                  dbus_interface=dbus.PROPERTIES_IFACE)
4684             logger.debug("p2pdevice properties: " + str(prop))
4685
4686             g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4687                                    properties['group_object'])
4688             res = g_obj.GetAll(WPAS_DBUS_GROUP,
4689                                dbus_interface=dbus.PROPERTIES_IFACE,
4690                                byte_arrays=True)
4691             logger.debug("Group properties: " + str(res))
4692
4693             if not self.group1:
4694                 self.group1 = properties['group_object']
4695                 self.group1iface = properties['interface_object']
4696                 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4697                                                 self.group1iface)
4698
4699                 logger.info("Start autonomous GO")
4700                 params = dbus.Dictionary({ 'frequency': 2412 })
4701                 p2p.GroupAdd(params)
4702             elif not self.group2:
4703                 self.group2 = properties['group_object']
4704                 self.group2iface = properties['interface_object']
4705                 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4706                                                 self.group2iface)
4707                 self.g2_bssid = res['BSSID']
4708
4709             if self.group1 and self.group2:
4710                 logger.info("Authorize peer to join the group")
4711                 a2 = binascii.unhexlify(addr2.replace(':',''))
4712                 params = { 'Role': 'enrollee',
4713                            'P2PDeviceAddress': dbus.ByteArray(a2),
4714                            'Bssid': dbus.ByteArray(a2),
4715                            'Type': 'pin',
4716                            'Pin': '12345670' }
4717                 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4718                 g_wps.Start(params)
4719
4720                 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4721                 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4722                 dev2.scan_for_bss(bssid, freq=2412)
4723                 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
4724                 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4725                 if ev is None:
4726                     raise Exception("Group join timed out")
4727                 self.dev2_group_ev = ev
4728
4729         def groupFinished(self, properties):
4730             logger.debug("groupFinished: " + str(properties))
4731
4732             if self.group1 == properties['group_object']:
4733                 self.group1 = None
4734             elif self.group2 == properties['group_object']:
4735                 self.group2 = None
4736
4737             if not self.group1 and not self.group2:
4738                 self.done = True
4739                 self.loop.quit()
4740
4741         def peerJoined(self, peer):
4742             logger.debug("peerJoined: " + peer)
4743             if self.groups_removed:
4744                 return
4745             self.check_results()
4746
4747             dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4748             dev2.group_form_result(self.dev2_group_ev)
4749             dev2.remove_group()
4750
4751             logger.info("Disconnect group2")
4752             group_p2p = dbus.Interface(self.g2_if_obj,
4753                                        WPAS_DBUS_IFACE_P2PDEVICE)
4754             group_p2p.Disconnect()
4755
4756             logger.info("Disconnect group1")
4757             group_p2p = dbus.Interface(self.g1_if_obj,
4758                                        WPAS_DBUS_IFACE_P2PDEVICE)
4759             group_p2p.Disconnect()
4760             self.groups_removed = True
4761
4762         def check_results(self):
4763             logger.info("Check results with two concurrent groups in operation")
4764
4765             g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4766             res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4767                                  dbus_interface=dbus.PROPERTIES_IFACE,
4768                                  byte_arrays=True)
4769
4770             g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4771             res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4772                                  dbus_interface=dbus.PROPERTIES_IFACE,
4773                                  byte_arrays=True)
4774
4775             logger.info("group1 = " + self.group1)
4776             logger.debug("Group properties: " + str(res1))
4777
4778             logger.info("group2 = " + self.group2)
4779             logger.debug("Group properties: " + str(res2))
4780
4781             prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4782                                  dbus_interface=dbus.PROPERTIES_IFACE)
4783             logger.debug("p2pdevice properties: " + str(prop))
4784
4785             if res1['Role'] != 'client':
4786                 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4787             if res2['Role'] != 'GO':
4788                 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4789             if prop['Role'] != 'device':
4790                 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4791
4792             if len(res2['Members']) != 1:
4793                    raise Exception("Unexpected Members value for group 2")
4794
4795         def run_test(self, *args):
4796             logger.debug("run_test")
4797             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4798             return False
4799
4800         def success(self):
4801             return self.done
4802
4803     with TestDbusP2p(bus) as t:
4804         if not t.success():
4805             raise Exception("Expected signals not seen")
4806
4807     dev[1].remove_group()
4808
4809 def test_dbus_p2p_cancel(dev, apdev):
4810     """D-Bus P2P Cancel"""
4811     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4812     p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4813     try:
4814         p2p.Cancel()
4815         raise Exception("Unexpected p2p.Cancel() success")
4816     except dbus.exceptions.DBusException, e:
4817         pass
4818
4819     addr0 = dev[0].p2p_dev_addr()
4820     dev[1].p2p_listen()
4821
4822     class TestDbusP2p(TestDbus):
4823         def __init__(self, bus):
4824             TestDbus.__init__(self, bus)
4825             self.done = False
4826
4827         def __enter__(self):
4828             gobject.timeout_add(1, self.run_test)
4829             gobject.timeout_add(15000, self.timeout)
4830             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4831                             "DeviceFound")
4832             self.loop.run()
4833             return self
4834
4835         def deviceFound(self, path):
4836             logger.debug("deviceFound: path=%s" % path)
4837             args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4838                      'go_intent': 0 }
4839             p2p.Connect(args)
4840             p2p.Cancel()
4841             self.done = True
4842             self.loop.quit()
4843
4844         def run_test(self, *args):
4845             logger.debug("run_test")
4846             p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4847             return False
4848
4849         def success(self):
4850             return self.done
4851
4852     with TestDbusP2p(bus) as t:
4853         if not t.success():
4854             raise Exception("Expected signals not seen")
4855
4856 def test_dbus_introspect(dev, apdev):
4857     """D-Bus introspection"""
4858     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4859
4860     res = if_obj.Introspect(WPAS_DBUS_IFACE,
4861                             dbus_interface=dbus.INTROSPECTABLE_IFACE)
4862     logger.info("Initial Introspect: " + str(res))
4863     if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4864         raise Exception("Unexpected initial Introspect response: " + str(res))
4865     if "FastReauth" not in res or "PassiveScan" not in res:
4866         raise Exception("Unexpected initial Introspect response: " + str(res))
4867
4868     with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4869         res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4870                                  dbus_interface=dbus.INTROSPECTABLE_IFACE)
4871         logger.info("Introspect: " + str(res2))
4872         if res2 is not None:
4873             raise Exception("Unexpected Introspect response")
4874
4875     with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4876         res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4877                                  dbus_interface=dbus.INTROSPECTABLE_IFACE)
4878         logger.info("Introspect: " + str(res2))
4879         if res2 is None:
4880             raise Exception("No Introspect response")
4881         if len(res2) >= len(res):
4882             raise Exception("Unexpected Introspect response")
4883
4884     with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4885         res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4886                                  dbus_interface=dbus.INTROSPECTABLE_IFACE)
4887         logger.info("Introspect: " + str(res2))
4888         if res2 is None:
4889             raise Exception("No Introspect response")
4890         if len(res2) >= len(res):
4891             raise Exception("Unexpected Introspect response")
4892
4893     with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4894         res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4895                                  dbus_interface=dbus.INTROSPECTABLE_IFACE)
4896         logger.info("Introspect: " + str(res2))
4897         if res2 is None:
4898             raise Exception("No Introspect response")
4899         if len(res2) >= len(res):
4900             raise Exception("Unexpected Introspect response")
4901
4902 def test_dbus_ap(dev, apdev):
4903     """D-Bus AddNetwork for AP mode"""
4904     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4905     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4906
4907     ssid = "test-wpa2-psk"
4908     passphrase = 'qwertyuiop'
4909
4910     class TestDbusConnect(TestDbus):
4911         def __init__(self, bus):
4912             TestDbus.__init__(self, bus)
4913             self.started = False
4914
4915         def __enter__(self):
4916             gobject.timeout_add(1, self.run_connect)
4917             gobject.timeout_add(15000, self.timeout)
4918             self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4919             self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4920                             "NetworkSelected")
4921             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4922                             "PropertiesChanged")
4923             self.loop.run()
4924             return self
4925
4926         def networkAdded(self, network, properties):
4927             logger.debug("networkAdded: %s" % str(network))
4928             logger.debug(str(properties))
4929
4930         def networkSelected(self, network):
4931             logger.debug("networkSelected: %s" % str(network))
4932             self.network_selected = True
4933
4934         def propertiesChanged(self, properties):
4935             logger.debug("propertiesChanged: %s" % str(properties))
4936             if 'State' in properties and properties['State'] == "completed":
4937                 self.started = True
4938                 self.loop.quit()
4939
4940         def run_connect(self, *args):
4941             logger.debug("run_connect")
4942             args = dbus.Dictionary({ 'ssid': ssid,
4943                                      'key_mgmt': 'WPA-PSK',
4944                                      'psk': passphrase,
4945                                      'mode': 2,
4946                                      'frequency': 2412 },
4947                                    signature='sv')
4948             self.netw = iface.AddNetwork(args)
4949             iface.SelectNetwork(self.netw)
4950             return False
4951
4952         def success(self):
4953             return self.started
4954
4955     with TestDbusConnect(bus) as t:
4956         if not t.success():
4957             raise Exception("Expected signals not seen")
4958         dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
4959
4960 def test_dbus_connect_wpa_eap(dev, apdev):
4961     """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4962     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4963     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4964
4965     ssid = "test-wpa-eap"
4966     params = hostapd.wpa_eap_params(ssid=ssid)
4967     params["wpa"] = "3"
4968     params["rsn_pairwise"] = "CCMP"
4969     hapd = hostapd.add_ap(apdev[0], params)
4970
4971     class TestDbusConnect(TestDbus):
4972         def __init__(self, bus):
4973             TestDbus.__init__(self, bus)
4974             self.done = False
4975
4976         def __enter__(self):
4977             gobject.timeout_add(1, self.run_connect)
4978             gobject.timeout_add(15000, self.timeout)
4979             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4980                             "PropertiesChanged")
4981             self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
4982             self.loop.run()
4983             return self
4984
4985         def propertiesChanged(self, properties):
4986             logger.debug("propertiesChanged: %s" % str(properties))
4987             if 'State' in properties and properties['State'] == "completed":
4988                 self.done = True
4989                 self.loop.quit()
4990
4991         def eap(self, status, parameter):
4992             logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
4993
4994         def run_connect(self, *args):
4995             logger.debug("run_connect")
4996             args = dbus.Dictionary({ 'ssid': ssid,
4997                                      'key_mgmt': 'WPA-EAP',
4998                                      'eap': 'PEAP',
4999                                      'identity': 'user',
5000                                      'password': 'password',
5001                                      'ca_cert': 'auth_serv/ca.pem',
5002                                      'phase2': 'auth=MSCHAPV2',
5003                                      'scan_freq': 2412 },
5004                                    signature='sv')
5005             self.netw = iface.AddNetwork(args)
5006             iface.SelectNetwork(self.netw)
5007             return False
5008
5009         def success(self):
5010             return self.done
5011
5012     with TestDbusConnect(bus) as t:
5013         if not t.success():
5014             raise Exception("Expected signals not seen")
5015
5016 def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5017     """AP_SCAN 2 AP mode and D-Bus Scan()"""
5018     try:
5019         _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5020     finally:
5021         dev[0].request("AP_SCAN 1")
5022
5023 def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5024     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5025     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5026
5027     if "OK" not in dev[0].request("AP_SCAN 2"):
5028         raise Exception("Failed to set AP_SCAN 2")
5029
5030     id = dev[0].add_network()
5031     dev[0].set_network(id, "mode", "2")
5032     dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5033     dev[0].set_network(id, "key_mgmt", "NONE")
5034     dev[0].set_network(id, "frequency", "2412")
5035     dev[0].set_network(id, "scan_freq", "2412")
5036     dev[0].set_network(id, "disabled", "0")
5037     dev[0].select_network(id)
5038     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5039     if ev is None:
5040         raise Exception("AP failed to start")
5041
5042     with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5043         iface.Scan({'Type': 'active',
5044                     'AllowRoam': True,
5045                     'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5046         ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5047                                 "AP-DISABLED"], timeout=5)
5048         if ev is None:
5049             raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5050         if "AP-DISABLED" in ev:
5051             raise Exception("Unexpected AP-DISABLED event")
5052         if "retry=1" in ev:
5053             # Wait for the retry to scan happen
5054             ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5055                                     "AP-DISABLED"], timeout=5)
5056             if ev is None:
5057                 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5058             if "AP-DISABLED" in ev:
5059                 raise Exception("Unexpected AP-DISABLED event - retry")
5060
5061     dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5062     dev[1].request("DISCONNECT")
5063     dev[1].wait_disconnected()
5064     dev[0].request("DISCONNECT")
5065     dev[0].wait_disconnected()
5066
5067 def test_dbus_expectdisconnect(dev, apdev):
5068     """D-Bus ExpectDisconnect"""
5069     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5070     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5071
5072     params = { "ssid": "test-open" }
5073     hapd = hostapd.add_ap(apdev[0], params)
5074     dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5075
5076     # This does not really verify the behavior other than by going through the
5077     # code path for additional coverage.
5078     wpas.ExpectDisconnect()
5079     dev[0].request("DISCONNECT")
5080     dev[0].wait_disconnected()
5081
5082 def test_dbus_save_config(dev, apdev):
5083     """D-Bus SaveConfig"""
5084     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5085     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5086     try:
5087         iface.SaveConfig()
5088         raise Exception("SaveConfig() accepted unexpectedly")
5089     except dbus.exceptions.DBusException, e:
5090         if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5091             raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5092
5093 def test_dbus_vendor_elem(dev, apdev):
5094     """D-Bus vendor element operations"""
5095     try:
5096         _test_dbus_vendor_elem(dev, apdev)
5097     finally:
5098         dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5099
5100 def _test_dbus_vendor_elem(dev, apdev):
5101     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5102     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5103
5104     dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5105
5106     try:
5107         ie = dbus.ByteArray("\x00\x00")
5108         iface.VendorElemAdd(-1, ie)
5109         raise Exception("Invalid VendorElemAdd() accepted")
5110     except dbus.exceptions.DBusException, e:
5111         if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5112             raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5113
5114     try:
5115         ie = dbus.ByteArray("")
5116         iface.VendorElemAdd(1, ie)
5117         raise Exception("Invalid VendorElemAdd() accepted")
5118     except dbus.exceptions.DBusException, e:
5119         if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5120             raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5121
5122     try:
5123         ie = dbus.ByteArray("\x00\x01")
5124         iface.VendorElemAdd(1, ie)
5125         raise Exception("Invalid VendorElemAdd() accepted")
5126     except dbus.exceptions.DBusException, e:
5127         if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5128             raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5129
5130     try:
5131         iface.VendorElemGet(-1)
5132         raise Exception("Invalid VendorElemGet() accepted")
5133     except dbus.exceptions.DBusException, e:
5134         if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5135             raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5136
5137     try:
5138         iface.VendorElemGet(1)
5139         raise Exception("Invalid VendorElemGet() accepted")
5140     except dbus.exceptions.DBusException, e:
5141         if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5142             raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5143
5144     try:
5145         ie = dbus.ByteArray("\x00\x00")
5146         iface.VendorElemRem(-1, ie)
5147         raise Exception("Invalid VendorElemRemove() accepted")
5148     except dbus.exceptions.DBusException, e:
5149         if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5150             raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5151
5152     try:
5153         ie = dbus.ByteArray("")
5154         iface.VendorElemRem(1, ie)
5155         raise Exception("Invalid VendorElemRemove() accepted")
5156     except dbus.exceptions.DBusException, e:
5157         if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5158             raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5159
5160     iface.VendorElemRem(1, "*")
5161
5162     ie = dbus.ByteArray("\x00\x01\x00")
5163     iface.VendorElemAdd(1, ie)
5164
5165     val = iface.VendorElemGet(1)
5166     if len(val) != len(ie):
5167         raise Exception("Unexpected VendorElemGet length")
5168     for i in range(len(val)):
5169         if val[i] != dbus.Byte(ie[i]):
5170             raise Exception("Unexpected VendorElemGet data")
5171
5172     ie2 = dbus.ByteArray("\xe0\x00")
5173     iface.VendorElemAdd(1, ie2)
5174
5175     ies = ie + ie2
5176     val = iface.VendorElemGet(1)
5177     if len(val) != len(ies):
5178         raise Exception("Unexpected VendorElemGet length[2]")
5179     for i in range(len(val)):
5180         if val[i] != dbus.Byte(ies[i]):
5181             raise Exception("Unexpected VendorElemGet data[2]")
5182
5183     try:
5184         test_ie = dbus.ByteArray("\x01\x01")
5185         iface.VendorElemRem(1, test_ie)
5186         raise Exception("Invalid VendorElemRemove() accepted")
5187     except dbus.exceptions.DBusException, e:
5188         if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5189             raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5190
5191     iface.VendorElemRem(1, ie)
5192     val = iface.VendorElemGet(1)
5193     if len(val) != len(ie2):
5194         raise Exception("Unexpected VendorElemGet length[3]")
5195
5196     iface.VendorElemRem(1, "*")
5197     try:
5198         iface.VendorElemGet(1)
5199         raise Exception("Invalid VendorElemGet() accepted after removal")
5200     except dbus.exceptions.DBusException, e:
5201         if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5202             raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5203
5204 def test_dbus_assoc_reject(dev, apdev):
5205     """D-Bus AssocStatusCode"""
5206     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5207     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5208
5209     ssid = "test-open"
5210     params = { "ssid": ssid,
5211                "max_listen_interval": "1" }
5212     hapd = hostapd.add_ap(apdev[0], params)
5213
5214     class TestDbusConnect(TestDbus):
5215         def __init__(self, bus):
5216             TestDbus.__init__(self, bus)
5217             self.assoc_status_seen = False
5218             self.state = 0
5219
5220         def __enter__(self):
5221             gobject.timeout_add(1, self.run_connect)
5222             gobject.timeout_add(15000, self.timeout)
5223             self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5224                             "PropertiesChanged")
5225             self.loop.run()
5226             return self
5227
5228         def propertiesChanged(self, properties):
5229             logger.debug("propertiesChanged: %s" % str(properties))
5230             if 'AssocStatusCode' in properties:
5231                 status = properties['AssocStatusCode']
5232                 if status != 51:
5233                     logger.info("Unexpected status code: " + str(status))
5234                 else:
5235                     self.assoc_status_seen = True
5236                 iface.Disconnect()
5237                 self.loop.quit()
5238
5239         def run_connect(self, *args):
5240             args = dbus.Dictionary({ 'ssid': ssid,
5241                                      'key_mgmt': 'NONE',
5242                                      'scan_freq': 2412 },
5243                                    signature='sv')
5244             self.netw = iface.AddNetwork(args)
5245             iface.SelectNetwork(self.netw)
5246             return False
5247
5248         def success(self):
5249             return self.assoc_status_seen
5250
5251     with TestDbusConnect(bus) as t:
5252         if not t.success():
5253             raise Exception("Expected signals not seen")