Support forward proxy authentication
[mod_auth_gssapi.git] / tests / magtests.py
1 #!/usr/bin/python
2 # Copyright (C) 2015 - mod_auth_gssapi contributors, see COPYING for license.
3
4 import argparse
5 import glob
6 import os
7 import random
8 import shutil
9 import signal
10 from string import Template
11 import subprocess
12 import sys
13 import time
14
15
16 def parse_args():
17     parser = argparse.ArgumentParser(description='Mod Auth GSSAPI Tests Environment')
18     parser.add_argument('--path', default='%s/scratchdir' % os.getcwd(),
19                         help="Directory in which tests are run")
20
21     return vars(parser.parse_args())
22
23
24 WRAP_HOSTNAME = "kdc.mag.dev"
25 WRAP_IPADDR = '127.0.0.9'
26
27 def setup_wrappers(base):
28
29     pkgcfg = subprocess.Popen(['pkg-config', '--exists', 'socket_wrapper'])
30     pkgcfg.wait()
31     if pkgcfg.returncode != 0:
32         raise ValueError('Socket Wrappers not available')
33
34     pkgcfg = subprocess.Popen(['pkg-config', '--exists', 'nss_wrapper'])
35     pkgcfg.wait()
36     if pkgcfg.returncode != 0:
37         raise ValueError('Socket Wrappers not available')
38
39     wrapdir = os.path.join(base, 'wrapdir')
40     if not os.path.exists(wrapdir):
41         os.makedirs(wrapdir)
42
43     hosts_file = os.path.join(testdir, 'hosts')
44     with open(hosts_file, 'w+') as f:
45         f.write('%s %s' % (WRAP_IPADDR, WRAP_HOSTNAME))
46
47     wenv = {'LD_PRELOAD': 'libsocket_wrapper.so libnss_wrapper.so',
48             'SOCKET_WRAPPER_DIR': wrapdir,
49             'SOCKET_WRAPPER_DEFAULT_IFACE': '9',
50             'NSS_WRAPPER_HOSTNAME': WRAP_HOSTNAME,
51             'NSS_WRAPPER_HOSTS': hosts_file}
52
53     return wenv
54
55
56 TESTREALM = "MAG.DEV"
57 KDC_DBNAME = 'db.file'
58 KDC_STASH = 'stash.file'
59 KDC_PASSWORD = 'modauthgssapi'
60 KRB5_CONF_TEMPLATE = '''
61 [libdefaults]
62   default_realm = ${TESTREALM}
63   dns_lookup_realm = false
64   dns_lookup_kdc = false
65   rdns = false
66   ticket_lifetime = 24h
67   forwardable = yes
68   default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
69
70 [realms]
71   ${TESTREALM} = {
72     kdc =${WRAP_HOSTNAME}
73   }
74
75 [domain_realm]
76   .mag.dev = ${TESTREALM}
77   mag.dev = ${TESTREALM}
78
79 [dbmodules]
80   ${TESTREALM} = {
81     database_name = ${KDCDIR}/${KDC_DBNAME}
82   }
83 '''
84 KDC_CONF_TEMPLATE = '''
85 [kdcdefaults]
86  kdc_ports = 88
87  kdc_tcp_ports = 88
88  restrict_anonymous_to_tgt = true
89
90 [realms]
91  ${TESTREALM} = {
92   master_key_type = aes256-cts
93   max_life = 7d
94   max_renewable_life = 14d
95   acl_file = ${KDCDIR}/kadm5.acl
96   dict_file = /usr/share/dict/words
97   default_principal_flags = +preauth
98   admin_keytab = ${TESTREALM}/kadm5.keytab
99   key_stash_file = ${KDCDIR}/${KDC_STASH}
100  }
101 [logging]
102   kdc = FILE:${KDCLOG}
103 '''
104
105
106 def setup_kdc(testdir, wrapenv):
107
108     # setup kerberos environment
109     testlog = os.path.join(testdir, 'kerb.log')
110     krb5conf = os.path.join(testdir, 'krb5.conf')
111     kdcconf = os.path.join(testdir, 'kdc.conf')
112     kdcdir = os.path.join(testdir, 'kdc')
113     kdcstash = os.path.join(kdcdir, KDC_STASH)
114     kdcdb = os.path.join(kdcdir, KDC_DBNAME)
115     if os.path.exists(kdcdir):
116         shutil.rmtree(kdcdir)
117     os.makedirs(kdcdir)
118
119     t = Template(KRB5_CONF_TEMPLATE)
120     text = t.substitute({'TESTREALM': TESTREALM,
121                          'TESTDIR': testdir,
122                          'KDCDIR': kdcdir,
123                          'KDC_DBNAME': KDC_DBNAME,
124                          'WRAP_HOSTNAME': WRAP_HOSTNAME})
125     with open(krb5conf, 'w+') as f:
126         f.write(text)
127
128     t = Template(KDC_CONF_TEMPLATE)
129     text = t.substitute({'TESTREALM': TESTREALM,
130                          'KDCDIR': kdcdir,
131                          'KDCLOG': testlog,
132                          'KDC_STASH': KDC_STASH})
133     with open(kdcconf, 'w+') as f:
134         f.write(text)
135
136     kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
137               'KRB5_CONFIG': krb5conf,
138               'KRB5_KDC_PROFILE': kdcconf,
139               'KRB5_TRACE': os.path.join(testdir, 'krbtrace.log')}
140     kdcenv.update(wrapenv)
141
142     with (open(testlog, 'a')) as logfile:
143         ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
144                                    "-r", TESTREALM, "-P", KDC_PASSWORD],
145                                   stdout=logfile, stderr=logfile,
146                                   env=kdcenv, preexec_fn=os.setsid)
147     ksetup.wait()
148     if ksetup.returncode != 0:
149         raise ValueError('KDC Setup failed')
150
151     with (open(testlog, 'a')) as logfile:
152         kdcproc = subprocess.Popen(['krb5kdc', '-n'],
153                                    stdout=logfile, stderr=logfile,
154                                    env=kdcenv, preexec_fn=os.setsid)
155
156     return kdcproc, kdcenv
157
158
159 def kadmin_local(cmd, env, logfile):
160     ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
161                               stdout=logfile, stderr=logfile,
162                               env=env, preexec_fn=os.setsid)
163     ksetup.wait()
164     if ksetup.returncode != 0:
165         raise ValueError('Kadmin local [%s] failed' % cmd)
166
167
168 USR_NAME = "maguser"
169 USR_PWD = "magpwd"
170 USR_NAME_2 = "maguser2"
171 USR_PWD_2 = "magpwd2"
172 SVC_KTNAME = "httpd/http.keytab"
173 KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
174
175
176 def setup_keys(tesdir, env):
177
178     testlog = os.path.join(testdir, 'kerb.log')
179
180     svc_name = "HTTP/%s" % WRAP_HOSTNAME
181     svc_keytab = os.path.join(testdir, SVC_KTNAME)
182     cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
183     with (open(testlog, 'a')) as logfile:
184         kadmin_local(cmd, env, logfile)
185     cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
186     with (open(testlog, 'a')) as logfile:
187         kadmin_local(cmd, env, logfile)
188
189     cmd = "addprinc -pw %s -e %s %s" % (USR_PWD, KEY_TYPE, USR_NAME)
190     with (open(testlog, 'a')) as logfile:
191         kadmin_local(cmd, env, logfile)
192
193     cmd = "addprinc -pw %s -e %s %s" % (USR_PWD_2, KEY_TYPE, USR_NAME_2)
194     with (open(testlog, 'a')) as logfile:
195         kadmin_local(cmd, env, logfile)
196
197     keys_env = { "KRB5_KTNAME": svc_keytab }
198     keys_env.update(env)
199
200     return keys_env
201
202
203 def setup_http(testdir, wrapenv):
204
205     httpdir = os.path.join(testdir, 'httpd')
206     if os.path.exists(httpdir):
207         shutil.rmtree(httpdir)
208     os.makedirs(httpdir)
209     os.mkdir(os.path.join(httpdir, 'conf.d'))
210     os.mkdir(os.path.join(httpdir, 'html'))
211     os.mkdir(os.path.join(httpdir, 'logs'))
212     os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules'))
213
214     shutil.copy('src/.libs/mod_auth_gssapi.so', httpdir)
215
216     with open('tests/httpd.conf') as f:
217         t = Template(f.read())
218         text = t.substitute({'HTTPROOT': httpdir,
219                              'HTTPNAME': WRAP_HOSTNAME,
220                              'HTTPADDR': WRAP_IPADDR,
221                              'HTTPPORT': '80'})
222     config = os.path.join(httpdir, 'httpd.conf')
223     with open(config, 'w+') as f:
224         f.write(text)
225
226     httpenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
227                'MALLOC_CHECK_': '3',
228                'MALLOC_PERTURB_': str(random.randint(0, 32767) % 255 + 1)}
229     httpenv.update(wrapenv)
230
231     httpproc = subprocess.Popen(['httpd', '-DFOREGROUND', '-f', config],
232                                  env=httpenv, preexec_fn=os.setsid)
233
234     return httpproc
235
236
237 def kinit_user(testdir, kdcenv):
238     testlog = os.path.join(testdir, 'kinit.log')
239     ccache = os.path.join(testdir, 'k5ccache')
240     testenv = {'KRB5CCNAME': ccache}
241     testenv.update(kdcenv)
242
243     with (open(testlog, 'a')) as logfile:
244         kinit = subprocess.Popen(["kinit", USR_NAME],
245                                  stdin=subprocess.PIPE,
246                                  stdout=logfile, stderr=logfile,
247                                  env=testenv, preexec_fn=os.setsid)
248         kinit.communicate('%s\n' % USR_PWD)
249         kinit.wait()
250         if kinit.returncode != 0:
251             raise ValueError('kinit failed')
252
253     return testenv
254
255
256 def test_spnego_auth(testdir, testenv, testlog):
257
258     spnegodir = os.path.join(testdir, 'httpd', 'html', 'spnego')
259     os.mkdir(spnegodir)
260     shutil.copy('tests/index.html', spnegodir)
261
262     with (open(testlog, 'a')) as logfile:
263         spnego = subprocess.Popen(["tests/t_spnego.py"],
264                                   stdout=logfile, stderr=logfile,
265                                   env=testenv, preexec_fn=os.setsid)
266         spnego.wait()
267         if spnego.returncode != 0:
268             sys.stderr.write('SPNEGO: FAILED\n')
269         else:
270             sys.stderr.write('SPNEGO: SUCCESS\n')
271
272
273 def test_basic_auth_krb5(testdir, testenv, testlog):
274
275     basicdir = os.path.join(testdir, 'httpd', 'html', 'basic_auth_krb5')
276     os.mkdir(basicdir)
277     shutil.copy('tests/index.html', basicdir)
278
279     with (open(testlog, 'a')) as logfile:
280         basick5 = subprocess.Popen(["tests/t_basic_k5.py"],
281                                    stdout=logfile, stderr=logfile,
282                                    env=testenv, preexec_fn=os.setsid)
283         basick5.wait()
284         if basick5.returncode != 0:
285             sys.stderr.write('BASIC-AUTH: FAILED\n')
286         else:
287             sys.stderr.write('BASIC-AUTH: SUCCESS\n')
288
289     with (open(testlog, 'a')) as logfile:
290         basick5 = subprocess.Popen(["tests/t_basic_k5_two_users.py"],
291                                    stdout=logfile, stderr=logfile,
292                                    env=testenv, preexec_fn=os.setsid)
293         basick5.wait()
294         if basick5.returncode != 0:
295             sys.stderr.write('BASIC-AUTH Two Users: FAILED\n')
296         else:
297             sys.stderr.write('BASIC-AUTH Two Users: SUCCESS\n')
298
299
300 if __name__ == '__main__':
301
302     args = parse_args()
303
304     testdir = args['path']
305     if os.path.exists(testdir):
306         shutil.rmtree(testdir)
307     os.makedirs(testdir)
308
309     processes = dict()
310
311     testlog = os.path.join(testdir, 'tests.log')
312
313     try:
314         wrapenv = setup_wrappers(testdir)
315
316         kdcproc, kdcenv = setup_kdc(testdir, wrapenv)
317         processes['KDC(%d)' % kdcproc.pid] = kdcproc
318
319         httpproc = setup_http(testdir, kdcenv)
320         processes['HTTPD(%d)' % httpproc.pid] = httpproc
321
322         keysenv = setup_keys(testdir, kdcenv)
323         testenv = kinit_user(testdir, kdcenv)
324
325         test_spnego_auth(testdir, testenv, testlog)
326
327
328         testenv = {'MAG_USER_NAME': USR_NAME,
329                    'MAG_USER_PASSWORD': USR_PWD,
330                    'MAG_USER_NAME_2': USR_NAME_2,
331                    'MAG_USER_PASSWORD_2': USR_PWD_2}
332         testenv.update(kdcenv)
333         test_basic_auth_krb5(testdir, testenv, testlog)
334
335     finally:
336         with (open(testlog, 'a')) as logfile:
337             for name in processes:
338                 logfile.write("Killing %s\n" % name)
339                 os.killpg(processes[name].pid, signal.SIGTERM)