Add a starting point for mac80211_hwsim-based testing
authorJouni Malinen <j@w1.fi>
Fri, 1 Mar 2013 23:05:03 +0000 (01:05 +0200)
committerJouni Malinen <j@w1.fi>
Fri, 1 Mar 2013 23:05:03 +0000 (01:05 +0200)
This will hopefully grow over time to become a much more complete
testing mechanism that uses mac80211_hwsim to verify various
wpa_supplicant and hostapd functions automatically.

Signed-hostap: Jouni Malinen <j@w1.fi>

tests/hwsim/p2p-group-formation.py [new file with mode: 0755]
tests/hwsim/wpasupplicant.py [new file with mode: 0644]

diff --git a/tests/hwsim/p2p-group-formation.py b/tests/hwsim/p2p-group-formation.py
new file mode 100755 (executable)
index 0000000..37f6709
--- /dev/null
@@ -0,0 +1,48 @@
+#!/usr/bin/python
+#
+# P2P group formation test
+# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import os
+import sys
+import time
+
+import logging
+
+from wpasupplicant import WpaSupplicant
+
+
+def main():
+    if len(sys.argv) > 1 and sys.argv[1] == '-d':
+        logging.basicConfig(level=logging.DEBUG)
+    else:
+        logging.basicConfig()
+
+    dev0 = WpaSupplicant('wlan0')
+    dev1 = WpaSupplicant('wlan1')
+    dev0.request("hello")
+    dev0.ping()
+    if not dev0.ping() or not dev1.ping():
+        print "No response from wpa_supplicant"
+        return
+    addr0 = dev0.p2p_dev_addr()
+    addr1 = dev1.p2p_dev_addr()
+    print "dev0 P2P Device Address: " + addr0
+    print "dev1 P2P Device Address: " + addr1
+    dev0.reset()
+    dev1.reset()
+
+    dev0.p2p_listen()
+    dev1.p2p_listen()
+    pin = dev0.wps_read_pin()
+    dev0.p2p_go_neg_auth(addr1, pin, "display")
+    print "Start GO negotiation"
+    dev1.p2p_go_neg_init(addr0, pin, "enter", timeout=15)
+    dev0.dump_monitor()
+    dev1.dump_monitor()
+
+if __name__ == "__main__":
+    main()
diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py
new file mode 100644 (file)
index 0000000..eca3fae
--- /dev/null
@@ -0,0 +1,122 @@
+#!/usr/bin/python
+#
+# Python class for controlling wpa_supplicant
+# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import os
+import time
+import logging
+import wpaspy
+
+logger = logging.getLogger(__name__)
+wpas_ctrl = '/var/run/wpa_supplicant'
+
+class WpaSupplicant:
+    def __init__(self, ifname):
+        self.ifname = ifname
+        self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
+        self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
+        self.mon.attach()
+
+    def request(self, cmd):
+        logger.debug(self.ifname + ": CTRL: " + cmd)
+        return self.ctrl.request(cmd)
+
+    def ping(self):
+        return "PONG" in self.request("PING")
+
+    def reset(self):
+        self.request("P2P_STOP_FIND")
+        self.request("P2P_FLUSH")
+        self.request("P2P_GROUP_REMOVE *")
+        self.request("REMOVE_NETWORK *")
+        self.request("REMOVE_CRED *")
+
+    def get_status(self, field):
+        res = self.request("STATUS")
+        lines = res.splitlines()
+        for l in lines:
+            [name,value] = l.split('=', 1)
+            if name == field:
+                return value
+        return None
+
+    def p2p_dev_addr(self):
+        return self.get_status("p2p_device_address")
+
+    def p2p_listen(self):
+        return self.request("P2P_LISTEN")
+
+    def p2p_find(self, social=False):
+        if social:
+            return self.request("P2P_FIND type=social")
+        return self.request("P2P_FIND")
+
+    def wps_read_pin(self):
+        #TODO: make this random
+        self.pin = "12345670"
+        return self.pin
+
+    def peer_known(self, peer, full=True):
+        res = self.request("P2P_PEER " + peer)
+        if peer.lower() not in res.lower():
+            return False
+        if not full:
+            return True
+        return "[PROBE_REQ_ONLY]" not in res
+
+    def discover_peer(self, peer, full=True, timeout=15):
+        logger.info(self.ifname + ": Trying to discover peer " + peer)
+        if self.peer_known(peer, full):
+            return True
+        self.p2p_find()
+        count = 0
+        while count < timeout:
+            time.sleep(1)
+            count = count + 1
+            if self.peer_known(peer, full):
+                return True
+        return False
+
+    def p2p_go_neg_auth(self, peer, pin, method):
+        if not self.discover_peer(peer):
+            raise Exception("Peer " + peer + " not found")
+        self.dump_monitor()
+        cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
+        if "OK" in self.request(cmd):
+            return None
+        raise Exception("P2P_CONNECT (auth) failed")
+
+    def p2p_go_neg_init(self, peer, pin, method, timeout=0):
+        if not self.discover_peer(peer):
+            raise Exception("Peer " + peer + " not found")
+        self.dump_monitor()
+        cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
+        if "OK" in self.request(cmd):
+            if timeout == 0:
+                self.dump_monitor()
+                return None
+            if self.wait_event("P2P-GROUP-STARTED", timeout):
+                self.dump_monitor()
+                return None
+            raise Exception("Group formation timed out")
+        raise Exception("P2P_CONNECT failed")
+
+    def wait_event(self, event, timeout):
+        count = 0
+        while count < timeout * 2:
+            count = count + 1
+            time.sleep(0.5)
+            while self.mon.pending():
+                ev = self.mon.recv()
+                if event in ev:
+                    return True
+        return False
+
+    def dump_monitor(self):
+        while self.mon.pending():
+            ev = self.mon.recv()
+            logger.debug(self.ifname + ": " + ev)