Don't need this function.
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/rad_assert.h>
29
30 /*
31  *      Other OS's have sem_init, OS X doesn't.
32  */
33 #ifdef HAVE_SEMAPHORE_H
34 #include <semaphore.h>
35 #endif
36
37 #ifdef DARWIN
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #ifdef HAVE_SYS_WAIT_H
52 #include <sys/wait.h>
53 #endif
54
55 #ifdef HAVE_PTHREAD_H
56
57 #ifdef HAVE_OPENSSL_CRYPTO_H
58 #include <openssl/crypto.h>
59 #endif
60 #ifdef HAVE_OPENSSL_ERR_H
61 #include <openssl/err.h>
62 #endif
63 #ifdef HAVE_OPENSSL_EVP_H
64 #include <openssl/evp.h>
65 #endif
66
67 #define SEMAPHORE_LOCKED        (0)
68 #define SEMAPHORE_UNLOCKED      (1)
69
70 #define THREAD_RUNNING          (1)
71 #define THREAD_CANCELLED        (2)
72 #define THREAD_EXITED           (3)
73
74 #define NUM_FIFOS               RAD_LISTEN_MAX
75
76
77 /*
78  *  A data structure which contains the information about
79  *  the current thread.
80  *
81  *  pthread_id     pthread id
82  *  thread_num     server thread number, 1...number of threads
83  *  semaphore     used to block the thread until a request comes in
84  *  status        is the thread running or exited?
85  *  request_count the number of requests that this thread has handled
86  *  timestamp     when the thread started executing.
87  */
88 typedef struct THREAD_HANDLE {
89         struct THREAD_HANDLE *prev;
90         struct THREAD_HANDLE *next;
91         pthread_t            pthread_id;
92         int                  thread_num;
93         int                  status;
94         unsigned int         request_count;
95         time_t               timestamp;
96         REQUEST              *request;
97 } THREAD_HANDLE;
98
99 /*
100  *      For the request queue.
101  */
102 typedef struct request_queue_t {
103         REQUEST           *request;
104         RAD_REQUEST_FUNP  fun;
105 } request_queue_t;
106
107 typedef struct thread_fork_t {
108         pid_t           pid;
109         int             status;
110         int             exited;
111 } thread_fork_t;
112
113
114 /*
115  *      A data structure to manage the thread pool.  There's no real
116  *      need for a data structure, but it makes things conceptually
117  *      easier.
118  */
119 typedef struct THREAD_POOL {
120         THREAD_HANDLE *head;
121         THREAD_HANDLE *tail;
122
123         int total_threads;
124         int active_threads;     /* protected by queue_mutex */
125         int max_thread_num;
126         int start_threads;
127         int max_threads;
128         int min_spare_threads;
129         int max_spare_threads;
130         unsigned int max_requests_per_thread;
131         unsigned long request_count;
132         time_t time_last_spawned;
133         int cleanup_delay;
134         int spawn_flag;
135
136         pthread_mutex_t wait_mutex;
137         fr_hash_table_t *waiters;
138
139         /*
140          *      All threads wait on this semaphore, for requests
141          *      to enter the queue.
142          */
143         sem_t           semaphore;
144
145         /*
146          *      To ensure only one thread at a time touches the queue.
147          */
148         pthread_mutex_t queue_mutex;
149
150         int             max_queue_size;
151         int             num_queued;
152         fr_fifo_t       *fifo[NUM_FIFOS];
153 } THREAD_POOL;
154
155 static THREAD_POOL thread_pool;
156 static int pool_initialized = FALSE;
157 static time_t last_cleaned = 0;
158
159 static void thread_pool_manage(time_t now);
160
161 /*
162  *      A mapping of configuration file names to internal integers
163  */
164 static const CONF_PARSER thread_config[] = {
165         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
166         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
167         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
168         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
169         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
170         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
171         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
172         { NULL, -1, 0, NULL, NULL }
173 };
174
175
176 #ifdef HAVE_OPENSSL_CRYPTO_H
177
178 /*
179  *      If we're linking against OpenSSL, then it is the
180  *      duty of the application, if it is multithreaded,
181  *      to provide OpenSSL with appropriate thread id
182  *      and mutex locking functions
183  *
184  *      Note: this only implements static callbacks.
185  *      OpenSSL does not use dynamic locking callbacks
186  *      right now, but may in the futiure, so we will have
187  *      to add them at some point.
188  */
189
190 static pthread_mutex_t *ssl_mutexes = NULL;
191
192 static unsigned long ssl_id_function(void)
193 {
194         return (unsigned long) pthread_self();
195 }
196
197 static void ssl_locking_function(int mode, int n, const char *file, int line)
198 {
199         file = file;            /* -Wunused */
200         line = line;            /* -Wunused */
201
202         if (mode & CRYPTO_LOCK) {
203                 pthread_mutex_lock(&(ssl_mutexes[n]));
204         } else {
205                 pthread_mutex_unlock(&(ssl_mutexes[n]));
206         }
207 }
208
209 static int setup_ssl_mutexes(void)
210 {
211         int i;
212
213 #ifdef HAVE_OPENSSL_EVP_H
214         /*
215          *      Enable all ciphers and digests.
216          */
217         OpenSSL_add_all_algorithms();
218 #endif
219
220         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
221         if (!ssl_mutexes) {
222                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
223                 return 0;
224         }
225
226         for (i = 0; i < CRYPTO_num_locks(); i++) {
227                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
228         }
229
230         CRYPTO_set_id_callback(ssl_id_function);
231         CRYPTO_set_locking_callback(ssl_locking_function);
232
233         return 1;
234 }
235 #endif
236
237
238 /*
239  *      We don't want to catch SIGCHLD for a host of reasons.
240  *
241  *      - exec_wait means that someone, somewhere, somewhen, will
242  *      call waitpid(), and catch the child.
243  *
244  *      - SIGCHLD is delivered to a random thread, not the one that
245  *      forked.
246  *
247  *      - if another thread catches the child, we have to coordinate
248  *      with the thread doing the waiting.
249  *
250  *      - if we don't waitpid() for non-wait children, they'll be zombies,
251  *      and will hang around forever.
252  *
253  */
254 static void reap_children(void)
255 {
256         pid_t pid;
257         int status;
258         thread_fork_t mytf, *tf;
259
260
261         pthread_mutex_lock(&thread_pool.wait_mutex);
262
263         do {
264                 pid = waitpid(0, &status, WNOHANG);
265                 if (pid <= 0) break;
266
267                 mytf.pid = pid;
268                 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
269                 if (!tf) continue;
270
271                 tf->status = status;
272                 tf->exited = 1;
273         } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
274
275         pthread_mutex_unlock(&thread_pool.wait_mutex);
276 }
277
278 /*
279  *      Add a request to the list of waiting requests.
280  *      This function gets called ONLY from the main handler thread...
281  *
282  *      This function should never fail.
283  */
284 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
285 {
286         request_queue_t *entry;
287
288         pthread_mutex_lock(&thread_pool.queue_mutex);
289
290         thread_pool.request_count++;
291
292         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
293                 pthread_mutex_unlock(&thread_pool.queue_mutex);
294
295                 /*
296                  *      Mark the request as done.
297                  */
298                 radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
299                 request->child_state = REQUEST_DONE;
300                 return 0;
301         }
302
303         entry = rad_malloc(sizeof(*entry));
304         entry->request = request;
305         entry->fun = fun;
306
307         /*
308          *      Push the request onto the appropriate fifo for that
309          */
310         if (!fr_fifo_push(thread_pool.fifo[request->priority],
311                             entry)) {
312                 pthread_mutex_unlock(&thread_pool.queue_mutex);
313                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
314                 request->child_state = REQUEST_DONE;
315                 return 0;
316         }
317
318         thread_pool.num_queued++;
319
320         pthread_mutex_unlock(&thread_pool.queue_mutex);
321
322         /*
323          *      There's one more request in the queue.
324          *
325          *      Note that we're not touching the queue any more, so
326          *      the semaphore post is outside of the mutex.  This also
327          *      means that when the thread wakes up and tries to lock
328          *      the mutex, it will be unlocked, and there won't be
329          *      contention.
330          */
331         sem_post(&thread_pool.semaphore);
332
333         return 1;
334 }
335
336 /*
337  *      Remove a request from the queue.
338  */
339 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
340 {
341         RAD_LISTEN_TYPE i, start;
342         request_queue_t *entry;
343
344         reap_children();
345
346         pthread_mutex_lock(&thread_pool.queue_mutex);
347
348         /*
349          *      Clear old requests from all queues.
350          *
351          *      We only do one pass over the queue, in order to
352          *      amortize the work across the child threads.  Since we
353          *      do N checks for one request de-queued, the old
354          *      requests will be quickly cleared.
355          */
356         for (i = 0; i < RAD_LISTEN_MAX; i++) {
357                 entry = fr_fifo_peek(thread_pool.fifo[i]);
358                 if (!entry ||
359                     (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
360                         continue;
361 }
362                 /*
363                  *      This entry was marked to be stopped.  Acknowledge it.
364                  */
365                 entry = fr_fifo_pop(thread_pool.fifo[i]);
366                 rad_assert(entry != NULL);
367                 entry->request->child_state = REQUEST_DONE;
368                 thread_pool.num_queued--;
369                 free(entry);
370         }
371
372         start = 0;
373  retry:
374         /*
375          *      Pop results from the top of the queue
376          */
377         for (i = start; i < RAD_LISTEN_MAX; i++) {
378                 entry = fr_fifo_pop(thread_pool.fifo[i]);
379                 if (entry) {
380                         start = i;
381                         break;
382                 }
383         }
384
385         if (!entry) {
386                 pthread_mutex_unlock(&thread_pool.queue_mutex);
387                 *request = NULL;
388                 *fun = NULL;
389                 return 0;
390         }
391
392         rad_assert(thread_pool.num_queued > 0);
393         thread_pool.num_queued--;
394         *request = entry->request;
395         *fun = entry->fun;
396         free(entry);
397
398         rad_assert(*request != NULL);
399         rad_assert((*request)->magic == REQUEST_MAGIC);
400         rad_assert(*fun != NULL);
401
402         /*
403          *      If the request has sat in the queue for too long,
404          *      kill it.
405          *
406          *      The main clean-up code can't delete the request from
407          *      the queue, and therefore won't clean it up until we
408          *      have acknowledged it as "done".
409          */
410         if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
411                 (*request)->child_state = REQUEST_DONE;
412                 goto retry;
413         }
414
415         /*
416          *      The thread is currently processing a request.
417          */
418         thread_pool.active_threads++;
419
420         pthread_mutex_unlock(&thread_pool.queue_mutex);
421
422         return 1;
423 }
424
425
426 /*
427  *      The main thread handler for requests.
428  *
429  *      Wait on the semaphore until we have it, and process the request.
430  */
431 static void *request_handler_thread(void *arg)
432 {
433         RAD_REQUEST_FUNP  fun;
434         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
435
436         /*
437          *      Loop forever, until told to exit.
438          */
439         do {
440                 /*
441                  *      Wait to be signalled.
442                  */
443                 DEBUG2("Thread %d waiting to be assigned a request",
444                        self->thread_num);
445         re_wait:
446                 if (sem_wait(&thread_pool.semaphore) != 0) {
447                         /*
448                          *      Interrupted system call.  Go back to
449                          *      waiting, but DON'T print out any more
450                          *      text.
451                          */
452                         if (errno == EINTR) {
453                                 DEBUG2("Re-wait %d", self->thread_num);
454                                 goto re_wait;
455                         }
456                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
457                                self->thread_num, strerror(errno));
458                         break;
459                 }
460
461                 DEBUG2("Thread %d got semaphore", self->thread_num);
462
463                 /*
464                  *      Try to grab a request from the queue.
465                  *
466                  *      It may be empty, in which case we fail
467                  *      gracefully.
468                  */
469                 if (!request_dequeue(&self->request, &fun)) continue;
470
471                 self->request->child_pid = self->pthread_id;
472                 self->request_count++;
473
474                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
475                        self->thread_num, self->request->number,
476                        self->request_count);
477
478                 radius_handle_request(self->request, fun);
479
480                 /*
481                  *      Update the active threads.
482                  */
483                 pthread_mutex_lock(&thread_pool.queue_mutex);
484                 rad_assert(thread_pool.active_threads > 0);
485                 thread_pool.active_threads--;
486                 pthread_mutex_unlock(&thread_pool.queue_mutex);
487         } while (self->status != THREAD_CANCELLED);
488
489         DEBUG2("Thread %d exiting...", self->thread_num);
490
491 #ifdef HAVE_OPENSSL_ERR_H
492         /*
493          *      If we linked with OpenSSL, the application
494          *      must remove the thread's error queue before
495          *      exiting to prevent memory leaks.
496          */
497         ERR_remove_state(0);
498 #endif
499
500         /*
501          *  Do this as the LAST thing before exiting.
502          */
503         self->request = NULL;
504         self->status = THREAD_EXITED;
505
506         return NULL;
507 }
508
509 /*
510  *      Take a THREAD_HANDLE, delete it from the thread pool and
511  *      free its resources.
512  *
513  *      This function is called ONLY from the main server thread,
514  *      ONLY after the thread has exited.
515  */
516 static void delete_thread(THREAD_HANDLE *handle)
517 {
518         THREAD_HANDLE *prev;
519         THREAD_HANDLE *next;
520
521         rad_assert(handle->request == NULL);
522
523         DEBUG2("Deleting thread %d", handle->thread_num);
524
525         prev = handle->prev;
526         next = handle->next;
527         rad_assert(thread_pool.total_threads > 0);
528         thread_pool.total_threads--;
529
530         /*
531          *      Remove the handle from the list.
532          */
533         if (prev == NULL) {
534                 rad_assert(thread_pool.head == handle);
535                 thread_pool.head = next;
536         } else {
537                 prev->next = next;
538         }
539
540         if (next == NULL) {
541                 rad_assert(thread_pool.tail == handle);
542                 thread_pool.tail = prev;
543         } else {
544                 next->prev = prev;
545         }
546
547         /*
548          *      Free the handle, now that it's no longer referencable.
549          */
550         free(handle);
551 }
552
553
554 /*
555  *      Spawn a new thread, and place it in the thread pool.
556  *
557  *      The thread is started initially in the blocked state, waiting
558  *      for the semaphore.
559  */
560 static THREAD_HANDLE *spawn_thread(time_t now)
561 {
562         int rcode;
563         THREAD_HANDLE *handle;
564         pthread_attr_t attr;
565
566         /*
567          *      Ensure that we don't spawn too many threads.
568          */
569         if (thread_pool.total_threads >= thread_pool.max_threads) {
570                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
571                 return NULL;
572         }
573
574         /*
575          *      Allocate a new thread handle.
576          */
577         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
578         memset(handle, 0, sizeof(THREAD_HANDLE));
579         handle->prev = NULL;
580         handle->next = NULL;
581         handle->pthread_id = NO_SUCH_CHILD_PID;
582         handle->thread_num = thread_pool.max_thread_num++;
583         handle->request_count = 0;
584         handle->status = THREAD_RUNNING;
585         handle->timestamp = time(NULL);
586
587         /*
588          *      Initialize the thread's attributes to detached.
589          *
590          *      We could call pthread_detach() later, but if the thread
591          *      exits between the create & detach calls, it will need to
592          *      be joined, which will never happen.
593          */
594         pthread_attr_init(&attr);
595         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
596
597         /*
598          *      Create the thread detached, so that it cleans up it's
599          *      own memory when it exits.
600          *
601          *      Note that the function returns non-zero on error, NOT
602          *      -1.  The return code is the error, and errno isn't set.
603          */
604         rcode = pthread_create(&handle->pthread_id, &attr,
605                         request_handler_thread, handle);
606         if (rcode != 0) {
607                 radlog(L_ERR, "Thread create failed: %s",
608                        strerror(rcode));
609                 return NULL;
610         }
611         pthread_attr_destroy(&attr);
612
613         /*
614          *      One more thread to go into the list.
615          */
616         thread_pool.total_threads++;
617         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
618                         handle->thread_num, thread_pool.total_threads);
619
620         /*
621          *      Add the thread handle to the tail of the thread pool list.
622          */
623         if (thread_pool.tail) {
624                 thread_pool.tail->next = handle;
625                 handle->prev = thread_pool.tail;
626                 thread_pool.tail = handle;
627         } else {
628                 rad_assert(thread_pool.head == NULL);
629                 thread_pool.head = thread_pool.tail = handle;
630         }
631
632         /*
633          *      Update the time we last spawned a thread.
634          */
635         thread_pool.time_last_spawned = now;
636
637         /*
638          *      And return the new handle to the caller.
639          */
640         return handle;
641 }
642
643 /*
644  *      Temporary function to prevent server from executing a SIGHUP
645  *      until all threads are finished handling requests.  This returns
646  *      the number of active threads to 'radiusd.c'.
647  */
648 int total_active_threads(void)
649 {
650         /*
651          *      We don't acquire the mutex, so this is just an estimate.
652          *      We can't return with the lock held, so there's no point
653          *      in getting the guaranteed correct value; by the time
654          *      the caller sees it, it can be wrong again.
655          */
656         return thread_pool.active_threads;
657 }
658
659
660 static uint32_t pid_hash(const void *data)
661 {
662         const thread_fork_t *tf = data;
663
664         return fr_hash(&tf->pid, sizeof(tf->pid));
665 }
666
667 static int pid_cmp(const void *one, const void *two)
668 {
669         const thread_fork_t *a = one;
670         const thread_fork_t *b = two;
671
672         return (a->pid - b->pid);
673 }
674
675 /*
676  *      Allocate the thread pool, and seed it with an initial number
677  *      of threads.
678  *
679  *      FIXME: What to do on a SIGHUP???
680  */
681 int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
682 {
683         int             i, rcode;
684         CONF_SECTION    *pool_cf;
685         time_t          now;
686
687         now = time(NULL);
688
689         /*
690          *      We're not spawning new threads, don't do
691          *      anything.
692          */
693         if (!spawn_flag) return 0;
694
695         /*
696          *      After a SIGHUP, we don't over-write the previous values.
697          */
698         if (!pool_initialized) {
699                 /*
700                  *      Initialize the thread pool to some reasonable values.
701                  */
702                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
703                 thread_pool.head = NULL;
704                 thread_pool.tail = NULL;
705                 thread_pool.total_threads = 0;
706                 thread_pool.max_thread_num = 1;
707                 thread_pool.cleanup_delay = 5;
708                 thread_pool.spawn_flag = spawn_flag;
709
710                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
711                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
712                                strerror(errno));
713                         return -1;
714                 }
715
716                 /*
717                  *      Create the hash table of child PID's
718                  */
719                 thread_pool.waiters = fr_hash_table_create(pid_hash,
720                                                              pid_cmp,
721                                                              free);
722                 if (!thread_pool.waiters) {
723                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
724                         return -1;
725                 }
726         }
727
728         pool_cf = cf_subsection_find_next(cs, NULL, "thread");
729         if (!pool_cf) {
730                 radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
731                 return -1;
732         }
733
734         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
735                 return -1;
736         }
737
738         /*
739          *      Catch corner cases.
740          */
741         if (thread_pool.min_spare_threads < 1)
742                 thread_pool.min_spare_threads = 1;
743         if (thread_pool.max_spare_threads < 1)
744                 thread_pool.max_spare_threads = 1;
745         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
746                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
747
748         /*
749          *      The pool has already been initialized.  Don't spawn
750          *      new threads, and don't forget about forked children,
751          */
752         if (pool_initialized) {
753                 return 0;
754         }
755
756         /*
757          *      Initialize the queue of requests.
758          */
759         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
760         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
761         if (rcode != 0) {
762                 radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
763                        strerror(errno));
764                 return -1;
765         }
766
767         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
768         if (rcode != 0) {
769                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
770                        strerror(errno));
771                 return -1;
772         }
773
774         /*
775          *      Allocate multiple fifos.
776          */
777         for (i = 0; i < RAD_LISTEN_MAX; i++) {
778                 thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
779                 if (!thread_pool.fifo[i]) {
780                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
781                         return -1;
782                 }
783         }
784
785 #ifdef HAVE_OPENSSL_CRYPTO_H
786         /*
787          *      If we're linking with OpenSSL too, then we need
788          *      to set up the mutexes and enable the thread callbacks.
789          */
790         if (!setup_ssl_mutexes()) {
791                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
792                 return -1;
793         }
794 #endif
795
796
797         /*
798          *      Create a number of waiting threads.
799          *
800          *      If we fail while creating them, do something intelligent.
801          */
802         for (i = 0; i < thread_pool.start_threads; i++) {
803                 if (spawn_thread(now) == NULL) {
804                         return -1;
805                 }
806         }
807
808         DEBUG2("Thread pool initialized");
809         pool_initialized = TRUE;
810         return 0;
811 }
812
813
814 /*
815  *      Assign a new request to a free thread.
816  *
817  *      If there isn't a free thread, then try to create a new one,
818  *      up to the configured limits.
819  */
820 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
821 {
822         time_t now = request->timestamp;
823
824         /*
825          *      We've been told not to spawn threads, so don't.
826          */
827         if (!thread_pool.spawn_flag) {
828                 radius_handle_request(request, fun);
829
830                 /*
831                  *      Requests that care about child process exit
832                  *      codes have already either called
833                  *      rad_waitpid(), or they've given up.
834                  */
835                 wait(NULL);
836                 return 1;
837         }
838
839         /*
840          *      Add the new request to the queue.
841          */
842         if (!request_enqueue(request, fun)) return 0;
843
844         /*
845          *      If we haven't checked the number of child threads
846          *      in a while, OR if the thread pool appears to be full,
847          *      go manage it.
848          */
849         if ((last_cleaned < now) ||
850             (thread_pool.active_threads == thread_pool.total_threads)) {
851                 thread_pool_manage(now);
852         }
853
854         return 1;
855 }
856
857 /*
858  *      Check the min_spare_threads and max_spare_threads.
859  *
860  *      If there are too many or too few threads waiting, then we
861  *      either create some more, or delete some.
862  */
863 static void thread_pool_manage(time_t now)
864 {
865         int spare;
866         int i, total;
867         THREAD_HANDLE *handle, *next;
868         int active_threads;
869
870         /*
871          *      We don't need a mutex lock here, as we're reading
872          *      active_threads, and not modifying it.  We want a close
873          *      approximation of the number of active threads, and this
874          *      is good enough.
875          */
876         active_threads = thread_pool.active_threads;
877         spare = thread_pool.total_threads - active_threads;
878         if (debug_flag) {
879                 static int old_total = -1;
880                 static int old_active = -1;
881
882                 if ((old_total != thread_pool.total_threads) ||
883                                 (old_active != active_threads)) {
884                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
885                                         thread_pool.total_threads, active_threads, spare);
886                         old_total = thread_pool.total_threads;
887                         old_active = active_threads;
888                 }
889         }
890
891         /*
892          *      If there are too few spare threads.  Go create some more.
893          */
894         if (spare < thread_pool.min_spare_threads) {
895                 total = thread_pool.min_spare_threads - spare;
896
897                 DEBUG2("Threads: Spawning %d spares", total);
898
899                 /*
900                  *      Create a number of spare threads.
901                  */
902                 for (i = 0; i < total; i++) {
903                         handle = spawn_thread(now);
904                         if (handle == NULL) {
905                                 return;
906                         }
907                 }
908
909                 return;         /* there aren't too many spare threads */
910         }
911
912         /*
913          *      Only delete spare threads if we haven't already done
914          *      so this second.
915          */
916         if (now == last_cleaned) {
917                 return;
918         }
919         last_cleaned = now;
920
921         /*
922          *      Loop over the thread pool, deleting exited threads.
923          */
924         for (handle = thread_pool.head; handle; handle = next) {
925                 next = handle->next;
926
927                 /*
928                  *      Maybe we've asked the thread to exit, and it
929                  *      has agreed.
930                  */
931                 if (handle->status == THREAD_EXITED) {
932                         delete_thread(handle);
933                 }
934         }
935
936         /*
937          *      Only delete the spare threads if sufficient time has
938          *      passed since we last created one.  This helps to minimize
939          *      the amount of create/delete cycles.
940          */
941         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
942                 return;
943         }
944
945         /*
946          *      If there are too many spare threads, delete one.
947          *
948          *      Note that we only delete ONE at a time, instead of
949          *      wiping out many.  This allows the excess servers to
950          *      be slowly reaped, just in case the load spike comes again.
951          */
952         if (spare > thread_pool.max_spare_threads) {
953
954                 spare -= thread_pool.max_spare_threads;
955
956                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
957
958                 /*
959                  *      Walk through the thread pool, deleting the
960                  *      first idle thread we come across.
961                  */
962                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
963                         next = handle->next;
964
965                         /*
966                          *      If the thread is not handling a
967                          *      request, but still live, then tell it
968                          *      to exit.
969                          *
970                          *      It will eventually wake up, and realize
971                          *      it's been told to commit suicide.
972                          */
973                         if ((handle->request == NULL) &&
974                             (handle->status == THREAD_RUNNING)) {
975                                 handle->status = THREAD_CANCELLED;
976                                 /*
977                                  *      Post an extra semaphore, as a
978                                  *      signal to wake up, and exit.
979                                  */
980                                 sem_post(&thread_pool.semaphore);
981                                 spare--;
982                                 break;
983                         }
984                 }
985         }
986
987         /*
988          *      If the thread has handled too many requests, then make it
989          *      exit.
990          */
991         if (thread_pool.max_requests_per_thread > 0) {
992                 for (handle = thread_pool.head; handle; handle = next) {
993                         next = handle->next;
994
995                         /*
996                          *      Not handling a request, but otherwise
997                          *      live, we can kill it.
998                          */
999                         if ((handle->request == NULL) &&
1000                             (handle->status == THREAD_RUNNING) &&
1001                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1002                                 handle->status = THREAD_CANCELLED;
1003                                 sem_post(&thread_pool.semaphore);
1004                         }
1005                 }
1006         }
1007
1008         /*
1009          *      Otherwise everything's kosher.  There are not too few,
1010          *      or too many spare threads.  Exit happily.
1011          */
1012         return;
1013 }
1014
1015
1016 /*
1017  *      Thread wrapper for fork().
1018  */
1019 pid_t rad_fork(void)
1020 {
1021         pid_t child_pid;
1022
1023         if (!pool_initialized) return fork();
1024
1025         reap_children();        /* be nice to non-wait thingies */
1026
1027         if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1028                 return -1;
1029         }
1030
1031         /*
1032          *      Fork & save the PID for later reaping.
1033          */
1034         child_pid = fork();
1035         if (child_pid > 0) {
1036                 int rcode;
1037                 thread_fork_t *tf;
1038
1039                 tf = rad_malloc(sizeof(*tf));
1040                 memset(tf, 0, sizeof(*tf));
1041
1042                 tf->pid = child_pid;
1043
1044                 pthread_mutex_lock(&thread_pool.wait_mutex);
1045                 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1046                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1047
1048                 if (!rcode) {
1049                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1050                                (int) child_pid);
1051                 }
1052         }
1053
1054         /*
1055          *      Return whatever we were told.
1056          */
1057         return child_pid;
1058 }
1059
1060
1061 /*
1062  *      Wait 10 seconds at most for a child to exit, then give up.
1063  */
1064 pid_t rad_waitpid(pid_t pid, int *status)
1065 {
1066         int i;
1067         thread_fork_t mytf, *tf;
1068
1069         if (!pool_initialized) return waitpid(pid, status, 0);
1070
1071         if (pid <= 0) return -1;
1072
1073         mytf.pid = pid;
1074
1075         pthread_mutex_lock(&thread_pool.wait_mutex);
1076         tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1077         pthread_mutex_unlock(&thread_pool.wait_mutex);
1078
1079         if (!tf) return -1;
1080
1081         for (i = 0; i < 100; i++) {
1082                 reap_children();
1083
1084                 if (tf->exited) {
1085                         *status = tf->status;
1086
1087                         pthread_mutex_lock(&thread_pool.wait_mutex);
1088                         fr_hash_table_delete(thread_pool.waiters, &mytf);
1089                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1090                         return pid;
1091                 }
1092                 usleep(100000); /* sleep for 1/10 of a second */
1093         }
1094
1095         /*
1096          *      10 seconds have passed, give up on the child.
1097          */
1098         pthread_mutex_lock(&thread_pool.wait_mutex);
1099         fr_hash_table_delete(thread_pool.waiters, &mytf);
1100         pthread_mutex_unlock(&thread_pool.wait_mutex);
1101
1102         return 0;
1103 }
1104
1105 void thread_pool_lock(void)
1106 {
1107         pthread_mutex_lock(&thread_pool.queue_mutex);
1108 }
1109
1110 void thread_pool_unlock(void)
1111 {
1112         pthread_mutex_unlock(&thread_pool.queue_mutex);
1113 }
1114 #endif /* HAVE_PTHREAD_H */