9d670768b9a1a98198c135056a6858266ffa1c88
[mech_eap.git] / tests / hwsim / hostapd.py
1 # Python class for controlling hostapd
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import struct
12 import wpaspy
13 import remotehost
14 import utils
15 import subprocess
16
17 logger = logging.getLogger()
18 hapd_ctrl = '/var/run/hostapd'
19 hapd_global = '/var/run/hostapd-global'
20
21 def mac2tuple(mac):
22     return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
23
24 class HostapdGlobal:
25     def __init__(self, apdev=None):
26         try:
27             hostname = apdev['hostname']
28             port = apdev['port']
29         except:
30             hostname = None
31             port = 8878
32         self.host = remotehost.Host(hostname)
33         self.hostname = hostname
34         self.port = port
35         if hostname is None:
36             self.ctrl = wpaspy.Ctrl(hapd_global)
37             self.mon = wpaspy.Ctrl(hapd_global)
38             self.dbg = ""
39         else:
40             self.ctrl = wpaspy.Ctrl(hostname, port)
41             self.mon = wpaspy.Ctrl(hostname, port)
42             self.dbg = hostname + "/" + str(port)
43         self.mon.attach()
44
45     def cmd_execute(self, cmd_array):
46         if self.hostname is None:
47             cmd = ' '.join(cmd_array)
48             proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
49                                     stdout=subprocess.PIPE, shell=True)
50             out = proc.communicate()[0]
51             ret = proc.returncode
52             return ret, out
53         else:
54             return self.host.execute(cmd_array)
55
56     def request(self, cmd, timeout=10):
57         logger.debug(self.dbg + ": CTRL(global): " + cmd)
58         return self.ctrl.request(cmd, timeout)
59
60     def wait_event(self, events, timeout):
61         start = os.times()[4]
62         while True:
63             while self.mon.pending():
64                 ev = self.mon.recv()
65                 logger.debug(self.dbg + "(global): " + ev)
66                 for event in events:
67                     if event in ev:
68                         return ev
69             now = os.times()[4]
70             remaining = start + timeout - now
71             if remaining <= 0:
72                 break
73             if not self.mon.pending(timeout=remaining):
74                 break
75         return None
76
77     def add(self, ifname, driver=None):
78         cmd = "ADD " + ifname + " " + hapd_ctrl
79         if driver:
80             cmd += " " + driver
81         res = self.request(cmd)
82         if not "OK" in res:
83             raise Exception("Could not add hostapd interface " + ifname)
84
85     def add_iface(self, ifname, confname):
86         res = self.request("ADD " + ifname + " config=" + confname)
87         if not "OK" in res:
88             raise Exception("Could not add hostapd interface")
89
90     def add_bss(self, phy, confname, ignore_error=False):
91         res = self.request("ADD bss_config=" + phy + ":" + confname)
92         if not "OK" in res:
93             if not ignore_error:
94                 raise Exception("Could not add hostapd BSS")
95
96     def remove(self, ifname):
97         self.request("REMOVE " + ifname, timeout=30)
98
99     def relog(self):
100         self.request("RELOG")
101
102     def flush(self):
103         self.request("FLUSH")
104
105     def get_ctrl_iface_port(self, ifname):
106         if self.hostname is None:
107             return None
108
109         res = self.request("INTERFACES ctrl")
110         lines = res.splitlines()
111         found = False
112         for line in lines:
113             words = line.split()
114             if words[0] == ifname:
115                 found = True
116                 break
117         if not found:
118             raise Exception("Could not find UDP port for " + ifname)
119         res = line.find("ctrl_iface=udp:")
120         if res == -1:
121             raise Exception("Wrong ctrl_interface format")
122         words = line.split(":")
123         return int(words[1])
124
125     def terminate(self):
126         self.mon.detach()
127         self.mon.close()
128         self.mon = None
129         self.ctrl.terminate()
130         self.ctrl = None
131
132 class Hostapd:
133     def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
134         self.hostname = hostname
135         self.host = remotehost.Host(hostname, ifname)
136         self.ifname = ifname
137         if hostname is None:
138             self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
139             self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
140             self.dbg = ifname
141         else:
142             self.ctrl = wpaspy.Ctrl(hostname, port)
143             self.mon = wpaspy.Ctrl(hostname, port)
144             self.dbg = hostname + "/" + ifname
145         self.mon.attach()
146         self.bssid = None
147         self.bssidx = bssidx
148
149     def cmd_execute(self, cmd_array):
150         if self.hostname is None:
151             cmd = ""
152             for arg in cmd_array:
153                 cmd += arg + " "
154             cmd = cmd.strip()
155             proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
156                                     stdout=subprocess.PIPE, shell=True)
157             out = proc.communicate()[0]
158             ret = proc.returncode
159             return ret, out
160         else:
161             return self.host.execute(cmd_array)
162
163     def close_ctrl(self):
164         if self.mon is not None:
165             self.mon.detach()
166             self.mon.close()
167             self.mon = None
168             self.ctrl.close()
169             self.ctrl = None
170
171     def own_addr(self):
172         if self.bssid is None:
173             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
174         return self.bssid
175
176     def request(self, cmd):
177         logger.debug(self.dbg + ": CTRL: " + cmd)
178         return self.ctrl.request(cmd)
179
180     def ping(self):
181         return "PONG" in self.request("PING")
182
183     def set(self, field, value):
184         if not "OK" in self.request("SET " + field + " " + value):
185             raise Exception("Failed to set hostapd parameter " + field)
186
187     def set_defaults(self):
188         self.set("driver", "nl80211")
189         self.set("hw_mode", "g")
190         self.set("channel", "1")
191         self.set("ieee80211n", "1")
192         self.set("logger_stdout", "-1")
193         self.set("logger_stdout_level", "0")
194
195     def set_open(self, ssid):
196         self.set_defaults()
197         self.set("ssid", ssid)
198
199     def set_wpa2_psk(self, ssid, passphrase):
200         self.set_defaults()
201         self.set("ssid", ssid)
202         self.set("wpa_passphrase", passphrase)
203         self.set("wpa", "2")
204         self.set("wpa_key_mgmt", "WPA-PSK")
205         self.set("rsn_pairwise", "CCMP")
206
207     def set_wpa_psk(self, ssid, passphrase):
208         self.set_defaults()
209         self.set("ssid", ssid)
210         self.set("wpa_passphrase", passphrase)
211         self.set("wpa", "1")
212         self.set("wpa_key_mgmt", "WPA-PSK")
213         self.set("wpa_pairwise", "TKIP")
214
215     def set_wpa_psk_mixed(self, ssid, passphrase):
216         self.set_defaults()
217         self.set("ssid", ssid)
218         self.set("wpa_passphrase", passphrase)
219         self.set("wpa", "3")
220         self.set("wpa_key_mgmt", "WPA-PSK")
221         self.set("wpa_pairwise", "TKIP")
222         self.set("rsn_pairwise", "CCMP")
223
224     def set_wep(self, ssid, key):
225         self.set_defaults()
226         self.set("ssid", ssid)
227         self.set("wep_key0", key)
228
229     def enable(self):
230         if not "OK" in self.request("ENABLE"):
231             raise Exception("Failed to enable hostapd interface " + self.ifname)
232
233     def disable(self):
234         if not "OK" in self.request("DISABLE"):
235             raise Exception("Failed to disable hostapd interface " + self.ifname)
236
237     def dump_monitor(self):
238         while self.mon.pending():
239             ev = self.mon.recv()
240             logger.debug(self.dbg + ": " + ev)
241
242     def wait_event(self, events, timeout):
243         start = os.times()[4]
244         while True:
245             while self.mon.pending():
246                 ev = self.mon.recv()
247                 logger.debug(self.dbg + ": " + ev)
248                 for event in events:
249                     if event in ev:
250                         return ev
251             now = os.times()[4]
252             remaining = start + timeout - now
253             if remaining <= 0:
254                 break
255             if not self.mon.pending(timeout=remaining):
256                 break
257         return None
258
259     def get_status(self):
260         res = self.request("STATUS")
261         lines = res.splitlines()
262         vals = dict()
263         for l in lines:
264             [name,value] = l.split('=', 1)
265             vals[name] = value
266         return vals
267
268     def get_status_field(self, field):
269         vals = self.get_status()
270         if field in vals:
271             return vals[field]
272         return None
273
274     def get_driver_status(self):
275         res = self.request("STATUS-DRIVER")
276         lines = res.splitlines()
277         vals = dict()
278         for l in lines:
279             [name,value] = l.split('=', 1)
280             vals[name] = value
281         return vals
282
283     def get_driver_status_field(self, field):
284         vals = self.get_driver_status()
285         if field in vals:
286             return vals[field]
287         return None
288
289     def get_config(self):
290         res = self.request("GET_CONFIG")
291         lines = res.splitlines()
292         vals = dict()
293         for l in lines:
294             [name,value] = l.split('=', 1)
295             vals[name] = value
296         return vals
297
298     def mgmt_rx(self, timeout=5):
299         ev = self.wait_event(["MGMT-RX"], timeout=timeout)
300         if ev is None:
301             return None
302         msg = {}
303         frame = binascii.unhexlify(ev.split(' ')[1])
304         msg['frame'] = frame
305
306         hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
307         msg['fc'] = hdr[0]
308         msg['subtype'] = (hdr[0] >> 4) & 0xf
309         hdr = hdr[1:]
310         msg['duration'] = hdr[0]
311         hdr = hdr[1:]
312         msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
313         hdr = hdr[6:]
314         msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
315         hdr = hdr[6:]
316         msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
317         hdr = hdr[6:]
318         msg['seq_ctrl'] = hdr[0]
319         msg['payload'] = frame[24:]
320
321         return msg
322
323     def mgmt_tx(self, msg):
324         t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
325         hdr = struct.pack('<HH6B6B6BH', *t)
326         self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
327
328     def get_sta(self, addr, info=None, next=False):
329         cmd = "STA-NEXT " if next else "STA "
330         if addr is None:
331             res = self.request("STA-FIRST")
332         elif info:
333             res = self.request(cmd + addr + " " + info)
334         else:
335             res = self.request(cmd + addr)
336         lines = res.splitlines()
337         vals = dict()
338         first = True
339         for l in lines:
340             if first and '=' not in l:
341                 vals['addr'] = l
342                 first = False
343             else:
344                 [name,value] = l.split('=', 1)
345                 vals[name] = value
346         return vals
347
348     def get_mib(self, param=None):
349         if param:
350             res = self.request("MIB " + param)
351         else:
352             res = self.request("MIB")
353         lines = res.splitlines()
354         vals = dict()
355         for l in lines:
356             name_val = l.split('=', 1)
357             if len(name_val) > 1:
358                 vals[name_val[0]] = name_val[1]
359         return vals
360
361     def get_pmksa(self, addr):
362         res = self.request("PMKSA")
363         lines = res.splitlines()
364         for l in lines:
365             if addr not in l:
366                 continue
367             vals = dict()
368             [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
369             vals['index'] = index
370             vals['pmkid'] = pmkid
371             vals['expiration'] = expiration
372             vals['opportunistic'] = opportunistic
373             return vals
374         return None
375
376 def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30):
377         if isinstance(apdev, dict):
378             ifname = apdev['ifname']
379             try:
380                 hostname = apdev['hostname']
381                 port = apdev['port']
382                 logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
383             except:
384                 logger.info("Starting AP " + ifname)
385                 hostname = None
386                 port = 8878
387         else:
388             ifname = apdev
389             logger.info("Starting AP " + ifname + " (old add_ap argument type)")
390             hostname = None
391             port = 8878
392         hapd_global = HostapdGlobal(apdev)
393         hapd_global.remove(ifname)
394         hapd_global.add(ifname)
395         port = hapd_global.get_ctrl_iface_port(ifname)
396         hapd = Hostapd(ifname, hostname=hostname, port=port)
397         if not hapd.ping():
398             raise Exception("Could not ping hostapd")
399         hapd.set_defaults()
400         fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
401                    "wpa",
402                    "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
403                    "acct_server_addr", "osu_server_uri" ]
404         for field in fields:
405             if field in params:
406                 hapd.set(field, params[field])
407         for f,v in params.items():
408             if f in fields:
409                 continue
410             if isinstance(v, list):
411                 for val in v:
412                     hapd.set(f, val)
413             else:
414                 hapd.set(f, v)
415         if no_enable:
416             return hapd
417         hapd.enable()
418         if wait_enabled:
419             ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
420             if ev is None:
421                 raise Exception("AP startup timed out")
422             if "AP-ENABLED" not in ev:
423                 raise Exception("AP startup failed")
424         return hapd
425
426 def add_bss(apdev, ifname, confname, ignore_error=False):
427     phy = utils.get_phy(apdev)
428     try:
429         hostname = apdev['hostname']
430         port = apdev['port']
431         logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
432     except:
433         logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
434         hostname = None
435         port = 8878
436     hapd_global = HostapdGlobal(apdev)
437     hapd_global.add_bss(phy, confname, ignore_error)
438     port = hapd_global.get_ctrl_iface_port(ifname)
439     hapd = Hostapd(ifname, hostname=hostname, port=port)
440     if not hapd.ping():
441         raise Exception("Could not ping hostapd")
442     return hapd
443
444 def add_iface(apdev, confname):
445     ifname = apdev['ifname']
446     try:
447         hostname = apdev['hostname']
448         port = apdev['port']
449         logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
450     except:
451         logger.info("Starting interface " + ifname)
452         hostname = None
453         port = 8878
454     hapd_global = HostapdGlobal(apdev)
455     hapd_global.add_iface(ifname, confname)
456     port = hapd_global.get_ctrl_iface_port(ifname)
457     hapd = Hostapd(ifname, hostname=hostname, port=port)
458     if not hapd.ping():
459         raise Exception("Could not ping hostapd")
460     return hapd
461
462 def remove_bss(apdev, ifname=None):
463     if ifname == None:
464         ifname = apdev['ifname']
465     try:
466         hostname = apdev['hostname']
467         port = apdev['port']
468         logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
469     except:
470         logger.info("Removing BSS " + ifname)
471     hapd_global = HostapdGlobal(apdev)
472     hapd_global.remove(ifname)
473
474 def terminate(apdev):
475     try:
476         hostname = apdev['hostname']
477         port = apdev['port']
478         logger.info("Terminating hostapd " + hostname + "/" + port)
479     except:
480         logger.info("Terminating hostapd")
481     hapd_global = HostapdGlobal(apdev)
482     hapd_global.terminate()
483
484 def wpa2_params(ssid=None, passphrase=None):
485     params = { "wpa": "2",
486                "wpa_key_mgmt": "WPA-PSK",
487                "rsn_pairwise": "CCMP" }
488     if ssid:
489         params["ssid"] = ssid
490     if passphrase:
491         params["wpa_passphrase"] = passphrase
492     return params
493
494 def wpa_params(ssid=None, passphrase=None):
495     params = { "wpa": "1",
496                "wpa_key_mgmt": "WPA-PSK",
497                "wpa_pairwise": "TKIP" }
498     if ssid:
499         params["ssid"] = ssid
500     if passphrase:
501         params["wpa_passphrase"] = passphrase
502     return params
503
504 def wpa_mixed_params(ssid=None, passphrase=None):
505     params = { "wpa": "3",
506                "wpa_key_mgmt": "WPA-PSK",
507                "wpa_pairwise": "TKIP",
508                "rsn_pairwise": "CCMP" }
509     if ssid:
510         params["ssid"] = ssid
511     if passphrase:
512         params["wpa_passphrase"] = passphrase
513     return params
514
515 def radius_params():
516     params = { "auth_server_addr": "127.0.0.1",
517                "auth_server_port": "1812",
518                "auth_server_shared_secret": "radius",
519                "nas_identifier": "nas.w1.fi" }
520     return params
521
522 def wpa_eap_params(ssid=None):
523     params = radius_params()
524     params["wpa"] = "1"
525     params["wpa_key_mgmt"] = "WPA-EAP"
526     params["wpa_pairwise"] = "TKIP"
527     params["ieee8021x"] = "1"
528     if ssid:
529         params["ssid"] = ssid
530     return params
531
532 def wpa2_eap_params(ssid=None):
533     params = radius_params()
534     params["wpa"] = "2"
535     params["wpa_key_mgmt"] = "WPA-EAP"
536     params["rsn_pairwise"] = "CCMP"
537     params["ieee8021x"] = "1"
538     if ssid:
539         params["ssid"] = ssid
540     return params
541
542 def b_only_params(channel="1", ssid=None, country=None):
543     params = { "hw_mode" : "b",
544                "channel" : channel }
545     if ssid:
546         params["ssid"] = ssid
547     if country:
548         params["country_code"] = country
549     return params
550
551 def g_only_params(channel="1", ssid=None, country=None):
552     params = { "hw_mode" : "g",
553                "channel" : channel }
554     if ssid:
555         params["ssid"] = ssid
556     if country:
557         params["country_code"] = country
558     return params
559
560 def a_only_params(channel="36", ssid=None, country=None):
561     params = { "hw_mode" : "a",
562                "channel" : channel }
563     if ssid:
564         params["ssid"] = ssid
565     if country:
566         params["country_code"] = country
567     return params
568
569 def ht20_params(channel="1", ssid=None, country=None):
570     params = { "ieee80211n" : "1",
571                "channel" : channel,
572                "hw_mode" : "g" }
573     if int(channel) > 14:
574         params["hw_mode"] = "a"
575     if ssid:
576         params["ssid"] = ssid
577     if country:
578         params["country_code"] = country
579     return params
580
581 def ht40_plus_params(channel="1", ssid=None, country=None):
582     params = ht20_params(channel, ssid, country)
583     params['ht_capab'] = "[HT40+]"
584     return params
585
586 def ht40_minus_params(channel="1", ssid=None, country=None):
587     params = ht20_params(channel, ssid, country)
588     params['ht_capab'] = "[HT40-]"
589     return params
590
591 def cmd_execute(apdev, cmd):
592     hapd_global = HostapdGlobal(apdev)
593     return hapd_global.cmd_execute(cmd)