Updated through tag hostap_2_5 from git://w1.fi/hostap.git
[mech_eap.git] / libeap / tests / hwsim / hostapd.py
1 # Python class for controlling hostapd
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import struct
12 import wpaspy
13
14 logger = logging.getLogger()
15 hapd_ctrl = '/var/run/hostapd'
16 hapd_global = '/var/run/hostapd-global'
17
18 def mac2tuple(mac):
19     return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
20
21 class HostapdGlobal:
22     def __init__(self):
23         self.ctrl = wpaspy.Ctrl(hapd_global)
24         self.mon = wpaspy.Ctrl(hapd_global)
25         self.mon.attach()
26
27     def request(self, cmd):
28         return self.ctrl.request(cmd)
29
30     def wait_event(self, events, timeout):
31         start = os.times()[4]
32         while True:
33             while self.mon.pending():
34                 ev = self.mon.recv()
35                 logger.debug("(global): " + ev)
36                 for event in events:
37                     if event in ev:
38                         return ev
39             now = os.times()[4]
40             remaining = start + timeout - now
41             if remaining <= 0:
42                 break
43             if not self.mon.pending(timeout=remaining):
44                 break
45         return None
46
47     def request(self, cmd):
48         return self.ctrl.request(cmd)
49
50     def add(self, ifname, driver=None):
51         cmd = "ADD " + ifname + " " + hapd_ctrl
52         if driver:
53             cmd += " " + driver
54         res = self.ctrl.request(cmd)
55         if not "OK" in res:
56             raise Exception("Could not add hostapd interface " + ifname)
57
58     def add_iface(self, ifname, confname):
59         res = self.ctrl.request("ADD " + ifname + " config=" + confname)
60         if not "OK" in res:
61             raise Exception("Could not add hostapd interface")
62
63     def add_bss(self, phy, confname, ignore_error=False):
64         res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname)
65         if not "OK" in res:
66             if not ignore_error:
67                 raise Exception("Could not add hostapd BSS")
68
69     def remove(self, ifname):
70         self.ctrl.request("REMOVE " + ifname, timeout=30)
71
72     def relog(self):
73         self.ctrl.request("RELOG")
74
75     def flush(self):
76         self.ctrl.request("FLUSH")
77
78
79 class Hostapd:
80     def __init__(self, ifname, bssidx=0):
81         self.ifname = ifname
82         self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
83         self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
84         self.mon.attach()
85         self.bssid = None
86         self.bssidx = bssidx
87
88     def own_addr(self):
89         if self.bssid is None:
90             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
91         return self.bssid
92
93     def request(self, cmd):
94         logger.debug(self.ifname + ": CTRL: " + cmd)
95         return self.ctrl.request(cmd)
96
97     def ping(self):
98         return "PONG" in self.request("PING")
99
100     def set(self, field, value):
101         if not "OK" in self.request("SET " + field + " " + value):
102             raise Exception("Failed to set hostapd parameter " + field)
103
104     def set_defaults(self):
105         self.set("driver", "nl80211")
106         self.set("hw_mode", "g")
107         self.set("channel", "1")
108         self.set("ieee80211n", "1")
109         self.set("logger_stdout", "-1")
110         self.set("logger_stdout_level", "0")
111
112     def set_open(self, ssid):
113         self.set_defaults()
114         self.set("ssid", ssid)
115
116     def set_wpa2_psk(self, ssid, passphrase):
117         self.set_defaults()
118         self.set("ssid", ssid)
119         self.set("wpa_passphrase", passphrase)
120         self.set("wpa", "2")
121         self.set("wpa_key_mgmt", "WPA-PSK")
122         self.set("rsn_pairwise", "CCMP")
123
124     def set_wpa_psk(self, ssid, passphrase):
125         self.set_defaults()
126         self.set("ssid", ssid)
127         self.set("wpa_passphrase", passphrase)
128         self.set("wpa", "1")
129         self.set("wpa_key_mgmt", "WPA-PSK")
130         self.set("wpa_pairwise", "TKIP")
131
132     def set_wpa_psk_mixed(self, ssid, passphrase):
133         self.set_defaults()
134         self.set("ssid", ssid)
135         self.set("wpa_passphrase", passphrase)
136         self.set("wpa", "3")
137         self.set("wpa_key_mgmt", "WPA-PSK")
138         self.set("wpa_pairwise", "TKIP")
139         self.set("rsn_pairwise", "CCMP")
140
141     def set_wep(self, ssid, key):
142         self.set_defaults()
143         self.set("ssid", ssid)
144         self.set("wep_key0", key)
145
146     def enable(self):
147         if not "OK" in self.request("ENABLE"):
148             raise Exception("Failed to enable hostapd interface " + self.ifname)
149
150     def disable(self):
151         if not "OK" in self.request("DISABLE"):
152             raise Exception("Failed to disable hostapd interface " + self.ifname)
153
154     def dump_monitor(self):
155         while self.mon.pending():
156             ev = self.mon.recv()
157             logger.debug(self.ifname + ": " + ev)
158
159     def wait_event(self, events, timeout):
160         start = os.times()[4]
161         while True:
162             while self.mon.pending():
163                 ev = self.mon.recv()
164                 logger.debug(self.ifname + ": " + ev)
165                 for event in events:
166                     if event in ev:
167                         return ev
168             now = os.times()[4]
169             remaining = start + timeout - now
170             if remaining <= 0:
171                 break
172             if not self.mon.pending(timeout=remaining):
173                 break
174         return None
175
176     def get_status(self):
177         res = self.request("STATUS")
178         lines = res.splitlines()
179         vals = dict()
180         for l in lines:
181             [name,value] = l.split('=', 1)
182             vals[name] = value
183         return vals
184
185     def get_status_field(self, field):
186         vals = self.get_status()
187         if field in vals:
188             return vals[field]
189         return None
190
191     def get_driver_status(self):
192         res = self.request("STATUS-DRIVER")
193         lines = res.splitlines()
194         vals = dict()
195         for l in lines:
196             [name,value] = l.split('=', 1)
197             vals[name] = value
198         return vals
199
200     def get_driver_status_field(self, field):
201         vals = self.get_driver_status()
202         if field in vals:
203             return vals[field]
204         return None
205
206     def get_config(self):
207         res = self.request("GET_CONFIG")
208         lines = res.splitlines()
209         vals = dict()
210         for l in lines:
211             [name,value] = l.split('=', 1)
212             vals[name] = value
213         return vals
214
215     def mgmt_rx(self, timeout=5):
216         ev = self.wait_event(["MGMT-RX"], timeout=timeout)
217         if ev is None:
218             return None
219         msg = {}
220         frame = binascii.unhexlify(ev.split(' ')[1])
221         msg['frame'] = frame
222
223         hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
224         msg['fc'] = hdr[0]
225         msg['subtype'] = (hdr[0] >> 4) & 0xf
226         hdr = hdr[1:]
227         msg['duration'] = hdr[0]
228         hdr = hdr[1:]
229         msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
230         hdr = hdr[6:]
231         msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
232         hdr = hdr[6:]
233         msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
234         hdr = hdr[6:]
235         msg['seq_ctrl'] = hdr[0]
236         msg['payload'] = frame[24:]
237
238         return msg
239
240     def mgmt_tx(self, msg):
241         t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
242         hdr = struct.pack('<HH6B6B6BH', *t)
243         self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
244
245     def get_sta(self, addr, info=None, next=False):
246         cmd = "STA-NEXT " if next else "STA "
247         if addr is None:
248             res = self.request("STA-FIRST")
249         elif info:
250             res = self.request(cmd + addr + " " + info)
251         else:
252             res = self.request(cmd + addr)
253         lines = res.splitlines()
254         vals = dict()
255         first = True
256         for l in lines:
257             if first and '=' not in l:
258                 vals['addr'] = l
259                 first = False
260             else:
261                 [name,value] = l.split('=', 1)
262                 vals[name] = value
263         return vals
264
265     def get_mib(self, param=None):
266         if param:
267             res = self.request("MIB " + param)
268         else:
269             res = self.request("MIB")
270         lines = res.splitlines()
271         vals = dict()
272         for l in lines:
273             name_val = l.split('=', 1)
274             if len(name_val) > 1:
275                 vals[name_val[0]] = name_val[1]
276         return vals
277
278 def add_ap(ifname, params, wait_enabled=True, no_enable=False):
279         logger.info("Starting AP " + ifname)
280         hapd_global = HostapdGlobal()
281         hapd_global.remove(ifname)
282         hapd_global.add(ifname)
283         hapd = Hostapd(ifname)
284         if not hapd.ping():
285             raise Exception("Could not ping hostapd")
286         hapd.set_defaults()
287         fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
288                    "wpa",
289                    "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
290                    "acct_server_addr", "osu_server_uri" ]
291         for field in fields:
292             if field in params:
293                 hapd.set(field, params[field])
294         for f,v in params.items():
295             if f in fields:
296                 continue
297             if isinstance(v, list):
298                 for val in v:
299                     hapd.set(f, val)
300             else:
301                 hapd.set(f, v)
302         if no_enable:
303             return hapd
304         hapd.enable()
305         if wait_enabled:
306             ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=30)
307             if ev is None:
308                 raise Exception("AP startup timed out")
309             if "AP-ENABLED" not in ev:
310                 raise Exception("AP startup failed")
311         return hapd
312
313 def add_bss(phy, ifname, confname, ignore_error=False):
314     logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
315     hapd_global = HostapdGlobal()
316     hapd_global.add_bss(phy, confname, ignore_error)
317     hapd = Hostapd(ifname)
318     if not hapd.ping():
319         raise Exception("Could not ping hostapd")
320
321 def add_iface(ifname, confname):
322     logger.info("Starting interface " + ifname)
323     hapd_global = HostapdGlobal()
324     hapd_global.add_iface(ifname, confname)
325     hapd = Hostapd(ifname)
326     if not hapd.ping():
327         raise Exception("Could not ping hostapd")
328
329 def remove_bss(ifname):
330     logger.info("Removing BSS " + ifname)
331     hapd_global = HostapdGlobal()
332     hapd_global.remove(ifname)
333
334 def wpa2_params(ssid=None, passphrase=None):
335     params = { "wpa": "2",
336                "wpa_key_mgmt": "WPA-PSK",
337                "rsn_pairwise": "CCMP" }
338     if ssid:
339         params["ssid"] = ssid
340     if passphrase:
341         params["wpa_passphrase"] = passphrase
342     return params
343
344 def wpa_params(ssid=None, passphrase=None):
345     params = { "wpa": "1",
346                "wpa_key_mgmt": "WPA-PSK",
347                "wpa_pairwise": "TKIP" }
348     if ssid:
349         params["ssid"] = ssid
350     if passphrase:
351         params["wpa_passphrase"] = passphrase
352     return params
353
354 def wpa_mixed_params(ssid=None, passphrase=None):
355     params = { "wpa": "3",
356                "wpa_key_mgmt": "WPA-PSK",
357                "wpa_pairwise": "TKIP",
358                "rsn_pairwise": "CCMP" }
359     if ssid:
360         params["ssid"] = ssid
361     if passphrase:
362         params["wpa_passphrase"] = passphrase
363     return params
364
365 def radius_params():
366     params = { "auth_server_addr": "127.0.0.1",
367                "auth_server_port": "1812",
368                "auth_server_shared_secret": "radius",
369                "nas_identifier": "nas.w1.fi" }
370     return params
371
372 def wpa_eap_params(ssid=None):
373     params = radius_params()
374     params["wpa"] = "1"
375     params["wpa_key_mgmt"] = "WPA-EAP"
376     params["wpa_pairwise"] = "TKIP"
377     params["ieee8021x"] = "1"
378     if ssid:
379         params["ssid"] = ssid
380     return params
381
382 def wpa2_eap_params(ssid=None):
383     params = radius_params()
384     params["wpa"] = "2"
385     params["wpa_key_mgmt"] = "WPA-EAP"
386     params["rsn_pairwise"] = "CCMP"
387     params["ieee8021x"] = "1"
388     if ssid:
389         params["ssid"] = ssid
390     return params