tests: Make ap_wps_er_http_proto more robust
[mech_eap.git] / tests / hwsim / test_hs20_filter.py
1 # Hotspot 2.0 filtering tests
2 # Copyright (c) 2015, Intel Deutschland GmbH
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import hostapd
8 import hwsim_utils
9 import socket
10 import subprocess
11 import binascii
12 from utils import HwsimSkip, require_under_vm
13 import os
14 import time
15 from test_ap_hs20 import build_arp, build_na, hs20_ap_params
16 from test_ap_hs20 import interworking_select, interworking_connect
17 import struct
18 import logging
19 logger = logging.getLogger()
20
21 class IPAssign(object):
22     def __init__(self, iface, addr, ipv6=False):
23         self._iface = iface
24         self._addr = addr
25         self._cmd = ['ip']
26         if ipv6:
27             self._cmd.append('-6')
28         self._cmd.append('addr')
29         self._ipv6 = ipv6
30     def __enter__(self):
31         subprocess.call(self._cmd + ['add', self._addr, 'dev', self._iface])
32         if self._ipv6:
33             # wait for DAD to finish
34             while True:
35                 o = subprocess.check_output(self._cmd + ['show', 'tentative', 'dev', self._iface])
36                 if not self._addr in o:
37                     break
38                 time.sleep(0.1)
39     def __exit__(self, type, value, traceback):
40         subprocess.call(self._cmd + ['del', self._addr, 'dev', self._iface])
41
42 def hs20_filters_connect(dev, apdev, disable_dgaf=False, proxy_arp=False):
43     bssid = apdev[0]['bssid']
44     params = hs20_ap_params()
45     params['hessid'] = bssid
46
47     # Do not disable dgaf, to test that the station drops unicast IP packets
48     # encrypted with GTK.
49     params['disable_dgaf'] = '0'
50     params['proxy_arp'] = '1'
51     params['ap_isolate'] = '1'
52     params['bridge'] = 'ap-br0'
53
54     try:
55         hapd = hostapd.add_ap(apdev[0], params)
56     except:
57         # For now, do not report failures due to missing kernel support.
58         raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in the kernel")
59
60     subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
61     subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
62
63     dev[0].hs20_enable()
64
65     id = dev[0].add_cred_values({ 'realm': "example.com",
66                                   'username': "hs20-test",
67                                   'password': "password",
68                                   'ca_cert': "auth_serv/ca.pem",
69                                   'domain': "example.com",
70                                   'update_identifier': "1234" })
71     interworking_select(dev[0], bssid, "home", freq="2412")
72     interworking_connect(dev[0], bssid, "TTLS")
73
74     time.sleep(0.1)
75
76     return dev[0], hapd
77
78 def _test_ip4_gtk_drop(devs, apdevs, params, dst):
79     require_under_vm()
80     procfile = '/proc/sys/net/ipv4/conf/%s/drop_unicast_in_l2_multicast' % devs[0].ifname
81     if not os.path.exists(procfile):
82         raise HwsimSkip("kernel doesn't have capability")
83
84     [dev, hapd] = hs20_filters_connect(devs, apdevs)
85     with IPAssign(dev.ifname, '10.0.0.1/24'):
86         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
87         s.bind(("10.0.0.1", 12345))
88         s.settimeout(0.1)
89
90         pkt = dst
91         pkt += hapd.own_addr().replace(':', '')
92         pkt += '0800'
93         pkt += '45000020786840004011ae600a0000040a000001'
94         pkt += '30393039000c0000'
95         pkt += '61736466' # "asdf"
96         if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
97             raise Exception("DATA_TEST_FRAME failed")
98         try:
99             logger.info(s.recvfrom(1024))
100             logger.info("procfile=" + procfile + " val=" + open(procfile,'r').read().rstrip())
101             raise Exception("erroneously received frame!")
102         except socket.timeout:
103             # this is the expected behaviour
104             pass
105
106 def test_ip4_gtk_drop_bcast(devs, apdevs, params):
107     _test_ip4_gtk_drop(devs, apdevs, params, dst='ffffffffffff')
108
109 def test_ip4_gtk_drop_mcast(devs, apdevs, params):
110     _test_ip4_gtk_drop(devs, apdevs, params, dst='ff0000000000')
111
112 def _test_ip6_gtk_drop(devs, apdevs, params, dst):
113     require_under_vm()
114     dev = devs[0]
115     procfile = '/proc/sys/net/ipv6/conf/%s/drop_unicast_in_l2_multicast' % devs[0].ifname
116     if not os.path.exists(procfile):
117         raise HwsimSkip("kernel doesn't have capability")
118
119     [dev, hapd] = hs20_filters_connect(devs, apdevs)
120
121     with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True):
122         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
123         s.bind(("fdaa::1", 12345))
124         s.settimeout(0.1)
125
126         pkt = dst
127         pkt += hapd.own_addr().replace(':', '')
128         pkt += '86dd'
129         pkt += '60000000000c1140fdaa0000000000000000000000000002fdaa0000000000000000000000000001'
130         pkt += '30393039000cde31'
131         pkt += '61736466' # "asdf"
132         if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
133             raise Exception("DATA_TEST_FRAME failed")
134         try:
135             logger.info(s.recvfrom(1024))
136             logger.info("procfile=" + procfile + " val=" + open(procfile,'r').read().rstrip())
137             raise Exception("erroneously received frame!")
138         except socket.timeout:
139             # this is the expected behaviour
140             pass
141
142 def test_ip6_gtk_drop_bcast(devs, apdevs, params):
143     _test_ip6_gtk_drop(devs, apdevs, params, dst='ffffffffffff')
144
145 def test_ip6_gtk_drop_mcast(devs, apdevs, params):
146     _test_ip6_gtk_drop(devs, apdevs, params, dst='ff0000000000')
147
148 def test_ip4_drop_gratuitous_arp(devs, apdevs, params):
149     require_under_vm()
150     procfile = '/proc/sys/net/ipv4/conf/%s/drop_gratuitous_arp' % devs[0].ifname
151     if not os.path.exists(procfile):
152         raise HwsimSkip("kernel doesn't have capability")
153
154     [dev, hapd] = hs20_filters_connect(devs, apdevs)
155
156     with IPAssign(dev.ifname, '10.0.0.2/24'):
157         # add an entry that can be updated by gratuitous ARP
158         subprocess.call(['ip', 'neigh', 'add', '10.0.0.1', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
159         # wait for lock-time
160         time.sleep(1)
161         try:
162             ap_addr = hapd.own_addr()
163             cl_addr = dev.own_addr()
164             pkt = build_arp(cl_addr, ap_addr, 2, ap_addr, '10.0.0.1', ap_addr, '10.0.0.1')
165             pkt = binascii.hexlify(pkt)
166
167             if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
168                 raise Exception("DATA_TEST_FRAME failed")
169
170             if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']):
171                 raise Exception("gratuitous ARP frame updated erroneously")
172         finally:
173             subprocess.call(['ip', 'neigh', 'del', '10.0.0.1', 'dev', dev.ifname])
174
175 def test_ip6_drop_unsolicited_na(devs, apdevs, params):
176     require_under_vm()
177     procfile = '/proc/sys/net/ipv6/conf/%s/drop_unsolicited_na' % devs[0].ifname
178     if not os.path.exists(procfile):
179         raise HwsimSkip("kernel doesn't have capability")
180
181     [dev, hapd] = hs20_filters_connect(devs, apdevs)
182
183     with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True):
184         # add an entry that can be updated by unsolicited NA
185         subprocess.call(['ip', '-6', 'neigh', 'add', 'fdaa::2', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
186         try:
187             ap_addr = hapd.own_addr()
188             cl_addr = dev.own_addr()
189             pkt = build_na(ap_addr, 'fdaa::2', 'ff02::1', 'fdaa::2', flags=0x20,
190                            opt=binascii.unhexlify('0201' + ap_addr.replace(':', '')))
191             pkt = binascii.hexlify(pkt)
192
193             if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
194                 raise Exception("DATA_TEST_FRAME failed")
195
196             if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']):
197                 raise Exception("unsolicited NA frame updated erroneously")
198         finally:
199             subprocess.call(['ip', '-6', 'neigh', 'del', 'fdaa::2', 'dev', dev.ifname])