X-Git-Url: http://www.project-moonshot.org/gitweb/?a=blobdiff_plain;f=tests%2Fhwsim%2Fwpasupplicant.py;h=d3f5c9f19a59368859d528452d718242a2c1199e;hb=67a0d4f94ecebbf72ed20310684192f7aa51f13a;hp=3f5bd6f08c2be1feeab8339940b7a43c424dfc23;hpb=a42f88f8e7fc7467c54683878dba6df4243cac34;p=mech_eap.git diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 3f5bd6f..d3f5c9f 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -10,33 +10,84 @@ import logging import binascii import re import struct -import subprocess import wpaspy +import remotehost +import subprocess logger = logging.getLogger() wpas_ctrl = '/var/run/wpa_supplicant' class WpaSupplicant: - def __init__(self, ifname=None, global_iface=None): + def __init__(self, ifname=None, global_iface=None, hostname=None, + port=9877, global_port=9878): + self.hostname = hostname self.group_ifname = None self.gctrl_mon = None + self.host = remotehost.Host(hostname, ifname) + self._group_dbg = None if ifname: - self.set_ifname(ifname) + self.set_ifname(ifname, hostname, port) + res = self.get_driver_status() + if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000: + self.p2p_dev_ifname = 'p2p-dev-' + self.ifname + else: + self.p2p_dev_ifname = ifname else: self.ifname = None self.global_iface = global_iface if global_iface: - self.global_ctrl = wpaspy.Ctrl(global_iface) - self.global_mon = wpaspy.Ctrl(global_iface) + if hostname != None: + self.global_ctrl = wpaspy.Ctrl(hostname, global_port) + self.global_mon = wpaspy.Ctrl(hostname, global_port) + self.global_dbg = hostname + "/" + str(global_port) + "/" + else: + self.global_ctrl = wpaspy.Ctrl(global_iface) + self.global_mon = wpaspy.Ctrl(global_iface) + self.global_dbg = "" self.global_mon.attach() else: self.global_mon = None - def set_ifname(self, ifname): + def cmd_execute(self, cmd_array, shell=False): + if self.hostname is None: + if shell: + cmd = ' '.join(cmd_array) + else: + cmd = cmd_array + proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, shell=shell) + out = proc.communicate()[0] + ret = proc.returncode + return ret, out + else: + return self.host.execute(cmd_array) + + def terminate(self): + if self.global_mon: + self.global_mon.detach() + self.global_mon = None + self.global_ctrl.terminate() + self.global_ctrl = None + + def close_ctrl(self): + if self.global_mon: + self.global_mon.detach() + self.global_mon = None + self.global_ctrl = None + self.remove_ifname() + + def set_ifname(self, ifname, hostname=None, port=9877): self.ifname = ifname - self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) - self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + if hostname != None: + self.ctrl = wpaspy.Ctrl(hostname, port) + self.mon = wpaspy.Ctrl(hostname, port) + self.host = remotehost.Host(hostname, ifname) + self.dbg = hostname + "/" + ifname + else: + self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + self.dbg = ifname self.mon.attach() def remove_ifname(self): @@ -46,39 +97,101 @@ class WpaSupplicant: self.ctrl = None self.ifname = None - def interface_add(self, ifname, config="", driver="nl80211", drv_params=None): - try: - groups = subprocess.check_output(["id"]) - group = "admin" if "(admin)" in groups else "adm" - except Exception, e: + def get_ctrl_iface_port(self, ifname): + if self.hostname is None: + return None + + res = self.global_request("INTERFACES ctrl") + lines = res.splitlines() + found = False + for line in lines: + words = line.split() + if words[0] == ifname: + found = True + break + if not found: + raise Exception("Could not find UDP port for " + ifname) + res = line.find("ctrl_iface=udp:") + if res == -1: + raise Exception("Wrong ctrl_interface format") + words = line.split(":") + return int(words[1]) + + def interface_add(self, ifname, config="", driver="nl80211", + drv_params=None, br_ifname=None, create=False, + set_ifname=True, all_params=False, if_type=None): + status, groups = self.host.execute(["id"]) + if status != 0: group = "admin" + group = "admin" if "(admin)" in groups else "adm" cmd = "INTERFACE_ADD " + ifname + "\t" + config + "\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group if drv_params: cmd = cmd + '\t' + drv_params + if br_ifname: + if not drv_params: + cmd += '\t' + cmd += '\t' + br_ifname + if create: + if not br_ifname: + cmd += '\t' + if not drv_params: + cmd += '\t' + cmd += '\tcreate' + if if_type: + cmd += '\t' + if_type + if all_params and not create: + if not br_ifname: + cmd += '\t' + if not drv_params: + cmd += '\t' + cmd += '\t' if "FAIL" in self.global_request(cmd): raise Exception("Failed to add a dynamic wpa_supplicant interface") - self.set_ifname(ifname) + if not create and set_ifname: + port = self.get_ctrl_iface_port(ifname) + self.set_ifname(ifname, self.hostname, port) + res = self.get_driver_status() + if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000: + self.p2p_dev_ifname = 'p2p-dev-' + self.ifname + else: + self.p2p_dev_ifname = ifname def interface_remove(self, ifname): self.remove_ifname() self.global_request("INTERFACE_REMOVE " + ifname) - def request(self, cmd): - logger.debug(self.ifname + ": CTRL: " + cmd) - return self.ctrl.request(cmd) + def request(self, cmd, timeout=10): + logger.debug(self.dbg + ": CTRL: " + cmd) + return self.ctrl.request(cmd, timeout=timeout) def global_request(self, cmd): if self.global_iface is None: - self.request(cmd) + return self.request(cmd) else: ifname = self.ifname or self.global_iface - logger.debug(ifname + ": CTRL(global): " + cmd) + logger.debug(self.global_dbg + ifname + ": CTRL(global): " + cmd) return self.global_ctrl.request(cmd) + @property + def group_dbg(self): + if self._group_dbg is not None: + return self._group_dbg + if self.group_ifname is None: + raise Exception("Cannot have group_dbg without group_ifname") + if self.hostname is None: + self._group_dbg = self.group_ifname + else: + self._group_dbg = self.hostname + "/" + self.group_ifname + return self._group_dbg + def group_request(self, cmd): if self.group_ifname and self.group_ifname != self.ifname: - logger.debug(self.group_ifname + ": CTRL: " + cmd) - gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname)) + if self.hostname is None: + gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname)) + else: + port = self.get_ctrl_iface_port(self.group_ifname) + gctrl = wpaspy.Ctrl(self.hostname, port) + logger.debug(self.group_dbg + ": CTRL(group): " + cmd) return gctrl.request(cmd) return self.request(cmd) @@ -93,12 +206,9 @@ class WpaSupplicant: res = self.request("FLUSH") if not "OK" in res: logger.info("FLUSH to " + self.ifname + " failed: " + res) - self.request("SET p2p_add_cli_chan 0") - self.request("SET p2p_no_go_freq ") - self.request("SET p2p_pref_chan ") - self.request("SET p2p_no_group_iface 1") - self.request("SET p2p_go_intent 7") - self.request("SET ignore_old_scan_res 0") + self.global_request("REMOVE_NETWORK all") + self.global_request("SET p2p_no_group_iface 1") + self.global_request("P2P_FLUSH") if self.gctrl_mon: try: self.gctrl_mon.detach() @@ -110,8 +220,11 @@ class WpaSupplicant: iter = 0 while iter < 60: - state = self.get_driver_status_field("scan_state") - if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state: + state1 = self.get_driver_status_field("scan_state") + p2pdev = "p2p-dev-" + self.ifname + state2 = self.get_driver_status_field("scan_state", ifname=p2pdev) + states = str(state1) + " " + str(state2) + if "SCAN_STARTED" in states or "SCAN_REQUESTED" in states: logger.info(self.ifname + ": Waiting for scan operation to complete before continuing") time.sleep(1) else: @@ -120,18 +233,14 @@ class WpaSupplicant: if iter == 60: logger.error(self.ifname + ": Driver scan state did not clear") print "Trying to clear cfg80211/mac80211 scan state" - try: - cmd = ["sudo", "ifconfig", self.ifname, "down"] - subprocess.call(cmd) - except subprocess.CalledProcessError, e: - logger.info("ifconfig failed: " + str(e.returncode)) - logger.info(e.output) - try: - cmd = ["sudo", "ifconfig", self.ifname, "up"] - subprocess.call(cmd) - except subprocess.CalledProcessError, e: - logger.info("ifconfig failed: " + str(e.returncode)) - logger.info(e.output) + status, buf = self.host.execute(["ifconfig", self.ifname, "down"]) + if status != 0: + logger.info("ifconfig failed: " + buf) + logger.info(status) + status, buf = self.host.execute(["ifconfig", self.ifname, "up"]) + if status != 0: + logger.info("ifconfig failed: " + buf) + logger.info(status) if iter > 0: # The ongoing scan could have discovered BSSes or P2P peers logger.info("Run FLUSH again since scan was in progress") @@ -171,8 +280,32 @@ class WpaSupplicant: raise Exception("SET_NETWORK failed") return None - def list_networks(self): - res = self.request("LIST_NETWORKS") + def p2pdev_request(self, cmd): + return self.global_request("IFNAME=" + self.p2p_dev_ifname + " " + cmd) + + def p2pdev_add_network(self): + id = self.p2pdev_request("ADD_NETWORK") + if "FAIL" in id: + raise Exception("p2pdev ADD_NETWORK failed") + return int(id) + + def p2pdev_set_network(self, id, field, value): + res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + " " + value) + if "FAIL" in res: + raise Exception("p2pdev SET_NETWORK failed") + return None + + def p2pdev_set_network_quoted(self, id, field, value): + res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') + if "FAIL" in res: + raise Exception("p2pdev SET_NETWORK failed") + return None + + def list_networks(self, p2p=False): + if p2p: + res = self.global_request("LIST_NETWORKS") + else: + res = self.request("LIST_NETWORKS") lines = res.splitlines() networks = [] for l in lines: @@ -195,6 +328,12 @@ class WpaSupplicant: else: self.request("SET auto_interworking 0") + def interworking_add_network(self, bssid): + id = self.request("INTERWORKING_ADD_NETWORK " + bssid) + if "FAIL" in id or "OK" in id: + raise Exception("INTERWORKING_ADD_NETWORK failed") + return int(id) + def add_cred(self): id = self.request("ADD_CRED") if "FAIL" in id: @@ -228,7 +367,7 @@ class WpaSupplicant: quoted = [ "realm", "username", "password", "domain", "imsi", "excluded_ssid", "milenage", "ca_cert", "client_cert", "private_key", "domain_suffix_match", "provisioning_sp", - "roaming_partner", "phase1", "phase2" ] + "roaming_partner", "phase1", "phase2", "private_key_passwd" ] for field in quoted: if field in params: self.set_cred_quoted(id, field, params[field]) @@ -242,11 +381,11 @@ class WpaSupplicant: if field in params: self.set_cred(id, field, params[field]) - return id; + return id def select_network(self, id, freq=None): if freq: - extra = " freq=" + freq + extra = " freq=" + str(freq) else: extra = "" id = self.request("SELECT_NETWORK " + str(id) + extra) @@ -266,7 +405,9 @@ class WpaSupplicant: raise Exception("MESH_GROUP_REMOVE failed") return None - def connect_network(self, id, timeout=10): + def connect_network(self, id, timeout=None): + if timeout is None: + timeout = 10 if self.hostname is None else 60 self.dump_monitor() self.select_network(id) self.wait_connected(timeout=timeout) @@ -317,8 +458,13 @@ class WpaSupplicant: return vals[field] return None - def get_driver_status(self): - res = self.request("STATUS-DRIVER") + def get_driver_status(self, ifname=None): + if ifname is None: + res = self.request("STATUS-DRIVER") + else: + res = self.global_request("IFNAME=%s STATUS-DRIVER" % ifname) + if res.startswith("FAIL"): + return dict() lines = res.splitlines() vals = dict() for l in lines: @@ -330,15 +476,15 @@ class WpaSupplicant: vals[name] = value return vals - def get_driver_status_field(self, field): - vals = self.get_driver_status() + def get_driver_status_field(self, field, ifname=None): + vals = self.get_driver_status(ifname) if field in vals: return vals[field] return None def get_mcc(self): - mcc = int(self.get_driver_status_field('capa.num_multichan_concurrent')) - return 1 if mcc < 2 else mcc + mcc = int(self.get_driver_status_field('capa.num_multichan_concurrent')) + return 1 if mcc < 2 else mcc def get_mib(self): res = self.request("MIB") @@ -368,8 +514,14 @@ class WpaSupplicant: def p2p_listen(self): return self.global_request("P2P_LISTEN") + def p2p_ext_listen(self, period, interval): + return self.global_request("P2P_EXT_LISTEN %d %d" % (period, interval)) + + def p2p_cancel_ext_listen(self): + return self.global_request("P2P_EXT_LISTEN") + def p2p_find(self, social=False, progressive=False, dev_id=None, - dev_type=None, delay=None): + dev_type=None, delay=None, freq=None): cmd = "P2P_FIND" if social: cmd = cmd + " type=social" @@ -381,6 +533,8 @@ class WpaSupplicant: cmd = cmd + " dev_type=" + dev_type if delay: cmd = cmd + " delay=" + str(delay) + if freq: + cmd = cmd + " freq=" + str(freq) return self.global_request(cmd) def p2p_stop_find(self): @@ -400,11 +554,12 @@ class WpaSupplicant: return True return "[PROBE_REQ_ONLY]" not in res - def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False): + def discover_peer(self, peer, full=True, timeout=15, social=True, + force_find=False, freq=None): logger.info(self.ifname + ": Trying to discover peer " + peer) if not force_find and self.peer_known(peer, full): return True - self.p2p_find(social) + self.p2p_find(social, freq=freq) count = 0 while count < timeout * 4: time.sleep(0.25) @@ -453,7 +608,12 @@ class WpaSupplicant: res['ifname'] = s[2] self.group_ifname = s[2] try: - self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname)) + if self.hostname is None: + self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, + self.group_ifname)) + else: + port = self.get_ctrl_iface_port(self.group_ifname) + self.gctrl_mon = wpaspy.Ctrl(self.hostname, port) self.gctrl_mon.attach() except: logger.debug("Could not open monitor socket for group interface") @@ -490,15 +650,28 @@ class WpaSupplicant: return res - def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None): + def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, + persistent=False, freq=None, freq2=None, + max_oper_chwidth=None, ht40=False, vht=False): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() - cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" + if pin: + cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" + else: + cmd = "P2P_CONNECT " + peer + " " + method + " auth" if go_intent: cmd = cmd + ' go_intent=' + str(go_intent) if freq: cmd = cmd + ' freq=' + str(freq) + if freq2: + cmd = cmd + ' freq2=' + str(freq2) + if max_oper_chwidth: + cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth) + if ht40: + cmd = cmd + ' ht40' + if vht: + cmd = cmd + ' vht' if persistent: cmd = cmd + " persistent" if "OK" in self.global_request(cmd): @@ -508,14 +681,14 @@ class WpaSupplicant: def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False): go_neg_res = None ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", - "P2P-GO-NEG-FAILURE"], timeout); + "P2P-GO-NEG-FAILURE"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") if "P2P-GO-NEG-SUCCESS" in ev: go_neg_res = ev - ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout); + ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) if ev is None: if expect_failure: return None @@ -523,7 +696,11 @@ class WpaSupplicant: self.dump_monitor() return self.group_form_result(ev, expect_failure, go_neg_res) - def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, persistent_id=None, freq=None, provdisc=False, wait_group=True): + def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, + expect_failure=False, persistent=False, + persistent_id=None, freq=None, provdisc=False, + wait_group=True, freq2=None, max_oper_chwidth=None, + ht40=False, vht=False): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() @@ -531,10 +708,18 @@ class WpaSupplicant: cmd = "P2P_CONNECT " + peer + " " + pin + " " + method else: cmd = "P2P_CONNECT " + peer + " " + method - if go_intent: + if go_intent is not None: cmd = cmd + ' go_intent=' + str(go_intent) if freq: cmd = cmd + ' freq=' + str(freq) + if freq2: + cmd = cmd + ' freq2=' + str(freq2) + if max_oper_chwidth: + cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth) + if ht40: + cmd = cmd + ' ht40' + if vht: + cmd = cmd + ' vht' if persistent: cmd = cmd + " persistent" elif persistent_id: @@ -543,7 +728,6 @@ class WpaSupplicant: cmd = cmd + " provdisc" if "OK" in self.global_request(cmd): if timeout == 0: - self.dump_monitor() return None go_neg_res = None ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", @@ -570,7 +754,7 @@ class WpaSupplicant: while True: while self.mon.pending(): ev = self.mon.recv() - logger.debug(self.ifname + ": " + ev) + logger.debug(self.dbg + ": " + ev) for event in events: if event in ev: return ev @@ -590,7 +774,7 @@ class WpaSupplicant: while True: while self.global_mon.pending(): ev = self.global_mon.recv() - logger.debug(self.ifname + "(global): " + ev) + logger.debug(self.global_dbg + self.ifname + "(global): " + ev) for event in events: if event in ev: return ev @@ -610,7 +794,7 @@ class WpaSupplicant: while True: while self.gctrl_mon.pending(): ev = self.gctrl_mon.recv() - logger.debug(self.group_ifname + ": " + ev) + logger.debug(self.group_dbg + "(group): " + ev) for event in events: if event in ev: return ev @@ -631,19 +815,24 @@ class WpaSupplicant: except: pass self.gctrl_mon = None - ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3) + ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=3) if ev is None: raise Exception("Group removal event timed out") if "reason=GO_ENDING_SESSION" not in ev: raise Exception("Unexpected group removal reason") def dump_monitor(self): + count_iface = 0 + count_global = 0 while self.mon.pending(): ev = self.mon.recv() - logger.debug(self.ifname + ": " + ev) + logger.debug(self.dbg + ": " + ev) + count_iface += 1 while self.global_mon and self.global_mon.pending(): ev = self.global_mon.recv() - logger.debug(self.ifname + "(global): " + ev) + logger.debug(self.global_dbg + self.ifname + "(global): " + ev) + count_global += 1 + return (count_iface, count_global) def remove_group(self, ifname=None): if self.gctrl_mon: @@ -693,9 +882,10 @@ class WpaSupplicant: def p2p_connect_group(self, go_addr, pin, timeout=0, social=False, freq=None): self.dump_monitor() - if not self.discover_peer(go_addr, social=social): + if not self.discover_peer(go_addr, social=social, freq=freq): if social or not self.discover_peer(go_addr, social=social): raise Exception("GO " + go_addr + " not found") + self.p2p_stop_find() self.dump_monitor() cmd = "P2P_CONNECT " + go_addr + " " + pin + " join" if freq: @@ -704,9 +894,13 @@ class WpaSupplicant: if timeout == 0: self.dump_monitor() return None - ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) + ev = self.wait_global_event(["P2P-GROUP-STARTED", + "P2P-GROUP-FORMATION-FAILURE"], + timeout) if ev is None: raise Exception("Joining the group timed out") + if "P2P-GROUP-STARTED" not in ev: + raise Exception("Failed to join the group") self.dump_monitor() return self.group_form_result(ev) raise Exception("P2P_CONNECT(join) failed") @@ -723,6 +917,22 @@ class WpaSupplicant: raise Exception("Failed to request TDLS teardown") return None + def tdls_link_status(self, peer): + cmd = "TDLS_LINK_STATUS " + peer + ret = self.group_request(cmd) + if "FAIL" in ret: + raise Exception("Failed to request TDLS link status") + return ret + + def tspecs(self): + """Return (tsid, up) tuples representing current tspecs""" + res = self.request("WMM_AC_STATUS") + tspecs = re.findall(r"TSID=(\d+) UP=(\d+)", res) + tspecs = [tuple(map(int, tspec)) for tspec in tspecs] + + logger.debug("tspecs: " + str(tspecs)) + return tspecs + def add_ts(self, tsid, up, direction="downlink", expect_failure=False, extra=None): params = { @@ -754,6 +964,9 @@ class WpaSupplicant: if "tsid=%d" % (tsid) not in ev: raise Exception("ADDTS failed (invalid tsid in TSPEC-ADDED)") + if not (tsid, up) in self.tspecs(): + raise Exception("ADDTS failed (tsid not in tspec list)") + def del_ts(self, tsid): if self.request("WMM_AC_DELTS %d" % (tsid)).strip() != "OK": raise Exception("DELTS failed") @@ -764,6 +977,10 @@ class WpaSupplicant: if "tsid=%d" % (tsid) not in ev: raise Exception("DELTS failed (invalid tsid in TSPEC-REMOVED)") + tspecs = [(t, u) for (t, u) in self.tspecs() if t == tsid] + if tspecs: + raise Exception("DELTS failed (still in tspec list)") + def connect(self, ssid=None, ssid2=None, **kwargs): logger.info("Connect STA " + self.ifname + " to AP") id = self.add_network() @@ -777,20 +994,23 @@ class WpaSupplicant: "private_key_passwd", "ca_cert2", "client_cert2", "private_key2", "phase1", "phase2", "domain_suffix_match", "altsubject_match", "subject_match", "pac_file", "dh_file", - "bgscan", "ht_mcs", "id_str", "openssl_ciphers" ] + "bgscan", "ht_mcs", "id_str", "openssl_ciphers", + "domain_match" ] for field in quoted: if field in kwargs and kwargs[field]: self.set_network_quoted(id, field, kwargs[field]) not_quoted = [ "proto", "key_mgmt", "ieee80211w", "pairwise", "group", "wep_key0", "wep_key1", "wep_key2", "wep_key3", - "wep_tx_keyidx", "scan_freq", "eap", + "wep_tx_keyidx", "scan_freq", "freq_list", "eap", "eapol_flags", "fragment_size", "scan_ssid", "auth_alg", "wpa_ptk_rekey", "disable_ht", "disable_vht", "bssid", "disable_max_amsdu", "ampdu_factor", "ampdu_density", "disable_ht40", "disable_sgi", "disable_ldpc", "ht40_intolerant", "update_identifier", "mac_addr", - "erp", "bg_scan_period" ] + "erp", "bg_scan_period", "bssid_blacklist", + "bssid_whitelist", "mem_only_psk", "eap_workaround", + "engine" ] for field in not_quoted: if field in kwargs and kwargs[field]: self.set_network(id, field, kwargs[field]) @@ -836,11 +1056,11 @@ class WpaSupplicant: if ev is None: raise Exception("Scan timed out") - def scan_for_bss(self, bssid, freq=None, force_scan=False): + def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False): if not force_scan and self.get_bss(bssid) is not None: return for i in range(0, 10): - self.scan(freq=freq, type="ONLY") + self.scan(freq=freq, type="ONLY", only_new=only_new) if self.get_bss(bssid) is not None: return raise Exception("Could not find BSS " + bssid + " in scan") @@ -848,6 +1068,13 @@ class WpaSupplicant: def flush_scan_cache(self, freq=2417): self.request("BSS_FLUSH 0") self.scan(freq=freq, only_new=True) + res = self.request("SCAN_RESULTS") + if len(res.splitlines()) > 1: + self.request("BSS_FLUSH 0") + self.scan(freq=2422, only_new=True) + res = self.request("SCAN_RESULTS") + if len(res.splitlines()) > 1: + logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res) def roam(self, bssid, fail_test=False): self.dump_monitor() @@ -913,8 +1140,14 @@ class WpaSupplicant: return None return res.split(' ') - def get_bss(self, bssid): - res = self.request("BSS " + bssid) + def get_bss(self, bssid, ifname=None): + if not ifname or ifname == self.ifname: + res = self.request("BSS " + bssid) + elif ifname == self.group_ifname: + res = self.group_request("BSS " + bssid) + else: + return None + if "FAIL" in res: return None lines = res.splitlines() @@ -971,6 +1204,17 @@ class WpaSupplicant: if field != "freq": raise Exception("Unexpected MGMT-RX event format: " + ev) msg['freq'] = val + + field,val = items[2].split('=') + if field != "datarate": + raise Exception("Unexpected MGMT-RX event format: " + ev) + msg['datarate'] = val + + field,val = items[3].split('=') + if field != "ssi_signal": + raise Exception("Unexpected MGMT-RX event format: " + ev) + msg['ssi_signal'] = val + frame = binascii.unhexlify(items[4]) msg['frame'] = frame @@ -997,8 +1241,42 @@ class WpaSupplicant: raise Exception(error) return ev - def wait_disconnected(self, timeout=10, error="Disconnection timed out"): + def wait_disconnected(self, timeout=None, error="Disconnection timed out"): + if timeout is None: + timeout = 10 if self.hostname is None else 30 ev = self.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=timeout) if ev is None: raise Exception(error) return ev + + def get_group_ifname(self): + return self.group_ifname if self.group_ifname else self.ifname + + def get_config(self): + res = self.request("DUMP") + if res.startswith("FAIL"): + raise Exception("DUMP failed") + lines = res.splitlines() + vals = dict() + for l in lines: + [name,value] = l.split('=', 1) + vals[name] = value + return vals + + def asp_provision(self, peer, adv_id, adv_mac, session_id, session_mac, + method="1000", info="", status=None, cpt=None, role=None): + if status is None: + cmd = "P2P_ASP_PROVISION" + params = "info='%s' method=%s" % (info, method) + else: + cmd = "P2P_ASP_PROVISION_RESP" + params = "status=%d" % status + + if role is not None: + params += " role=" + role + if cpt is not None: + params += " cpt=" + cpt + + if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" % + (cmd, peer, adv_id, adv_mac, session_id, session_mac, params)): + raise Exception("%s request failed" % cmd)