X-Git-Url: http://www.project-moonshot.org/gitweb/?a=blobdiff_plain;f=libeap%2Fwpa_supplicant%2Feapol_test.py;fp=libeap%2Fwpa_supplicant%2Feapol_test.py;h=80e7dfcf531dc391bb6fa376e5f83daeb7044a54;hb=88e34eecb5769da6526361b35df00fdbe823eac5;hp=0000000000000000000000000000000000000000;hpb=2394142e4bf89e0a9e2a0091ac16bd6442e3d4c7;p=mech_eap.git diff --git a/libeap/wpa_supplicant/eapol_test.py b/libeap/wpa_supplicant/eapol_test.py new file mode 100755 index 0000000..80e7dfc --- /dev/null +++ b/libeap/wpa_supplicant/eapol_test.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python2 +# +# eapol_test controller +# Copyright (c) 2015, Jouni Malinen +# +# This software may be distributed under the terms of the BSD license. +# See README for more details. + +import argparse +import logging +import os +import Queue +import sys +import threading + +logger = logging.getLogger() +dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__)) +sys.path.append(os.path.join(dir, '..', 'wpaspy')) +import wpaspy +wpas_ctrl = '/tmp/eapol_test' + +class eapol_test: + def __init__(self, ifname): + self.ifname = ifname + self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + if "PONG" not in self.ctrl.request("PING"): + raise Exception("Failed to connect to eapol_test (%s)" % ifname) + self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) + self.mon.attach() + + def add_network(self): + id = self.request("ADD_NETWORK") + if "FAIL" in id: + raise Exception("ADD_NETWORK failed") + return int(id) + + def remove_network(self, id): + id = self.request("REMOVE_NETWORK " + str(id)) + if "FAIL" in id: + raise Exception("REMOVE_NETWORK failed") + return None + + def set_network(self, id, field, value): + res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value) + if "FAIL" in res: + raise Exception("SET_NETWORK failed") + return None + + def set_network_quoted(self, id, field, value): + res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') + if "FAIL" in res: + raise Exception("SET_NETWORK failed") + return None + + def request(self, cmd, timeout=10): + return self.ctrl.request(cmd, timeout=timeout) + + def wait_event(self, events, timeout=10): + start = os.times()[4] + while True: + while self.mon.pending(): + ev = self.mon.recv() + logger.debug(self.ifname + ": " + ev) + for event in events: + if event in ev: + return ev + now = os.times()[4] + remaining = start + timeout - now + if remaining <= 0: + break + if not self.mon.pending(timeout=remaining): + break + return None + +def run(ifname, count, no_fast_reauth, res): + et = eapol_test(ifname) + + et.request("AP_SCAN 0") + if no_fast_reauth: + et.request("SET fast_reauth 0") + else: + et.request("SET fast_reauth 1") + id = et.add_network() + et.set_network(id, "key_mgmt", "IEEE8021X") + et.set_network(id, "eapol_flags", "0") + et.set_network(id, "eap", "TLS") + et.set_network_quoted(id, "identity", "user") + et.set_network_quoted(id, "ca_cert", 'ca.pem') + et.set_network_quoted(id, "client_cert", 'client.pem') + et.set_network_quoted(id, "private_key", 'client.key') + et.set_network_quoted(id, "private_key_passwd", 'whatever') + et.set_network(id, "disabled", "0") + + fail = False + for i in range(count): + et.request("REASSOCIATE") + ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"]) + if ev is None or "CTRL-EVENT-CONNECTED" not in ev: + fail = True + break + + et.remove_network(id) + + if fail: + res.put("FAIL (%d OK)" % i) + else: + res.put("PASS %d" % (i + 1)) + +def main(): + parser = argparse.ArgumentParser(description='eapol_test controller') + parser.add_argument('--ctrl', help='control interface directory') + parser.add_argument('--num', help='number of processes') + parser.add_argument('--iter', help='number of iterations') + parser.add_argument('--no-fast-reauth', action='store_true', + dest='no_fast_reauth', + help='disable TLS session resumption') + args = parser.parse_args() + + num = int(args.num) + iter = int(args.iter) + if args.ctrl: + global wpas_ctrl + wpas_ctrl = args.ctrl + + t = {} + res = {} + for i in range(num): + res[i] = Queue.Queue() + t[i] = threading.Thread(target=run, args=(str(i), iter, + args.no_fast_reauth, res[i])) + for i in range(num): + t[i].start() + for i in range(num): + t[i].join() + try: + results = res[i].get(False) + except: + results = "N/A" + print "%d: %s" % (i, results) + +if __name__ == "__main__": + main()