7f29c5a07664c0068ab8ed13e211c04f25b9875a
[mech_eap.git] / tests / hwsim / test_dbus_old.py
1 # wpa_supplicant D-Bus old interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import gobject
8 import logging
9 logger = logging.getLogger()
10
11 try:
12     import dbus
13     dbus_imported = True
14 except ImportError:
15     dbus_imported = False
16
17 import hostapd
18 from test_dbus import TestDbus, start_ap
19
20 WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
21 WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
22 WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
23 WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
24 WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
25
26 def prepare_dbus(dev):
27     if not dbus_imported:
28         logger.info("No dbus module available")
29         raise Exception("hwsim-SKIP")
30     try:
31         from dbus.mainloop.glib import DBusGMainLoop
32         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
33         bus = dbus.SystemBus()
34         wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
35         wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
36         path = wpas.getInterface(dev.ifname)
37         if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
38         return (bus,wpas_obj,path,if_obj)
39     except Exception, e:
40         logger.info("No D-Bus support available: " + str(e))
41         raise Exception("hwsim-SKIP")
42
43 class TestDbusOldWps(TestDbus):
44     def __init__(self, bus):
45         TestDbus.__init__(self, bus)
46         self.event_ok = False
47
48     def __enter__(self):
49         gobject.timeout_add(1, self.run_wps)
50         gobject.timeout_add(15000, self.timeout)
51         self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
52         self.loop.run()
53         return self
54
55     def wpsCred(self, cred):
56         logger.debug("wpsCred: " + str(cred))
57         self.event_ok = True
58         self.loop.quit()
59
60     def success(self):
61         return self.event_ok
62
63 def test_dbus_old(dev, apdev):
64     """The old D-Bus interface"""
65     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
66
67     res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
68     logger.debug("capabilities(): " + str(res))
69     if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
70         raise Exception("Unexpected capabilities")
71     res2 = if_obj.capabilities(dbus.Boolean(True),
72                                dbus_interface=WPAS_DBUS_OLD_IFACE)
73     logger.debug("capabilities(strict): " + str(res2))
74     res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
75     logger.debug("State: " + res)
76
77     res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
78     if res != 0:
79         raise Exception("Unexpected scanning: " + str(res))
80
81     if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
82
83     for t in [ dbus.UInt32(123), "foo" ]:
84         try:
85             if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
86             raise Exception("Invalid setAPScan() accepted")
87         except dbus.exceptions.DBusException, e:
88             if "InvalidOptions" not in str(e):
89                 raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
90
91     for p in [ path + "/Networks/12345",
92                path + "/Networks/foo" ]:
93         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
94         try:
95             obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
96             raise Exception("Invalid disable() accepted")
97         except dbus.exceptions.DBusException, e:
98             if "InvalidNetwork" not in str(e):
99                 raise Exception("Unexpected error message for invalid disable: " + str(e))
100
101     for p in [ path + "/BSSIDs/foo",
102                path + "/BSSIDs/001122334455"]:
103         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
104         try:
105             obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
106             raise Exception("Invalid properties() accepted")
107         except dbus.exceptions.DBusException, e:
108             if "InvalidBSSID" not in str(e):
109                 raise Exception("Unexpected error message for invalid properties: " + str(e))
110
111 def test_dbus_old_scan(dev, apdev):
112     """The old D-Bus interface - scanning"""
113     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
114
115     hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
116
117     params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
118     params['wpa'] = '3'
119     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
120
121     class TestDbusScan(TestDbus):
122         def __init__(self, bus):
123             TestDbus.__init__(self, bus)
124             self.scan_completed = False
125
126         def __enter__(self):
127             gobject.timeout_add(1, self.run_scan)
128             gobject.timeout_add(7000, self.timeout)
129             self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
130                             "ScanResultsAvailable")
131             self.loop.run()
132             return self
133
134         def scanDone(self):
135             logger.debug("scanDone")
136             self.scan_completed = True
137             self.loop.quit()
138
139         def run_scan(self, *args):
140             logger.debug("run_scan")
141             if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
142                 raise Exception("Failed to trigger scan")
143             return False
144
145         def success(self):
146             return self.scan_completed
147
148     with TestDbusScan(bus) as t:
149         if not t.success():
150             raise Exception("Expected signals not seen")
151
152     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
153     if len(res) != 2:
154         raise Exception("Unexpected number of scan results: " + str(res))
155     for i in range(2):
156         logger.debug("Scan result BSS path: " + res[i])
157         bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
158         bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
159                                  byte_arrays=True)
160         logger.debug("BSS: " + str(bss))
161
162     obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
163     try:
164         bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
165         raise Exception("Unknown BSSID method accepted")
166     except Exception, e:
167         logger.debug("Unknown BSSID method exception: " + str(e))
168
169     if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
170         raise Exception("Failed to issue flush(0)")
171     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
172     if len(res) != 0:
173         raise Exception("Unexpected BSS entry after flush")
174     if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
175         raise Exception("Failed to issue flush(1)")
176     try:
177         if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
178         raise Exception("Invalid flush arguments accepted")
179     except dbus.exceptions.DBusException, e:
180         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
181             raise Exception("Unexpected error message for invalid flush: " + str(e))
182     try:
183         bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
184                            byte_arrays=True)
185     except dbus.exceptions.DBusException, e:
186         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
187             raise Exception("Unexpected error message for invalid BSS: " + str(e))
188
189 def test_dbus_old_debug(dev, apdev):
190     """The old D-Bus interface - debug"""
191     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
192     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
193
194     try:
195         wpas.setDebugParams(123)
196         raise Exception("Invalid setDebugParams accepted")
197     except dbus.exceptions.DBusException, e:
198         if "InvalidOptions" not in str(e):
199             raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
200
201     try:
202         wpas.setDebugParams(123, True, True)
203         raise Exception("Invalid setDebugParams accepted")
204     except dbus.exceptions.DBusException, e:
205         if "InvalidOptions" not in str(e):
206             raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
207
208     wpas.setDebugParams(1, True, True)
209     dev[0].request("LOG_LEVEL MSGDUMP")
210
211 def test_dbus_old_smartcard(dev, apdev):
212     """The old D-Bus interface - smartcard"""
213     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
214
215     params = dbus.Dictionary(signature='sv')
216     if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
217
218     params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
219                                'pkcs11_engine_path': "foobar2",
220                                'pkcs11_module_path': "foobar3",
221                                'foo': 'bar' },
222                              signature='sv')
223     params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
224                                 'foo': 'bar' },
225                               signature='sv')
226     params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
227                                 'foo2': 'bar' },
228                               signature='sv')
229     params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
230                                 'foo3': 'bar' },
231                               signature='sv')
232     tests = [ 1, params, params2, params3, params4 ]
233     for t in tests:
234         try:
235             if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
236             raise Exception("Invalid setSmartcardModules accepted: " + str(t))
237         except dbus.exceptions.DBusException, e:
238             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
239                 raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
240
241 def test_dbus_old_interface(dev, apdev):
242     """The old D-Bus interface - interface get/add/remove"""
243     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
244     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
245
246     tests = [ (123, "InvalidOptions"),
247               ("foo", "InvalidInterface") ]
248     for (ifname,err) in tests:
249         try:
250             wpas.getInterface(ifname)
251             raise Exception("Invalid getInterface accepted")
252         except dbus.exceptions.DBusException, e:
253             if err not in str(e):
254                 raise Exception("Unexpected error message for invalid getInterface: " + str(e))
255
256     params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
257     wpas.addInterface("lo", params)
258     path = wpas.getInterface("lo")
259     logger.debug("New interface path: " + str(path))
260     wpas.removeInterface(path)
261     try:
262         wpas.removeInterface(path)
263         raise Exception("Invalid removeInterface() accepted")
264     except dbus.exceptions.DBusException, e:
265         if "InvalidInterface" not in str(e):
266             raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
267
268     params1 = dbus.Dictionary({ 'driver': 'foo',
269                                 'driver-params': 'foo',
270                                 'config-file': 'foo',
271                                 'bridge-ifname': 'foo' },
272                               signature='sv')
273     params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
274     tests = [ (123, None, "InvalidOptions"),
275               ("", None, "InvalidOptions"),
276               ("foo", None, "AddError"),
277               ("foo", params1, "AddError"),
278               ("foo", params2, "InvalidOptions"),
279               ("foo", 1234, "InvalidOptions"),
280               (dev[0].ifname, None, "ExistsError" ) ]
281     for (ifname,params,err) in tests:
282         try:
283             if params is None:
284                 wpas.addInterface(ifname)
285             else:
286                 wpas.addInterface(ifname, params)
287             raise Exception("Invalid addInterface accepted: " + str(params))
288         except dbus.exceptions.DBusException, e:
289             if err not in str(e):
290                 raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
291
292     try:
293         wpas.removeInterface(123)
294         raise Exception("Invalid removeInterface accepted")
295     except dbus.exceptions.DBusException, e:
296         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
297             raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
298
299 def test_dbus_old_blob(dev, apdev):
300     """The old D-Bus interface - blob operations"""
301     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
302
303     param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
304     param2 = dbus.Dictionary({ 'blob3': "foo" })
305     param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
306                              signature='sv')
307     tests = [ (1, "InvalidOptions"),
308               (param1, "InvalidOptions"),
309               (param2, "InvalidOptions"),
310               (param3, "InvalidOptions") ]
311     for (arg,err) in tests:
312         try:
313             if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
314             raise Exception("Invalid setBlobs() accepted: " + str(arg))
315         except dbus.exceptions.DBusException, e:
316             logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
317             if err not in str(e):
318                 raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
319
320     tests = [ (["foo"], "RemoveError: Error removing blob"),
321               ([""], "RemoveError: Invalid blob name"),
322               ([1], "InvalidOptions"),
323               ("foo", "InvalidOptions") ]
324     for (arg,err) in tests:
325         try:
326             if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
327             raise Exception("Invalid removeBlobs() accepted: " + str(arg))
328         except dbus.exceptions.DBusException, e:
329             logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
330             if err not in str(e):
331                 raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
332
333     blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
334                               'blob2': dbus.ByteArray([ 1, 2 ]) },
335                             signature='sv')
336     if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
337     if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
338
339 def test_dbus_old_connect(dev, apdev):
340     """The old D-Bus interface - add a network and connect"""
341     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
342
343     ssid = "test-wpa2-psk"
344     passphrase = 'qwertyuiop'
345     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
346     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
347
348     for p in [ "/no/where/to/be/found",
349                path + "/Networks/12345",
350                path + "/Networks/foo",
351                "/fi/epitest/hostap/WPASupplicant/Interfaces",
352                "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
353         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
354         try:
355             if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
356             raise Exception("Invalid removeNetwork accepted: " + p)
357         except dbus.exceptions.DBusException, e:
358             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
359                 raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
360
361     try:
362         if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
363         raise Exception("Invalid removeNetwork accepted")
364     except dbus.exceptions.DBusException, e:
365         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
366             raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
367
368     try:
369         if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
370         raise Exception("Invalid removeNetwork accepted")
371     except dbus.exceptions.DBusException, e:
372         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
373             raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
374
375     tests = [ (path, "InvalidNetwork"),
376               (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
377                "InvalidInterface"),
378               (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
379                "InvalidNetwork"),
380               (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
381                "InvalidNetwork"),
382               (1, "InvalidOptions") ]
383     for t,err in tests:
384         try:
385             if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
386             raise Exception("Invalid selectNetwork accepted: " + str(t))
387         except dbus.exceptions.DBusException, e:
388             if err not in str(e):
389                 raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
390
391     npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
392     if not npath.startswith(WPAS_DBUS_OLD_PATH):
393         raise Exception("Unexpected addNetwork result: " + path)
394     netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
395     tests = [ 123,
396               dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
397     for t in tests:
398         try:
399             netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
400             raise Exception("Invalid set() accepted: " + str(t))
401         except dbus.exceptions.DBusException, e:
402             if "InvalidOptions" not in str(e):
403                 raise Exception("Unexpected error message for invalid set: " + str(e))
404     params = dbus.Dictionary({ 'ssid': ssid,
405                                'key_mgmt': 'WPA-PSK',
406                                'psk': passphrase,
407                                'identity': dbus.ByteArray([ 1, 2 ]),
408                                'priority': dbus.Int32(0),
409                                'scan_freq': dbus.UInt32(2412) },
410                              signature='sv')
411     netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
412     if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
413
414     class TestDbusConnect(TestDbus):
415         def __init__(self, bus):
416             TestDbus.__init__(self, bus)
417             self.state = 0
418
419         def __enter__(self):
420             gobject.timeout_add(1, self.run_connect)
421             gobject.timeout_add(15000, self.timeout)
422             self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
423                             "ScanResultsAvailable")
424             self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
425                             "StateChange")
426             self.loop.run()
427             return self
428
429         def scanDone(self):
430             logger.debug("scanDone")
431
432         def stateChange(self, new, old):
433             logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
434             if new == "COMPLETED":
435                 if self.state == 0:
436                     self.state = 1
437                     self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
438                 elif self.state == 2:
439                     self.state = 3
440                     if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
441                 elif self.state == 4:
442                     self.state = 5
443                     if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
444                 elif self.state == 6:
445                     self.state = 7
446                     if_obj.removeNetwork(self.path,
447                                          dbus_interface=WPAS_DBUS_OLD_IFACE)
448                     try:
449                         if_obj.removeNetwork(self.path,
450                                              dbus_interface=WPAS_DBUS_OLD_IFACE)
451                         raise Exception("Invalid removeNetwork accepted")
452                     except dbus.exceptions.DBusException, e:
453                         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
454                             raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
455
456                     self.loop.quit()
457             elif new == "DISCONNECTED":
458                 if self.state == 1:
459                     self.state = 2
460                     self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
461                 elif self.state == 3:
462                     self.state = 4
463                     if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
464                 elif self.state == 5:
465                     self.state = 6
466                     if_obj.selectNetwork(self.path,
467                                          dbus_interface=WPAS_DBUS_OLD_IFACE)
468
469         def run_connect(self, *args):
470             logger.debug("run_connect")
471             path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
472             netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
473             netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
474             params = dbus.Dictionary({ 'ssid': ssid,
475                                        'key_mgmt': 'WPA-PSK',
476                                        'psk': passphrase,
477                                        'scan_freq': 2412 },
478                                      signature='sv')
479             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
480             netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
481             self.path = path
482             self.netw_obj = netw_obj
483             return False
484
485         def success(self):
486             return self.state == 7
487
488     with TestDbusConnect(bus) as t:
489         if not t.success():
490             raise Exception("Expected signals not seen")
491
492     if len(dev[0].list_networks()) != 0:
493         raise Exception("Unexpected network")
494
495 def test_dbus_old_connect_eap(dev, apdev):
496     """The old D-Bus interface - add an EAP network and connect"""
497     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
498
499     ssid = "test-wpa2-eap"
500     params = hostapd.wpa2_eap_params(ssid=ssid)
501     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
502
503     class TestDbusConnect(TestDbus):
504         def __init__(self, bus):
505             TestDbus.__init__(self, bus)
506             self.connected = False
507             self.certification_received = False
508
509         def __enter__(self):
510             gobject.timeout_add(1, self.run_connect)
511             gobject.timeout_add(15000, self.timeout)
512             self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
513                             "StateChange")
514             self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
515                             "Certification")
516             self.loop.run()
517             return self
518
519         def stateChange(self, new, old):
520             logger.debug("stateChange: %s --> %s" % (old, new))
521             if new == "COMPLETED":
522                 self.connected = True
523                 self.loop.quit()
524
525         def certification(self, depth, subject, hash, cert_hex):
526             logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
527             self.certification_received = True
528
529         def run_connect(self, *args):
530             logger.debug("run_connect")
531             path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
532             netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
533             params = dbus.Dictionary({ 'ssid': ssid,
534                                        'key_mgmt': 'WPA-EAP',
535                                        'eap': 'TTLS',
536                                        'anonymous_identity': 'ttls',
537                                        'identity': 'pap user',
538                                        'ca_cert': 'auth_serv/ca.pem',
539                                        'phase2': 'auth=PAP',
540                                        'password': 'password',
541                                        'scan_freq': 2412 },
542                                      signature='sv')
543             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
544             netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
545             self.path = path
546             self.netw_obj = netw_obj
547             return False
548
549         def success(self):
550             return self.connected and self.certification_received
551
552     with TestDbusConnect(bus) as t:
553         if not t.success():
554             raise Exception("Expected signals not seen")
555
556 def test_dbus_old_wps_pbc(dev, apdev):
557     """The old D-Bus interface and WPS/PBC"""
558     try:
559         return _test_dbus_old_wps_pbc(dev, apdev)
560     finally:
561         dev[0].request("SET wps_cred_processing 0")
562
563 def _test_dbus_old_wps_pbc(dev, apdev):
564     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
565
566     hapd = start_ap(apdev[0])
567     hapd.request("WPS_PBC")
568     bssid = apdev[0]['bssid']
569     dev[0].scan_for_bss(bssid, freq="2412")
570     dev[0].request("SET wps_cred_processing 2")
571
572     for arg in [ 123, "123" ]:
573         try:
574             if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
575             raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
576         except dbus.exceptions.DBusException, e:
577             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
578                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
579
580     class TestDbusWps(TestDbusOldWps):
581         def __init__(self, bus, pbc_param):
582             TestDbusOldWps.__init__(self, bus)
583             self.pbc_param = pbc_param
584
585         def run_wps(self, *args):
586             logger.debug("run_wps: pbc_param=" + self.pbc_param)
587             if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
588             return False
589
590     with TestDbusWps(bus, "any") as t:
591         if not t.success():
592             raise Exception("Expected signals not seen")
593
594     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
595     if len(res) != 1:
596         raise Exception("Unexpected number of scan results: " + str(res))
597     for i in range(1):
598         logger.debug("Scan result BSS path: " + res[i])
599         bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
600         bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
601                                  byte_arrays=True)
602         logger.debug("BSS: " + str(bss))
603
604     dev[0].wait_connected(timeout=10)
605     dev[0].request("DISCONNECT")
606     dev[0].wait_disconnected(timeout=10)
607     dev[0].request("FLUSH")
608
609     hapd.request("WPS_PBC")
610     dev[0].scan_for_bss(bssid, freq="2412")
611
612     with TestDbusWps(bus, bssid) as t:
613         if not t.success():
614             raise Exception("Expected signals not seen")
615
616     dev[0].wait_connected(timeout=10)
617     dev[0].request("DISCONNECT")
618     dev[0].wait_disconnected(timeout=10)
619
620     hapd.disable()
621     dev[0].flush_scan_cache()
622
623 def test_dbus_old_wps_pin(dev, apdev):
624     """The old D-Bus interface and WPS/PIN"""
625     try:
626         return _test_dbus_old_wps_pin(dev, apdev)
627     finally:
628         dev[0].request("SET wps_cred_processing 0")
629
630 def _test_dbus_old_wps_pin(dev, apdev):
631     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
632
633     hapd = start_ap(apdev[0])
634     hapd.request("WPS_PIN any 12345670")
635     bssid = apdev[0]['bssid']
636     dev[0].scan_for_bss(bssid, freq="2412")
637     dev[0].request("SET wps_cred_processing 2")
638
639     for arg in [ (123, "12345670"),
640                  ("123", "12345670") ]:
641         try:
642             if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
643             raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
644         except dbus.exceptions.DBusException, e:
645             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
646                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
647
648     class TestDbusWps(TestDbusOldWps):
649         def __init__(self, bus, bssid, pin):
650             TestDbusOldWps.__init__(self, bus)
651             self.bssid = bssid
652             self.pin = pin
653
654         def run_wps(self, *args):
655             logger.debug("run_wps %s %s" % (self.bssid, self.pin))
656             pin = if_obj.wpsPin(self.bssid, self.pin,
657                                 dbus_interface=WPAS_DBUS_OLD_IFACE)
658             if len(self.pin) == 0:
659                 h = hostapd.Hostapd(apdev[0]['ifname'])
660                 h.request("WPS_PIN any " + pin)
661             return False
662
663     with TestDbusWps(bus, bssid, "12345670") as t:
664         if not t.success():
665             raise Exception("Expected signals not seen")
666
667     dev[0].wait_connected(timeout=10)
668     dev[0].request("DISCONNECT")
669     dev[0].wait_disconnected(timeout=10)
670     dev[0].request("FLUSH")
671
672     dev[0].scan_for_bss(bssid, freq="2412")
673
674     with TestDbusWps(bus, "any", "") as t:
675         if not t.success():
676             raise Exception("Expected signals not seen")
677
678 def test_dbus_old_wps_reg(dev, apdev):
679     """The old D-Bus interface and WPS/Registar"""
680     try:
681         return _test_dbus_old_wps_reg(dev, apdev)
682     finally:
683         dev[0].request("SET wps_cred_processing 0")
684
685 def _test_dbus_old_wps_reg(dev, apdev):
686     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
687
688     hapd = start_ap(apdev[0])
689     bssid = apdev[0]['bssid']
690     dev[0].scan_for_bss(bssid, freq="2412")
691     dev[0].request("SET wps_cred_processing 2")
692
693     for arg in [ (123, "12345670"),
694                  ("123", "12345670") ]:
695         try:
696             if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
697             raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
698         except dbus.exceptions.DBusException, e:
699             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
700                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
701
702     class TestDbusWps(TestDbusOldWps):
703         def run_wps(self, *args):
704             logger.debug("run_wps")
705             if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
706             return False
707
708     with TestDbusWps(bus) as t:
709         if not t.success():
710             raise Exception("Expected signals not seen")
711
712     dev[0].wait_connected(timeout=10)