Moved nearly all remaining code out of radiusd.c, and into
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/rad_assert.h>
29
30 /*
31  *      Other OS's have sem_init, OS X doesn't.
32  */
33 #ifdef HAVE_SEMAPHORE_H
34 #include <semaphore.h>
35 #endif
36
37 #ifdef DARWIN
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #ifdef HAVE_SYS_WAIT_H
52 #include <sys/wait.h>
53 #endif
54
55 #ifdef HAVE_PTHREAD_H
56
57 #ifdef HAVE_OPENSSL_CRYPTO_H
58 #include <openssl/crypto.h>
59 #endif
60 #ifdef HAVE_OPENSSL_ERR_H
61 #include <openssl/err.h>
62 #endif
63
64 #define SEMAPHORE_LOCKED        (0)
65 #define SEMAPHORE_UNLOCKED      (1)
66
67 #define THREAD_RUNNING          (1)
68 #define THREAD_CANCELLED        (2)
69 #define THREAD_EXITED           (3)
70
71 #define NUM_FIFOS               RAD_LISTEN_MAX
72
73
74 /*
75  *  A data structure which contains the information about
76  *  the current thread.
77  *
78  *  pthread_id     pthread id
79  *  thread_num     server thread number, 1...number of threads
80  *  semaphore     used to block the thread until a request comes in
81  *  status        is the thread running or exited?
82  *  request_count the number of requests that this thread has handled
83  *  timestamp     when the thread started executing.
84  */
85 typedef struct THREAD_HANDLE {
86         struct THREAD_HANDLE *prev;
87         struct THREAD_HANDLE *next;
88         pthread_t            pthread_id;
89         int                  thread_num;
90         int                  status;
91         unsigned int         request_count;
92         time_t               timestamp;
93         REQUEST              *request;
94 } THREAD_HANDLE;
95
96 /*
97  *      For the request queue.
98  */
99 typedef struct request_queue_t {
100         REQUEST           *request;
101         RAD_REQUEST_FUNP  fun;
102 } request_queue_t;
103
104 typedef struct thread_fork_t {
105         pid_t           pid;
106         int             status;
107         int             exited;
108 } thread_fork_t;
109
110
111 /*
112  *      A data structure to manage the thread pool.  There's no real
113  *      need for a data structure, but it makes things conceptually
114  *      easier.
115  */
116 typedef struct THREAD_POOL {
117         THREAD_HANDLE *head;
118         THREAD_HANDLE *tail;
119
120         int total_threads;
121         int active_threads;     /* protected by queue_mutex */
122         int max_thread_num;
123         int start_threads;
124         int max_threads;
125         int min_spare_threads;
126         int max_spare_threads;
127         unsigned int max_requests_per_thread;
128         unsigned long request_count;
129         time_t time_last_spawned;
130         int cleanup_delay;
131         int spawn_flag;
132
133         pthread_mutex_t wait_mutex;
134         lrad_hash_table_t *waiters;
135
136         /*
137          *      All threads wait on this semaphore, for requests
138          *      to enter the queue.
139          */
140         sem_t           semaphore;
141
142         /*
143          *      To ensure only one thread at a time touches the queue.
144          */
145         pthread_mutex_t queue_mutex;
146
147         int             max_queue_size;
148         int             num_queued;
149         int             can_read_detail;
150         lrad_fifo_t     *fifo[NUM_FIFOS];
151 } THREAD_POOL;
152
153 static THREAD_POOL thread_pool;
154 static int pool_initialized = FALSE;
155 static time_t last_cleaned = 0;
156
157 static void thread_pool_manage(time_t now);
158
159 /*
160  *      A mapping of configuration file names to internal integers
161  */
162 static const CONF_PARSER thread_config[] = {
163         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
164         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
165         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
166         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
167         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
168         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
169         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
170         { NULL, -1, 0, NULL, NULL }
171 };
172
173
174 #ifdef HAVE_OPENSSL_CRYPTO_H
175
176 /*
177  *      If we're linking against OpenSSL, then it is the
178  *      duty of the application, if it is multithreaded,
179  *      to provide OpenSSL with appropriate thread id
180  *      and mutex locking functions
181  *
182  *      Note: this only implements static callbacks.
183  *      OpenSSL does not use dynamic locking callbacks
184  *      right now, but may in the futiure, so we will have
185  *      to add them at some point.
186  */
187
188 static pthread_mutex_t *ssl_mutexes = NULL;
189
190 static unsigned long ssl_id_function(void)
191 {
192         return (unsigned long) pthread_self();
193 }
194
195 static void ssl_locking_function(int mode, int n, const char *file, int line)
196 {
197         file = file;            /* -Wunused */
198         line = line;            /* -Wunused */
199
200         if (mode & CRYPTO_LOCK) {
201                 pthread_mutex_lock(&(ssl_mutexes[n]));
202         } else {
203                 pthread_mutex_unlock(&(ssl_mutexes[n]));
204         }
205 }
206
207 static int setup_ssl_mutexes(void)
208 {
209         int i;
210
211         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
212         if (!ssl_mutexes) {
213                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
214                 return 0;
215         }
216
217         for (i = 0; i < CRYPTO_num_locks(); i++) {
218                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
219         }
220
221         CRYPTO_set_id_callback(ssl_id_function);
222         CRYPTO_set_locking_callback(ssl_locking_function);
223
224         return 1;
225 }
226 #endif
227
228
229 /*
230  *      We don't want to catch SIGCHLD for a host of reasons.
231  *
232  *      - exec_wait means that someone, somewhere, somewhen, will
233  *      call waitpid(), and catch the child.
234  *
235  *      - SIGCHLD is delivered to a random thread, not the one that
236  *      forked.
237  *
238  *      - if another thread catches the child, we have to coordinate
239  *      with the thread doing the waiting.
240  *
241  *      - if we don't waitpid() for non-wait children, they'll be zombies,
242  *      and will hang around forever.
243  *
244  */
245 static void reap_children(void)
246 {
247         pid_t pid;
248         int status;
249         thread_fork_t mytf, *tf;
250
251
252         pthread_mutex_lock(&thread_pool.wait_mutex);
253
254         do {
255                 pid = waitpid(0, &status, WNOHANG);
256                 if (pid <= 0) break;
257
258                 mytf.pid = pid;
259                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
260                 if (!tf) continue;
261
262                 tf->status = status;
263                 tf->exited = 1;
264         } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
265
266         pthread_mutex_unlock(&thread_pool.wait_mutex);
267 }
268
269 /*
270  *      Add a request to the list of waiting requests.
271  *      This function gets called ONLY from the main handler thread...
272  *
273  *      This function should never fail.
274  */
275 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
276 {
277         request_queue_t *entry;
278
279         pthread_mutex_lock(&thread_pool.queue_mutex);
280
281         thread_pool.request_count++;
282
283         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
284                 pthread_mutex_unlock(&thread_pool.queue_mutex);
285
286                 /*
287                  *      Mark the request as done.
288                  */
289                 radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
290                 request->child_state = REQUEST_DONE;
291                 return 0;
292         }
293
294         entry = rad_malloc(sizeof(*entry));
295         entry->request = request;
296         entry->fun = fun;
297
298         /*
299          *      Push the request onto the appropriate fifo for that
300          */
301         if (!lrad_fifo_push(thread_pool.fifo[request->priority],
302                             entry)) {
303                 pthread_mutex_unlock(&thread_pool.queue_mutex);
304                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
305                 request->child_state = REQUEST_DONE;
306                 return 0;
307         }
308
309         /*
310          *      We've added an entry that didn't come from the detail
311          *      file.  Note that the child thread should signal the
312          *      main worker thread again when the queue becomes empty.
313          */
314         if (request->listener->type != RAD_LISTEN_DETAIL) {
315                 thread_pool.can_read_detail = FALSE;
316         }
317
318         thread_pool.num_queued++;
319
320         pthread_mutex_unlock(&thread_pool.queue_mutex);
321
322         /*
323          *      There's one more request in the queue.
324          *
325          *      Note that we're not touching the queue any more, so
326          *      the semaphore post is outside of the mutex.  This also
327          *      means that when the thread wakes up and tries to lock
328          *      the mutex, it will be unlocked, and there won't be
329          *      contention.
330          */
331         sem_post(&thread_pool.semaphore);
332
333         return 1;
334 }
335
336 /*
337  *      Remove a request from the queue.
338  */
339 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
340 {
341         RAD_LISTEN_TYPE i, start;
342         request_queue_t *entry;
343
344         reap_children();
345
346         pthread_mutex_lock(&thread_pool.queue_mutex);
347
348         /*
349          *      Clear old requests from all queues.
350          *
351          *      We only do one pass over the queue, in order to
352          *      amortize the work across the child threads.  Since we
353          *      do N checks for one request de-queued, the old
354          *      requests will be quickly cleared.
355          */
356         for (i = 0; i < RAD_LISTEN_MAX; i++) {
357                 entry = lrad_fifo_peek(thread_pool.fifo[i]);
358                 if (!entry ||
359                     (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
360                         continue;
361 }
362                 /*
363                  *      This entry was marked to be stopped.  Acknowledge it.
364                  */
365                 entry = lrad_fifo_pop(thread_pool.fifo[i]);
366                 rad_assert(entry != NULL);
367                 entry->request->child_state = REQUEST_DONE;
368         }
369
370         start = 0;
371  retry:
372         /*
373          *      Pop results from the top of the queue
374          */
375         for (i = start; i < RAD_LISTEN_MAX; i++) {
376                 entry = lrad_fifo_pop(thread_pool.fifo[i]);
377                 if (entry) {
378                         start = i;
379                         break;
380                 }
381         }
382
383         if (!entry) {
384                 pthread_mutex_unlock(&thread_pool.queue_mutex);
385                 *request = NULL;
386                 *fun = NULL;
387                 return 0;
388         }
389
390         rad_assert(thread_pool.num_queued > 0);
391         thread_pool.num_queued--;
392         *request = entry->request;
393         *fun = entry->fun;
394         free(entry);
395
396         rad_assert(*request != NULL);
397         rad_assert((*request)->magic == REQUEST_MAGIC);
398         rad_assert(*fun != NULL);
399
400         /*
401          *      If the request has sat in the queue for too long,
402          *      kill it.
403          *
404          *      The main clean-up code can't delete the request from
405          *      the queue, and therefore won't clean it up until we
406          *      have acknowledged it as "done".
407          */
408         if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
409                 (*request)->child_state = REQUEST_DONE;
410                 goto retry;
411         }
412
413         /*
414          *      The thread is currently processing a request.
415          */
416         thread_pool.active_threads++;
417
418         pthread_mutex_unlock(&thread_pool.queue_mutex);
419
420         return 1;
421 }
422
423
424 /*
425  *      The main thread handler for requests.
426  *
427  *      Wait on the semaphore until we have it, and process the request.
428  */
429 static void *request_handler_thread(void *arg)
430 {
431         RAD_REQUEST_FUNP  fun;
432         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
433
434         /*
435          *      Loop forever, until told to exit.
436          */
437         do {
438                 int can_read_detail;
439
440                 /*
441                  *      Wait to be signalled.
442                  */
443                 DEBUG2("Thread %d waiting to be assigned a request",
444                        self->thread_num);
445         re_wait:
446                 if (sem_wait(&thread_pool.semaphore) != 0) {
447                         /*
448                          *      Interrupted system call.  Go back to
449                          *      waiting, but DON'T print out any more
450                          *      text.
451                          */
452                         if (errno == EINTR) {
453                                 DEBUG2("Re-wait %d", self->thread_num);
454                                 goto re_wait;
455                         }
456                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
457                                self->thread_num, strerror(errno));
458                         break;
459                 }
460
461                 DEBUG2("Thread %d got semaphore", self->thread_num);
462
463                 /*
464                  *      Try to grab a request from the queue.
465                  *
466                  *      It may be empty, in which case we fail
467                  *      gracefully.
468                  */
469                 if (!request_dequeue(&self->request, &fun)) continue;
470
471                 self->request->child_pid = self->pthread_id;
472                 self->request_count++;
473
474                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
475                        self->thread_num, self->request->number,
476                        self->request_count);
477
478                 radius_handle_request(self->request, fun);
479
480                 /*
481                  *      Update the active threads.
482                  */
483                 pthread_mutex_lock(&thread_pool.queue_mutex);
484                 rad_assert(thread_pool.active_threads > 0);
485                 thread_pool.active_threads--;
486
487                 /*
488                  *      If we're not currently allowed to read the
489                  *      detail file, AND there are no requests queued,
490                  *      THEN signal the main worker thread that
491                  *      there's at least one waiting thread (us) who
492                  *      can accept a packet from the detail file.
493                  */
494                 can_read_detail = FALSE;
495                 if (!thread_pool.can_read_detail &&
496                     (thread_pool.num_queued == 0)) {
497                         can_read_detail = TRUE;
498                 }
499
500                 pthread_mutex_unlock(&thread_pool.queue_mutex);
501
502                 /*
503                  *      Do this out of the lock to be nice to everyone.
504                  */
505                 if (can_read_detail) {
506                         radius_signal_self(RADIUS_SIGNAL_SELF_DETAIL);
507                 }
508
509         } while (self->status != THREAD_CANCELLED);
510
511         DEBUG2("Thread %d exiting...", self->thread_num);
512
513 #ifdef HAVE_OPENSSL_ERR_H
514         /*
515          *      If we linked with OpenSSL, the application
516          *      must remove the thread's error queue before
517          *      exiting to prevent memory leaks.
518          */
519         ERR_remove_state(0);
520 #endif
521
522         /*
523          *  Do this as the LAST thing before exiting.
524          */
525         self->request = NULL;
526         self->status = THREAD_EXITED;
527
528         return NULL;
529 }
530
531 /*
532  *      Take a THREAD_HANDLE, delete it from the thread pool and
533  *      free its resources.
534  *
535  *      This function is called ONLY from the main server thread,
536  *      ONLY after the thread has exited.
537  */
538 static void delete_thread(THREAD_HANDLE *handle)
539 {
540         THREAD_HANDLE *prev;
541         THREAD_HANDLE *next;
542
543         rad_assert(handle->request == NULL);
544
545         DEBUG2("Deleting thread %d", handle->thread_num);
546
547         prev = handle->prev;
548         next = handle->next;
549         rad_assert(thread_pool.total_threads > 0);
550         thread_pool.total_threads--;
551
552         /*
553          *      Remove the handle from the list.
554          */
555         if (prev == NULL) {
556                 rad_assert(thread_pool.head == handle);
557                 thread_pool.head = next;
558         } else {
559                 prev->next = next;
560         }
561
562         if (next == NULL) {
563                 rad_assert(thread_pool.tail == handle);
564                 thread_pool.tail = prev;
565         } else {
566                 next->prev = prev;
567         }
568
569         /*
570          *      Free the handle, now that it's no longer referencable.
571          */
572         free(handle);
573 }
574
575
576 /*
577  *      Spawn a new thread, and place it in the thread pool.
578  *
579  *      The thread is started initially in the blocked state, waiting
580  *      for the semaphore.
581  */
582 static THREAD_HANDLE *spawn_thread(time_t now)
583 {
584         int rcode;
585         THREAD_HANDLE *handle;
586         pthread_attr_t attr;
587
588         /*
589          *      Ensure that we don't spawn too many threads.
590          */
591         if (thread_pool.total_threads >= thread_pool.max_threads) {
592                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
593                 return NULL;
594         }
595
596         /*
597          *      Allocate a new thread handle.
598          */
599         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
600         memset(handle, 0, sizeof(THREAD_HANDLE));
601         handle->prev = NULL;
602         handle->next = NULL;
603         handle->pthread_id = NO_SUCH_CHILD_PID;
604         handle->thread_num = thread_pool.max_thread_num++;
605         handle->request_count = 0;
606         handle->status = THREAD_RUNNING;
607         handle->timestamp = time(NULL);
608
609         /*
610          *      Initialize the thread's attributes to detached.
611          *
612          *      We could call pthread_detach() later, but if the thread
613          *      exits between the create & detach calls, it will need to
614          *      be joined, which will never happen.
615          */
616         pthread_attr_init(&attr);
617         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
618
619         /*
620          *      Create the thread detached, so that it cleans up it's
621          *      own memory when it exits.
622          *
623          *      Note that the function returns non-zero on error, NOT
624          *      -1.  The return code is the error, and errno isn't set.
625          */
626         rcode = pthread_create(&handle->pthread_id, &attr,
627                         request_handler_thread, handle);
628         if (rcode != 0) {
629                 radlog(L_ERR, "Thread create failed: %s",
630                        strerror(rcode));
631                 return NULL;
632         }
633         pthread_attr_destroy(&attr);
634
635         /*
636          *      One more thread to go into the list.
637          */
638         thread_pool.total_threads++;
639         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
640                         handle->thread_num, thread_pool.total_threads);
641
642         /*
643          *      Add the thread handle to the tail of the thread pool list.
644          */
645         if (thread_pool.tail) {
646                 thread_pool.tail->next = handle;
647                 handle->prev = thread_pool.tail;
648                 thread_pool.tail = handle;
649         } else {
650                 rad_assert(thread_pool.head == NULL);
651                 thread_pool.head = thread_pool.tail = handle;
652         }
653
654         /*
655          *      Update the time we last spawned a thread.
656          */
657         thread_pool.time_last_spawned = now;
658
659         /*
660          *      And return the new handle to the caller.
661          */
662         return handle;
663 }
664
665 /*
666  *      Temporary function to prevent server from executing a SIGHUP
667  *      until all threads are finished handling requests.  This returns
668  *      the number of active threads to 'radiusd.c'.
669  */
670 int total_active_threads(void)
671 {
672         /*
673          *      We don't acquire the mutex, so this is just an estimate.
674          *      We can't return with the lock held, so there's no point
675          *      in getting the guaranteed correct value; by the time
676          *      the caller sees it, it can be wrong again.
677          */
678         return thread_pool.active_threads;
679 }
680
681
682 static uint32_t pid_hash(const void *data)
683 {
684         const thread_fork_t *tf = data;
685
686         return lrad_hash(&tf->pid, sizeof(tf->pid));
687 }
688
689 static int pid_cmp(const void *one, const void *two)
690 {
691         const thread_fork_t *a = one;
692         const thread_fork_t *b = two;
693
694         return (a->pid - b->pid);
695 }
696
697 /*
698  *      Allocate the thread pool, and seed it with an initial number
699  *      of threads.
700  *
701  *      FIXME: What to do on a SIGHUP???
702  */
703 int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
704 {
705         int             i, rcode;
706         CONF_SECTION    *pool_cf;
707         time_t          now;
708
709         DEBUG("Initializing the thread pool...");
710         now = time(NULL);
711
712         /*
713          *      We're not spawning new threads, don't do
714          *      anything.
715          */
716         if (!spawn_flag) return 0;
717
718         /*
719          *      After a SIGHUP, we don't over-write the previous values.
720          */
721         if (!pool_initialized) {
722                 /*
723                  *      Initialize the thread pool to some reasonable values.
724                  */
725                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
726                 thread_pool.head = NULL;
727                 thread_pool.tail = NULL;
728                 thread_pool.total_threads = 0;
729                 thread_pool.max_thread_num = 1;
730                 thread_pool.cleanup_delay = 5;
731                 thread_pool.spawn_flag = spawn_flag;
732
733                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
734                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
735                                strerror(errno));
736                         return -1;
737                 }
738
739                 /*
740                  *      Create the hash table of child PID's
741                  */
742                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
743                                                              pid_cmp,
744                                                              free);
745                 if (!thread_pool.waiters) {
746                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
747                         return -1;
748                 }
749         }
750
751         pool_cf = cf_subsection_find_next(cs, NULL, "thread");
752         if (!pool_cf) {
753                 radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
754                 return -1;
755         }
756
757         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
758                 return -1;
759         }
760
761         /*
762          *      Catch corner cases.
763          */
764         if (thread_pool.min_spare_threads < 1)
765                 thread_pool.min_spare_threads = 1;
766         if (thread_pool.max_spare_threads < 1)
767                 thread_pool.max_spare_threads = 1;
768         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
769                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
770
771         /*
772          *      The pool has already been initialized.  Don't spawn
773          *      new threads, and don't forget about forked children,
774          */
775         if (pool_initialized) {
776                 return 0;
777         }
778
779         /*
780          *      Initialize the queue of requests.
781          */
782         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
783         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
784         if (rcode != 0) {
785                 radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
786                        strerror(errno));
787                 return -1;
788         }
789
790         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
791         if (rcode != 0) {
792                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
793                        strerror(errno));
794                 return -1;
795         }
796
797         /*
798          *      Allocate multiple fifos.
799          */
800         for (i = 0; i < RAD_LISTEN_MAX; i++) {
801                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
802                 if (!thread_pool.fifo[i]) {
803                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
804                         return -1;
805                 }
806         }
807
808 #ifdef HAVE_OPENSSL_CRYPTO_H
809         /*
810          *      If we're linking with OpenSSL too, then we need
811          *      to set up the mutexes and enable the thread callbacks.
812          */
813         if (!setup_ssl_mutexes()) {
814                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
815                 return -1;
816         }
817 #endif
818
819
820         /*
821          *      Create a number of waiting threads.
822          *
823          *      If we fail while creating them, do something intelligent.
824          */
825         for (i = 0; i < thread_pool.start_threads; i++) {
826                 if (spawn_thread(now) == NULL) {
827                         return -1;
828                 }
829         }
830
831         DEBUG2("Thread pool initialized");
832         pool_initialized = TRUE;
833         return 0;
834 }
835
836
837 /*
838  *      Assign a new request to a free thread.
839  *
840  *      If there isn't a free thread, then try to create a new one,
841  *      up to the configured limits.
842  */
843 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
844 {
845         time_t now = request->timestamp;
846
847         /*
848          *      We've been told not to spawn threads, so don't.
849          */
850         if (!thread_pool.spawn_flag) {
851                 radius_handle_request(request, fun);
852
853                 /*
854                  *      Requests that care about child process exit
855                  *      codes have already either called
856                  *      rad_waitpid(), or they've given up.
857                  */
858                 wait(NULL);
859                 return 1;
860         }
861
862         /*
863          *      Add the new request to the queue.
864          */
865         if (!request_enqueue(request, fun)) return 0;
866
867         /*
868          *      If we haven't checked the number of child threads
869          *      in a while, OR if the thread pool appears to be full,
870          *      go manage it.
871          */
872         if ((last_cleaned < now) ||
873             (thread_pool.active_threads == thread_pool.total_threads)) {
874                 thread_pool_manage(now);
875         }
876
877         return 1;
878 }
879
880 /*
881  *      Check the min_spare_threads and max_spare_threads.
882  *
883  *      If there are too many or too few threads waiting, then we
884  *      either create some more, or delete some.
885  */
886 static void thread_pool_manage(time_t now)
887 {
888         int spare;
889         int i, total;
890         THREAD_HANDLE *handle, *next;
891         int active_threads;
892
893         /*
894          *      We don't need a mutex lock here, as we're reading
895          *      active_threads, and not modifying it.  We want a close
896          *      approximation of the number of active threads, and this
897          *      is good enough.
898          */
899         active_threads = thread_pool.active_threads;
900         spare = thread_pool.total_threads - active_threads;
901         if (debug_flag) {
902                 static int old_total = -1;
903                 static int old_active = -1;
904
905                 if ((old_total != thread_pool.total_threads) ||
906                                 (old_active != active_threads)) {
907                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
908                                         thread_pool.total_threads, active_threads, spare);
909                         old_total = thread_pool.total_threads;
910                         old_active = active_threads;
911                 }
912         }
913
914         /*
915          *      If there are too few spare threads.  Go create some more.
916          */
917         if (spare < thread_pool.min_spare_threads) {
918                 total = thread_pool.min_spare_threads - spare;
919
920                 DEBUG2("Threads: Spawning %d spares", total);
921                 /*
922                  *      Create a number of spare threads.
923                  */
924                 for (i = 0; i < total; i++) {
925                         handle = spawn_thread(now);
926                         if (handle == NULL) {
927                                 return;
928                         }
929                 }
930
931                 return;         /* there aren't too many spare threads */
932         }
933
934         /*
935          *      Only delete spare threads if we haven't already done
936          *      so this second.
937          */
938         if (now == last_cleaned) {
939                 return;
940         }
941         last_cleaned = now;
942
943         /*
944          *      Loop over the thread pool, deleting exited threads.
945          */
946         for (handle = thread_pool.head; handle; handle = next) {
947                 next = handle->next;
948
949                 /*
950                  *      Maybe we've asked the thread to exit, and it
951                  *      has agreed.
952                  */
953                 if (handle->status == THREAD_EXITED) {
954                         delete_thread(handle);
955                 }
956         }
957
958         /*
959          *      Only delete the spare threads if sufficient time has
960          *      passed since we last created one.  This helps to minimize
961          *      the amount of create/delete cycles.
962          */
963         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
964                 return;
965         }
966
967         /*
968          *      If there are too many spare threads, delete one.
969          *
970          *      Note that we only delete ONE at a time, instead of
971          *      wiping out many.  This allows the excess servers to
972          *      be slowly reaped, just in case the load spike comes again.
973          */
974         if (spare > thread_pool.max_spare_threads) {
975
976                 spare -= thread_pool.max_spare_threads;
977
978                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
979
980                 /*
981                  *      Walk through the thread pool, deleting the
982                  *      first idle thread we come across.
983                  */
984                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
985                         next = handle->next;
986
987                         /*
988                          *      If the thread is not handling a
989                          *      request, but still live, then tell it
990                          *      to exit.
991                          *
992                          *      It will eventually wake up, and realize
993                          *      it's been told to commit suicide.
994                          */
995                         if ((handle->request == NULL) &&
996                             (handle->status == THREAD_RUNNING)) {
997                                 handle->status = THREAD_CANCELLED;
998                                 /*
999                                  *      Post an extra semaphore, as a
1000                                  *      signal to wake up, and exit.
1001                                  */
1002                                 sem_post(&thread_pool.semaphore);
1003                                 spare--;
1004                                 break;
1005                         }
1006                 }
1007         }
1008
1009         /*
1010          *      If the thread has handled too many requests, then make it
1011          *      exit.
1012          */
1013         if (thread_pool.max_requests_per_thread > 0) {
1014                 for (handle = thread_pool.head; handle; handle = next) {
1015                         next = handle->next;
1016
1017                         /*
1018                          *      Not handling a request, but otherwise
1019                          *      live, we can kill it.
1020                          */
1021                         if ((handle->request == NULL) &&
1022                             (handle->status == THREAD_RUNNING) &&
1023                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1024                                 handle->status = THREAD_CANCELLED;
1025                                 sem_post(&thread_pool.semaphore);
1026                         }
1027                 }
1028         }
1029
1030         /*
1031          *      Otherwise everything's kosher.  There are not too few,
1032          *      or too many spare threads.  Exit happily.
1033          */
1034         return;
1035 }
1036
1037
1038 /*
1039  *      Thread wrapper for fork().
1040  */
1041 pid_t rad_fork(void)
1042 {
1043         pid_t child_pid;
1044
1045         if (!pool_initialized) return fork();
1046
1047         reap_children();        /* be nice to non-wait thingies */
1048
1049         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1050                 return -1;
1051         }
1052
1053         /*
1054          *      Fork & save the PID for later reaping.
1055          */
1056         child_pid = fork();
1057         if (child_pid > 0) {
1058                 int rcode;
1059                 thread_fork_t *tf;
1060
1061                 tf = rad_malloc(sizeof(*tf));
1062                 memset(tf, 0, sizeof(*tf));
1063
1064                 tf->pid = child_pid;
1065
1066                 pthread_mutex_lock(&thread_pool.wait_mutex);
1067                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1068                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1069
1070                 if (!rcode) {
1071                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1072                                (int) child_pid);
1073                 }
1074         }
1075
1076         /*
1077          *      Return whatever we were told.
1078          */
1079         return child_pid;
1080 }
1081
1082
1083 /*
1084  *      Wait 10 seconds at most for a child to exit, then give up.
1085  */
1086 pid_t rad_waitpid(pid_t pid, int *status)
1087 {
1088         int i;
1089         thread_fork_t mytf, *tf;
1090
1091         if (!pool_initialized) return waitpid(pid, status, 0);
1092
1093         if (pid <= 0) return -1;
1094
1095         mytf.pid = pid;
1096
1097         pthread_mutex_lock(&thread_pool.wait_mutex);
1098         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1099         pthread_mutex_unlock(&thread_pool.wait_mutex);
1100
1101         if (!tf) return -1;
1102
1103         for (i = 0; i < 100; i++) {
1104                 reap_children();
1105
1106                 if (tf->exited) {
1107                         *status = tf->status;
1108
1109                         pthread_mutex_lock(&thread_pool.wait_mutex);
1110                         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1111                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1112                         return pid;
1113                 }
1114                 usleep(100000); /* sleep for 1/10 of a second */
1115         }
1116
1117         /*
1118          *      10 seconds have passed, give up on the child.
1119          */
1120         pthread_mutex_lock(&thread_pool.wait_mutex);
1121         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1122         pthread_mutex_unlock(&thread_pool.wait_mutex);
1123
1124         return 0;
1125 }
1126
1127 #else /* HAVE_PTHREAD_H */
1128 /*
1129  *      "thread" code when we don't have threads.
1130  */
1131 int thread_pool_init(int spawn_flag)
1132 {
1133         return 0;
1134 }
1135
1136 /*
1137  *      call "radrespond".
1138  */
1139 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1140 {
1141         radius_handle_request(request, fun);
1142         return 1;
1143 }
1144
1145 #endif /* HAVE_PTHREAD_H */