tests: Add FST module tests
[mech_eap.git] / tests / hwsim / hostapd.py
1 # Python class for controlling hostapd
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import struct
12 import wpaspy
13
14 logger = logging.getLogger()
15 hapd_ctrl = '/var/run/hostapd'
16 hapd_global = '/var/run/hostapd-global'
17
18 def mac2tuple(mac):
19     return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
20
21 class HostapdGlobal:
22     def __init__(self):
23         self.ctrl = wpaspy.Ctrl(hapd_global)
24         self.mon = wpaspy.Ctrl(hapd_global)
25         self.mon.attach()
26
27     def request(self, cmd):
28         return self.ctrl.request(cmd)
29
30     def wait_event(self, events, timeout):
31         start = os.times()[4]
32         while True:
33             while self.mon.pending():
34                 ev = self.mon.recv()
35                 logger.debug("(global): " + ev)
36                 for event in events:
37                     if event in ev:
38                         return ev
39             now = os.times()[4]
40             remaining = start + timeout - now
41             if remaining <= 0:
42                 break
43             if not self.mon.pending(timeout=remaining):
44                 break
45         return None
46
47     def add(self, ifname):
48         res = self.ctrl.request("ADD " + ifname + " " + hapd_ctrl)
49         if not "OK" in res:
50             raise Exception("Could not add hostapd interface " + ifname)
51
52     def add_iface(self, ifname, confname):
53         res = self.ctrl.request("ADD " + ifname + " config=" + confname)
54         if not "OK" in res:
55             raise Exception("Could not add hostapd interface")
56
57     def add_bss(self, phy, confname, ignore_error=False):
58         res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname)
59         if not "OK" in res:
60             if not ignore_error:
61                 raise Exception("Could not add hostapd BSS")
62
63     def remove(self, ifname):
64         self.ctrl.request("REMOVE " + ifname, timeout=30)
65
66     def relog(self):
67         self.ctrl.request("RELOG")
68
69     def flush(self):
70         self.ctrl.request("FLUSH")
71
72
73 class Hostapd:
74     def __init__(self, ifname, bssidx=0):
75         self.ifname = ifname
76         self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
77         self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
78         self.mon.attach()
79         self.bssid = None
80         self.bssidx = bssidx
81
82     def own_addr(self):
83         if self.bssid is None:
84             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
85         return self.bssid
86
87     def request(self, cmd):
88         logger.debug(self.ifname + ": CTRL: " + cmd)
89         return self.ctrl.request(cmd)
90
91     def ping(self):
92         return "PONG" in self.request("PING")
93
94     def set(self, field, value):
95         if not "OK" in self.request("SET " + field + " " + value):
96             raise Exception("Failed to set hostapd parameter " + field)
97
98     def set_defaults(self):
99         self.set("driver", "nl80211")
100         self.set("hw_mode", "g")
101         self.set("channel", "1")
102         self.set("ieee80211n", "1")
103         self.set("logger_stdout", "-1")
104         self.set("logger_stdout_level", "0")
105
106     def set_open(self, ssid):
107         self.set_defaults()
108         self.set("ssid", ssid)
109
110     def set_wpa2_psk(self, ssid, passphrase):
111         self.set_defaults()
112         self.set("ssid", ssid)
113         self.set("wpa_passphrase", passphrase)
114         self.set("wpa", "2")
115         self.set("wpa_key_mgmt", "WPA-PSK")
116         self.set("rsn_pairwise", "CCMP")
117
118     def set_wpa_psk(self, ssid, passphrase):
119         self.set_defaults()
120         self.set("ssid", ssid)
121         self.set("wpa_passphrase", passphrase)
122         self.set("wpa", "1")
123         self.set("wpa_key_mgmt", "WPA-PSK")
124         self.set("wpa_pairwise", "TKIP")
125
126     def set_wpa_psk_mixed(self, ssid, passphrase):
127         self.set_defaults()
128         self.set("ssid", ssid)
129         self.set("wpa_passphrase", passphrase)
130         self.set("wpa", "3")
131         self.set("wpa_key_mgmt", "WPA-PSK")
132         self.set("wpa_pairwise", "TKIP")
133         self.set("rsn_pairwise", "CCMP")
134
135     def set_wep(self, ssid, key):
136         self.set_defaults()
137         self.set("ssid", ssid)
138         self.set("wep_key0", key)
139
140     def enable(self):
141         if not "OK" in self.request("ENABLE"):
142             raise Exception("Failed to enable hostapd interface " + self.ifname)
143
144     def disable(self):
145         if not "OK" in self.request("DISABLE"):
146             raise Exception("Failed to disable hostapd interface " + self.ifname)
147
148     def dump_monitor(self):
149         while self.mon.pending():
150             ev = self.mon.recv()
151             logger.debug(self.ifname + ": " + ev)
152
153     def wait_event(self, events, timeout):
154         start = os.times()[4]
155         while True:
156             while self.mon.pending():
157                 ev = self.mon.recv()
158                 logger.debug(self.ifname + ": " + ev)
159                 for event in events:
160                     if event in ev:
161                         return ev
162             now = os.times()[4]
163             remaining = start + timeout - now
164             if remaining <= 0:
165                 break
166             if not self.mon.pending(timeout=remaining):
167                 break
168         return None
169
170     def get_status(self):
171         res = self.request("STATUS")
172         lines = res.splitlines()
173         vals = dict()
174         for l in lines:
175             [name,value] = l.split('=', 1)
176             vals[name] = value
177         return vals
178
179     def get_status_field(self, field):
180         vals = self.get_status()
181         if field in vals:
182             return vals[field]
183         return None
184
185     def get_driver_status(self):
186         res = self.request("STATUS-DRIVER")
187         lines = res.splitlines()
188         vals = dict()
189         for l in lines:
190             [name,value] = l.split('=', 1)
191             vals[name] = value
192         return vals
193
194     def get_driver_status_field(self, field):
195         vals = self.get_driver_status()
196         if field in vals:
197             return vals[field]
198         return None
199
200     def get_config(self):
201         res = self.request("GET_CONFIG")
202         lines = res.splitlines()
203         vals = dict()
204         for l in lines:
205             [name,value] = l.split('=', 1)
206             vals[name] = value
207         return vals
208
209     def mgmt_rx(self, timeout=5):
210         ev = self.wait_event(["MGMT-RX"], timeout=timeout)
211         if ev is None:
212             return None
213         msg = {}
214         frame = binascii.unhexlify(ev.split(' ')[1])
215         msg['frame'] = frame
216
217         hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
218         msg['fc'] = hdr[0]
219         msg['subtype'] = (hdr[0] >> 4) & 0xf
220         hdr = hdr[1:]
221         msg['duration'] = hdr[0]
222         hdr = hdr[1:]
223         msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
224         hdr = hdr[6:]
225         msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
226         hdr = hdr[6:]
227         msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
228         hdr = hdr[6:]
229         msg['seq_ctrl'] = hdr[0]
230         msg['payload'] = frame[24:]
231
232         return msg
233
234     def mgmt_tx(self, msg):
235         t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
236         hdr = struct.pack('<HH6B6B6BH', *t)
237         self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
238
239     def get_sta(self, addr, info=None, next=False):
240         cmd = "STA-NEXT " if next else "STA "
241         if addr is None:
242             res = self.request("STA-FIRST")
243         elif info:
244             res = self.request(cmd + addr + " " + info)
245         else:
246             res = self.request(cmd + addr)
247         lines = res.splitlines()
248         vals = dict()
249         first = True
250         for l in lines:
251             if first and '=' not in l:
252                 vals['addr'] = l
253                 first = False
254             else:
255                 [name,value] = l.split('=', 1)
256                 vals[name] = value
257         return vals
258
259     def get_mib(self, param=None):
260         if param:
261             res = self.request("MIB " + param)
262         else:
263             res = self.request("MIB")
264         lines = res.splitlines()
265         vals = dict()
266         for l in lines:
267             name_val = l.split('=', 1)
268             if len(name_val) > 1:
269                 vals[name_val[0]] = name_val[1]
270         return vals
271
272 def add_ap(ifname, params, wait_enabled=True, no_enable=False):
273         logger.info("Starting AP " + ifname)
274         hapd_global = HostapdGlobal()
275         hapd_global.remove(ifname)
276         hapd_global.add(ifname)
277         hapd = Hostapd(ifname)
278         if not hapd.ping():
279             raise Exception("Could not ping hostapd")
280         hapd.set_defaults()
281         fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
282                    "wpa",
283                    "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
284                    "acct_server_addr", "osu_server_uri" ]
285         for field in fields:
286             if field in params:
287                 hapd.set(field, params[field])
288         for f,v in params.items():
289             if f in fields:
290                 continue
291             if isinstance(v, list):
292                 for val in v:
293                     hapd.set(f, val)
294             else:
295                 hapd.set(f, v)
296         if no_enable:
297             return hapd
298         hapd.enable()
299         if wait_enabled:
300             ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=30)
301             if ev is None:
302                 raise Exception("AP startup timed out")
303             if "AP-ENABLED" not in ev:
304                 raise Exception("AP startup failed")
305         return hapd
306
307 def add_bss(phy, ifname, confname, ignore_error=False):
308     logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
309     hapd_global = HostapdGlobal()
310     hapd_global.add_bss(phy, confname, ignore_error)
311     hapd = Hostapd(ifname)
312     if not hapd.ping():
313         raise Exception("Could not ping hostapd")
314
315 def add_iface(ifname, confname):
316     logger.info("Starting interface " + ifname)
317     hapd_global = HostapdGlobal()
318     hapd_global.add_iface(ifname, confname)
319     hapd = Hostapd(ifname)
320     if not hapd.ping():
321         raise Exception("Could not ping hostapd")
322
323 def remove_bss(ifname):
324     logger.info("Removing BSS " + ifname)
325     hapd_global = HostapdGlobal()
326     hapd_global.remove(ifname)
327
328 def wpa2_params(ssid=None, passphrase=None):
329     params = { "wpa": "2",
330                "wpa_key_mgmt": "WPA-PSK",
331                "rsn_pairwise": "CCMP" }
332     if ssid:
333         params["ssid"] = ssid
334     if passphrase:
335         params["wpa_passphrase"] = passphrase
336     return params
337
338 def wpa_params(ssid=None, passphrase=None):
339     params = { "wpa": "1",
340                "wpa_key_mgmt": "WPA-PSK",
341                "wpa_pairwise": "TKIP" }
342     if ssid:
343         params["ssid"] = ssid
344     if passphrase:
345         params["wpa_passphrase"] = passphrase
346     return params
347
348 def wpa_mixed_params(ssid=None, passphrase=None):
349     params = { "wpa": "3",
350                "wpa_key_mgmt": "WPA-PSK",
351                "wpa_pairwise": "TKIP",
352                "rsn_pairwise": "CCMP" }
353     if ssid:
354         params["ssid"] = ssid
355     if passphrase:
356         params["wpa_passphrase"] = passphrase
357     return params
358
359 def radius_params():
360     params = { "auth_server_addr": "127.0.0.1",
361                "auth_server_port": "1812",
362                "auth_server_shared_secret": "radius",
363                "nas_identifier": "nas.w1.fi" }
364     return params
365
366 def wpa_eap_params(ssid=None):
367     params = radius_params()
368     params["wpa"] = "1"
369     params["wpa_key_mgmt"] = "WPA-EAP"
370     params["wpa_pairwise"] = "TKIP"
371     params["ieee8021x"] = "1"
372     if ssid:
373         params["ssid"] = ssid
374     return params
375
376 def wpa2_eap_params(ssid=None):
377     params = radius_params()
378     params["wpa"] = "2"
379     params["wpa_key_mgmt"] = "WPA-EAP"
380     params["rsn_pairwise"] = "CCMP"
381     params["ieee8021x"] = "1"
382     if ssid:
383         params["ssid"] = ssid
384     return params