import os
import time
import logging
+import binascii
+import struct
import wpaspy
logger = logging.getLogger()
hapd_ctrl = '/var/run/hostapd'
hapd_global = '/var/run/hostapd-global'
+def mac2tuple(mac):
+ return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
+
class HostapdGlobal:
def __init__(self):
self.ctrl = wpaspy.Ctrl(hapd_global)
return vals[field]
return None
+ def mgmt_rx(self, timeout=5):
+ ev = self.wait_event(["MGMT-RX"], timeout=timeout)
+ if ev is None:
+ return None
+ msg = {}
+ frame = binascii.unhexlify(ev.split(' ')[1])
+ msg['frame'] = frame
+
+ hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
+ msg['fc'] = hdr[0]
+ msg['subtype'] = (hdr[0] >> 4) & 0xf
+ hdr = hdr[1:]
+ msg['duration'] = hdr[0]
+ hdr = hdr[1:]
+ msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
+ hdr = hdr[6:]
+ msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
+ hdr = hdr[6:]
+ msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
+ hdr = hdr[6:]
+ msg['seq_ctrl'] = hdr[0]
+ msg['payload'] = frame[24:]
+
+ return msg
+
+ def mgmt_tx(self, msg):
+ t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
+ hdr = struct.pack('<HH6B6B6BH', *t)
+ self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
+
def add_ap(ifname, params):
logger.info("Starting AP " + ifname)
hapd_global = HostapdGlobal()
# See README for more details.
import time
+import binascii
import logging
logger = logging.getLogger()
import re
if ev is None:
raise Exception("Operation timed out")
+def expect_gas_result(dev, result):
+ ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+ if "result=" + result not in ev:
+ raise Exception("Unexpected GAS query result")
+
def test_gas_timeout(dev, apdev):
"""GAS timeout"""
bssid = apdev[0]['bssid']
if ev is None:
raise Exception("MGMT RX wait timed out")
- ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ expect_gas_result(dev[0], "TIMEOUT")
+
+def test_gas_invalid_response_type(dev, apdev):
+ """GAS invalid response type"""
+ bssid = apdev[0]['bssid']
+ params = hs20_ap_params()
+ params['hessid'] = bssid
+ hostapd.add_ap(apdev[0]['ifname'], params)
+ hapd = hostapd.Hostapd(apdev[0]['ifname'])
+
+ dev[0].scan(freq="2412")
+ hapd.set("ext_mgmt_frame_handling", "1")
+
+ dev[0].request("ANQP_GET " + bssid + " 263")
+ ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
if ev is None:
- raise Exception("GAS query timed out")
- if "result=TIMEOUT" not in ev:
- raise Exception("Unexpected GAS query result")
+ raise Exception("GAS query start timed out")
+
+ query = hapd.mgmt_rx()
+ if query is None:
+ raise Exception("GAS query request not received")
+ resp = {}
+ resp['fc'] = query['fc']
+ resp['da'] = query['sa']
+ resp['sa'] = query['da']
+ resp['bssid'] = query['bssid']
+ # GAS Comeback Response instead of GAS Initial Response
+ resp['payload'] = binascii.unhexlify("040d" + binascii.hexlify(query['payload'][2]) + "0000000000" + "6c027f00" + "0000")
+ hapd.mgmt_tx(resp)
+ ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
+ if ev is None:
+ raise Exception("Missing TX status for GAS response")
+ if "ok=1" not in ev:
+ raise Exception("GAS response not acknowledged")
+
+ # station drops the invalid frame, so this needs to result in GAS timeout
+ expect_gas_result(dev[0], "TIMEOUT")