Updated through tag hostap_2_5 from git://w1.fi/hostap.git
[mech_eap.git] / libeap / tests / hwsim / test_dbus_old.py
1 # wpa_supplicant D-Bus old interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import logging
8 logger = logging.getLogger()
9
10 try:
11     import gobject
12     import dbus
13     dbus_imported = True
14 except ImportError:
15     dbus_imported = False
16
17 import hostapd
18 from utils import HwsimSkip
19 from test_dbus import TestDbus, alloc_fail_dbus, start_ap
20
21 WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
22 WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
23 WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
24 WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
25 WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
26
27 def prepare_dbus(dev):
28     if not dbus_imported:
29         raise HwsimSkip("No dbus module available")
30     try:
31         from dbus.mainloop.glib import DBusGMainLoop
32         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
33         bus = dbus.SystemBus()
34         wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
35         wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
36         path = wpas.getInterface(dev.ifname)
37         if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
38         return (bus,wpas_obj,path,if_obj)
39     except Exception, e:
40         raise HwsimSkip("Could not connect to D-Bus: %s" % e)
41
42 class TestDbusOldWps(TestDbus):
43     def __init__(self, bus):
44         TestDbus.__init__(self, bus)
45         self.event_ok = False
46
47     def __enter__(self):
48         gobject.timeout_add(1, self.run_wps)
49         gobject.timeout_add(15000, self.timeout)
50         self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
51         self.loop.run()
52         return self
53
54     def wpsCred(self, cred):
55         logger.debug("wpsCred: " + str(cred))
56         self.event_ok = True
57         self.loop.quit()
58
59     def success(self):
60         return self.event_ok
61
62 def test_dbus_old(dev, apdev):
63     """The old D-Bus interface"""
64     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
65
66     res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
67     logger.debug("capabilities(): " + str(res))
68     if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
69         raise Exception("Unexpected capabilities")
70     res2 = if_obj.capabilities(dbus.Boolean(True),
71                                dbus_interface=WPAS_DBUS_OLD_IFACE)
72     logger.debug("capabilities(strict): " + str(res2))
73     res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
74     logger.debug("State: " + res)
75
76     res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
77     if res != 0:
78         raise Exception("Unexpected scanning: " + str(res))
79
80     if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
81
82     for t in [ dbus.UInt32(123), "foo" ]:
83         try:
84             if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
85             raise Exception("Invalid setAPScan() accepted")
86         except dbus.exceptions.DBusException, e:
87             if "InvalidOptions" not in str(e):
88                 raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
89
90     for p in [ path + "/Networks/12345",
91                path + "/Networks/foo" ]:
92         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
93         try:
94             obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
95             raise Exception("Invalid disable() accepted")
96         except dbus.exceptions.DBusException, e:
97             if "InvalidNetwork" not in str(e):
98                 raise Exception("Unexpected error message for invalid disable: " + str(e))
99
100     for p in [ path + "/BSSIDs/foo",
101                path + "/BSSIDs/001122334455"]:
102         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
103         try:
104             obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
105             raise Exception("Invalid properties() accepted")
106         except dbus.exceptions.DBusException, e:
107             if "InvalidBSSID" not in str(e):
108                 raise Exception("Unexpected error message for invalid properties: " + str(e))
109
110 def test_dbus_old_scan(dev, apdev):
111     """The old D-Bus interface - scanning"""
112     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
113
114     hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
115
116     params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
117     params['wpa'] = '3'
118     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
119
120     class TestDbusScan(TestDbus):
121         def __init__(self, bus):
122             TestDbus.__init__(self, bus)
123             self.scan_completed = False
124
125         def __enter__(self):
126             gobject.timeout_add(1, self.run_scan)
127             gobject.timeout_add(7000, self.timeout)
128             self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
129                             "ScanResultsAvailable")
130             self.loop.run()
131             return self
132
133         def scanDone(self):
134             logger.debug("scanDone")
135             self.scan_completed = True
136             self.loop.quit()
137
138         def run_scan(self, *args):
139             logger.debug("run_scan")
140             if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
141                 raise Exception("Failed to trigger scan")
142             return False
143
144         def success(self):
145             return self.scan_completed
146
147     with TestDbusScan(bus) as t:
148         if not t.success():
149             raise Exception("Expected signals not seen")
150
151     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
152     if len(res) != 2:
153         raise Exception("Unexpected number of scan results: " + str(res))
154     for i in range(2):
155         logger.debug("Scan result BSS path: " + res[i])
156         bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
157         bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
158                                  byte_arrays=True)
159         logger.debug("BSS: " + str(bss))
160
161     obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
162     try:
163         bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
164         raise Exception("Unknown BSSID method accepted")
165     except Exception, e:
166         logger.debug("Unknown BSSID method exception: " + str(e))
167
168     if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
169         raise Exception("Failed to issue flush(0)")
170     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
171     if len(res) != 0:
172         raise Exception("Unexpected BSS entry after flush")
173     if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
174         raise Exception("Failed to issue flush(1)")
175     try:
176         if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
177         raise Exception("Invalid flush arguments accepted")
178     except dbus.exceptions.DBusException, e:
179         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
180             raise Exception("Unexpected error message for invalid flush: " + str(e))
181     try:
182         bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
183                            byte_arrays=True)
184     except dbus.exceptions.DBusException, e:
185         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
186             raise Exception("Unexpected error message for invalid BSS: " + str(e))
187
188 def test_dbus_old_debug(dev, apdev):
189     """The old D-Bus interface - debug"""
190     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
191     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
192
193     try:
194         wpas.setDebugParams(123)
195         raise Exception("Invalid setDebugParams accepted")
196     except dbus.exceptions.DBusException, e:
197         if "InvalidOptions" not in str(e):
198             raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
199
200     try:
201         wpas.setDebugParams(123, True, True)
202         raise Exception("Invalid setDebugParams accepted")
203     except dbus.exceptions.DBusException, e:
204         if "InvalidOptions" not in str(e):
205             raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
206
207     wpas.setDebugParams(1, True, True)
208     dev[0].request("LOG_LEVEL MSGDUMP")
209
210 def test_dbus_old_smartcard(dev, apdev):
211     """The old D-Bus interface - smartcard"""
212     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
213
214     params = dbus.Dictionary(signature='sv')
215     if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
216
217     params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
218                                'pkcs11_engine_path': "foobar2",
219                                'pkcs11_module_path': "foobar3",
220                                'foo': 'bar' },
221                              signature='sv')
222     params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
223                                 'foo': 'bar' },
224                               signature='sv')
225     params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
226                                 'foo2': 'bar' },
227                               signature='sv')
228     params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
229                                 'foo3': 'bar' },
230                               signature='sv')
231     tests = [ 1, params, params2, params3, params4 ]
232     for t in tests:
233         try:
234             if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
235             raise Exception("Invalid setSmartcardModules accepted: " + str(t))
236         except dbus.exceptions.DBusException, e:
237             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
238                 raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
239
240 def test_dbus_old_smartcard_oom(dev, apdev):
241     """The old D-Bus interface - smartcard (OOM)"""
242     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
243
244     for arg in [ 'opensc_engine_path', 'pkcs11_engine_path', 'pkcs11_module_path' ]:
245         with alloc_fail_dbus(dev[0], 1,
246                              "=wpas_dbus_iface_set_smartcard_modules",
247                              "setSmartcardModules",
248                              "InvalidOptions"):
249             params = dbus.Dictionary({ arg : "foo", }, signature='sv')
250             if_obj.setSmartcardModules(params,
251                                        dbus_interface=WPAS_DBUS_OLD_IFACE)
252
253     with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_smartcard_modules",
254                          "setSmartcardModules", "InvalidOptions"):
255         params = dbus.Dictionary({ arg : "foo", }, signature='sv')
256         if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
257
258 def test_dbus_old_interface(dev, apdev):
259     """The old D-Bus interface - interface get/add/remove"""
260     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
261     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
262
263     tests = [ (123, "InvalidOptions"),
264               ("foo", "InvalidInterface") ]
265     for (ifname,err) in tests:
266         try:
267             wpas.getInterface(ifname)
268             raise Exception("Invalid getInterface accepted")
269         except dbus.exceptions.DBusException, e:
270             if err not in str(e):
271                 raise Exception("Unexpected error message for invalid getInterface: " + str(e))
272
273     params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
274     wpas.addInterface("lo", params)
275     path = wpas.getInterface("lo")
276     logger.debug("New interface path: " + str(path))
277     wpas.removeInterface(path)
278     try:
279         wpas.removeInterface(path)
280         raise Exception("Invalid removeInterface() accepted")
281     except dbus.exceptions.DBusException, e:
282         if "InvalidInterface" not in str(e):
283             raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
284
285     params1 = dbus.Dictionary({ 'driver': 'foo',
286                                 'driver-params': 'foo',
287                                 'config-file': 'foo',
288                                 'bridge-ifname': 'foo' },
289                               signature='sv')
290     params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
291     tests = [ (123, None, "InvalidOptions"),
292               ("", None, "InvalidOptions"),
293               ("foo", None, "AddError"),
294               ("foo", params1, "AddError"),
295               ("foo", params2, "InvalidOptions"),
296               ("foo", 1234, "InvalidOptions"),
297               (dev[0].ifname, None, "ExistsError" ) ]
298     for (ifname,params,err) in tests:
299         try:
300             if params is None:
301                 wpas.addInterface(ifname)
302             else:
303                 wpas.addInterface(ifname, params)
304             raise Exception("Invalid addInterface accepted: " + str(params))
305         except dbus.exceptions.DBusException, e:
306             if err not in str(e):
307                 raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
308
309     try:
310         wpas.removeInterface(123)
311         raise Exception("Invalid removeInterface accepted")
312     except dbus.exceptions.DBusException, e:
313         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
314             raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
315
316 def test_dbus_old_interface_oom(dev, apdev):
317     """The old D-Bus interface - interface get/add/remove (OOM)"""
318     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
319     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
320
321     with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_global_add_interface",
322                          "addInterface", "InvalidOptions"):
323         params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
324         wpas.addInterface("lo", params)
325
326     for arg in [ "driver", "driver-params", "config-file", "bridge-ifname" ]:
327         with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_global_add_interface",
328                              "addInterface", "InvalidOptions"):
329             params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
330             wpas.addInterface("lo", params)
331
332 def test_dbus_old_blob(dev, apdev):
333     """The old D-Bus interface - blob operations"""
334     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
335
336     param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
337     param2 = dbus.Dictionary({ 'blob3': "foo" })
338     param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
339                              signature='sv')
340     tests = [ (1, "InvalidOptions"),
341               (param1, "InvalidOptions"),
342               (param2, "InvalidOptions"),
343               (param3, "InvalidOptions") ]
344     for (arg,err) in tests:
345         try:
346             if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
347             raise Exception("Invalid setBlobs() accepted: " + str(arg))
348         except dbus.exceptions.DBusException, e:
349             logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
350             if err not in str(e):
351                 raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
352
353     tests = [ (["foo"], "RemoveError: Error removing blob"),
354               ([""], "RemoveError: Invalid blob name"),
355               ([1], "InvalidOptions"),
356               ("foo", "InvalidOptions") ]
357     for (arg,err) in tests:
358         try:
359             if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
360             raise Exception("Invalid removeBlobs() accepted: " + str(arg))
361         except dbus.exceptions.DBusException, e:
362             logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
363             if err not in str(e):
364                 raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
365
366     blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
367                               'blob2': dbus.ByteArray([ 1, 2 ]) },
368                             signature='sv')
369     if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
370     if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
371
372 def test_dbus_old_blob_oom(dev, apdev):
373     """The old D-Bus interface - blob operations (OOM)"""
374     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
375
376     blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
377                               'blob2': dbus.ByteArray([ 1, 2 ]) },
378                             signature='sv')
379
380     with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_blobs", "setBlobs",
381                          "AddError: Not enough memory to add blob"):
382         if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
383
384     with alloc_fail_dbus(dev[0], 2, "=wpas_dbus_iface_set_blobs", "setBlobs",
385                          "AddError: Not enough memory to add blob data"):
386         if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
387
388     with alloc_fail_dbus(dev[0], 3, "=wpas_dbus_iface_set_blobs", "setBlobs",
389                          "AddError: Error adding blob"):
390         if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
391
392     with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_decompose_object_path;wpas_iface_message_handler",
393                          "setBlobs",
394                          "InvalidInterface: wpa_supplicant knows nothing about this interface"):
395         if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
396
397 def test_dbus_old_connect(dev, apdev):
398     """The old D-Bus interface - add a network and connect"""
399     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
400
401     ssid = "test-wpa2-psk"
402     passphrase = 'qwertyuiop'
403     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
404     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
405
406     for p in [ "/no/where/to/be/found",
407                path + "/Networks/12345",
408                path + "/Networks/foo",
409                "/fi/epitest/hostap/WPASupplicant/Interfaces",
410                "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
411         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
412         try:
413             if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
414             raise Exception("Invalid removeNetwork accepted: " + p)
415         except dbus.exceptions.DBusException, e:
416             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
417                 raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
418
419     try:
420         if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
421         raise Exception("Invalid removeNetwork accepted")
422     except dbus.exceptions.DBusException, e:
423         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
424             raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
425
426     try:
427         if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
428         raise Exception("Invalid removeNetwork accepted")
429     except dbus.exceptions.DBusException, e:
430         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
431             raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
432
433     tests = [ (path, "InvalidNetwork"),
434               (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
435                "InvalidInterface"),
436               (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
437                "InvalidNetwork"),
438               (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
439                "InvalidNetwork"),
440               (1, "InvalidOptions") ]
441     for t,err in tests:
442         try:
443             if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
444             raise Exception("Invalid selectNetwork accepted: " + str(t))
445         except dbus.exceptions.DBusException, e:
446             if err not in str(e):
447                 raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
448
449     npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
450     if not npath.startswith(WPAS_DBUS_OLD_PATH):
451         raise Exception("Unexpected addNetwork result: " + path)
452     netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
453     tests = [ 123,
454               dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
455     for t in tests:
456         try:
457             netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
458             raise Exception("Invalid set() accepted: " + str(t))
459         except dbus.exceptions.DBusException, e:
460             if "InvalidOptions" not in str(e):
461                 raise Exception("Unexpected error message for invalid set: " + str(e))
462     params = dbus.Dictionary({ 'ssid': ssid,
463                                'key_mgmt': 'WPA-PSK',
464                                'psk': passphrase,
465                                'identity': dbus.ByteArray([ 1, 2 ]),
466                                'priority': dbus.Int32(0),
467                                'scan_freq': dbus.UInt32(2412) },
468                              signature='sv')
469     netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
470     id = int(dev[0].list_networks()[0]['id'])
471     val = dev[0].get_network(id, "scan_freq")
472     if val != "2412":
473         raise Exception("Invalid scan_freq value: " + str(val))
474     params = dbus.Dictionary({ 'scan_freq': "2412 2432",
475                                'freq_list': "2412 2417 2432" },
476                              signature='sv')
477     netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
478     val = dev[0].get_network(id, "scan_freq")
479     if val != "2412 2432":
480         raise Exception("Invalid scan_freq value (2): " + str(val))
481     val = dev[0].get_network(id, "freq_list")
482     if val != "2412 2417 2432":
483         raise Exception("Invalid freq_list value: " + str(val))
484     if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
485
486     class TestDbusConnect(TestDbus):
487         def __init__(self, bus):
488             TestDbus.__init__(self, bus)
489             self.state = 0
490
491         def __enter__(self):
492             gobject.timeout_add(1, self.run_connect)
493             gobject.timeout_add(15000, self.timeout)
494             self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
495                             "ScanResultsAvailable")
496             self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
497                             "StateChange")
498             self.loop.run()
499             return self
500
501         def scanDone(self):
502             logger.debug("scanDone")
503
504         def stateChange(self, new, old):
505             logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
506             if new == "COMPLETED":
507                 if self.state == 0:
508                     self.state = 1
509                     self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
510                 elif self.state == 2:
511                     self.state = 3
512                     if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
513                 elif self.state == 4:
514                     self.state = 5
515                     if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
516                 elif self.state == 6:
517                     self.state = 7
518                     if_obj.removeNetwork(self.path,
519                                          dbus_interface=WPAS_DBUS_OLD_IFACE)
520                     try:
521                         if_obj.removeNetwork(self.path,
522                                              dbus_interface=WPAS_DBUS_OLD_IFACE)
523                         raise Exception("Invalid removeNetwork accepted")
524                     except dbus.exceptions.DBusException, e:
525                         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
526                             raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
527
528                     self.loop.quit()
529             elif new == "DISCONNECTED":
530                 if self.state == 1:
531                     self.state = 2
532                     self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
533                 elif self.state == 3:
534                     self.state = 4
535                     if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
536                 elif self.state == 5:
537                     self.state = 6
538                     if_obj.selectNetwork(self.path,
539                                          dbus_interface=WPAS_DBUS_OLD_IFACE)
540
541         def run_connect(self, *args):
542             logger.debug("run_connect")
543             path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
544             netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
545             netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
546             params = dbus.Dictionary({ 'ssid': ssid,
547                                        'key_mgmt': 'WPA-PSK',
548                                        'psk': passphrase,
549                                        'scan_freq': 2412 },
550                                      signature='sv')
551             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
552             netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
553             self.path = path
554             self.netw_obj = netw_obj
555             return False
556
557         def success(self):
558             return self.state == 7
559
560     with TestDbusConnect(bus) as t:
561         if not t.success():
562             raise Exception("Expected signals not seen")
563
564     if len(dev[0].list_networks()) != 0:
565         raise Exception("Unexpected network")
566
567 def test_dbus_old_connect_eap(dev, apdev):
568     """The old D-Bus interface - add an EAP network and connect"""
569     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
570
571     ssid = "test-wpa2-eap"
572     params = hostapd.wpa2_eap_params(ssid=ssid)
573     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
574
575     class TestDbusConnect(TestDbus):
576         def __init__(self, bus):
577             TestDbus.__init__(self, bus)
578             self.connected = False
579             self.certification_received = False
580
581         def __enter__(self):
582             gobject.timeout_add(1, self.run_connect)
583             gobject.timeout_add(15000, self.timeout)
584             self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
585                             "StateChange")
586             self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
587                             "Certification")
588             self.loop.run()
589             return self
590
591         def stateChange(self, new, old):
592             logger.debug("stateChange: %s --> %s" % (old, new))
593             if new == "COMPLETED":
594                 self.connected = True
595                 self.loop.quit()
596
597         def certification(self, depth, subject, hash, cert_hex):
598             logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
599             self.certification_received = True
600
601         def run_connect(self, *args):
602             logger.debug("run_connect")
603             path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
604             netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
605             params = dbus.Dictionary({ 'ssid': ssid,
606                                        'key_mgmt': 'WPA-EAP',
607                                        'eap': 'TTLS',
608                                        'anonymous_identity': 'ttls',
609                                        'identity': 'pap user',
610                                        'ca_cert': 'auth_serv/ca.pem',
611                                        'phase2': 'auth=PAP',
612                                        'password': 'password',
613                                        'scan_freq': 2412 },
614                                      signature='sv')
615             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
616             netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
617             self.path = path
618             self.netw_obj = netw_obj
619             return False
620
621         def success(self):
622             return self.connected and self.certification_received
623
624     with TestDbusConnect(bus) as t:
625         if not t.success():
626             raise Exception("Expected signals not seen")
627
628 def test_dbus_old_network_set(dev, apdev):
629     """The old D-Bus interface and network set method"""
630     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
631
632     path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
633     netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
634     netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
635
636     params = dbus.Dictionary({ 'priority': dbus.UInt64(1) }, signature='sv')
637     try:
638         netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
639         raise Exception("set succeeded with unexpected type")
640     except dbus.exceptions.DBusException, e:
641         if "InvalidOptions" not in str(e):
642             raise Exception("Unexpected error message for unexpected type: " + str(e))
643
644 def test_dbus_old_wps_pbc(dev, apdev):
645     """The old D-Bus interface and WPS/PBC"""
646     try:
647         _test_dbus_old_wps_pbc(dev, apdev)
648     finally:
649         dev[0].request("SET wps_cred_processing 0")
650
651 def _test_dbus_old_wps_pbc(dev, apdev):
652     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
653
654     dev[0].flush_scan_cache()
655     hapd = start_ap(apdev[0])
656     hapd.request("WPS_PBC")
657     bssid = apdev[0]['bssid']
658     dev[0].scan_for_bss(bssid, freq="2412")
659     dev[0].request("SET wps_cred_processing 2")
660
661     for arg in [ 123, "123" ]:
662         try:
663             if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
664             raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
665         except dbus.exceptions.DBusException, e:
666             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
667                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
668
669     class TestDbusWps(TestDbusOldWps):
670         def __init__(self, bus, pbc_param):
671             TestDbusOldWps.__init__(self, bus)
672             self.pbc_param = pbc_param
673
674         def run_wps(self, *args):
675             logger.debug("run_wps: pbc_param=" + self.pbc_param)
676             if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
677             return False
678
679     with TestDbusWps(bus, "any") as t:
680         if not t.success():
681             raise Exception("Expected signals not seen")
682
683     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
684     if len(res) != 1:
685         raise Exception("Unexpected number of scan results: " + str(res))
686     for i in range(1):
687         logger.debug("Scan result BSS path: " + res[i])
688         bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
689         bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
690                                  byte_arrays=True)
691         logger.debug("BSS: " + str(bss))
692
693     dev[0].wait_connected(timeout=10)
694     dev[0].request("DISCONNECT")
695     dev[0].wait_disconnected(timeout=10)
696     dev[0].request("FLUSH")
697
698     hapd.request("WPS_PBC")
699     dev[0].scan_for_bss(bssid, freq="2412")
700
701     with TestDbusWps(bus, bssid) as t:
702         if not t.success():
703             raise Exception("Expected signals not seen")
704
705     dev[0].wait_connected(timeout=10)
706     dev[0].request("DISCONNECT")
707     dev[0].wait_disconnected(timeout=10)
708
709     hapd.disable()
710     dev[0].flush_scan_cache()
711
712 def test_dbus_old_wps_pin(dev, apdev):
713     """The old D-Bus interface and WPS/PIN"""
714     try:
715         _test_dbus_old_wps_pin(dev, apdev)
716     finally:
717         dev[0].request("SET wps_cred_processing 0")
718
719 def _test_dbus_old_wps_pin(dev, apdev):
720     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
721
722     hapd = start_ap(apdev[0])
723     hapd.request("WPS_PIN any 12345670")
724     bssid = apdev[0]['bssid']
725     dev[0].scan_for_bss(bssid, freq="2412")
726     dev[0].request("SET wps_cred_processing 2")
727
728     for arg in [ (123, "12345670"),
729                  ("123", "12345670") ]:
730         try:
731             if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
732             raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
733         except dbus.exceptions.DBusException, e:
734             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
735                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
736
737     class TestDbusWps(TestDbusOldWps):
738         def __init__(self, bus, bssid, pin):
739             TestDbusOldWps.__init__(self, bus)
740             self.bssid = bssid
741             self.pin = pin
742
743         def run_wps(self, *args):
744             logger.debug("run_wps %s %s" % (self.bssid, self.pin))
745             pin = if_obj.wpsPin(self.bssid, self.pin,
746                                 dbus_interface=WPAS_DBUS_OLD_IFACE)
747             if len(self.pin) == 0:
748                 h = hostapd.Hostapd(apdev[0]['ifname'])
749                 h.request("WPS_PIN any " + pin)
750             return False
751
752     with TestDbusWps(bus, bssid, "12345670") as t:
753         if not t.success():
754             raise Exception("Expected signals not seen")
755
756     dev[0].wait_connected(timeout=10)
757     dev[0].request("DISCONNECT")
758     dev[0].wait_disconnected(timeout=10)
759     dev[0].request("FLUSH")
760
761     dev[0].scan_for_bss(bssid, freq="2412")
762
763     with TestDbusWps(bus, "any", "") as t:
764         if not t.success():
765             raise Exception("Expected signals not seen")
766
767 def test_dbus_old_wps_reg(dev, apdev):
768     """The old D-Bus interface and WPS/Registar"""
769     try:
770         _test_dbus_old_wps_reg(dev, apdev)
771     finally:
772         dev[0].request("SET wps_cred_processing 0")
773
774 def _test_dbus_old_wps_reg(dev, apdev):
775     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
776
777     hapd = start_ap(apdev[0])
778     bssid = apdev[0]['bssid']
779     dev[0].scan_for_bss(bssid, freq="2412")
780     dev[0].request("SET wps_cred_processing 2")
781
782     for arg in [ (123, "12345670"),
783                  ("123", "12345670") ]:
784         try:
785             if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
786             raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
787         except dbus.exceptions.DBusException, e:
788             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
789                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
790
791     class TestDbusWps(TestDbusOldWps):
792         def run_wps(self, *args):
793             logger.debug("run_wps")
794             if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
795             return False
796
797     with TestDbusWps(bus) as t:
798         if not t.success():
799             raise Exception("Expected signals not seen")
800
801     dev[0].wait_connected(timeout=10)
802
803 def test_dbus_old_wps_oom(dev, apdev):
804     """The old D-Bus interface and WPS (OOM)"""
805     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
806     bssid = apdev[0]['bssid']
807
808     with alloc_fail_dbus(dev[0], 1,
809                          "=wpa_config_add_network;wpas_dbus_iface_wps_pbc",
810                          "wpsPbc",
811                          "WpsPbcError: Could not start PBC negotiation"):
812         if_obj.wpsPbc("any", dbus_interface=WPAS_DBUS_OLD_IFACE)
813
814     with alloc_fail_dbus(dev[0], 1,
815                          "=wpa_config_add_network;wpas_dbus_iface_wps_pin",
816                          "wpsPin", "WpsPinError: Could not init PIN"):
817         if_obj.wpsPin("any", "", dbus_interface=WPAS_DBUS_OLD_IFACE)
818
819     with alloc_fail_dbus(dev[0], 1,
820                          "=wpa_config_add_network;wpas_dbus_iface_wps_reg",
821                          "wpsReg",
822                          "WpsRegError: Could not request credentials"):
823         if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
824
825 def test_dbus_old_network_set_oom(dev, apdev):
826     """The old D-Bus interface and network set method (OOM)"""
827     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
828
829     with alloc_fail_dbus(dev[0], 1,
830                          "=wpa_config_add_network;wpas_dbus_iface_add_network",
831                          "addNetwork",
832                          "AddNetworkError: wpa_supplicant could not add"):
833         if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
834
835     path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
836     netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
837     netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
838
839     with alloc_fail_dbus(dev[0], 1,
840                          "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_network",
841                          "set", "InvalidOptions"):
842         params = dbus.Dictionary({ 'ssid': "foo" }, signature='sv')
843         netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
844
845     tests = [ { 'identity': dbus.ByteArray([ 1, 2 ]) },
846               { 'scan_freq': dbus.UInt32(2412) },
847               { 'priority': dbus.Int32(0) },
848               { 'identity': "user" },
849               { 'eap': "TLS" }]
850     for arg in tests:
851         with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_network",
852                              "set", "InvalidOptions"):
853             params = dbus.Dictionary(arg, signature='sv')
854             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)