Don't initialize the wait hash if we're not threaded
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/autoconf.h>
28
29 #include <stdlib.h>
30 #include <string.h>
31
32 /*
33  *      Other OS's have sem_init, OS X doesn't.
34  */
35 #ifndef DARWIN
36 #include <semaphore.h>
37 #else
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #include <signal.h>
52
53 #ifdef HAVE_SYS_WAIT_H
54 #include <sys/wait.h>
55 #endif
56
57 #include <freeradius-devel/radiusd.h>
58 #include <freeradius-devel/rad_assert.h>
59 #include <freeradius-devel/modules.h>
60
61 #ifdef HAVE_PTHREAD_H
62
63 #ifdef HAVE_OPENSSL_CRYPTO_H
64 #include <openssl/crypto.h>
65 #endif
66 #ifdef HAVE_OPENSSL_ERR_H
67 #include <openssl/err.h>
68 #endif
69
70 #define SEMAPHORE_LOCKED        (0)
71 #define SEMAPHORE_UNLOCKED      (1)
72
73 #define THREAD_RUNNING          (1)
74 #define THREAD_CANCELLED        (2)
75 #define THREAD_EXITED           (3)
76
77 #define NUM_FIFOS               (2)
78
79 /*
80  *     Ordered this way because we prefer proxy, then ongoing, then
81  *     start.
82  */
83 #define FIFO_START   (1)
84 #define FIFO_PROXY   (0)
85
86 /*
87  *  A data structure which contains the information about
88  *  the current thread.
89  *
90  *  pthread_id     pthread id
91  *  thread_num     server thread number, 1...number of threads
92  *  semaphore     used to block the thread until a request comes in
93  *  status        is the thread running or exited?
94  *  request_count the number of requests that this thread has handled
95  *  timestamp     when the thread started executing.
96  */
97 typedef struct THREAD_HANDLE {
98         struct THREAD_HANDLE *prev;
99         struct THREAD_HANDLE *next;
100         pthread_t            pthread_id;
101         int                  thread_num;
102         int                  status;
103         unsigned int         request_count;
104         time_t               timestamp;
105         REQUEST              *request;
106 } THREAD_HANDLE;
107
108 /*
109  *      For the request queue.
110  */
111 typedef struct request_queue_t {
112         REQUEST           *request;
113         RAD_REQUEST_FUNP  fun;
114 } request_queue_t;
115
116 typedef struct thread_fork_t {
117         pid_t           pid;
118         int             status;
119         int             exited;
120 } thread_fork_t;
121
122
123 /*
124  *      A data structure to manage the thread pool.  There's no real
125  *      need for a data structure, but it makes things conceptually
126  *      easier.
127  */
128 typedef struct THREAD_POOL {
129         THREAD_HANDLE *head;
130         THREAD_HANDLE *tail;
131
132         int total_threads;
133         int active_threads;     /* protected by queue_mutex */
134         int max_thread_num;
135         int start_threads;
136         int max_threads;
137         int min_spare_threads;
138         int max_spare_threads;
139         unsigned int max_requests_per_thread;
140         unsigned long request_count;
141         time_t time_last_spawned;
142         int cleanup_delay;
143         int spawn_flag;
144
145         pthread_mutex_t wait_mutex;
146         lrad_hash_table_t *waiters;
147
148         /*
149          *      All threads wait on this semaphore, for requests
150          *      to enter the queue.
151          */
152         sem_t           semaphore;
153
154         /*
155          *      To ensure only one thread at a time touches the queue.
156          */
157         pthread_mutex_t queue_mutex;
158
159         int             max_queue_size;
160         int             num_queued;
161         int             fifo_state;
162         lrad_fifo_t     *fifo[NUM_FIFOS];
163 } THREAD_POOL;
164
165 static THREAD_POOL thread_pool;
166 static int pool_initialized = FALSE;
167
168
169 /*
170  *      A mapping of configuration file names to internal integers
171  */
172 static const CONF_PARSER thread_config[] = {
173         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
174         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
175         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
176         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
177         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
178         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
179         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
180         { NULL, -1, 0, NULL, NULL }
181 };
182
183
184 #ifdef HAVE_OPENSSL_CRYPTO_H
185
186 /*
187  *      If we're linking against OpenSSL, then it is the
188  *      duty of the application, if it is multithreaded,
189  *      to provide OpenSSL with appropriate thread id
190  *      and mutex locking functions
191  *
192  *      Note: this only implements static callbacks.
193  *      OpenSSL does not use dynamic locking callbacks
194  *      right now, but may in the futiure, so we will have
195  *      to add them at some point.
196  */
197
198 static pthread_mutex_t *ssl_mutexes = NULL;
199
200 static unsigned long ssl_id_function(void)
201 {
202         return (unsigned long) pthread_self();
203 }
204
205 static void ssl_locking_function(int mode, int n, const char *file, int line)
206 {
207         file = file;            /* -Wunused */
208         line = line;            /* -Wunused */
209
210         if (mode & CRYPTO_LOCK) {
211                 pthread_mutex_lock(&(ssl_mutexes[n]));
212         } else {
213                 pthread_mutex_unlock(&(ssl_mutexes[n]));
214         }
215 }
216
217 static int setup_ssl_mutexes(void)
218 {
219         int i;
220
221         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
222         if (!ssl_mutexes) {
223                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
224                 return 0;
225         }
226
227         for (i = 0; i < CRYPTO_num_locks(); i++) {
228                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
229         }
230
231         CRYPTO_set_id_callback(ssl_id_function);
232         CRYPTO_set_locking_callback(ssl_locking_function);
233
234         return 1;
235 }
236 #endif
237
238
239 /*
240  *      We don't want to catch SIGCHLD for a host of reasons.
241  *
242  *      - exec_wait means that someone, somewhere, somewhen, will
243  *      call waitpid(), and catch the child.
244  *
245  *      - SIGCHLD is delivered to a random thread, not the one that
246  *      forked.
247  *
248  *      - if another thread catches the child, we have to coordinate
249  *      with the thread doing the waiting.
250  *
251  *      - if we don't waitpid() for non-wait children, they'll be zombies,
252  *      and will hang around forever.
253  *
254  */
255 static void reap_children(void)
256 {
257         pid_t pid;
258         int status;
259         thread_fork_t mytf, *tf;
260
261
262         pthread_mutex_lock(&thread_pool.wait_mutex);
263
264         do {
265                 pid = waitpid(0, &status, WNOHANG);
266                 if (pid <= 0) break;
267
268                 mytf.pid = pid;
269                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
270                 if (!tf) continue;
271                 
272                 tf->status = status;
273                 tf->exited = 1;
274         } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
275
276         pthread_mutex_unlock(&thread_pool.wait_mutex);
277 }
278
279 /*
280  *      Add a request to the list of waiting requests.
281  *      This function gets called ONLY from the main handler thread...
282  *
283  *      This function should never fail.
284  */
285 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
286 {
287         int fifo = FIFO_START;
288         request_queue_t *entry;
289
290         pthread_mutex_lock(&thread_pool.queue_mutex);
291
292         thread_pool.request_count++;
293
294         /*
295          *      FIXME: Handle proxy replies separately?
296          */
297         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
298                 pthread_mutex_unlock(&thread_pool.queue_mutex);
299                 
300                 /*
301                  *      Mark the request as done.
302                  */
303                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
304                 request->finished = TRUE;
305                 return 0;
306         }
307
308         /*
309          *      Requests get handled in priority.  First, we handle
310          *      replies from a home server, to finish ongoing requests.
311          *
312          *      Then, we handle requests with State, to finish
313          *      multi-packet transactions.
314          *
315          *      Finally, we handle new requests.
316          */
317         if (request->proxy_reply) {
318                 fifo = FIFO_PROXY;
319         } else {
320                 fifo = FIFO_START;
321         }
322
323         entry = rad_malloc(sizeof(*entry));
324         entry->request = request;
325         entry->fun = fun;
326
327         if (!lrad_fifo_push(thread_pool.fifo[fifo], entry)) {
328                 pthread_mutex_unlock(&thread_pool.queue_mutex);
329                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
330                 request->finished = TRUE;
331                 return 0;
332         }
333
334         thread_pool.num_queued++;
335
336         pthread_mutex_unlock(&thread_pool.queue_mutex);
337
338         /*
339          *      There's one more request in the queue.
340          *
341          *      Note that we're not touching the queue any more, so
342          *      the semaphore post is outside of the mutex.  This also
343          *      means that when the thread wakes up and tries to lock
344          *      the mutex, it will be unlocked, and there won't be
345          *      contention.
346          */
347         sem_post(&thread_pool.semaphore);
348
349         return 1;
350 }
351
352 /*
353  *      Remove a request from the queue.
354  */
355 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
356 {
357         int fifo_state;
358         request_queue_t *entry;
359
360         reap_children();
361
362         pthread_mutex_lock(&thread_pool.queue_mutex);
363
364         fifo_state = thread_pool.fifo_state;
365
366  retry:
367         do {
368                 /*
369                  *      Pop an entry from the current queue, and go to
370                  *      the next queue.
371                  */
372                 entry = lrad_fifo_pop(thread_pool.fifo[fifo_state]);
373                 fifo_state++;
374                 if (fifo_state >= NUM_FIFOS) fifo_state = 0;
375         } while ((fifo_state != thread_pool.fifo_state) && !entry);
376
377         if (!entry) {
378                 pthread_mutex_unlock(&thread_pool.queue_mutex);
379                 *request = NULL;
380                 *fun = NULL;
381                 return 0;
382         }
383
384         rad_assert(thread_pool.num_queued > 0);
385         thread_pool.num_queued--;
386         *request = entry->request;
387         *fun = entry->fun;
388         free(entry);
389
390         rad_assert(*request != NULL);
391         rad_assert((*request)->magic == REQUEST_MAGIC);
392         rad_assert(*fun != NULL);
393
394         /*
395          *      If the request has sat in the queue for too long,
396          *      kill it.
397          *
398          *      The main clean-up code won't delete the request from
399          *      the request list, until it's marked "finished"
400          */
401         if ((*request)->options & RAD_REQUEST_OPTION_STOP_NOW) {
402                 (*request)->finished = 1;
403                 goto retry;
404         }
405
406         /*
407          *      The thread is currently processing a request.
408          */
409         thread_pool.active_threads++;
410         thread_pool.fifo_state = fifo_state;
411
412         pthread_mutex_unlock(&thread_pool.queue_mutex);
413
414         rad_assert((*request)->child_pid == NO_SUCH_CHILD_PID);
415
416         return 1;
417 }
418
419
420 /*
421  *      The main thread handler for requests.
422  *
423  *      Wait on the semaphore until we have it, and process the request.
424  */
425 static void *request_handler_thread(void *arg)
426 {
427         RAD_REQUEST_FUNP  fun;
428         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
429 #ifdef HAVE_PTHREAD_SIGMASK
430         sigset_t set;
431
432         /*
433          *      Block SIGHUP handling for the child threads.
434          *
435          *      This ensures that only the main server thread will
436          *      process HUP signals.
437          *
438          *      If we don't have sigprocmask, then it shouldn't be
439          *      a problem, either, as the sig_hup handler should check
440          *      for this condition.
441          */
442         sigemptyset(&set);
443         sigaddset(&set, SIGHUP);
444         sigaddset(&set, SIGINT);
445         sigaddset(&set, SIGQUIT);
446         sigaddset(&set, SIGTERM);
447         pthread_sigmask(SIG_BLOCK, &set, NULL);
448 #endif
449
450         /*
451          *      Loop forever, until told to exit.
452          */
453         do {
454                 int finished;
455
456                 /*
457                  *      Wait to be signalled.
458                  */
459                 DEBUG2("Thread %d waiting to be assigned a request",
460                        self->thread_num);
461         re_wait:
462                 if (sem_wait(&thread_pool.semaphore) != 0) {
463                         /*
464                          *      Interrupted system call.  Go back to
465                          *      waiting, but DON'T print out any more
466                          *      text.
467                          */
468                         if (errno == EINTR) {
469                                 DEBUG2("Re-wait %d", self->thread_num);
470                                 goto re_wait;
471                         }
472                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
473                                self->thread_num, strerror(errno));
474                         break;
475                 }
476
477                 DEBUG2("Thread %d got semaphore", self->thread_num);
478
479                 /*
480                  *      Try to grab a request from the queue.
481                  *
482                  *      It may be empty, in which case we fail
483                  *      gracefully.
484                  */
485                 if (!request_dequeue(&self->request, &fun)) continue;
486
487                 self->request->child_pid = self->pthread_id;
488                 self->request_count++;
489
490                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
491                        self->thread_num, self->request->number,
492                        self->request_count);
493
494                 /*
495                  *      Respond, and reset request->child_pid
496                  */
497                 finished = rad_respond(self->request, fun);
498
499                 /*
500                  *      Update the active threads.
501                  */
502                 pthread_mutex_lock(&thread_pool.queue_mutex);
503
504                 /*
505                  *      We haven't replied to the client, but we HAVE
506                  *      sent a proxied packet, and we have NOT
507                  *      received a proxy response.  In that case, send
508                  *      the proxied packet now.  Doing this in the mutex
509                  *      avoids race conditions.
510                  *
511                  *      FIXME: this work should really depend on a
512                  *      "state", and "next handler", rather than
513                  *      horrid hacks like thise.
514                  */
515                 if (!self->request->reply->data &&
516                     self->request->proxy && self->request->proxy->data
517                     && !self->request->proxy_reply)
518                         self->request->proxy_listener->send(self->request->proxy_listener,
519                                                             self->request);
520
521                 self->request->child_pid = NO_SUCH_CHILD_PID;
522                 self->request->finished = finished;
523                 self->request = NULL;
524                 
525                 rad_assert(thread_pool.active_threads > 0);
526                 thread_pool.active_threads--;
527                 pthread_mutex_unlock(&thread_pool.queue_mutex);
528         } while (self->status != THREAD_CANCELLED);
529
530         DEBUG2("Thread %d exiting...", self->thread_num);
531
532 #ifdef HAVE_OPENSSL_ERR_H
533         /*
534          *      If we linked with OpenSSL, the application
535          *      must remove the thread's error queue before
536          *      exiting to prevent memory leaks.
537          */
538         ERR_remove_state(0);
539 #endif
540
541         /*
542          *  Do this as the LAST thing before exiting.
543          */
544         self->status = THREAD_EXITED;
545
546         return NULL;
547 }
548
549 /*
550  *      Take a THREAD_HANDLE, delete it from the thread pool and
551  *      free its resources.
552  *
553  *      This function is called ONLY from the main server thread,
554  *      ONLY after the thread has exited.
555  */
556 static void delete_thread(THREAD_HANDLE *handle)
557 {
558         THREAD_HANDLE *prev;
559         THREAD_HANDLE *next;
560
561         rad_assert(handle->request == NULL);
562
563         DEBUG2("Deleting thread %d", handle->thread_num);
564
565         prev = handle->prev;
566         next = handle->next;
567         rad_assert(thread_pool.total_threads > 0);
568         thread_pool.total_threads--;
569
570         /*
571          *      Remove the handle from the list.
572          */
573         if (prev == NULL) {
574                 rad_assert(thread_pool.head == handle);
575                 thread_pool.head = next;
576         } else {
577                 prev->next = next;
578         }
579
580         if (next == NULL) {
581                 rad_assert(thread_pool.tail == handle);
582                 thread_pool.tail = prev;
583         } else {
584                 next->prev = prev;
585         }
586
587         /*
588          *      Free the handle, now that it's no longer referencable.
589          */
590         free(handle);
591 }
592
593
594 /*
595  *      Spawn a new thread, and place it in the thread pool.
596  *
597  *      The thread is started initially in the blocked state, waiting
598  *      for the semaphore.
599  */
600 static THREAD_HANDLE *spawn_thread(time_t now)
601 {
602         int rcode;
603         THREAD_HANDLE *handle;
604         pthread_attr_t attr;
605
606         /*
607          *      Ensure that we don't spawn too many threads.
608          */
609         if (thread_pool.total_threads >= thread_pool.max_threads) {
610                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
611                 return NULL;
612         }
613
614         /*
615          *      Allocate a new thread handle.
616          */
617         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
618         memset(handle, 0, sizeof(THREAD_HANDLE));
619         handle->prev = NULL;
620         handle->next = NULL;
621         handle->pthread_id = NO_SUCH_CHILD_PID;
622         handle->thread_num = thread_pool.max_thread_num++;
623         handle->request_count = 0;
624         handle->status = THREAD_RUNNING;
625         handle->timestamp = time(NULL);
626
627         /*
628          *      Initialize the thread's attributes to detached.
629          *
630          *      We could call pthread_detach() later, but if the thread
631          *      exits between the create & detach calls, it will need to
632          *      be joined, which will never happen.
633          */
634         pthread_attr_init(&attr);
635         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
636
637         /*
638          *      Create the thread detached, so that it cleans up it's
639          *      own memory when it exits.
640          *
641          *      Note that the function returns non-zero on error, NOT
642          *      -1.  The return code is the error, and errno isn't set.
643          */
644         rcode = pthread_create(&handle->pthread_id, &attr,
645                         request_handler_thread, handle);
646         if (rcode != 0) {
647                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
648                        strerror(rcode));
649                 exit(1);
650         }
651         pthread_attr_destroy(&attr);
652
653         /*
654          *      One more thread to go into the list.
655          */
656         thread_pool.total_threads++;
657         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
658                         handle->thread_num, thread_pool.total_threads);
659
660         /*
661          *      Add the thread handle to the tail of the thread pool list.
662          */
663         if (thread_pool.tail) {
664                 thread_pool.tail->next = handle;
665                 handle->prev = thread_pool.tail;
666                 thread_pool.tail = handle;
667         } else {
668                 rad_assert(thread_pool.head == NULL);
669                 thread_pool.head = thread_pool.tail = handle;
670         }
671
672         /*
673          *      Update the time we last spawned a thread.
674          */
675         thread_pool.time_last_spawned = now;
676
677         /*
678          *      And return the new handle to the caller.
679          */
680         return handle;
681 }
682
683 /*
684  *      Temporary function to prevent server from executing a SIGHUP
685  *      until all threads are finished handling requests.  This returns
686  *      the number of active threads to 'radiusd.c'.
687  */
688 int total_active_threads(void)
689 {
690         /*
691          *      We don't acquire the mutex, so this is just an estimate.
692          *      We can't return with the lock held, so there's no point
693          *      in getting the guaranteed correct value; by the time
694          *      the caller sees it, it can be wrong again.
695          */
696         return thread_pool.active_threads;
697 }
698
699
700 static uint32_t pid_hash(const void *data)
701 {
702         const thread_fork_t *tf = data;
703
704         return lrad_hash(&tf->pid, sizeof(tf->pid));
705 }
706
707 static int pid_cmp(const void *one, const void *two)
708 {
709         const thread_fork_t *a = one;
710         const thread_fork_t *b = two;
711
712         return (a->pid - b->pid);
713 }
714
715 /*
716  *      Allocate the thread pool, and seed it with an initial number
717  *      of threads.
718  *
719  *      FIXME: What to do on a SIGHUP???
720  */
721 int thread_pool_init(int spawn_flag)
722 {
723         int             i, rcode;
724         CONF_SECTION    *pool_cf;
725         time_t          now;
726
727         DEBUG("Initializing the thread pool...");
728         now = time(NULL);
729
730         /*
731          *      We're not spawning new threads, don't do
732          *      anything.
733          */
734         if (!spawn_flag) return 0;
735
736         /*
737          *      After a SIGHUP, we don't over-write the previous values.
738          */
739         if (!pool_initialized) {
740                 /*
741                  *      Initialize the thread pool to some reasonable values.
742                  */
743                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
744                 thread_pool.head = NULL;
745                 thread_pool.tail = NULL;
746                 thread_pool.total_threads = 0;
747                 thread_pool.max_thread_num = 1;
748                 thread_pool.cleanup_delay = 5;
749                 thread_pool.spawn_flag = spawn_flag;
750
751                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
752                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
753                                strerror(errno));
754                         exit(1);
755                 }               
756                 
757                 /*
758                  *      Create the hash table of child PID's
759                  */
760                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
761                                                              pid_cmp,
762                                                              free);
763                 if (!thread_pool.waiters) {
764                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
765                         exit(1);
766                 }
767         }
768
769         pool_cf = cf_section_find("thread");
770         if (pool_cf != NULL) {
771                 /*
772                  *      FIXME: Check for errors?
773                  */
774                 cf_section_parse(pool_cf, NULL, thread_config);
775         }
776
777         /*
778          *      The pool has already been initialized.  Don't spawn
779          *      new threads, and don't forget about forked children,
780          */
781         if (pool_initialized) {
782                 return 0;
783         }
784
785         /*
786          *      Initialize the queue of requests.
787          */
788         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
789         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
790         if (rcode != 0) {
791                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
792                        strerror(errno));
793                 exit(1);
794         }
795
796         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
797         if (rcode != 0) {
798                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
799                        strerror(errno));
800                 exit(1);
801         }
802
803         /*
804          *      Allocate multiple fifos.
805          */
806         for (i = 0; i < NUM_FIFOS; i++) {
807                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
808                 if (!thread_pool.fifo[i]) {
809                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
810                         exit(1);
811                 }
812         }
813
814 #ifdef HAVE_OPENSSL_CRYPTO_H
815         /*
816          *      If we're linking with OpenSSL too, then we need
817          *      to set up the mutexes and enable the thread callbacks.
818          */
819         if (!setup_ssl_mutexes()) {
820                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
821                 exit(1);
822         }
823 #endif
824
825
826         /*
827          *      Create a number of waiting threads.
828          *
829          *      If we fail while creating them, do something intelligent.
830          */
831         for (i = 0; i < thread_pool.start_threads; i++) {
832                 if (spawn_thread(now) == NULL) {
833                         return -1;
834                 }
835         }
836
837         DEBUG2("Thread pool initialized");
838         pool_initialized = TRUE;
839         return 0;
840 }
841
842
843 /*
844  *      Assign a new request to a free thread.
845  *
846  *      If there isn't a free thread, then try to create a new one,
847  *      up to the configured limits.
848  */
849 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
850 {
851         /*
852          *      We've been told not to spawn threads, so don't.
853          */
854         if (!thread_pool.spawn_flag) {
855                 request->finished = rad_respond(request, fun);
856                 
857                 /*
858                  *      Requests that care about child process exit
859                  *      codes have already either called
860                  *      rad_waitpid(), or they've given up.
861                  */
862                 wait(NULL);
863                 return 1;
864         }
865
866         /*
867          *      Add the new request to the queue.
868          */
869         if (!request_enqueue(request, fun)) return 0;
870
871         /*
872          *      If the thread pool is busy handling requests, then
873          *      try to spawn another one.  We don't acquire the mutex
874          *      before reading active_threads, so our thread count is
875          *      just an estimate.  It's fine to go ahead and spawn an
876          *      extra thread in that case.
877          *      NOTE: the log message may be in error since active_threads
878          *      is an estimate, but it's only in error about the thread
879          *      count, not about the fact that we can't create a new one.
880          */
881         if (thread_pool.active_threads == thread_pool.total_threads) {
882                 if (spawn_thread(request->timestamp) == NULL) {
883                         radlog(L_INFO,
884                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
885                                thread_pool.max_threads);
886                         return 1;
887                 }
888         }
889
890         return 1;
891 }
892
893 /*
894  *      Check the min_spare_threads and max_spare_threads.
895  *
896  *      If there are too many or too few threads waiting, then we
897  *      either create some more, or delete some.
898  */
899 int thread_pool_clean(time_t now)
900 {
901         int spare;
902         int i, total;
903         THREAD_HANDLE *handle, *next;
904         int active_threads;
905         static time_t last_cleaned = 0;
906
907         /*
908          *      Loop over the thread pool deleting exited threads.
909          */
910         for (handle = thread_pool.head; handle; handle = next) {
911                 next = handle->next;
912
913                 /*
914                  *      Maybe we've asked the thread to exit, and it
915                  *      has agreed.
916                  */
917                 if (handle->status == THREAD_EXITED) {
918                         delete_thread(handle);
919                 }
920         }
921
922         /*
923          *      We don't need a mutex lock here, as we're reading
924          *      active_threads, and not modifying it.  We want a close
925          *      approximation of the number of active threads, and this
926          *      is good enough.
927          */
928         active_threads = thread_pool.active_threads;
929         spare = thread_pool.total_threads - active_threads;
930         if (debug_flag) {
931                 static int old_total = -1;
932                 static int old_active = -1;
933
934                 if ((old_total != thread_pool.total_threads) ||
935                                 (old_active != active_threads)) {
936                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
937                                         thread_pool.total_threads, active_threads, spare);
938                         old_total = thread_pool.total_threads;
939                         old_active = active_threads;
940                 }
941         }
942
943         /*
944          *      If there are too few spare threads, create some more.
945          */
946         if (spare < thread_pool.min_spare_threads) {
947                 total = thread_pool.min_spare_threads - spare;
948
949                 DEBUG2("Threads: Spawning %d spares", total);
950                 /*
951                  *      Create a number of spare threads.
952                  */
953                 for (i = 0; i < total; i++) {
954                         handle = spawn_thread(now);
955                         if (handle == NULL) {
956                                 return -1;
957                         }
958                 }
959
960                 /*
961                  *      And exit, as there can't be too many spare threads.
962                  */
963                 return 0;
964         }
965
966         /*
967          *      Only delete spare threads if we haven't already done
968          *      so this second.
969          */
970         if (now == last_cleaned) {
971                 return 0;
972         }
973         last_cleaned = now;
974
975         /*
976          *      Only delete the spare threads if sufficient time has
977          *      passed since we last created one.  This helps to minimize
978          *      the amount of create/delete cycles.
979          */
980         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
981                 return 0;
982         }
983
984         /*
985          *      If there are too many spare threads, delete one.
986          *
987          *      Note that we only delete ONE at a time, instead of
988          *      wiping out many.  This allows the excess servers to
989          *      be slowly reaped, just in case the load spike comes again.
990          */
991         if (spare > thread_pool.max_spare_threads) {
992
993                 spare -= thread_pool.max_spare_threads;
994
995                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
996
997                 /*
998                  *      Walk through the thread pool, deleting the
999                  *      first idle thread we come across.
1000                  */
1001                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1002                         next = handle->next;
1003
1004                         /*
1005                          *      If the thread is not handling a
1006                          *      request, but still live, then tell it
1007                          *      to exit.
1008                          *
1009                          *      It will eventually wake up, and realize
1010                          *      it's been told to commit suicide.
1011                          */
1012                         if ((handle->request == NULL) &&
1013                             (handle->status == THREAD_RUNNING)) {
1014                                 handle->status = THREAD_CANCELLED;
1015                                 /*
1016                                  *      Post an extra semaphore, as a
1017                                  *      signal to wake up, and exit.
1018                                  */
1019                                 sem_post(&thread_pool.semaphore);
1020                                 spare--;
1021                                 break;
1022                         }
1023                 }
1024         }
1025
1026         /*
1027          *      If the thread has handled too many requests, then make it
1028          *      exit.
1029          */
1030         if (thread_pool.max_requests_per_thread > 0) {
1031                 for (handle = thread_pool.head; handle; handle = next) {
1032                         next = handle->next;
1033
1034                         /*
1035                          *      Not handling a request, but otherwise
1036                          *      live, we can kill it.
1037                          */
1038                         if ((handle->request == NULL) &&
1039                             (handle->status == THREAD_RUNNING) &&
1040                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1041                                 handle->status = THREAD_CANCELLED;
1042                                 sem_post(&thread_pool.semaphore);
1043                         }
1044                 }
1045         }
1046
1047         /*
1048          *      Otherwise everything's kosher.  There are not too few,
1049          *      or too many spare threads.  Exit happily.
1050          */
1051         return 0;
1052 }
1053
1054
1055 /*
1056  *      Thread wrapper for fork().
1057  */
1058 pid_t rad_fork(void)
1059 {
1060         pid_t child_pid;
1061
1062         if (!pool_initialized) return fork();
1063
1064         reap_children();        /* be nice to non-wait thingies */
1065
1066         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1067                 return -1;
1068         }
1069
1070         /*
1071          *      Fork & save the PID for later reaping.
1072          */
1073         child_pid = fork();
1074         if (child_pid > 0) {
1075                 int rcode;
1076                 thread_fork_t *tf;
1077
1078                 tf = rad_malloc(sizeof(*tf));
1079                 memset(tf, 0, sizeof(*tf));
1080                 
1081                 tf->pid = child_pid;
1082
1083                 pthread_mutex_lock(&thread_pool.wait_mutex);
1084                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1085                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1086
1087                 if (!rcode) {
1088                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1089                                (int) child_pid);
1090                 }
1091         }
1092
1093         /*
1094          *      Return whatever we were told.
1095          */
1096         return child_pid;
1097 }
1098
1099
1100 /*
1101  *      Wait 10 seconds at most for a child to exit, then give up.
1102  */
1103 pid_t rad_waitpid(pid_t pid, int *status)
1104 {
1105         int i;
1106         thread_fork_t mytf, *tf;
1107
1108         if (!pool_initialized) return waitpid(pid, status, 0);
1109
1110         if (pid <= 0) return -1;
1111
1112         mytf.pid = pid;
1113
1114         pthread_mutex_lock(&thread_pool.wait_mutex);
1115         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1116         pthread_mutex_unlock(&thread_pool.wait_mutex);
1117
1118         if (!tf) return -1;
1119         
1120         for (i = 0; i < 100; i++) {
1121                 reap_children();
1122                 
1123                 if (tf->exited) {
1124                         *status = tf->status;
1125
1126                         pthread_mutex_lock(&thread_pool.wait_mutex);
1127                         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1128                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1129                         return pid;
1130                 }
1131                 usleep(100000); /* sleep for 1/10 of a second */
1132         }
1133         
1134         /*
1135          *      10 seconds have passed, give up on the child.
1136          */
1137         pthread_mutex_lock(&thread_pool.wait_mutex);
1138         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1139         pthread_mutex_unlock(&thread_pool.wait_mutex);
1140
1141         return 0;
1142 }
1143
1144 #else /* HAVE_PTHREAD_H */
1145 /*
1146  *      "thread" code when we don't have threads.
1147  */
1148 int thread_pool_init(int spawn_flag)
1149 {
1150         return 0;
1151 }
1152
1153 /*
1154  *      call "radrespond".
1155  */
1156 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1157 {
1158         rad_respond(request, fun);
1159         return 1;
1160 }
1161
1162 #endif /* HAVE_PTHREAD_H */