+++ /dev/null
-#!/usr/bin/python
-# Tests p2p_connect
-# Will try to connect to another peer
-# and form a group
-######### MAY NEED TO RUN AS SUDO #############
-
-import dbus
-import sys, os
-import time
-import gobject
-import getopt
-from dbus.mainloop.glib import DBusGMainLoop
-
-
-def usage():
- print "Usage:"
- print " %s -i <interface_name> -m <wps_method> \ " \
- % sys.argv[0]
- print " -a <addr> [-p <pin>] [-g <go_intent>] \ "
- print " [-w <wpas_dbus_interface>]"
- print "Options:"
- print " -i = interface name"
- print " -m = wps method"
- print " -a = peer address"
- print " -p = pin number (8 digits)"
- print " -g = group owner intent"
- print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
- print "Example:"
- print " %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]
-
-
-# Required Signals
-def GONegotiationSuccess(status):
- print "Go Negotiation Success"
-
-def GONegotiationFailure(status):
- print 'Go Negotiation Failed. Status:'
- print format(status)
- os._exit(0)
-
-def GroupStarted(properties):
- if properties.has_key("group_object"):
- print 'Group Formation Complete %s' \
- % properties["group_object"]
- os._exit(0)
-
-def WpsFailure(status, etc):
- print "WPS Authentication Failure".format(status)
- print etc
- os._exit(0)
-
-class P2P_Connect():
- # Needed Variables
- global bus
- global wpas_object
- global interface_object
- global p2p_interface
- global ifname
- global wpas
- global wpas_dbus_interface
- global timeout
- global path
- global wps_method
- global go_intent
- global addr
- global pin
-
- # Dbus Paths
- global wpas_dbus_opath
- global wpas_dbus_interfaces_opath
- global wpas_dbus_interfaces_interface
- global wpas_dbus_interfaces_p2pdevice
-
- # Dictionary of Arguements
- global p2p_connect_arguements
-
- # Constructor
- def __init__(self,ifname,wpas_dbus_interface,addr,
- pin,wps_method,go_intent):
- # Initializes variables and threads
- self.ifname = ifname
- self.wpas_dbus_interface = wpas_dbus_interface
- self.wps_method = wps_method
- self.go_intent = go_intent
- self.addr = addr
- self.pin = pin
-
- # Generating interface/object paths
- self.wpas_dbus_opath = \
- "/" + self.wpas_dbus_interface.replace(".","/")
- self.wpas_wpas_dbus_interfaces_opath = \
- self.wpas_dbus_opath + "/Interfaces"
- self.wpas_dbus_interfaces_interface = \
- self.wpas_dbus_interface + ".Interface"
- self.wpas_dbus_interfaces_p2pdevice = \
- self.wpas_dbus_interfaces_interface + ".P2PDevice"
-
- # Getting interfaces and objects
- DBusGMainLoop(set_as_default=True)
- self.bus = dbus.SystemBus()
- self.wpas_object = self.bus.get_object(
- self.wpas_dbus_interface,
- self.wpas_dbus_opath)
- self.wpas = dbus.Interface(
- self.wpas_object, self.wpas_dbus_interface)
-
- # See if wpa_supplicant already knows about this interface
- self.path = None
- try:
- self.path = self.wpas.GetInterface(ifname)
- except:
- if not str(exc).startswith(
- self.wpas_dbus_interface + \
- ".InterfaceUnknown:"):
- raise exc
- try:
- path = self.wpas.CreateInterface(
- {'Ifname': ifname, 'Driver': 'test'})
- time.sleep(1)
-
- except dbus.DBusException, exc:
- if not str(exc).startswith(
- self.wpas_dbus_interface + \
- ".InterfaceExists:"):
- raise exc
-
- # Get Interface and objects
- self.interface_object = self.bus.get_object(
- self.wpas_dbus_interface,self.path)
- self.p2p_interface = dbus.Interface(
- self.interface_object,
- self.wpas_dbus_interfaces_p2pdevice)
-
- # Add signals
- self.bus.add_signal_receiver(GONegotiationSuccess,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="GONegotiationSuccess")
- self.bus.add_signal_receiver(GONegotiationFailure,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="GONegotiationFailure")
- self.bus.add_signal_receiver(GroupStarted,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="GroupStarted")
- self.bus.add_signal_receiver(WpsFailure,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="WpsFailed")
-
-
- #Constructing all the arguements needed to connect
- def constructArguements(self):
- # Adding required arguements
- self.p2p_connect_arguements = {'wps_method':self.wps_method,
- 'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
-
- # Display requires a pin, and a go intent of 15
- if (self.wps_method == 'display'):
- if (self.pin != None):
- self.p2p_connect_arguements.update({'pin':self.pin})
- else:
- print "Error:\n Pin required for wps_method=display"
- usage()
- quit()
-
- if (self.go_intent != None and int(self.go_intent) != 15):
- print "go_intent overwritten to 15"
-
- self.go_intent = '15'
-
- # Keypad requires a pin, and a go intent of less than 15
- elif (self.wps_method == 'keypad'):
- if (self.pin != None):
- self.p2p_connect_arguements.update({'pin':self.pin})
- else:
- print "Error:\n Pin required for wps_method=keypad"
- usage()
- quit()
-
- if (self.go_intent != None and int(self.go_intent) == 15):
- error = "Error :\n Group Owner intent cannot be" + \
- " 15 for wps_method=keypad"
- print error
- usage()
- quit()
-
- # Doesn't require pin
- # for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad
- elif (self.wps_method == 'pin'):
- if (self.pin != None):
- print "pin ignored"
-
- # No pin is required for pbc so it is ignored
- elif (self.wps_method == 'pbc'):
- if (self.pin != None):
- print "pin ignored"
-
- else:
- print "Error:\n wps_method not supported or does not exist"
- usage()
- quit()
-
- # Go_intent is optional for all arguements
- if (self.go_intent != None):
- self.p2p_connect_arguements.update(
- {'go_intent':dbus.Int32(self.go_intent)})
-
- # Running p2p_connect
- def run(self):
- try:
- result_pin = self.p2p_interface.Connect(
- self.p2p_connect_arguements)
-
- except dbus.DBusException, exc:
- raise exc
-
- if (self.wps_method == 'pin' and \
- not self.p2p_connect_arguements.has_key('pin') ):
- print "Connect return with pin value of %d " % int(result_pin)
- gobject.MainLoop().run()
-
-if __name__ == "__main__":
-
- # Required
- interface_name = None
- wps_method = None
- addr = None
-
- # Conditionally optional
- pin = None
-
- # Optional
- wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
- go_intent = None
-
- # Using getopts to handle options
- try:
- options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")
-
- except getopt.GetoptError:
- usage()
- quit()
-
- # If theres a switch, override default option
- for key, value in options:
- # Help
- if (key == "-h"):
- usage()
- quit()
- # Interface Name
- elif (key == "-i"):
- interface_name = value
- # WPS Method
- elif (key == "-m"):
- wps_method = value
- # Address
- elif (key == "-a"):
- addr = value
- # Pin
- elif (key == "-p"):
- pin = value
- # Group Owner Intent
- elif (key == "-g"):
- go_intent = value
- # Dbus interface
- elif (key == "-w"):
- wpas_dbus_interface = value
- else:
- assert False, "unhandled option"
-
- # Required Arguements check
- if (interface_name == None or wps_method == None or addr == None):
- print "Error:\n Required arguements not specified"
- usage()
- quit()
-
- # Group Owner Intent Check
- if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ):
- print "Error:\n Group Owner Intent must be between 0 and 15 inclusive"
- usage()
- quit()
-
- # Pin Check
- if (pin != None and len(pin) != 8):
- print "Error:\n Pin is not 8 digits"
- usage()
- quit()
-
- try:
- p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface,
- addr,pin,wps_method,go_intent)
-
- except:
- print "Error:\n Invalid Arguements"
- usage()
- quit()
-
- p2p_connect_test.constructArguements()
- p2p_connect_test.run()
-
- os._exit(0)