tests: Verify GAS fragmentation
[mech_eap.git] / tests / hwsim / test_gas.py
1 #!/usr/bin/python
2 #
3 # GAS tests
4 # Copyright (c) 2013, Qualcomm Atheros, Inc.
5 #
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
8
9 import time
10 import logging
11 logger = logging.getLogger()
12 import re
13
14 import hostapd
15
16 def hs20_ap_params():
17     params = hostapd.wpa2_params(ssid="test-gas")
18     params['wpa_key_mgmt'] = "WPA-EAP"
19     params['ieee80211w'] = "1"
20     params['ieee8021x'] = "1"
21     params['auth_server_addr'] = "127.0.0.1"
22     params['auth_server_port'] = "1812"
23     params['auth_server_shared_secret'] = "radius"
24     params['interworking'] = "1"
25     params['access_network_type'] = "14"
26     params['internet'] = "1"
27     params['asra'] = "0"
28     params['esr'] = "0"
29     params['uesa'] = "0"
30     params['venue_group'] = "7"
31     params['venue_type'] = "1"
32     params['venue_name'] = [ "eng:Example venue", "fin:Esimerkkipaikka" ]
33     params['roaming_consortium'] = [ "112233", "1020304050", "010203040506",
34                                      "fedcba" ]
35     params['domain_name'] = "example.com,another.example.com"
36     params['nai_realm'] = [ "0,example.com,13[5:6],21[2:4][5:7]",
37                             "0,another.example.com" ]
38     params['anqp_3gpp_cell_net'] = "244,91"
39     return params
40
41 def get_gas_response(dev, bssid, info, allow_fetch_failure=False):
42     exp = r'<.>(GAS-RESPONSE-INFO) addr=([0-9a-f:]*) dialog_token=([0-9]*) status_code=([0-9]*) resp_len=([\-0-9]*)'
43     res = re.split(exp, info)
44     if len(res) < 6:
45         raise Exception("Could not parse GAS-RESPONSE-INFO")
46     if res[2] != bssid:
47         raise Exception("Unexpected BSSID in response")
48     token = res[3]
49     status = res[4]
50     if status != "0":
51         raise Exception("GAS query failed")
52     resp_len = res[5]
53     if resp_len == "-1":
54         raise Exception("GAS query reported invalid response length")
55     if int(resp_len) > 2000:
56         raise Exception("Unexpected long GAS response")
57
58     resp = dev.request("GAS_RESPONSE_GET " + bssid + " " + token)
59     if "FAIL" in resp:
60         if allow_fetch_failure:
61             logger.debug("GAS response was not available anymore")
62             return
63         raise Exception("Could not fetch GAS response")
64     if len(resp) != int(resp_len) * 2:
65         raise Exception("Unexpected GAS response length")
66     logger.debug("GAS response: " + resp)
67
68 def test_gas_generic(dev, apdev):
69     """Generic GAS query"""
70     bssid = apdev[0]['bssid']
71     params = hs20_ap_params()
72     params['hessid'] = bssid
73     hostapd.add_ap(apdev[0]['ifname'], params)
74
75     dev[0].scan()
76     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
77     if "FAIL" in req:
78         raise Exception("GAS query request rejected")
79     ev = dev[0].wait_event(["GAS-RESPONSE-INFO"], timeout=10)
80     if ev is None:
81         raise Exception("GAS query timed out")
82     get_gas_response(dev[0], bssid, ev)
83
84 def test_gas_concurrent_scan(dev, apdev):
85     """Generic GAS queries with concurrent scan operation"""
86     bssid = apdev[0]['bssid']
87     params = hs20_ap_params()
88     params['hessid'] = bssid
89     hostapd.add_ap(apdev[0]['ifname'], params)
90
91     dev[0].scan()
92
93     logger.info("Request concurrent operations")
94     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
95     if "FAIL" in req:
96         raise Exception("GAS query request rejected")
97     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000801")
98     if "FAIL" in req:
99         raise Exception("GAS query request rejected")
100     dev[0].request("SCAN")
101     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000201")
102     if "FAIL" in req:
103         raise Exception("GAS query request rejected")
104     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000501")
105     if "FAIL" in req:
106         raise Exception("GAS query request rejected")
107
108     responses = 0
109     for i in range(0, 5):
110         ev = dev[0].wait_event(["GAS-RESPONSE-INFO", "CTRL-EVENT-SCAN-RESULTS"],
111                                timeout=10)
112         if ev is None:
113             raise Exception("Operation timed out")
114         if "GAS-RESPONSE-INFO" in ev:
115             responses = responses + 1
116             get_gas_response(dev[0], bssid, ev, allow_fetch_failure=True)
117
118     if responses != 4:
119         raise Exception("Unexpected number of GAS responses")
120
121 def test_gas_concurrent_connect(dev, apdev):
122     """Generic GAS queries with concurrent connection operation"""
123     bssid = apdev[0]['bssid']
124     params = hs20_ap_params()
125     params['hessid'] = bssid
126     hostapd.add_ap(apdev[0]['ifname'], params)
127
128     dev[0].scan()
129
130     logger.debug("Start concurrent connect and GAS request")
131     dev[0].connect("test-gas", key_mgmt="WPA-EAP", eap="TTLS",
132                    identity="DOMAIN\mschapv2 user", anonymous_identity="ttls",
133                    password="password", phase2="auth=MSCHAPV2",
134                    ca_cert="auth_serv/ca.pem", wait_connect=False)
135     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
136     if "FAIL" in req:
137         raise Exception("GAS query request rejected")
138
139     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED", "GAS-RESPONSE-INFO"],
140                            timeout=20)
141     if ev is None:
142         raise Exception("Operation timed out")
143     if "CTRL-EVENT-CONNECTED" not in ev:
144         raise Exception("Unexpected operation order")
145
146     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED", "GAS-RESPONSE-INFO"],
147                            timeout=20)
148     if ev is None:
149         raise Exception("Operation timed out")
150     if "GAS-RESPONSE-INFO" not in ev:
151         raise Exception("Unexpected operation order")
152     get_gas_response(dev[0], bssid, ev)
153
154     dev[0].request("DISCONNECT")
155     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
156     if ev is None:
157         raise Exception("Disconnection timed out")
158
159     logger.debug("Wait six seconds for expiration of connect-without-scan")
160     time.sleep(6)
161
162     logger.debug("Start concurrent GAS request and connect")
163     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
164     if "FAIL" in req:
165         raise Exception("GAS query request rejected")
166     dev[0].request("RECONNECT")
167
168     ev = dev[0].wait_event(["GAS-RESPONSE-INFO"], timeout=10)
169     if ev is None:
170         raise Exception("Operation timed out")
171     get_gas_response(dev[0], bssid, ev)
172
173     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], timeout=20)
174     if ev is None:
175         raise Exception("No new scan results reported")
176
177     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=20)
178     if ev is None:
179         raise Exception("Operation timed out")
180     if "CTRL-EVENT-CONNECTED" not in ev:
181         raise Exception("Unexpected operation order")
182
183 def test_gas_fragment(dev, apdev):
184     """GAS fragmentation"""
185     bssid = apdev[0]['bssid']
186     params = hs20_ap_params()
187     params['hessid'] = bssid
188     hostapd.add_ap(apdev[0]['ifname'], params)
189     hapd = hostapd.Hostapd(apdev[0]['ifname'])
190     hapd.set("gas_frag_limit", "50")
191
192     dev[0].scan()
193     dev[0].request("FETCH_ANQP")
194     for i in range(0, 6):
195         ev = dev[0].wait_event(["RX-ANQP"], timeout=5)
196         if ev is None:
197             raise Exception("Operation timed out")