class TestDbusP2p(TestDbus):
def __init__(self, bus):
TestDbus.__init__(self, bus)
- self.done = False
+ self.wps_failed = False
+ self.formation_failure = False
def __enter__(self):
gobject.timeout_add(1, self.run_test)
"GroupStarted")
self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
"WpsFailed")
+ self.add_signal(self.groupFormationFailure,
+ WPAS_DBUS_IFACE_P2PDEVICE,
+ "GroupFormationFailure")
self.loop.run()
return self
def wpsFailed(self, name, args):
logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
- self.done = True
- self.loop.quit()
+ self.wps_failed = True
+ if self.formation_failure:
+ self.loop.quit()
+
+ def groupFormationFailure(self, reason):
+ logger.debug("groupFormationFailure - reason=%s" % reason)
+ self.formation_failure = True
+ if self.wps_failed:
+ self.loop.quit()
def run_test(self, *args):
logger.debug("run_test")
return False
def success(self):
- return self.done
+ return self.wps_failed and self.formation_failure
with TestDbusP2p(bus) as t:
if not t.success():