tests: Verify behavior on incorrect GAS response type
[mech_eap.git] / tests / hwsim / test_gas.py
1 #!/usr/bin/python
2 #
3 # GAS tests
4 # Copyright (c) 2013, Qualcomm Atheros, Inc.
5 #
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
8
9 import time
10 import binascii
11 import logging
12 logger = logging.getLogger()
13 import re
14
15 import hostapd
16
17 def hs20_ap_params():
18     params = hostapd.wpa2_params(ssid="test-gas")
19     params['wpa_key_mgmt'] = "WPA-EAP"
20     params['ieee80211w'] = "1"
21     params['ieee8021x'] = "1"
22     params['auth_server_addr'] = "127.0.0.1"
23     params['auth_server_port'] = "1812"
24     params['auth_server_shared_secret'] = "radius"
25     params['interworking'] = "1"
26     params['access_network_type'] = "14"
27     params['internet'] = "1"
28     params['asra'] = "0"
29     params['esr'] = "0"
30     params['uesa'] = "0"
31     params['venue_group'] = "7"
32     params['venue_type'] = "1"
33     params['venue_name'] = [ "eng:Example venue", "fin:Esimerkkipaikka" ]
34     params['roaming_consortium'] = [ "112233", "1020304050", "010203040506",
35                                      "fedcba" ]
36     params['domain_name'] = "example.com,another.example.com"
37     params['nai_realm'] = [ "0,example.com,13[5:6],21[2:4][5:7]",
38                             "0,another.example.com" ]
39     params['anqp_3gpp_cell_net'] = "244,91"
40     return params
41
42 def get_gas_response(dev, bssid, info, allow_fetch_failure=False):
43     exp = r'<.>(GAS-RESPONSE-INFO) addr=([0-9a-f:]*) dialog_token=([0-9]*) status_code=([0-9]*) resp_len=([\-0-9]*)'
44     res = re.split(exp, info)
45     if len(res) < 6:
46         raise Exception("Could not parse GAS-RESPONSE-INFO")
47     if res[2] != bssid:
48         raise Exception("Unexpected BSSID in response")
49     token = res[3]
50     status = res[4]
51     if status != "0":
52         raise Exception("GAS query failed")
53     resp_len = res[5]
54     if resp_len == "-1":
55         raise Exception("GAS query reported invalid response length")
56     if int(resp_len) > 2000:
57         raise Exception("Unexpected long GAS response")
58
59     resp = dev.request("GAS_RESPONSE_GET " + bssid + " " + token)
60     if "FAIL" in resp:
61         if allow_fetch_failure:
62             logger.debug("GAS response was not available anymore")
63             return
64         raise Exception("Could not fetch GAS response")
65     if len(resp) != int(resp_len) * 2:
66         raise Exception("Unexpected GAS response length")
67     logger.debug("GAS response: " + resp)
68
69 def test_gas_generic(dev, apdev):
70     """Generic GAS query"""
71     bssid = apdev[0]['bssid']
72     params = hs20_ap_params()
73     params['hessid'] = bssid
74     hostapd.add_ap(apdev[0]['ifname'], params)
75
76     dev[0].scan()
77     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
78     if "FAIL" in req:
79         raise Exception("GAS query request rejected")
80     ev = dev[0].wait_event(["GAS-RESPONSE-INFO"], timeout=10)
81     if ev is None:
82         raise Exception("GAS query timed out")
83     get_gas_response(dev[0], bssid, ev)
84
85 def test_gas_concurrent_scan(dev, apdev):
86     """Generic GAS queries with concurrent scan operation"""
87     bssid = apdev[0]['bssid']
88     params = hs20_ap_params()
89     params['hessid'] = bssid
90     hostapd.add_ap(apdev[0]['ifname'], params)
91
92     dev[0].scan()
93
94     logger.info("Request concurrent operations")
95     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
96     if "FAIL" in req:
97         raise Exception("GAS query request rejected")
98     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000801")
99     if "FAIL" in req:
100         raise Exception("GAS query request rejected")
101     dev[0].request("SCAN")
102     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000201")
103     if "FAIL" in req:
104         raise Exception("GAS query request rejected")
105     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000501")
106     if "FAIL" in req:
107         raise Exception("GAS query request rejected")
108
109     responses = 0
110     for i in range(0, 5):
111         ev = dev[0].wait_event(["GAS-RESPONSE-INFO", "CTRL-EVENT-SCAN-RESULTS"],
112                                timeout=10)
113         if ev is None:
114             raise Exception("Operation timed out")
115         if "GAS-RESPONSE-INFO" in ev:
116             responses = responses + 1
117             get_gas_response(dev[0], bssid, ev, allow_fetch_failure=True)
118
119     if responses != 4:
120         raise Exception("Unexpected number of GAS responses")
121
122 def test_gas_concurrent_connect(dev, apdev):
123     """Generic GAS queries with concurrent connection operation"""
124     bssid = apdev[0]['bssid']
125     params = hs20_ap_params()
126     params['hessid'] = bssid
127     hostapd.add_ap(apdev[0]['ifname'], params)
128
129     dev[0].scan()
130
131     logger.debug("Start concurrent connect and GAS request")
132     dev[0].connect("test-gas", key_mgmt="WPA-EAP", eap="TTLS",
133                    identity="DOMAIN\mschapv2 user", anonymous_identity="ttls",
134                    password="password", phase2="auth=MSCHAPV2",
135                    ca_cert="auth_serv/ca.pem", wait_connect=False)
136     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
137     if "FAIL" in req:
138         raise Exception("GAS query request rejected")
139
140     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED", "GAS-RESPONSE-INFO"],
141                            timeout=20)
142     if ev is None:
143         raise Exception("Operation timed out")
144     if "CTRL-EVENT-CONNECTED" not in ev:
145         raise Exception("Unexpected operation order")
146
147     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED", "GAS-RESPONSE-INFO"],
148                            timeout=20)
149     if ev is None:
150         raise Exception("Operation timed out")
151     if "GAS-RESPONSE-INFO" not in ev:
152         raise Exception("Unexpected operation order")
153     get_gas_response(dev[0], bssid, ev)
154
155     dev[0].request("DISCONNECT")
156     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
157     if ev is None:
158         raise Exception("Disconnection timed out")
159
160     logger.debug("Wait six seconds for expiration of connect-without-scan")
161     time.sleep(6)
162
163     logger.debug("Start concurrent GAS request and connect")
164     req = dev[0].request("GAS_REQUEST " + bssid + " 00 000102000101")
165     if "FAIL" in req:
166         raise Exception("GAS query request rejected")
167     dev[0].request("RECONNECT")
168
169     ev = dev[0].wait_event(["GAS-RESPONSE-INFO"], timeout=10)
170     if ev is None:
171         raise Exception("Operation timed out")
172     get_gas_response(dev[0], bssid, ev)
173
174     ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], timeout=20)
175     if ev is None:
176         raise Exception("No new scan results reported")
177
178     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=20)
179     if ev is None:
180         raise Exception("Operation timed out")
181     if "CTRL-EVENT-CONNECTED" not in ev:
182         raise Exception("Unexpected operation order")
183
184 def test_gas_fragment(dev, apdev):
185     """GAS fragmentation"""
186     bssid = apdev[0]['bssid']
187     params = hs20_ap_params()
188     params['hessid'] = bssid
189     hostapd.add_ap(apdev[0]['ifname'], params)
190     hapd = hostapd.Hostapd(apdev[0]['ifname'])
191     hapd.set("gas_frag_limit", "50")
192
193     dev[0].scan(freq="2412")
194     dev[0].request("FETCH_ANQP")
195     for i in range(0, 6):
196         ev = dev[0].wait_event(["RX-ANQP"], timeout=5)
197         if ev is None:
198             raise Exception("Operation timed out")
199
200 def test_gas_comeback_delay(dev, apdev):
201     """GAS fragmentation"""
202     bssid = apdev[0]['bssid']
203     params = hs20_ap_params()
204     params['hessid'] = bssid
205     hostapd.add_ap(apdev[0]['ifname'], params)
206     hapd = hostapd.Hostapd(apdev[0]['ifname'])
207     hapd.set("gas_comeback_delay", "500")
208
209     dev[0].scan(freq="2412")
210     dev[0].request("FETCH_ANQP")
211     for i in range(0, 6):
212         ev = dev[0].wait_event(["RX-ANQP"], timeout=5)
213         if ev is None:
214             raise Exception("Operation timed out")
215
216 def expect_gas_result(dev, result):
217     ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
218     if ev is None:
219         raise Exception("GAS query timed out")
220     if "result=" + result not in ev:
221         raise Exception("Unexpected GAS query result")
222
223 def test_gas_timeout(dev, apdev):
224     """GAS timeout"""
225     bssid = apdev[0]['bssid']
226     params = hs20_ap_params()
227     params['hessid'] = bssid
228     hostapd.add_ap(apdev[0]['ifname'], params)
229     hapd = hostapd.Hostapd(apdev[0]['ifname'])
230
231     dev[0].scan(freq="2412")
232     hapd.set("ext_mgmt_frame_handling", "1")
233
234     dev[0].request("ANQP_GET " + bssid + " 263")
235     ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
236     if ev is None:
237         raise Exception("GAS query start timed out")
238
239     ev = hapd.wait_event(["MGMT-RX"], timeout=5)
240     if ev is None:
241         raise Exception("MGMT RX wait timed out")
242
243     expect_gas_result(dev[0], "TIMEOUT")
244
245 def test_gas_invalid_response_type(dev, apdev):
246     """GAS invalid response type"""
247     bssid = apdev[0]['bssid']
248     params = hs20_ap_params()
249     params['hessid'] = bssid
250     hostapd.add_ap(apdev[0]['ifname'], params)
251     hapd = hostapd.Hostapd(apdev[0]['ifname'])
252
253     dev[0].scan(freq="2412")
254     hapd.set("ext_mgmt_frame_handling", "1")
255
256     dev[0].request("ANQP_GET " + bssid + " 263")
257     ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
258     if ev is None:
259         raise Exception("GAS query start timed out")
260
261     query = hapd.mgmt_rx()
262     if query is None:
263         raise Exception("GAS query request not received")
264     resp = {}
265     resp['fc'] = query['fc']
266     resp['da'] = query['sa']
267     resp['sa'] = query['da']
268     resp['bssid'] = query['bssid']
269     # GAS Comeback Response instead of GAS Initial Response
270     resp['payload'] = binascii.unhexlify("040d" + binascii.hexlify(query['payload'][2]) + "0000000000" + "6c027f00" + "0000")
271     hapd.mgmt_tx(resp)
272     ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
273     if ev is None:
274         raise Exception("Missing TX status for GAS response")
275     if "ok=1" not in ev:
276         raise Exception("GAS response not acknowledged")
277
278     # station drops the invalid frame, so this needs to result in GAS timeout
279     expect_gas_result(dev[0], "TIMEOUT")