Updated to hostap_2_6
[mech_eap.git] / libeap / tests / hwsim / test_dbus.py
index dba899e..451434d 100644 (file)
@@ -20,7 +20,9 @@ except ImportError:
 import hostapd
 from wpasupplicant import WpaSupplicant
 from utils import HwsimSkip, alloc_fail, fail_test
+from p2p_utils import *
 from test_ap_tdls import connect_2sta_open
+from test_ap_eap import check_altsubject_match_support
 
 WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
 WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
@@ -97,7 +99,7 @@ def start_ap(ap, ssid="test-wps",
                "wpa_passphrase": "12345678", "wpa": "2",
                "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
                "ap_pin": "12345670", "uuid": ap_uuid}
-    return hostapd.add_ap(ap['ifname'], params)
+    return hostapd.add_ap(ap, params)
 
 def test_dbus_getall(dev, apdev):
     """D-Bus GetAll"""
@@ -125,7 +127,7 @@ def test_dbus_getall(dev, apdev):
     if len(res) != 0:
         raise Exception("Unexpected Networks entry: " + str(res))
 
-    hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
+    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
     bssid = apdev[0]['bssid']
     dev[0].scan_for_bss(bssid, freq=2412)
     id = dev[0].add_network()
@@ -159,6 +161,27 @@ def test_dbus_getall(dev, apdev):
     if ssid != '"test"':
         raise Exception("Unexpected SSID in network entry")
 
+def test_dbus_getall_oom(dev, apdev):
+    """D-Bus GetAll wpa_config_get_all() OOM"""
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+
+    id = dev[0].add_network()
+    dev[0].set_network(id, "disabled", "0")
+    dev[0].set_network_quoted(id, "ssid", "test")
+
+    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
+                     dbus_interface=dbus.PROPERTIES_IFACE)
+    if len(res) != 1:
+        raise Exception("Missing Networks entry: " + str(res))
+    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
+    for i in range(1, 50):
+        with alloc_fail(dev[0], i, "wpa_config_get_all"):
+            try:
+                props = net_obj.GetAll(WPAS_DBUS_NETWORK,
+                                       dbus_interface=dbus.PROPERTIES_IFACE)
+            except dbus.exceptions.DBusException, e:
+                pass
+
 def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
     val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
                        dbus_interface=dbus.PROPERTIES_IFACE,
@@ -295,6 +318,26 @@ def test_dbus_properties(dev, apdev):
         if "InvalidArgs: invalid message format" not in str(e):
             raise Exception("Unexpected error message: " + str(e))
 
+def test_dbus_set_global_properties(dev, apdev):
+    """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+
+    props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
+
+    for p in props:
+        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
+                         dbus_interface=dbus.PROPERTIES_IFACE)
+        if res != p[1]:
+            raise Exception("Unexpected " + p[0] + " value: " + str(res))
+
+        if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
+                   dbus_interface=dbus.PROPERTIES_IFACE)
+
+        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
+                         dbus_interface=dbus.PROPERTIES_IFACE)
+        if res != p[2]:
+            raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
+
 def test_dbus_invalid_method(dev, apdev):
     """D-Bus invalid method"""
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
@@ -397,13 +440,13 @@ def _test_dbus_get_set_wps(dev, apdev):
                        dbus_interface=dbus.PROPERTIES_IFACE)
             if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
                           dbus_interface=dbus.PROPERTIES_IFACE) != True:
-                raise Exception("Unexpected Get(ProcessCredentials) result after Set");
+                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
             if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
                        dbus.Boolean(0),
                        dbus_interface=dbus.PROPERTIES_IFACE)
             if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
                           dbus_interface=dbus.PROPERTIES_IFACE) != False:
-                raise Exception("Unexpected Get(ProcessCredentials) result after Set");
+                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
 
             self.dbus_sets_done = True
             return False
@@ -454,10 +497,11 @@ def test_dbus_wps_oom(dev, apdev):
         if_obj.Get(WPAS_DBUS_IFACE, "State",
                    dbus_interface=dbus.PROPERTIES_IFACE)
 
-    hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
+    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
     bssid = apdev[0]['bssid']
     dev[0].scan_for_bss(bssid, freq=2412)
 
+    time.sleep(0.05)
     for i in range(1, 3):
         with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
             if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
@@ -469,6 +513,13 @@ def test_dbus_wps_oom(dev, apdev):
     with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
         bss_obj.Get(WPAS_DBUS_BSS, "Rates",
                     dbus_interface=dbus.PROPERTIES_IFACE)
+    with alloc_fail(dev[0], 1,
+                    "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
+        try:
+            bss_obj.Get(WPAS_DBUS_BSS, "Rates",
+                        dbus_interface=dbus.PROPERTIES_IFACE)
+        except dbus.exceptions.DBusException, e:
+            pass
 
     id = dev[0].add_network()
     dev[0].set_network(id, "disabled", "0")
@@ -968,7 +1019,7 @@ def test_dbus_scan(dev, apdev):
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
 
-    hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
+    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
 
     class TestDbusScan(TestDbus):
         def __init__(self, bus):
@@ -1074,7 +1125,7 @@ def test_dbus_connect(dev, apdev):
     ssid = "test-wpa2-psk"
     passphrase = 'qwertyuiop'
     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
-    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    hapd = hostapd.add_ap(apdev[0], params)
 
     class TestDbusConnect(TestDbus):
         def __init__(self, bus):
@@ -1177,7 +1228,7 @@ def test_dbus_connect_psk_mem(dev, apdev):
     ssid = "test-wpa2-psk"
     passphrase = 'qwertyuiop'
     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
-    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    hapd = hostapd.add_ap(apdev[0], params)
 
     class TestDbusConnect(TestDbus):
         def __init__(self, bus):
@@ -1234,7 +1285,7 @@ def test_dbus_connect_oom(dev, apdev):
     ssid = "test-wpa2-psk"
     passphrase = 'qwertyuiop'
     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
-    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    hapd = hostapd.add_ap(apdev[0], params)
 
     class TestDbusConnect(TestDbus):
         def __init__(self, bus):
@@ -1354,6 +1405,17 @@ def test_dbus_connect_oom(dev, apdev):
         except:
             pass
 
+    # Force regulatory update to re-fetch hw capabilities for the following
+    # test cases.
+    try:
+        dev[0].dump_monitor()
+        subprocess.call(['iw', 'reg', 'set', 'US'])
+        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+    finally:
+        dev[0].dump_monitor()
+        subprocess.call(['iw', 'reg', 'set', '00'])
+        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+
 def test_dbus_while_not_connected(dev, apdev):
     """D-Bus invalid operations while not connected"""
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
@@ -1375,6 +1437,7 @@ def test_dbus_while_not_connected(dev, apdev):
 
 def test_dbus_connect_eap(dev, apdev):
     """D-Bus AddNetwork and connect to EAP network"""
+    check_altsubject_match_support(dev[0])
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
 
@@ -1382,7 +1445,7 @@ def test_dbus_connect_eap(dev, apdev):
     params = hostapd.radius_params()
     params["ssid"] = ssid
     params["ieee8021x"] = "1"
-    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    hapd = hostapd.add_ap(apdev[0], params)
 
     class TestDbusConnect(TestDbus):
         def __init__(self, bus):
@@ -1682,6 +1745,24 @@ def test_dbus_network_oom(dev, apdev):
 
 def test_dbus_interface(dev, apdev):
     """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
+    try:
+        _test_dbus_interface(dev, apdev)
+    finally:
+        # Need to force P2P channel list update since the 'lo' interface
+        # with driver=none ends up configuring default dualband channels.
+        dev[0].request("SET country US")
+        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+        if ev is None:
+            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
+                                          timeout=1)
+        dev[0].request("SET country 00")
+        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+        if ev is None:
+            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
+                                          timeout=1)
+        subprocess.call(['iw', 'reg', 'set', '00'])
+
+def _test_dbus_interface(dev, apdev):
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
 
@@ -1882,7 +1963,7 @@ def test_dbus_tdls_invalid(dev, apdev):
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
 
-    hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
+    hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
     connect_2sta_open(dev, hapd)
     addr1 = dev[1].p2p_interface_addr()
 
@@ -1939,7 +2020,7 @@ def test_dbus_tdls(dev, apdev):
     (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
     iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
 
-    hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
+    hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
     connect_2sta_open(dev, hapd)
 
     addr1 = dev[1].p2p_interface_addr()
@@ -2220,7 +2301,9 @@ def _test_dbus_country(dev, apdev):
         ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
         if ev is None:
             raise Exception("regdom change event not seen")
-    if "init=CORE type=WORLD" not in ev:
+    # init=CORE was previously used due to invalid db.txt data for 00. For
+    # now, allow both it and the new init=USER after fixed db.txt.
+    if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
         raise Exception("Unexpected event contents: " + ev)
 
 def test_dbus_scan_interval(dev, apdev):
@@ -2665,6 +2748,7 @@ def test_dbus_p2p_discovery(dev, apdev):
             TestDbus.__init__(self, bus)
             self.found = False
             self.found2 = False
+            self.found_prop = False
             self.lost = False
             self.find_stopped = False
 
@@ -2673,6 +2757,8 @@ def test_dbus_p2p_discovery(dev, apdev):
             gobject.timeout_add(15000, self.timeout)
             self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
                             "DeviceFound")
+            self.add_signal(self.deviceFoundProperties,
+                            WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
             self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
                             "DeviceLost")
             self.add_signal(self.provisionDiscoveryResponseEnterPin,
@@ -2740,6 +2826,12 @@ def test_dbus_p2p_discovery(dev, apdev):
                     raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
             self.loop.quit()
 
+        def deviceFoundProperties(self, path, properties):
+            logger.debug("deviceFoundProperties: path=%s" % path)
+            logger.debug("peer properties: " + str(properties))
+            if properties['DeviceAddress'] == a1:
+                self.found_prop = True
+
         def provisionDiscoveryResponseEnterPin(self, peer_object):
             logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
             p2p.Flush()
@@ -3605,6 +3697,55 @@ def test_dbus_p2p_join(dev, apdev):
 
     dev[2].p2p_stop_find()
 
+def test_dbus_p2p_invitation_received(dev, apdev):
+    """D-Bus P2P and InvitationReceived"""
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+
+    form(dev[0], dev[1])
+    addr0 = dev[0].p2p_dev_addr()
+    dev[0].p2p_listen()
+    dev[0].global_request("SET persistent_reconnect 0")
+
+    if not dev[1].discover_peer(addr0, social=True):
+        raise Exception("Peer " + addr0 + " not found")
+    peer = dev[1].get_peer(addr0)
+
+    class TestDbusP2p(TestDbus):
+        def __init__(self, bus):
+            TestDbus.__init__(self, bus)
+            self.done = False
+
+        def __enter__(self):
+            gobject.timeout_add(1, self.run_test)
+            gobject.timeout_add(15000, self.timeout)
+            self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
+                            "InvitationReceived")
+            self.loop.run()
+            return self
+
+        def invitationReceived(self, result):
+            logger.debug("invitationReceived: " + str(result))
+            self.done = True
+            self.loop.quit()
+
+        def run_test(self, *args):
+            logger.debug("run_test")
+            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
+            cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
+            dev1.global_request(cmd)
+            return False
+
+        def success(self):
+            return self.done
+
+    with TestDbusP2p(bus) as t:
+        if not t.success():
+            raise Exception("Expected signals not seen")
+
+    dev[0].p2p_stop_find()
+    dev[1].p2p_stop_find()
+
 def test_dbus_p2p_config(dev, apdev):
     """D-Bus Get/Set P2PDeviceConfig"""
     try:
@@ -4070,7 +4211,7 @@ def test_dbus_p2p_go_neg_auth(dev, apdev):
             if not dev1.discover_peer(addr0):
                 raise Exception("Peer not found")
             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
-            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
             if ev is None:
                 raise Exception("Group formation timed out")
             self.sta_group_ev = ev
@@ -4159,7 +4300,7 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
             if ev is None:
                 raise Exception("Timeout while waiting for GO Neg Request")
             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
-            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
             if ev is None:
                 raise Exception("Group formation timed out")
             self.sta_group_ev = ev
@@ -4249,7 +4390,7 @@ def test_dbus_p2p_group_termination_by_go(dev, apdev):
             if ev is None:
                 raise Exception("Timeout while waiting for GO Neg Request")
             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
-            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
             if ev is None:
                 raise Exception("Group formation timed out")
             self.sta_group_ev = ev
@@ -4312,6 +4453,7 @@ def _test_dbus_p2p_group_idle_timeout(dev, apdev):
         def __init__(self, bus):
             TestDbus.__init__(self, bus)
             self.done = False
+            self.group_started = False
             self.peer_group_added = False
             self.peer_group_removed = False
 
@@ -4344,7 +4486,7 @@ def _test_dbus_p2p_group_idle_timeout(dev, apdev):
             if ev is None:
                 raise Exception("Timeout while waiting for GO Neg Request")
             dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
-            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
             if ev is None:
                 raise Exception("Group formation timed out")
             self.sta_group_ev = ev
@@ -4354,6 +4496,7 @@ def _test_dbus_p2p_group_idle_timeout(dev, apdev):
 
         def groupStarted(self, properties):
             logger.debug("groupStarted: " + str(properties))
+            self.group_started = True
             g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
                                       properties['interface_object'])
             dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
@@ -4374,6 +4517,8 @@ def _test_dbus_p2p_group_idle_timeout(dev, apdev):
             logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
             if interface_name != WPAS_DBUS_P2P_PEER:
                 return
+            if not self.group_started:
+                return
             if "Groups" not in changed_properties:
                 return
             if len(changed_properties["Groups"]) > 0:
@@ -4576,7 +4721,7 @@ def test_dbus_p2p_two_groups(dev, apdev):
                 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
                 dev2.scan_for_bss(bssid, freq=2412)
                 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
-                ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+                ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
                 if ev is None:
                     raise Exception("Group join timed out")
                 self.dev2_group_ev = ev
@@ -4717,6 +4862,8 @@ def test_dbus_introspect(dev, apdev):
     logger.info("Initial Introspect: " + str(res))
     if res is None or "Introspectable" not in res or "GroupStarted" not in res:
         raise Exception("Unexpected initial Introspect response: " + str(res))
+    if "FastReauth" not in res or "PassiveScan" not in res:
+        raise Exception("Unexpected initial Introspect response: " + str(res))
 
     with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
         res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
@@ -4819,7 +4966,7 @@ def test_dbus_connect_wpa_eap(dev, apdev):
     params = hostapd.wpa_eap_params(ssid=ssid)
     params["wpa"] = "3"
     params["rsn_pairwise"] = "CCMP"
-    hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+    hapd = hostapd.add_ap(apdev[0], params)
 
     class TestDbusConnect(TestDbus):
         def __init__(self, bus):
@@ -4916,3 +5063,191 @@ def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
     dev[1].wait_disconnected()
     dev[0].request("DISCONNECT")
     dev[0].wait_disconnected()
+
+def test_dbus_expectdisconnect(dev, apdev):
+    """D-Bus ExpectDisconnect"""
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
+
+    params = { "ssid": "test-open" }
+    hapd = hostapd.add_ap(apdev[0], params)
+    dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
+
+    # This does not really verify the behavior other than by going through the
+    # code path for additional coverage.
+    wpas.ExpectDisconnect()
+    dev[0].request("DISCONNECT")
+    dev[0].wait_disconnected()
+
+def test_dbus_save_config(dev, apdev):
+    """D-Bus SaveConfig"""
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+    try:
+        iface.SaveConfig()
+        raise Exception("SaveConfig() accepted unexpectedly")
+    except dbus.exceptions.DBusException, e:
+        if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
+            raise Exception("Unexpected error message for SaveConfig(): " + str(e))
+
+def test_dbus_vendor_elem(dev, apdev):
+    """D-Bus vendor element operations"""
+    try:
+        _test_dbus_vendor_elem(dev, apdev)
+    finally:
+        dev[0].request("VENDOR_ELEM_REMOVE 1 *")
+
+def _test_dbus_vendor_elem(dev, apdev):
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+    dev[0].request("VENDOR_ELEM_REMOVE 1 *")
+
+    try:
+        ie = dbus.ByteArray("\x00\x00")
+        iface.VendorElemAdd(-1, ie)
+        raise Exception("Invalid VendorElemAdd() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
+
+    try:
+        ie = dbus.ByteArray("")
+        iface.VendorElemAdd(1, ie)
+        raise Exception("Invalid VendorElemAdd() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
+
+    try:
+        ie = dbus.ByteArray("\x00\x01")
+        iface.VendorElemAdd(1, ie)
+        raise Exception("Invalid VendorElemAdd() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
+
+    try:
+        iface.VendorElemGet(-1)
+        raise Exception("Invalid VendorElemGet() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
+
+    try:
+        iface.VendorElemGet(1)
+        raise Exception("Invalid VendorElemGet() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
+
+    try:
+        ie = dbus.ByteArray("\x00\x00")
+        iface.VendorElemRem(-1, ie)
+        raise Exception("Invalid VendorElemRemove() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
+
+    try:
+        ie = dbus.ByteArray("")
+        iface.VendorElemRem(1, ie)
+        raise Exception("Invalid VendorElemRemove() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
+
+    iface.VendorElemRem(1, "*")
+
+    ie = dbus.ByteArray("\x00\x01\x00")
+    iface.VendorElemAdd(1, ie)
+
+    val = iface.VendorElemGet(1)
+    if len(val) != len(ie):
+        raise Exception("Unexpected VendorElemGet length")
+    for i in range(len(val)):
+        if val[i] != dbus.Byte(ie[i]):
+            raise Exception("Unexpected VendorElemGet data")
+
+    ie2 = dbus.ByteArray("\xe0\x00")
+    iface.VendorElemAdd(1, ie2)
+
+    ies = ie + ie2
+    val = iface.VendorElemGet(1)
+    if len(val) != len(ies):
+        raise Exception("Unexpected VendorElemGet length[2]")
+    for i in range(len(val)):
+        if val[i] != dbus.Byte(ies[i]):
+            raise Exception("Unexpected VendorElemGet data[2]")
+
+    try:
+        test_ie = dbus.ByteArray("\x01\x01")
+        iface.VendorElemRem(1, test_ie)
+        raise Exception("Invalid VendorElemRemove() accepted")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
+
+    iface.VendorElemRem(1, ie)
+    val = iface.VendorElemGet(1)
+    if len(val) != len(ie2):
+        raise Exception("Unexpected VendorElemGet length[3]")
+
+    iface.VendorElemRem(1, "*")
+    try:
+        iface.VendorElemGet(1)
+        raise Exception("Invalid VendorElemGet() accepted after removal")
+    except dbus.exceptions.DBusException, e:
+        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
+            raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
+
+def test_dbus_assoc_reject(dev, apdev):
+    """D-Bus AssocStatusCode"""
+    (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+    ssid = "test-open"
+    params = { "ssid": ssid,
+               "max_listen_interval": "1" }
+    hapd = hostapd.add_ap(apdev[0], params)
+
+    class TestDbusConnect(TestDbus):
+        def __init__(self, bus):
+            TestDbus.__init__(self, bus)
+            self.assoc_status_seen = False
+            self.state = 0
+
+        def __enter__(self):
+            gobject.timeout_add(1, self.run_connect)
+            gobject.timeout_add(15000, self.timeout)
+            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
+                            "PropertiesChanged")
+            self.loop.run()
+            return self
+
+        def propertiesChanged(self, properties):
+            logger.debug("propertiesChanged: %s" % str(properties))
+            if 'AssocStatusCode' in properties:
+                status = properties['AssocStatusCode']
+                if status != 51:
+                    logger.info("Unexpected status code: " + str(status))
+                else:
+                    self.assoc_status_seen = True
+                iface.Disconnect()
+                self.loop.quit()
+
+        def run_connect(self, *args):
+            args = dbus.Dictionary({ 'ssid': ssid,
+                                     'key_mgmt': 'NONE',
+                                     'scan_freq': 2412 },
+                                   signature='sv')
+            self.netw = iface.AddNetwork(args)
+            iface.SelectNetwork(self.netw)
+            return False
+
+        def success(self):
+            return self.assoc_status_seen
+
+    with TestDbusConnect(bus) as t:
+        if not t.success():
+            raise Exception("Expected signals not seen")