From: Jouni Malinen Date: Fri, 1 Mar 2013 23:05:03 +0000 (+0200) Subject: Add a starting point for mac80211_hwsim-based testing X-Git-Tag: aosp-kk-from-upstream~510 X-Git-Url: http://www.project-moonshot.org/gitweb/?a=commitdiff_plain;h=1ae73b03be863f7b3e40fa8f1b5598a1d8b0cafa;p=mech_eap.git Add a starting point for mac80211_hwsim-based testing This will hopefully grow over time to become a much more complete testing mechanism that uses mac80211_hwsim to verify various wpa_supplicant and hostapd functions automatically. Signed-hostap: Jouni Malinen --- diff --git a/tests/hwsim/p2p-group-formation.py b/tests/hwsim/p2p-group-formation.py new file mode 100755 index 0000000..37f6709 --- /dev/null +++ b/tests/hwsim/p2p-group-formation.py @@ -0,0 +1,48 @@ +#!/usr/bin/python +# +# P2P group formation test +# Copyright (c) 2013, Jouni Malinen +# +# This software may be distributed under the terms of the BSD license. +# See README for more details. + +import os +import sys +import time + +import logging + +from wpasupplicant import WpaSupplicant + + +def main(): + if len(sys.argv) > 1 and sys.argv[1] == '-d': + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig() + + dev0 = WpaSupplicant('wlan0') + dev1 = WpaSupplicant('wlan1') + dev0.request("hello") + dev0.ping() + if not dev0.ping() or not dev1.ping(): + print "No response from wpa_supplicant" + return + addr0 = dev0.p2p_dev_addr() + addr1 = dev1.p2p_dev_addr() + print "dev0 P2P Device Address: " + addr0 + print "dev1 P2P Device Address: " + addr1 + dev0.reset() + dev1.reset() + + dev0.p2p_listen() + dev1.p2p_listen() + pin = dev0.wps_read_pin() + dev0.p2p_go_neg_auth(addr1, pin, "display") + print "Start GO negotiation" + dev1.p2p_go_neg_init(addr0, pin, "enter", timeout=15) + dev0.dump_monitor() + dev1.dump_monitor() + +if __name__ == "__main__": + main() diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py new file mode 100644 index 0000000..eca3fae --- /dev/null +++ b/tests/hwsim/wpasupplicant.py @@ -0,0 +1,122 @@ +#!/usr/bin/python +# +# Python class for controlling wpa_supplicant +# Copyright (c) 2013, Jouni Malinen +# +# This software may be distributed under the terms of the BSD license. +# See README for more details. + +import os +import time +import logging +import wpaspy + +logger = logging.getLogger(__name__) +wpas_ctrl = '/var/run/wpa_supplicant' + +class WpaSupplicant: + def __init__(self, ifname): + self.ifname = ifname + self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + self.mon.attach() + + def request(self, cmd): + logger.debug(self.ifname + ": CTRL: " + cmd) + return self.ctrl.request(cmd) + + def ping(self): + return "PONG" in self.request("PING") + + def reset(self): + self.request("P2P_STOP_FIND") + self.request("P2P_FLUSH") + self.request("P2P_GROUP_REMOVE *") + self.request("REMOVE_NETWORK *") + self.request("REMOVE_CRED *") + + def get_status(self, field): + res = self.request("STATUS") + lines = res.splitlines() + for l in lines: + [name,value] = l.split('=', 1) + if name == field: + return value + return None + + def p2p_dev_addr(self): + return self.get_status("p2p_device_address") + + def p2p_listen(self): + return self.request("P2P_LISTEN") + + def p2p_find(self, social=False): + if social: + return self.request("P2P_FIND type=social") + return self.request("P2P_FIND") + + def wps_read_pin(self): + #TODO: make this random + self.pin = "12345670" + return self.pin + + def peer_known(self, peer, full=True): + res = self.request("P2P_PEER " + peer) + if peer.lower() not in res.lower(): + return False + if not full: + return True + return "[PROBE_REQ_ONLY]" not in res + + def discover_peer(self, peer, full=True, timeout=15): + logger.info(self.ifname + ": Trying to discover peer " + peer) + if self.peer_known(peer, full): + return True + self.p2p_find() + count = 0 + while count < timeout: + time.sleep(1) + count = count + 1 + if self.peer_known(peer, full): + return True + return False + + def p2p_go_neg_auth(self, peer, pin, method): + if not self.discover_peer(peer): + raise Exception("Peer " + peer + " not found") + self.dump_monitor() + cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" + if "OK" in self.request(cmd): + return None + raise Exception("P2P_CONNECT (auth) failed") + + def p2p_go_neg_init(self, peer, pin, method, timeout=0): + if not self.discover_peer(peer): + raise Exception("Peer " + peer + " not found") + self.dump_monitor() + cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + if "OK" in self.request(cmd): + if timeout == 0: + self.dump_monitor() + return None + if self.wait_event("P2P-GROUP-STARTED", timeout): + self.dump_monitor() + return None + raise Exception("Group formation timed out") + raise Exception("P2P_CONNECT failed") + + def wait_event(self, event, timeout): + count = 0 + while count < timeout * 2: + count = count + 1 + time.sleep(0.5) + while self.mon.pending(): + ev = self.mon.recv() + if event in ev: + return True + return False + + def dump_monitor(self): + while self.mon.pending(): + ev = self.mon.recv() + logger.debug(self.ifname + ": " + ev)