More basic-auth tests
[mod_auth_gssapi.git] / tests / magtests.py
1 #!/usr/bin/python
2 # Copyright (C) 2015 - mod_auth_gssapi contributors, see COPYING for license.
3
4 import argparse
5 import glob
6 import os
7 import random
8 import shutil
9 import signal
10 from string import Template
11 import subprocess
12 import sys
13 import time
14
15
16 def parse_args():
17     parser = argparse.ArgumentParser(description='Mod Auth GSSAPI Tests Environment')
18     parser.add_argument('--path', default='%s/scratchdir' % os.getcwd(),
19                         help="Directory in which tests are run")
20
21     return vars(parser.parse_args())
22
23
24 WRAP_HOSTNAME = "kdc.mag.dev"
25 WRAP_IPADDR = '127.0.0.9'
26 WRAP_HTTP_PORT = '80'
27 WRAP_PROXY_PORT = '8080'
28
29 def setup_wrappers(base):
30
31     pkgcfg = subprocess.Popen(['pkg-config', '--exists', 'socket_wrapper'])
32     pkgcfg.wait()
33     if pkgcfg.returncode != 0:
34         raise ValueError('Socket Wrappers not available')
35
36     pkgcfg = subprocess.Popen(['pkg-config', '--exists', 'nss_wrapper'])
37     pkgcfg.wait()
38     if pkgcfg.returncode != 0:
39         raise ValueError('Socket Wrappers not available')
40
41     wrapdir = os.path.join(base, 'wrapdir')
42     if not os.path.exists(wrapdir):
43         os.makedirs(wrapdir)
44
45     hosts_file = os.path.join(testdir, 'hosts')
46     with open(hosts_file, 'w+') as f:
47         f.write('%s %s' % (WRAP_IPADDR, WRAP_HOSTNAME))
48
49     wenv = {'LD_PRELOAD': 'libsocket_wrapper.so libnss_wrapper.so',
50             'SOCKET_WRAPPER_DIR': wrapdir,
51             'SOCKET_WRAPPER_DEFAULT_IFACE': '9',
52             'WRAP_PROXY_PORT': WRAP_PROXY_PORT,
53             'NSS_WRAPPER_HOSTNAME': WRAP_HOSTNAME,
54             'NSS_WRAPPER_HOSTS': hosts_file}
55
56     return wenv
57
58
59 TESTREALM = "MAG.DEV"
60 KDC_DBNAME = 'db.file'
61 KDC_STASH = 'stash.file'
62 KDC_PASSWORD = 'modauthgssapi'
63 KRB5_CONF_TEMPLATE = '''
64 [libdefaults]
65   default_realm = ${TESTREALM}
66   dns_lookup_realm = false
67   dns_lookup_kdc = false
68   rdns = false
69   ticket_lifetime = 24h
70   forwardable = yes
71   default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
72
73 [realms]
74   ${TESTREALM} = {
75     kdc =${WRAP_HOSTNAME}
76   }
77
78 [domain_realm]
79   .mag.dev = ${TESTREALM}
80   mag.dev = ${TESTREALM}
81
82 [dbmodules]
83   ${TESTREALM} = {
84     database_name = ${KDCDIR}/${KDC_DBNAME}
85   }
86 '''
87 KDC_CONF_TEMPLATE = '''
88 [kdcdefaults]
89  kdc_ports = 88
90  kdc_tcp_ports = 88
91  restrict_anonymous_to_tgt = true
92
93 [realms]
94  ${TESTREALM} = {
95   master_key_type = aes256-cts
96   max_life = 7d
97   max_renewable_life = 14d
98   acl_file = ${KDCDIR}/kadm5.acl
99   dict_file = /usr/share/dict/words
100   default_principal_flags = +preauth
101   admin_keytab = ${TESTREALM}/kadm5.keytab
102   key_stash_file = ${KDCDIR}/${KDC_STASH}
103  }
104 [logging]
105   kdc = FILE:${KDCLOG}
106 '''
107
108
109 def setup_kdc(testdir, wrapenv):
110
111     # setup kerberos environment
112     testlog = os.path.join(testdir, 'kerb.log')
113     krb5conf = os.path.join(testdir, 'krb5.conf')
114     kdcconf = os.path.join(testdir, 'kdc.conf')
115     kdcdir = os.path.join(testdir, 'kdc')
116     kdcstash = os.path.join(kdcdir, KDC_STASH)
117     kdcdb = os.path.join(kdcdir, KDC_DBNAME)
118     if os.path.exists(kdcdir):
119         shutil.rmtree(kdcdir)
120     os.makedirs(kdcdir)
121
122     t = Template(KRB5_CONF_TEMPLATE)
123     text = t.substitute({'TESTREALM': TESTREALM,
124                          'TESTDIR': testdir,
125                          'KDCDIR': kdcdir,
126                          'KDC_DBNAME': KDC_DBNAME,
127                          'WRAP_HOSTNAME': WRAP_HOSTNAME})
128     with open(krb5conf, 'w+') as f:
129         f.write(text)
130
131     t = Template(KDC_CONF_TEMPLATE)
132     text = t.substitute({'TESTREALM': TESTREALM,
133                          'KDCDIR': kdcdir,
134                          'KDCLOG': testlog,
135                          'KDC_STASH': KDC_STASH})
136     with open(kdcconf, 'w+') as f:
137         f.write(text)
138
139     kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
140               'KRB5_CONFIG': krb5conf,
141               'KRB5_KDC_PROFILE': kdcconf,
142               'KRB5_TRACE': os.path.join(testdir, 'krbtrace.log')}
143     kdcenv.update(wrapenv)
144
145     with (open(testlog, 'a')) as logfile:
146         ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
147                                    "-r", TESTREALM, "-P", KDC_PASSWORD],
148                                   stdout=logfile, stderr=logfile,
149                                   env=kdcenv, preexec_fn=os.setsid)
150     ksetup.wait()
151     if ksetup.returncode != 0:
152         raise ValueError('KDC Setup failed')
153
154     with (open(testlog, 'a')) as logfile:
155         kdcproc = subprocess.Popen(['krb5kdc', '-n'],
156                                    stdout=logfile, stderr=logfile,
157                                    env=kdcenv, preexec_fn=os.setsid)
158
159     return kdcproc, kdcenv
160
161
162 def kadmin_local(cmd, env, logfile):
163     ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
164                               stdout=logfile, stderr=logfile,
165                               env=env, preexec_fn=os.setsid)
166     ksetup.wait()
167     if ksetup.returncode != 0:
168         raise ValueError('Kadmin local [%s] failed' % cmd)
169
170
171 USR_NAME = "maguser"
172 USR_PWD = "magpwd"
173 USR_NAME_2 = "maguser2"
174 USR_PWD_2 = "magpwd2"
175 SVC_KTNAME = "httpd/http.keytab"
176 KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
177
178
179 def setup_keys(tesdir, env):
180
181     testlog = os.path.join(testdir, 'kerb.log')
182
183     svc_name = "HTTP/%s" % WRAP_HOSTNAME
184     svc_keytab = os.path.join(testdir, SVC_KTNAME)
185     cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
186     with (open(testlog, 'a')) as logfile:
187         kadmin_local(cmd, env, logfile)
188     cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
189     with (open(testlog, 'a')) as logfile:
190         kadmin_local(cmd, env, logfile)
191
192     cmd = "addprinc -pw %s -e %s %s" % (USR_PWD, KEY_TYPE, USR_NAME)
193     with (open(testlog, 'a')) as logfile:
194         kadmin_local(cmd, env, logfile)
195
196     cmd = "addprinc -pw %s -e %s %s" % (USR_PWD_2, KEY_TYPE, USR_NAME_2)
197     with (open(testlog, 'a')) as logfile:
198         kadmin_local(cmd, env, logfile)
199
200     keys_env = { "KRB5_KTNAME": svc_keytab }
201     keys_env.update(env)
202
203     return keys_env
204
205
206 def setup_http(testdir, wrapenv):
207
208     httpdir = os.path.join(testdir, 'httpd')
209     if os.path.exists(httpdir):
210         shutil.rmtree(httpdir)
211     os.makedirs(httpdir)
212     os.mkdir(os.path.join(httpdir, 'conf.d'))
213     os.mkdir(os.path.join(httpdir, 'html'))
214     os.mkdir(os.path.join(httpdir, 'logs'))
215     os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules'))
216
217     shutil.copy('src/.libs/mod_auth_gssapi.so', httpdir)
218
219     with open('tests/httpd.conf') as f:
220         t = Template(f.read())
221         text = t.substitute({'HTTPROOT': httpdir,
222                              'HTTPNAME': WRAP_HOSTNAME,
223                              'HTTPADDR': WRAP_IPADDR,
224                              'PROXYPORT': WRAP_PROXY_PORT,
225                              'HTTPPORT': WRAP_HTTP_PORT})
226     config = os.path.join(httpdir, 'httpd.conf')
227     with open(config, 'w+') as f:
228         f.write(text)
229
230     httpenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
231                'MALLOC_CHECK_': '3',
232                'MALLOC_PERTURB_': str(random.randint(0, 32767) % 255 + 1)}
233     httpenv.update(wrapenv)
234
235     httpproc = subprocess.Popen(['httpd', '-DFOREGROUND', '-f', config],
236                                  env=httpenv, preexec_fn=os.setsid)
237
238     return httpproc
239
240
241 def kinit_user(testdir, kdcenv):
242     testlog = os.path.join(testdir, 'kinit.log')
243     ccache = os.path.join(testdir, 'k5ccache')
244     testenv = {'KRB5CCNAME': ccache}
245     testenv.update(kdcenv)
246
247     with (open(testlog, 'a')) as logfile:
248         kinit = subprocess.Popen(["kinit", USR_NAME],
249                                  stdin=subprocess.PIPE,
250                                  stdout=logfile, stderr=logfile,
251                                  env=testenv, preexec_fn=os.setsid)
252         kinit.communicate('%s\n' % USR_PWD)
253         kinit.wait()
254         if kinit.returncode != 0:
255             raise ValueError('kinit failed')
256
257     return testenv
258
259
260 def test_spnego_auth(testdir, testenv, testlog):
261
262     spnegodir = os.path.join(testdir, 'httpd', 'html', 'spnego')
263     os.mkdir(spnegodir)
264     shutil.copy('tests/index.html', spnegodir)
265
266     with (open(testlog, 'a')) as logfile:
267         spnego = subprocess.Popen(["tests/t_spnego.py"],
268                                   stdout=logfile, stderr=logfile,
269                                   env=testenv, preexec_fn=os.setsid)
270         spnego.wait()
271         if spnego.returncode != 0:
272             sys.stderr.write('SPNEGO: FAILED\n')
273         else:
274             sys.stderr.write('SPNEGO: SUCCESS\n')
275
276
277 def test_basic_auth_krb5(testdir, testenv, testlog):
278
279     basicdir = os.path.join(testdir, 'httpd', 'html', 'basic_auth_krb5')
280     os.mkdir(basicdir)
281     shutil.copy('tests/index.html', basicdir)
282
283     with (open(testlog, 'a')) as logfile:
284         basick5 = subprocess.Popen(["tests/t_basic_k5.py"],
285                                    stdout=logfile, stderr=logfile,
286                                    env=testenv, preexec_fn=os.setsid)
287         basick5.wait()
288         if basick5.returncode != 0:
289             sys.stderr.write('BASIC-AUTH: FAILED\n')
290         else:
291             sys.stderr.write('BASIC-AUTH: SUCCESS\n')
292
293     with (open(testlog, 'a')) as logfile:
294         basick5 = subprocess.Popen(["tests/t_basic_k5_two_users.py"],
295                                    stdout=logfile, stderr=logfile,
296                                    env=testenv, preexec_fn=os.setsid)
297         basick5.wait()
298         if basick5.returncode != 0:
299             sys.stderr.write('BASIC-AUTH Two Users: FAILED\n')
300         else:
301             sys.stderr.write('BASIC-AUTH Two Users: SUCCESS\n')
302
303     with (open(testlog, 'a')) as logfile:
304         basick5 = subprocess.Popen(["tests/t_basic_k5_fail_second.py"],
305                                    stdout=logfile, stderr=logfile,
306                                    env=testenv, preexec_fn=os.setsid)
307         basick5.wait()
308         if basick5.returncode != 0:
309             sys.stderr.write('BASIC Fail Second User: FAILED\n')
310         else:
311             sys.stderr.write('BASIC Fail Second User: SUCCESS\n')
312
313     with (open(testlog, 'a')) as logfile:
314         basick5 = subprocess.Popen(["tests/t_basic_proxy.py"],
315                                    stdout=logfile, stderr=logfile,
316                                    env=testenv, preexec_fn=os.setsid)
317         basick5.wait()
318         if basick5.returncode != 0:
319             sys.stderr.write('BASIC Proxy Auth: FAILED\n')
320         else:
321             sys.stderr.write('BASIC Proxy Auth: SUCCESS\n')
322
323
324 if __name__ == '__main__':
325
326     args = parse_args()
327
328     testdir = args['path']
329     if os.path.exists(testdir):
330         shutil.rmtree(testdir)
331     os.makedirs(testdir)
332
333     processes = dict()
334
335     testlog = os.path.join(testdir, 'tests.log')
336
337     try:
338         wrapenv = setup_wrappers(testdir)
339
340         kdcproc, kdcenv = setup_kdc(testdir, wrapenv)
341         processes['KDC(%d)' % kdcproc.pid] = kdcproc
342
343         httpproc = setup_http(testdir, kdcenv)
344         processes['HTTPD(%d)' % httpproc.pid] = httpproc
345
346         keysenv = setup_keys(testdir, kdcenv)
347         testenv = kinit_user(testdir, kdcenv)
348
349         test_spnego_auth(testdir, testenv, testlog)
350
351
352         testenv = {'MAG_USER_NAME': USR_NAME,
353                    'MAG_USER_PASSWORD': USR_PWD,
354                    'MAG_USER_NAME_2': USR_NAME_2,
355                    'MAG_USER_PASSWORD_2': USR_PWD_2}
356         testenv.update(kdcenv)
357         test_basic_auth_krb5(testdir, testenv, testlog)
358
359     finally:
360         with (open(testlog, 'a')) as logfile:
361             for name in processes:
362                 logfile.write("Killing %s\n" % name)
363                 os.killpg(processes[name].pid, signal.SIGTERM)