Updated through tag hostap_2_5 from git://w1.fi/hostap.git
[mech_eap.git] / libeap / tests / hwsim / fst_module_aux.py
1 # FST tests related classes
2 # Copyright (c) 2015, Qualcomm Atheros, Inc.
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import logging
8 import subprocess
9 import os
10 import signal
11 import time
12 import re
13
14 import hostapd
15 import wpaspy
16 import utils
17 from wpasupplicant import WpaSupplicant
18
19 import fst_test_common
20
21 logger = logging.getLogger()
22
23 def parse_fst_iface_event(ev):
24     """Parses FST iface event that comes as a string, e.g.
25     "<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0"
26     Returns a dictionary with parsed "event_type", "ifname", and "group"; or
27     None if not an FST event or can't be parsed."""
28     event = {}
29     if ev.find("FST-EVENT-IFACE") == -1:
30         return None
31     if ev.find("attached") != -1:
32         event['event_type'] = 'attached'
33     elif ev.find("detached") != -1:
34         event['event_type'] = 'detached'
35     else:
36         return None
37     f = re.search("ifname=(\S+)", ev)
38     if f is not None:
39         event['ifname'] = f.group(1)
40     f = re.search("group=(\S+)", ev)
41     if f is not None:
42         event['group'] = f.group(1)
43     return event
44
45 def parse_fst_session_event(ev):
46     """Parses FST session event that comes as a string, e.g.
47     "<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT"
48     Returns a dictionary with parsed "type", "id", and "reason"; or None if not
49     a FST event or can't be parsed"""
50     event = {}
51     if ev.find("FST-EVENT-SESSION") == -1:
52         return None
53     event['new_state'] = '' # The field always exists in the dictionary
54     f = re.search("event_type=(\S+)", ev)
55     if f is None:
56         return None
57     event['type'] = f.group(1)
58     f = re.search("session_id=(\d+)", ev)
59     if f is not None:
60         event['id'] = f.group(1)
61     f = re.search("old_state=(\S+)", ev)
62     if f is not None:
63         event['old_state'] = f.group(1)
64     f = re.search("new_state=(\S+)", ev)
65     if f is not None:
66         event['new_state'] = f.group(1)
67     f = re.search("reason=(\S+)", ev)
68     if f is not None:
69         event['reason'] = f.group(1)
70     return event
71
72 def start_two_ap_sta_pairs(apdev, rsn=False):
73     """auxiliary function that creates two pairs of APs and STAs"""
74     ap1 = FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
75                 fst_test_common.fst_test_def_chan_a,
76                 fst_test_common.fst_test_def_group,
77                 fst_test_common.fst_test_def_prio_low,
78                 fst_test_common.fst_test_def_llt, rsn=rsn)
79     ap1.start()
80     ap2 = FstAP(apdev[1]['ifname'], 'fst_11g', 'g',
81                 fst_test_common.fst_test_def_chan_g,
82                 fst_test_common.fst_test_def_group,
83                 fst_test_common.fst_test_def_prio_high,
84                 fst_test_common.fst_test_def_llt, rsn=rsn)
85     ap2.start()
86
87     sta1 = FstSTA('wlan5',
88                   fst_test_common.fst_test_def_group,
89                   fst_test_common.fst_test_def_prio_low,
90                   fst_test_common.fst_test_def_llt, rsn=rsn)
91     sta1.start()
92     sta2 = FstSTA('wlan6',
93                   fst_test_common.fst_test_def_group,
94                   fst_test_common.fst_test_def_prio_high,
95                   fst_test_common.fst_test_def_llt, rsn=rsn)
96     sta2.start()
97
98     return ap1, ap2, sta1, sta2
99
100 def stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2):
101     sta1.stop()
102     sta2.stop()
103     ap1.stop()
104     ap2.stop()
105
106 def connect_two_ap_sta_pairs(ap1, ap2, dev1, dev2, rsn=False):
107     """Connects a pair of stations, each one to a separate AP"""
108     dev1.scan(freq=fst_test_common.fst_test_def_freq_a)
109     dev2.scan(freq=fst_test_common.fst_test_def_freq_g)
110
111     if rsn:
112         dev1.connect(ap1, psk="12345678",
113                      scan_freq=fst_test_common.fst_test_def_freq_a)
114         dev2.connect(ap2, psk="12345678",
115                      scan_freq=fst_test_common.fst_test_def_freq_g)
116     else:
117         dev1.connect(ap1, key_mgmt="NONE",
118                      scan_freq=fst_test_common.fst_test_def_freq_a)
119         dev2.connect(ap2, key_mgmt="NONE",
120                      scan_freq=fst_test_common.fst_test_def_freq_g)
121
122 def disconnect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
123     dev1.disconnect()
124     dev2.disconnect()
125
126 def external_sta_connect(sta, ap, **kwargs):
127     """Connects the external station to the given AP"""
128     if not isinstance(sta, WpaSupplicant):
129         raise Exception("Bad STA object")
130     if not isinstance(ap, FstAP):
131         raise Exception("Bad AP object to connect to")
132     hap = ap.get_instance()
133     sta.connect(ap.get_ssid(), **kwargs)
134
135 def disconnect_external_sta(sta, ap, check_disconnect=True):
136     """Disconnects the external station from the AP"""
137     if not isinstance(sta, WpaSupplicant):
138         raise Exception("Bad STA object")
139     if not isinstance(ap, FstAP):
140         raise Exception("Bad AP object to connect to")
141     sta.request("DISCONNECT")
142     if check_disconnect:
143         hap = ap.get_instance()
144         ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
145         if ev is None:
146             raise Exception("No disconnection event received from %s" % ap.get_ssid())
147
148 #
149 # FstDevice class
150 # This is the parent class for the AP (FstAP) and STA (FstSTA) that implements
151 # FST functionality.
152 #
153 class FstDevice:
154     def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False):
155         self.iface = iface
156         self.fst_group = fst_group
157         self.fst_pri = fst_pri
158         self.fst_llt = fst_llt  # None llt means no llt parameter will be set
159         self.instance = None    # Hostapd/WpaSupplicant instance
160         self.peer_obj = None    # Peer object, must be a FstDevice child object
161         self.new_peer_addr = None # Peer MAC address for new session iface
162         self.old_peer_addr = None # Peer MAC address for old session iface
163         self.role = 'initiator' # Role: initiator/responder
164         s = self.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
165         if not s.startswith('OK'):
166             raise utils.HwsimSkip("FST not supported")
167         self.rsn = rsn
168
169     def ifname(self):
170         return self.iface
171
172     def get_instance(self):
173         """Gets the Hostapd/WpaSupplicant instance"""
174         raise Exception("Virtual get_instance() called!")
175
176     def get_own_mac_address(self):
177         """Gets the device's own MAC address"""
178         raise Exception("Virtual get_own_mac_address() called!")
179
180     def get_new_peer_addr(self):
181         return self.new_peer_addr
182
183     def get_old_peer_addr(self):
184         return self.old_peer_addr
185
186     def get_actual_peer_addr(self):
187         """Gets the peer address. A connected AP/station address is returned."""
188         raise Exception("Virtual get_actual_peer_addr() called!")
189
190     def grequest(self, req):
191         """Send request on the global control interface"""
192         raise Exception, "Virtual grequest() called!"
193
194     def wait_gevent(self, events, timeout=None):
195         """Wait for a list of events on the global interface"""
196         raise Exception("Virtual wait_gevent() called!")
197
198     def request(self, req):
199         """Issue a request to the control interface"""
200         h = self.get_instance()
201         return h.request(req)
202
203     def wait_event(self, events, timeout=None):
204         """Wait for an event from the control interface"""
205         h = self.get_instance()
206         if timeout is not None:
207             return h.wait_event(events, timeout=timeout)
208         else:
209             return h.wait_event(events)
210
211     def set_old_peer_addr(self, peer_addr=None):
212         """Sets the peer address"""
213         if peer_addr is not None:
214             self.old_peer_addr = peer_addr
215         else:
216             self.old_peer_addr = self.get_actual_peer_addr()
217
218     def set_new_peer_addr(self, peer_addr=None):
219         """Sets the peer address"""
220         if peer_addr is not None:
221             self.new_peer_addr = peer_addr
222         else:
223             self.new_peer_addr = self.get_actual_peer_addr()
224
225     def add_peer(self, obj, old_peer_addr=None, new_peer_addr=None):
226         """Add peer for FST session(s). 'obj' is a FstDevice subclass object.
227         The method must be called before add_session().
228         If peer_addr is not specified, the address of the currently connected
229         station is used."""
230         if not isinstance(obj, FstDevice):
231             raise Exception("Peer must be a FstDevice object")
232         self.peer_obj = obj
233         self.set_old_peer_addr(old_peer_addr)
234         self.set_new_peer_addr(new_peer_addr)
235
236     def get_peer(self):
237         """Returns peer object"""
238         return self.peer_obj
239
240     def set_fst_parameters(self, group_id=None, pri=None, llt=None):
241         """Change/set new FST parameters. Can be used to start FST sessions with
242         different FST parameters than defined in the configuration file."""
243         if group_id is not None:
244             self.fst_group = group_id
245         if pri is not None:
246             self.fst_pri = pri
247         if llt is not None:
248             self.fst_llt = llt
249
250     def get_local_mbies(self, ifname=None):
251         if_name = ifname if ifname is not None else self.iface
252         return self.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name)
253
254     def add_session(self):
255         """Adds an FST session. add_peer() must be called calling this
256         function"""
257         if self.peer_obj is None:
258             raise Exception("Peer wasn't added before starting session")
259         grp = ' ' + self.fst_group if self.fst_group != '' else ''
260         sid = self.grequest("FST-MANAGER SESSION_ADD" + grp)
261         sid = sid.strip()
262         if sid.startswith("FAIL"):
263             raise Exception("Cannot add FST session with groupid ==" + grp)
264         return sid
265
266     def set_session_param(self, params):
267         request = "FST-MANAGER SESSION_SET"
268         if params is not None and params != '':
269             request = request + ' ' + params
270         return self.grequest(request)
271
272     def get_session_params(self, sid):
273         request = "FST-MANAGER SESSION_GET " + sid
274         res = self.grequest(request)
275         if res.startswith("FAIL"):
276             return None
277         params = {}
278         for i in res.splitlines():
279             p = i.split('=')
280             params[p[0]] = p[1]
281         return params
282
283     def iface_peers(self, ifname):
284         grp = self.fst_group if self.fst_group != '' else ''
285         res = self.grequest("FST-MANAGER IFACE_PEERS " + grp + ' ' + ifname)
286         if res.startswith("FAIL"):
287             return None
288         return res.splitlines()
289
290     def get_peer_mbies(self, ifname, peer_addr):
291         return self.grequest("FST-MANAGER GET_PEER_MBIES %s %s" % (ifname, peer_addr))
292
293     def list_ifaces(self):
294         grp = self.fst_group if self.fst_group != '' else ''
295         res = self.grequest("FST-MANAGER LIST_IFACES " + grp)
296         if res.startswith("FAIL"):
297             return None
298         ifaces = []
299         for i in res.splitlines():
300             p = i.split(':')
301             iface = {}
302             iface['name'] = p[0]
303             iface['priority'] = p[1]
304             iface['llt'] = p[2]
305             ifaces.append(iface)
306         return ifaces
307
308     def list_groups(self):
309         res = self.grequest("FST-MANAGER LIST_GROUPS")
310         if res.startswith("FAIL"):
311             return None
312         return res.splitlines()
313
314     def configure_session(self, sid, new_iface, old_iface = None):
315         """Calls session_set for a number of parameters some of which are stored
316         in "self" while others are passed to this function explicitly. If
317         old_iface is None, current iface is used; if old_iface is an empty
318         string."""
319         oldiface = old_iface if old_iface is not None else self.iface
320         s = self.set_session_param(sid + ' old_ifname=' + oldiface)
321         if not s.startswith("OK"):
322             raise Exception("Cannot set FST session old_ifname: " + s)
323         if new_iface is not None:
324             s = self.set_session_param(sid + " new_ifname=" + new_iface)
325             if not s.startswith("OK"):
326                 raise Exception("Cannot set FST session new_ifname:" + s)
327         if self.new_peer_addr is not None and self.new_peer_addr != '':
328             s = self.set_session_param(sid + " new_peer_addr=" + self.new_peer_addr)
329             if not s.startswith("OK"):
330                 raise Exception("Cannot set FST session peer address:" + s + " (new)")
331         if self.old_peer_addr is not None and self.old_peer_addr != '':
332             s = self.set_session_param(sid + " old_peer_addr=" + self.old_peer_addr)
333             if not s.startswith("OK"):
334                 raise Exception("Cannot set FST session peer address:" + s + " (old)")
335         if self.fst_llt is not None and self.fst_llt != '':
336             s = self.set_session_param(sid + " llt=" + self.fst_llt)
337             if not s.startswith("OK"):
338                 raise Exception("Cannot set FST session llt:" + s)
339
340     def send_iface_attach_request(self, ifname, group, llt, priority):
341         request = "FST-ATTACH " + ifname + ' ' + group
342         if llt is not None:
343             request += " llt=" + llt
344         if priority is not None:
345             request += " priority=" + priority
346         res = self.grequest(request)
347         if not res.startswith("OK"):
348             raise Exception("Cannot attach FST iface: " + res)
349
350     def send_iface_detach_request(self, ifname):
351         res = self.grequest("FST-DETACH " + ifname)
352         if not res.startswith("OK"):
353             raise Exception("Cannot detach FST iface: " + res)
354
355     def send_session_setup_request(self, sid):
356         s = self.grequest("FST-MANAGER SESSION_INITIATE " + sid)
357         if not s.startswith('OK'):
358             raise Exception("Cannot send setup request: %s" % s)
359         return s
360
361     def send_session_setup_response(self, sid, response):
362         request = "FST-MANAGER SESSION_RESPOND " + sid + " " + response
363         s = self.grequest(request)
364         if not s.startswith('OK'):
365             raise Exception("Cannot send setup response: %s" % s)
366         return s
367
368     def send_test_session_setup_request(self, fsts_id,
369                                         additional_parameter = None):
370         request = "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id
371         if additional_parameter is not None:
372             request += " " + additional_parameter
373         s = self.grequest(request)
374         if not s.startswith('OK'):
375             raise Exception("Cannot send FST setup request: %s" % s)
376         return s
377
378     def send_test_session_setup_response(self, fsts_id,
379                                          response, additional_parameter = None):
380         request = "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id + " " + response
381         if additional_parameter is not None:
382             request += " " + additional_parameter
383         s = self.grequest(request)
384         if not s.startswith('OK'):
385             raise Exception("Cannot send FST setup response: %s" % s)
386         return s
387
388     def send_test_ack_request(self, fsts_id):
389         s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id)
390         if not s.startswith('OK'):
391             raise Exception("Cannot send FST ack request: %s" % s)
392         return s
393
394     def send_test_ack_response(self, fsts_id):
395         s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id)
396         if not s.startswith('OK'):
397             raise Exception("Cannot send FST ack response: %s" % s)
398         return s
399
400     def send_test_tear_down(self, fsts_id):
401         s = self.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id)
402         if not s.startswith('OK'):
403             raise Exception("Cannot send FST tear down: %s" % s)
404         return s
405
406     def get_fsts_id_by_sid(self, sid):
407         s = self.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid)
408         if s == ' ' or s.startswith('FAIL'):
409             raise Exception("Cannot get fsts_id for sid == %s" % sid)
410         return int(s)
411
412     def wait_for_iface_event(self, timeout):
413         while True:
414             ev = self.wait_gevent(["FST-EVENT-IFACE"], timeout)
415             if ev is None:
416                 raise Exception("No FST-EVENT-IFACE received")
417             event = parse_fst_iface_event(ev)
418             if event is None:
419                 # We can't parse so it's not our event, wait for next one
420                 continue
421             return event
422
423     def wait_for_session_event(self, timeout, events_to_ignore=[],
424                                events_to_count=[]):
425         while True:
426             ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout)
427             if ev is None:
428                 raise Exception("No FST-EVENT-SESSION received")
429             event = parse_fst_session_event(ev)
430             if event is None:
431                 # We can't parse so it's not our event, wait for next one
432                 continue
433             if len(events_to_ignore) > 0:
434                 if event['type'] in events_to_ignore:
435                     continue
436             elif len(events_to_count) > 0:
437                 if not event['type'] in events_to_count:
438                     continue
439             return event
440
441     def initiate_session(self, sid, response="accept"):
442         """Initiates FST session with given session id 'sid'.
443         'response' is the session respond answer: "accept", "reject", or a
444         special "timeout" value to skip the response in order to test session
445         timeouts.
446         Returns: "OK" - session has been initiated, otherwise the reason for the
447         reset: REASON_REJECT, REASON_STT."""
448         strsid = ' ' + sid if sid != '' else ''
449         s = self.grequest("FST-MANAGER SESSION_INITIATE"+ strsid)
450         if not s.startswith('OK'):
451             raise Exception("Cannot initiate fst session: %s" % s)
452         ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
453         if ev is None:
454             raise Exception("No FST-EVENT-SESSION received")
455         # We got FST event
456         event = parse_fst_session_event(ev)
457         if event == None:
458             raise Exception("Unrecognized FST event: " % ev)
459         if event['type'] != 'EVENT_FST_SETUP':
460             raise Exception("Expected FST_SETUP event, got: " + event['type'])
461         ev = self.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
462         if ev is None:
463             raise Exception("No FST-EVENT-SESSION received")
464         event = parse_fst_session_event(ev)
465         if event == None:
466             raise Exception("Unrecognized FST event: " % ev)
467         if event['type'] != 'EVENT_FST_SESSION_STATE':
468             raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
469         if event['new_state'] != "SETUP_COMPLETION":
470             raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state'])
471         if response == '':
472             return 'OK'
473         if response != "timeout":
474             s = self.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " " + response)  # Or reject
475             if not s.startswith('OK'):
476                 raise Exception("Error session_respond: %s" % s)
477         # Wait for EVENT_FST_SESSION_STATE events. We should get at least 2
478         # events. The 1st event will be EVENT_FST_SESSION_STATE
479         # old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be
480         # either EVENT_FST_ESTABLISHED with the session id or
481         # EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was
482         # reset, the reason field will tell why.
483         result = ''
484         while result == '':
485             ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
486             if ev is None:
487                 break # No session event received
488             event = parse_fst_session_event(ev)
489             if event == None:
490                 # We can't parse so it's not our event, wait for next one
491                 continue
492             if event['type'] == 'EVENT_FST_ESTABLISHED':
493                 result = "OK"
494                 break
495             elif event['type'] == "EVENT_FST_SESSION_STATE":
496                 if event['new_state'] == "INITIAL":
497                     # Session was reset, the only reason to get back to initial
498                     # state.
499                     result = event['reason']
500                     break
501         if result == '':
502             raise Exception("No event for session respond")
503         return result
504
505     def transfer_session(self, sid):
506         """Transfers the session. 'sid' is the session id. 'hsta' is the
507         station-responder object.
508         Returns: REASON_SWITCH - the session has been transferred successfully
509         or a REASON_... reported by the reset event."""
510         request = "FST-MANAGER SESSION_TRANSFER"
511         if sid != '':
512             request += ' ' + sid
513         s = self.grequest(request)
514         if not s.startswith('OK'):
515             raise Exception("Cannot transfer fst session: %s" % s)
516         result = ''
517         while result == '':
518             ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
519             if ev is None:
520                 raise Exception("Missing session transfer event")
521             # We got FST event. We expect TRANSITION_CONFIRMED state and then
522             # INITIAL (reset) with the reason (e.g. "REASON_SWITCH").
523             # Right now we'll be waiting for the reset event and record the
524             # reason.
525             event = parse_fst_session_event(ev)
526             if event == None:
527                 raise Exception("Unrecognized FST event: " % ev)
528             if event['new_state'] == 'INITIAL':
529                 result = event['reason']
530         return result
531
532     def wait_for_tear_down(self):
533         ev = self.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
534         if ev is None:
535             raise Exception("No FST-EVENT-SESSION received")
536         # We got FST event
537         event = parse_fst_session_event(ev)
538         if event == None:
539             raise Exception("Unrecognized FST event: " % ev)
540         if event['type'] != 'EVENT_FST_SESSION_STATE':
541             raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
542         if event['new_state'] != "INITIAL":
543             raise Exception("Expected new state INITIAL, got: " + event['new_state'])
544         if event['reason'] != 'REASON_TEARDOWN':
545             raise Exception("Expected reason REASON_TEARDOWN, got: " + event['reason'])
546
547     def teardown_session(self, sid):
548         """Tears down FST session with a given session id ('sid')"""
549         strsid = ' ' + sid if sid != '' else ''
550         s = self.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid)
551         if not s.startswith('OK'):
552             raise Exception("Cannot tear down fst session: %s" % s)
553         self.peer_obj.wait_for_tear_down()
554
555
556     def remove_session(self, sid, wait_for_tear_down=True):
557         """Removes FST session with a given session id ('sid')"""
558         strsid = ' ' + sid if sid != '' else ''
559         s = self.grequest("FST-MANAGER SESSION_REMOVE" + strsid)
560         if not s.startswith('OK'):
561             raise Exception("Cannot remove fst session: %s" % s)
562         if wait_for_tear_down == True:
563             self.peer_obj.wait_for_tear_down()
564
565     def remove_all_sessions(self):
566         """Removes FST session with a given session id ('sid')"""
567         grp = ' ' + self.fst_group if self.fst_group != '' else ''
568         s = self.grequest("FST-MANAGER LIST_SESSIONS" + grp)
569         if not s.startswith('FAIL'):
570             for sid in s.splitlines():
571                 sid = sid.strip()
572                 if len(sid) != 0:
573                     self.remove_session(sid, wait_for_tear_down=False)
574
575
576 #
577 # FstAP class
578 #
579 class FstAP (FstDevice):
580     def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
581                  fst_llt=None, rsn=False):
582         """If fst_group is empty, then FST parameters will not be set
583         If fst_llt is empty, the parameter will not be set and the default value
584         is expected to be configured."""
585         self.ssid = ssid
586         self.mode = mode
587         self.chan = chan
588         self.reg_ctrl = fst_test_common.HapdRegCtrl()
589         self.reg_ctrl.add_ap(iface, self.chan)
590         self.global_instance = hostapd.HostapdGlobal()
591         FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn)
592
593     def start(self, return_early=False):
594         """Starts AP the "standard" way as it was intended by hostapd tests.
595         This will work only when FST supports fully dynamically loading
596         parameters in hostapd."""
597         params = {}
598         params['ssid'] = self.ssid
599         params['hw_mode'] = self.mode
600         params['channel'] = self.chan
601         params['country_code'] = 'US'
602         if self.rsn:
603             params['wpa'] = '2'
604             params['wpa_key_mgmt'] = 'WPA-PSK'
605             params['rsn_pairwise'] = 'CCMP'
606             params['wpa_passphrase'] = '12345678'
607         self.hapd=hostapd.add_ap(self.iface, params)
608         if not self.hapd.ping():
609             raise Exception("Could not ping FST hostapd")
610         self.reg_ctrl.start()
611         self.get_global_instance()
612         if return_early:
613             return self.hapd
614         if len(self.fst_group) != 0:
615             self.send_iface_attach_request(self.iface, self.fst_group,
616                                            self.fst_llt, self.fst_pri)
617         return self.hapd
618
619     def stop(self):
620         """Removes the AP, To be used when dynamic fst APs are implemented in
621         hostapd."""
622         if len(self.fst_group) != 0:
623             self.remove_all_sessions()
624             self.send_iface_detach_request(self.iface)
625         self.reg_ctrl.stop()
626         del self.global_instance
627         self.global_instance = None
628
629     def get_instance(self):
630         """Return the Hostapd/WpaSupplicant instance"""
631         if self.instance is None:
632             self.instance = hostapd.Hostapd(self.iface)
633         return self.instance
634
635     def get_global_instance(self):
636         return self.global_instance
637
638     def get_own_mac_address(self):
639         """Gets the device's own MAC address"""
640         h = self.get_instance()
641         status = h.get_status()
642         return status['bssid[0]']
643
644     def get_actual_peer_addr(self):
645         """Gets the peer address. A connected station address is returned."""
646         # Use the device instance, the global control interface doesn't have
647         # station address
648         h = self.get_instance()
649         sta = h.get_sta(None)
650         if sta is None or 'addr' not in sta:
651             # Maybe station is not connected?
652             addr = None
653         else:
654             addr=sta['addr']
655         return addr
656
657     def grequest(self, req):
658         """Send request on the global control interface"""
659         logger.debug("FstAP::grequest: " + req)
660         h = self.get_global_instance()
661         return h.request(req)
662
663     def wait_gevent(self, events, timeout=None):
664         """Wait for a list of events on the global interface"""
665         h = self.get_global_instance()
666         if timeout is not None:
667             return h.wait_event(events, timeout=timeout)
668         else:
669             return h.wait_event(events)
670
671     def get_ssid(self):
672         return self.ssid
673
674 #
675 # FstSTA class
676 #
677 class FstSTA (FstDevice):
678     def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False):
679         """If fst_group is empty, then FST parameters will not be set
680         If fst_llt is empty, the parameter will not be set and the default value
681         is expected to be configured."""
682         FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn)
683         self.connected = None # FstAP object the station is connected to
684
685     def start(self):
686         """Current implementation involves running another instance of
687         wpa_supplicant with fixed FST STAs configurations. When any type of
688         dynamic STA loading is implemented, rewrite the function similarly to
689         FstAP."""
690         h = self.get_instance()
691         h.interface_add(self.iface, drv_params="force_connect_cmd=1")
692         if not h.global_ping():
693             raise Exception("Could not ping FST wpa_supplicant")
694         if len(self.fst_group) != 0:
695             self.send_iface_attach_request(self.iface, self.fst_group,
696                                            self.fst_llt, self.fst_pri)
697         return None
698
699     def stop(self):
700         """Removes the STA. In a static (temporary) implementation does nothing,
701         the STA will be removed when the fst wpa_supplicant process is killed by
702         fstap.cleanup()."""
703         h = self.get_instance()
704         if len(self.fst_group) != 0:
705             self.remove_all_sessions()
706             self.send_iface_detach_request(self.iface)
707         h.interface_remove(self.iface)
708         h.close_ctrl()
709         del h
710         self.instance = None
711
712     def get_instance(self):
713         """Return the Hostapd/WpaSupplicant instance"""
714         if self.instance is None:
715              self.instance = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
716         return self.instance
717
718     def get_own_mac_address(self):
719         """Gets the device's own MAC address"""
720         h = self.get_instance()
721         status = h.get_status()
722         return status['address']
723
724     def get_actual_peer_addr(self):
725         """Gets the peer address. A connected station address is returned"""
726         h = self.get_instance()
727         status = h.get_status()
728         return status['bssid']
729
730     def grequest(self, req):
731         """Send request on the global control interface"""
732         logger.debug("FstSTA::grequest: " + req)
733         h = self.get_instance()
734         return h.global_request(req)
735
736     def wait_gevent(self, events, timeout=None):
737         """Wait for a list of events on the global interface"""
738         h = self.get_instance()
739         if timeout is not None:
740             return h.wait_global_event(events, timeout=timeout)
741         else:
742             return h.wait_global_event(events)
743
744     def scan(self, freq=None, no_wait=False, only_new=False):
745         """Issue Scan with given parameters. Returns the BSS dictionary for the
746         AP found (the 1st BSS found. TODO: What if the AP required is not the
747         1st in list?) or None if no BSS found. None call be also a result of
748         no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the
749         results at once."""
750         h = self.get_instance()
751         h.scan(None, freq, no_wait, only_new)
752         r = h.get_bss('0')
753         return r
754
755     def connect(self, ap, **kwargs):
756         """Connects to the given AP"""
757         if not isinstance(ap, FstAP):
758             raise Exception("Bad AP object to connect to")
759         h = self.get_instance()
760         hap = ap.get_instance()
761         h.connect(ap.get_ssid(), **kwargs)
762         self.connected = ap
763
764     def connect_to_external_ap(self, ap, ssid, check_connection=True, **kwargs):
765         """Connects to the given external AP"""
766         if not isinstance(ap, hostapd.Hostapd):
767             raise Exception("Bad AP object to connect to")
768         h = self.get_instance()
769         h.connect(ssid, **kwargs)
770         self.connected = ap
771         if check_connection:
772             ev = ap.wait_event([ "AP-STA-CONNECTED" ], timeout=10)
773             if ev is None:
774                 self.connected = None
775                 raise Exception("No connection event received from %s" % ssid)
776
777     def disconnect(self, check_disconnect=True):
778         """Disconnects from the AP the station is currently connected to"""
779         if self.connected is not None:
780             h = self.get_instance()
781             h.request("DISCONNECT")
782             if check_disconnect:
783                 hap = self.connected.get_instance()
784                 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
785                 if ev is None:
786                     raise Exception("No disconnection event received from %s" % self.connected.get_ssid())
787             self.connected = None
788
789
790     def disconnect_from_external_ap(self, check_disconnect=True):
791         """Disconnects from the external AP the station is currently connected
792         to"""
793         if self.connected is not None:
794             h = self.get_instance()
795             h.request("DISCONNECT")
796             if check_disconnect:
797                 hap = self.connected
798                 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
799                 if ev is None:
800                     raise Exception("No disconnection event received from AP")
801             self.connected = None