udpate comments re: thread_pool.active_threads and locking
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/autoconf.h>
25
26 #include <stdlib.h>
27 #include <string.h>
28
29 /*
30  *      Other OS's have sem_init, OS X doesn't.
31  */
32 #ifndef DARWIN
33 #include <semaphore.h>
34 #else
35 #include <mach/task.h>
36 #include <mach/semaphore.h>
37
38 #undef sem_t
39 #define sem_t semaphore_t
40 #undef sem_init
41 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
42 #undef sem_wait
43 #define sem_wait(s) semaphore_wait(*s)
44 #undef sem_post
45 #define sem_post(s) semaphore_signal(*s)
46 #endif
47
48 #include <signal.h>
49
50 #ifdef HAVE_SYS_WAIT_H
51 #include <sys/wait.h>
52 #endif
53
54 #include <freeradius-devel/radiusd.h>
55 #include <freeradius-devel/rad_assert.h>
56 #include <freeradius-devel/conffile.h>
57 #include <freeradius-devel/modules.h>
58
59 static const char rcsid[] =
60 "$Id$";
61
62 #ifdef HAVE_PTHREAD_H
63
64 #ifdef HAVE_OPENSSL_CRYPTO_H
65 #include <openssl/crypto.h>
66 #endif
67 #ifdef HAVE_OPENSSL_ERR_H
68 #include <openssl/err.h>
69 #endif
70
71 #define SEMAPHORE_LOCKED        (0)
72 #define SEMAPHORE_UNLOCKED      (1)
73
74 #define THREAD_RUNNING          (1)
75 #define THREAD_CANCELLED        (2)
76 #define THREAD_EXITED           (3)
77
78 #define NUM_FIFOS               (2)
79
80 /*
81  *     Ordered this way because we prefer proxy, then ongoing, then
82  *     start.
83  */
84 #define FIFO_START   (1)
85 #define FIFO_PROXY   (0)
86
87 /*
88  *  A data structure which contains the information about
89  *  the current thread.
90  *
91  *  pthread_id     pthread id
92  *  thread_num     server thread number, 1...number of threads
93  *  semaphore     used to block the thread until a request comes in
94  *  status        is the thread running or exited?
95  *  request_count the number of requests that this thread has handled
96  *  timestamp     when the thread started executing.
97  */
98 typedef struct THREAD_HANDLE {
99         struct THREAD_HANDLE *prev;
100         struct THREAD_HANDLE *next;
101         pthread_t            pthread_id;
102         int                  thread_num;
103         int                  status;
104         unsigned int         request_count;
105         time_t               timestamp;
106         REQUEST              *request;
107 } THREAD_HANDLE;
108
109 /*
110  *      For the request queue.
111  */
112 typedef struct request_queue_t {
113         REQUEST           *request;
114         RAD_REQUEST_FUNP  fun;
115 } request_queue_t;
116
117
118 /*
119  *      A data structure to manage the thread pool.  There's no real
120  *      need for a data structure, but it makes things conceptually
121  *      easier.
122  */
123 typedef struct THREAD_POOL {
124         THREAD_HANDLE *head;
125         THREAD_HANDLE *tail;
126
127         int total_threads;
128         int active_threads;     /* protected by queue_mutex */
129         int max_thread_num;
130         int start_threads;
131         int max_threads;
132         int min_spare_threads;
133         int max_spare_threads;
134         unsigned int max_requests_per_thread;
135         unsigned long request_count;
136         time_t time_last_spawned;
137         int cleanup_delay;
138         int spawn_flag;
139
140         pthread_mutex_t wait_mutex;
141         lrad_hash_table_t *waiters;
142
143         /*
144          *      All threads wait on this semaphore, for requests
145          *      to enter the queue.
146          */
147         sem_t           semaphore;
148
149         /*
150          *      To ensure only one thread at a time touches the queue.
151          */
152         pthread_mutex_t queue_mutex;
153
154         int             max_queue_size;
155         int             num_queued;
156         int             fifo_state;
157         lrad_fifo_t     *fifo[NUM_FIFOS];
158 } THREAD_POOL;
159
160 static THREAD_POOL thread_pool;
161 static int pool_initialized = FALSE;
162
163
164 /*
165  *      A mapping of configuration file names to internal integers
166  */
167 static const CONF_PARSER thread_config[] = {
168         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
169         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
170         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
171         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
172         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
173         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
174         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
175         { NULL, -1, 0, NULL, NULL }
176 };
177
178
179 #ifdef HAVE_OPENSSL_CRYPTO_H
180
181 /*
182  *      If we're linking against OpenSSL, then it is the
183  *      duty of the application, if it is multithreaded,
184  *      to provide OpenSSL with appropriate thread id
185  *      and mutex locking functions
186  *
187  *      Note: this only implements static callbacks.
188  *      OpenSSL does not use dynamic locking callbacks
189  *      right now, but may in the futiure, so we will have
190  *      to add them at some point.
191  */
192
193 static pthread_mutex_t *ssl_mutexes = NULL;
194
195 static unsigned long ssl_id_function(void)
196 {
197         return (unsigned long) pthread_self();
198 }
199
200 static void ssl_locking_function(int mode, int n, const char *file, int line)
201 {
202         file = file;            /* -Wunused */
203         line = line;            /* -Wunused */
204
205         if (mode & CRYPTO_LOCK) {
206                 pthread_mutex_lock(&(ssl_mutexes[n]));
207         } else {
208                 pthread_mutex_unlock(&(ssl_mutexes[n]));
209         }
210 }
211
212 static int setup_ssl_mutexes(void)
213 {
214         int i;
215
216         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
217         if (!ssl_mutexes) {
218                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
219                 return 0;
220         }
221
222         for (i = 0; i < CRYPTO_num_locks(); i++) {
223                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
224         }
225
226         CRYPTO_set_id_callback(ssl_id_function);
227         CRYPTO_set_locking_callback(ssl_locking_function);
228
229         return 1;
230 }
231 #endif
232
233
234 /*
235  *      Callback for reaping
236  */
237 static int reap_callback(void *ctx, void *data)
238 {
239         pid_t pid = *(pid_t *) data;
240         lrad_hash_table_t *ht = ctx;
241
242         /*
243          *      Child is still alive, do nothing.
244          */
245         if (waitpid(pid, NULL, WNOHANG) == 0) return 0;
246
247         /*
248          *      Else no child, or was already reaped
249          */
250
251         lrad_hash_table_delete(ht, lrad_hash(&pid, sizeof(pid)));
252
253         return 0;
254 }
255
256
257 /*
258  *      We don't want to catch SIGCHLD for a host of reasons.
259  *
260  *      - exec_wait means that someone, somewhere, somewhen, will
261  *      call waitpid(), and catch the child.
262  *
263  *      - SIGCHLD is delivered to a random thread, not the one that
264  *      forked.
265  *
266  *      - if another thread catches the child, we have to coordinate
267  *      with the thread doing the waiting.
268  *
269  *      - if we don't waitpid() for non-wait children, they'll be zombies,
270  *      and will hang around forever.
271  *
272  */
273 static void reap_children(void)
274 {
275         if (lrad_hash_table_num_elements(thread_pool.waiters) == 0) return;
276
277         pthread_mutex_lock(&thread_pool.wait_mutex);
278
279         lrad_hash_table_walk(thread_pool.waiters,
280                              reap_callback, thread_pool.waiters);
281
282         pthread_mutex_unlock(&thread_pool.wait_mutex);
283 }
284
285 /*
286  *      Add a request to the list of waiting requests.
287  *      This function gets called ONLY from the main handler thread...
288  *
289  *      This function should never fail.
290  */
291 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
292 {
293         int fifo = FIFO_START;
294         request_queue_t *entry;
295
296         pthread_mutex_lock(&thread_pool.queue_mutex);
297
298         thread_pool.request_count++;
299
300         /*
301          *      FIXME: Handle proxy replies separately?
302          */
303         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
304                 pthread_mutex_unlock(&thread_pool.queue_mutex);
305                 
306                 /*
307                  *      Mark the request as done.
308                  */
309                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
310                 request->finished = TRUE;
311                 return 0;
312         }
313
314         /*
315          *      Requests get handled in priority.  First, we handle
316          *      replies from a home server, to finish ongoing requests.
317          *
318          *      Then, we handle requests with State, to finish
319          *      multi-packet transactions.
320          *
321          *      Finally, we handle new requests.
322          */
323         if (request->proxy_reply) {
324                 fifo = FIFO_PROXY;
325         } else {
326                 fifo = FIFO_START;
327         }
328
329         entry = rad_malloc(sizeof(*entry));
330         entry->request = request;
331         entry->fun = fun;
332
333         if (!lrad_fifo_push(thread_pool.fifo[fifo], entry)) {
334                 pthread_mutex_unlock(&thread_pool.queue_mutex);
335                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
336                 request->finished = TRUE;
337                 return 0;
338         }
339
340         thread_pool.num_queued++;
341
342         pthread_mutex_unlock(&thread_pool.queue_mutex);
343
344         /*
345          *      There's one more request in the queue.
346          *
347          *      Note that we're not touching the queue any more, so
348          *      the semaphore post is outside of the mutex.  This also
349          *      means that when the thread wakes up and tries to lock
350          *      the mutex, it will be unlocked, and there won't be
351          *      contention.
352          */
353         sem_post(&thread_pool.semaphore);
354
355         return 1;
356 }
357
358 /*
359  *      Remove a request from the queue.
360  */
361 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
362 {
363         int fifo_state;
364         request_queue_t *entry;
365
366         reap_children();
367
368         pthread_mutex_lock(&thread_pool.queue_mutex);
369
370         fifo_state = thread_pool.fifo_state;
371
372  retry:
373         do {
374                 /*
375                  *      Pop an entry from the current queue, and go to
376                  *      the next queue.
377                  */
378                 entry = lrad_fifo_pop(thread_pool.fifo[fifo_state]);
379                 fifo_state++;
380                 if (fifo_state >= NUM_FIFOS) fifo_state = 0;
381         } while ((fifo_state != thread_pool.fifo_state) && !entry);
382
383         if (!entry) {
384                 pthread_mutex_unlock(&thread_pool.queue_mutex);
385                 *request = NULL;
386                 *fun = NULL;
387                 return 0;
388         }
389
390         rad_assert(thread_pool.num_queued > 0);
391         thread_pool.num_queued--;
392         *request = entry->request;
393         *fun = entry->fun;
394         free(entry);
395
396         rad_assert(*request != NULL);
397         rad_assert((*request)->magic == REQUEST_MAGIC);
398         rad_assert(*fun != NULL);
399
400         /*
401          *      If the request has sat in the queue for too long,
402          *      kill it.
403          *
404          *      The main clean-up code won't delete the request from
405          *      the request list, until it's marked "finished"
406          */
407         if ((*request)->options & RAD_REQUEST_OPTION_STOP_NOW) {
408                 (*request)->finished = 1;
409                 goto retry;
410         }
411
412         /*
413          *      The thread is currently processing a request.
414          */
415         thread_pool.active_threads++;
416         thread_pool.fifo_state = fifo_state;
417
418         pthread_mutex_unlock(&thread_pool.queue_mutex);
419
420         /*
421          *      If the request is currently being processed, then that
422          *      MAY be OK, if it's a proxy reply.  In that case,
423          *      sending the packet may result in a reply being
424          *      received before that thread clears the child_pid.
425          *
426          *      In that case, we busy-wait for the request to be free.
427          *
428          *      We COULD push it onto the queue and try to grab
429          *      another request, but what if this is the only request?
430          *      What if there are multiple such packets with race
431          *      conditions?  We don't want to thrash the queue...
432          *
433          *      This busy-wait is less than optimal, but it's simple,
434          *      fail-safe, and it works.
435          */
436         if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
437                 int count, ok;
438                 struct timeval tv;
439 #ifdef HAVE_PTHREAD_SIGMASK
440                 sigset_t set, old_set;
441
442                 /*
443                  *      Block a large number of signals which could
444                  *      cause the select to return EINTR
445                  */
446                 sigemptyset(&set);
447                 sigaddset(&set, SIGPIPE);
448                 sigaddset(&set, SIGCONT);
449                 sigaddset(&set, SIGSTOP);
450                 sigaddset(&set, SIGCHLD);
451                 pthread_sigmask(SIG_BLOCK, &set, &old_set);
452 #endif
453
454                 rad_assert((*request)->proxy_reply != NULL);
455
456                 ok = FALSE;
457
458                 /*
459                  *      Sleep for 100 milliseconds.  If the other thread
460                  *      doesn't get serviced in this time, to clear
461                  *      the "child_pid" entry, then the server is too
462                  *      busy, so we die.
463                  */
464                 for (count = 0; count < 10; count++) {
465                         tv.tv_sec = 0;
466                         tv.tv_usec = 10000; /* sleep for 10 milliseconds */
467
468                         /*
469                          *      Portable sleep that's thread-safe.
470                          *
471                          *      Don't worry about interrupts, as they're
472                          *      blocked above.
473                          */
474                         select(0, NULL, NULL, NULL, &tv);
475                         if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
476                                 ok = TRUE;
477                                 break;
478                         }
479                 }
480
481 #ifdef HAVE_PTHREAD_SIGMASK
482                 /*
483                  *      Restore the original thread signal mask.
484                  */
485                 pthread_sigmask(SIG_SETMASK, &old_set, NULL);
486 #endif
487
488                 if (!ok) {
489                         radlog(L_ERR, "FATAL!  Server is too busy to process requests");
490                         exit(1);
491                 }
492         }
493
494         return 1;
495 }
496
497
498 /*
499  *      The main thread handler for requests.
500  *
501  *      Wait on the semaphore until we have it, and process the request.
502  */
503 static void *request_handler_thread(void *arg)
504 {
505         RAD_REQUEST_FUNP  fun;
506         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
507 #ifdef HAVE_PTHREAD_SIGMASK
508         sigset_t set;
509
510         /*
511          *      Block SIGHUP handling for the child threads.
512          *
513          *      This ensures that only the main server thread will
514          *      process HUP signals.
515          *
516          *      If we don't have sigprocmask, then it shouldn't be
517          *      a problem, either, as the sig_hup handler should check
518          *      for this condition.
519          */
520         sigemptyset(&set);
521         sigaddset(&set, SIGHUP);
522         sigaddset(&set, SIGINT);
523         sigaddset(&set, SIGQUIT);
524         sigaddset(&set, SIGTERM);
525         pthread_sigmask(SIG_BLOCK, &set, NULL);
526 #endif
527
528         /*
529          *      Loop forever, until told to exit.
530          */
531         do {
532                 /*
533                  *      Wait to be signalled.
534                  */
535                 DEBUG2("Thread %d waiting to be assigned a request",
536                        self->thread_num);
537         re_wait:
538                 if (sem_wait(&thread_pool.semaphore) != 0) {
539                         /*
540                          *      Interrupted system call.  Go back to
541                          *      waiting, but DON'T print out any more
542                          *      text.
543                          */
544                         if (errno == EINTR) {
545                                 DEBUG2("Re-wait %d", self->thread_num);
546                                 goto re_wait;
547                         }
548                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
549                                self->thread_num, strerror(errno));
550                         break;
551                 }
552
553                 DEBUG2("Thread %d got semaphore", self->thread_num);
554
555                 /*
556                  *      Try to grab a request from the queue.
557                  *
558                  *      It may be empty, in which case we fail
559                  *      gracefully.
560                  */
561                 if (!request_dequeue(&self->request, &fun)) continue;
562
563                 self->request->child_pid = self->pthread_id;
564                 self->request_count++;
565
566                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
567                        self->thread_num, self->request->number,
568                        self->request_count);
569
570                 /*
571                  *      Respond, and reset request->child_pid
572                  */
573                 rad_respond(self->request, fun);
574                 self->request = NULL;
575
576                 /*
577                  *      Update the active threads.
578                  */
579                 pthread_mutex_lock(&thread_pool.queue_mutex);
580                 rad_assert(thread_pool.active_threads > 0);
581                 thread_pool.active_threads--;
582                 pthread_mutex_unlock(&thread_pool.queue_mutex);
583         } while (self->status != THREAD_CANCELLED);
584
585         DEBUG2("Thread %d exiting...", self->thread_num);
586
587 #ifdef HAVE_OPENSSL_ERR_H
588         /*
589          *      If we linked with OpenSSL, the application
590          *      must remove the thread's error queue before
591          *      exiting to prevent memory leaks.
592          */
593         ERR_remove_state(0);
594 #endif
595
596         /*
597          *  Do this as the LAST thing before exiting.
598          */
599         self->status = THREAD_EXITED;
600
601         return NULL;
602 }
603
604 /*
605  *      Take a THREAD_HANDLE, delete it from the thread pool and
606  *      free its resources.
607  *
608  *      This function is called ONLY from the main server thread,
609  *      ONLY after the thread has exited.
610  */
611 static void delete_thread(THREAD_HANDLE *handle)
612 {
613         THREAD_HANDLE *prev;
614         THREAD_HANDLE *next;
615
616         rad_assert(handle->request == NULL);
617
618         DEBUG2("Deleting thread %d", handle->thread_num);
619
620         prev = handle->prev;
621         next = handle->next;
622         rad_assert(thread_pool.total_threads > 0);
623         thread_pool.total_threads--;
624
625         /*
626          *      Remove the handle from the list.
627          */
628         if (prev == NULL) {
629                 rad_assert(thread_pool.head == handle);
630                 thread_pool.head = next;
631         } else {
632                 prev->next = next;
633         }
634
635         if (next == NULL) {
636                 rad_assert(thread_pool.tail == handle);
637                 thread_pool.tail = prev;
638         } else {
639                 next->prev = prev;
640         }
641
642         /*
643          *      Free the handle, now that it's no longer referencable.
644          */
645         free(handle);
646 }
647
648
649 /*
650  *      Spawn a new thread, and place it in the thread pool.
651  *
652  *      The thread is started initially in the blocked state, waiting
653  *      for the semaphore.
654  */
655 static THREAD_HANDLE *spawn_thread(time_t now)
656 {
657         int rcode;
658         THREAD_HANDLE *handle;
659         pthread_attr_t attr;
660
661         /*
662          *      Ensure that we don't spawn too many threads.
663          */
664         if (thread_pool.total_threads >= thread_pool.max_threads) {
665                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
666                 return NULL;
667         }
668
669         /*
670          *      Allocate a new thread handle.
671          */
672         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
673         memset(handle, 0, sizeof(THREAD_HANDLE));
674         handle->prev = NULL;
675         handle->next = NULL;
676         handle->pthread_id = NO_SUCH_CHILD_PID;
677         handle->thread_num = thread_pool.max_thread_num++;
678         handle->request_count = 0;
679         handle->status = THREAD_RUNNING;
680         handle->timestamp = time(NULL);
681
682         /*
683          *      Initialize the thread's attributes to detached.
684          *
685          *      We could call pthread_detach() later, but if the thread
686          *      exits between the create & detach calls, it will need to
687          *      be joined, which will never happen.
688          */
689         pthread_attr_init(&attr);
690         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
691
692         /*
693          *      Create the thread detached, so that it cleans up it's
694          *      own memory when it exits.
695          *
696          *      Note that the function returns non-zero on error, NOT
697          *      -1.  The return code is the error, and errno isn't set.
698          */
699         rcode = pthread_create(&handle->pthread_id, &attr,
700                         request_handler_thread, handle);
701         if (rcode != 0) {
702                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
703                        strerror(rcode));
704                 exit(1);
705         }
706         pthread_attr_destroy(&attr);
707
708         /*
709          *      One more thread to go into the list.
710          */
711         thread_pool.total_threads++;
712         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
713                         handle->thread_num, thread_pool.total_threads);
714
715         /*
716          *      Add the thread handle to the tail of the thread pool list.
717          */
718         if (thread_pool.tail) {
719                 thread_pool.tail->next = handle;
720                 handle->prev = thread_pool.tail;
721                 thread_pool.tail = handle;
722         } else {
723                 rad_assert(thread_pool.head == NULL);
724                 thread_pool.head = thread_pool.tail = handle;
725         }
726
727         /*
728          *      Update the time we last spawned a thread.
729          */
730         thread_pool.time_last_spawned = now;
731
732         /*
733          *      And return the new handle to the caller.
734          */
735         return handle;
736 }
737
738 /*
739  *      Temporary function to prevent server from executing a SIGHUP
740  *      until all threads are finished handling requests.  This returns
741  *      the number of active threads to 'radiusd.c'.
742  */
743 int total_active_threads(void)
744 {
745         /*
746          *      We don't acquire the mutex, so this is just an estimate.
747          *      We can't return with the lock held, so there's no point
748          *      in getting the guaranteed correct value; by the time
749          *      the caller sees it, it can be wrong again.
750          */
751         return thread_pool.active_threads;
752 }
753
754
755 /*
756  *      Allocate the thread pool, and seed it with an initial number
757  *      of threads.
758  *
759  *      FIXME: What to do on a SIGHUP???
760  */
761 int thread_pool_init(int spawn_flag)
762 {
763         int             i, rcode;
764         CONF_SECTION    *pool_cf;
765         time_t          now;
766
767         DEBUG("Initializing the thread pool...");
768         now = time(NULL);
769
770         /*
771          *      After a SIGHUP, we don't over-write the previous values.
772          */
773         if (!pool_initialized) {
774                 /*
775                  *      Initialize the thread pool to some reasonable values.
776                  */
777                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
778                 thread_pool.head = NULL;
779                 thread_pool.tail = NULL;
780                 thread_pool.total_threads = 0;
781                 thread_pool.max_thread_num = 1;
782                 thread_pool.cleanup_delay = 5;
783                 thread_pool.spawn_flag = spawn_flag;
784
785                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
786                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
787                                strerror(errno));
788                         exit(1);
789                 }               
790                 
791                 /*
792                  *      Create the hash table of child PID's
793                  */
794                 thread_pool.waiters = lrad_hash_table_create(free);
795                 if (!thread_pool.waiters) {
796                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
797                         exit(1);
798                 }
799         }
800
801         /*
802          *      We're not spawning new threads, don't do
803          *      anything.
804          */
805         if (!spawn_flag) return 0;
806
807         pool_cf = cf_section_find("thread");
808         if (pool_cf != NULL) {
809                 /*
810                  *      FIXME: Check for errors?
811                  */
812                 cf_section_parse(pool_cf, NULL, thread_config);
813         }
814
815         /*
816          *      The pool has already been initialized.  Don't spawn
817          *      new threads, and don't forget about forked children,
818          */
819         if (pool_initialized) {
820                 return 0;
821         }
822
823         /*
824          *      Initialize the queue of requests.
825          */
826         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
827         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
828         if (rcode != 0) {
829                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
830                        strerror(errno));
831                 exit(1);
832         }
833
834         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
835         if (rcode != 0) {
836                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
837                        strerror(errno));
838                 exit(1);
839         }
840
841         /*
842          *      Allocate multiple fifos.
843          */
844         for (i = 0; i < NUM_FIFOS; i++) {
845                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
846                 if (!thread_pool.fifo[i]) {
847                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
848                         exit(1);
849                 }
850         }
851
852 #ifdef HAVE_OPENSSL_CRYPTO_H
853         /*
854          *      If we're linking with OpenSSL too, then we need
855          *      to set up the mutexes and enable the thread callbacks.
856          */
857         if (!setup_ssl_mutexes()) {
858                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
859                 exit(1);
860         }
861 #endif
862
863
864         /*
865          *      Create a number of waiting threads.
866          *
867          *      If we fail while creating them, do something intelligent.
868          */
869         for (i = 0; i < thread_pool.start_threads; i++) {
870                 if (spawn_thread(now) == NULL) {
871                         return -1;
872                 }
873         }
874
875         DEBUG2("Thread pool initialized");
876         pool_initialized = TRUE;
877         return 0;
878 }
879
880
881 /*
882  *      Assign a new request to a free thread.
883  *
884  *      If there isn't a free thread, then try to create a new one,
885  *      up to the configured limits.
886  */
887 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
888 {
889         /*
890          *      We've been told not to spawn threads, so don't.
891          */
892         if (!thread_pool.spawn_flag) {
893                 rad_respond(request, fun);
894                 return 1;
895         }
896
897         /*
898          *      Add the new request to the queue.
899          */
900         if (!request_enqueue(request, fun)) return 0;
901
902         /*
903          *      If the thread pool is busy handling requests, then
904          *      try to spawn another one.  We don't acquire the mutex
905          *      before reading active_threads, so our thread count is
906          *      just an estimate.  It's fine to go ahead and spawn an
907          *      extra thread in that case.
908          *      NOTE: the log message may be in error since active_threads
909          *      is an estimate, but it's only in error about the thread
910          *      count, not about the fact that we can't create a new one.
911          */
912         if (thread_pool.active_threads == thread_pool.total_threads) {
913                 if (spawn_thread(request->timestamp) == NULL) {
914                         radlog(L_INFO,
915                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
916                                thread_pool.max_threads);
917                         return 1;
918                 }
919         }
920
921         return 1;
922 }
923
924 /*
925  *      Check the min_spare_threads and max_spare_threads.
926  *
927  *      If there are too many or too few threads waiting, then we
928  *      either create some more, or delete some.
929  */
930 int thread_pool_clean(time_t now)
931 {
932         int spare;
933         int i, total;
934         THREAD_HANDLE *handle, *next;
935         int active_threads;
936         static time_t last_cleaned = 0;
937
938         /*
939          *      Loop over the thread pool deleting exited threads.
940          */
941         for (handle = thread_pool.head; handle; handle = next) {
942                 next = handle->next;
943
944                 /*
945                  *      Maybe we've asked the thread to exit, and it
946                  *      has agreed.
947                  */
948                 if (handle->status == THREAD_EXITED) {
949                         delete_thread(handle);
950                 }
951         }
952
953         /*
954          *      We don't need a mutex lock here, as we're reading
955          *      active_threads, and not modifying it.  We want a close
956          *      approximation of the number of active threads, and this
957          *      is good enough.
958          */
959         active_threads = thread_pool.active_threads;
960         spare = thread_pool.total_threads - active_threads;
961         if (debug_flag) {
962                 static int old_total = -1;
963                 static int old_active = -1;
964
965                 if ((old_total != thread_pool.total_threads) ||
966                                 (old_active != active_threads)) {
967                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
968                                         thread_pool.total_threads, active_threads, spare);
969                         old_total = thread_pool.total_threads;
970                         old_active = active_threads;
971                 }
972         }
973
974         /*
975          *      If there are too few spare threads, create some more.
976          */
977         if (spare < thread_pool.min_spare_threads) {
978                 total = thread_pool.min_spare_threads - spare;
979
980                 DEBUG2("Threads: Spawning %d spares", total);
981                 /*
982                  *      Create a number of spare threads.
983                  */
984                 for (i = 0; i < total; i++) {
985                         handle = spawn_thread(now);
986                         if (handle == NULL) {
987                                 return -1;
988                         }
989                 }
990
991                 /*
992                  *      And exit, as there can't be too many spare threads.
993                  */
994                 return 0;
995         }
996
997         /*
998          *      Only delete spare threads if we haven't already done
999          *      so this second.
1000          */
1001         if (now == last_cleaned) {
1002                 return 0;
1003         }
1004         last_cleaned = now;
1005
1006         /*
1007          *      Only delete the spare threads if sufficient time has
1008          *      passed since we last created one.  This helps to minimize
1009          *      the amount of create/delete cycles.
1010          */
1011         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
1012                 return 0;
1013         }
1014
1015         /*
1016          *      If there are too many spare threads, delete one.
1017          *
1018          *      Note that we only delete ONE at a time, instead of
1019          *      wiping out many.  This allows the excess servers to
1020          *      be slowly reaped, just in case the load spike comes again.
1021          */
1022         if (spare > thread_pool.max_spare_threads) {
1023
1024                 spare -= thread_pool.max_spare_threads;
1025
1026                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1027
1028                 /*
1029                  *      Walk through the thread pool, deleting the
1030                  *      first idle thread we come across.
1031                  */
1032                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1033                         next = handle->next;
1034
1035                         /*
1036                          *      If the thread is not handling a
1037                          *      request, but still live, then tell it
1038                          *      to exit.
1039                          *
1040                          *      It will eventually wake up, and realize
1041                          *      it's been told to commit suicide.
1042                          */
1043                         if ((handle->request == NULL) &&
1044                             (handle->status == THREAD_RUNNING)) {
1045                                 handle->status = THREAD_CANCELLED;
1046                                 /*
1047                                  *      Post an extra semaphore, as a
1048                                  *      signal to wake up, and exit.
1049                                  */
1050                                 sem_post(&thread_pool.semaphore);
1051                                 spare--;
1052                                 break;
1053                         }
1054                 }
1055         }
1056
1057         /*
1058          *      If the thread has handled too many requests, then make it
1059          *      exit.
1060          */
1061         if (thread_pool.max_requests_per_thread > 0) {
1062                 for (handle = thread_pool.head; handle; handle = next) {
1063                         next = handle->next;
1064
1065                         /*
1066                          *      Not handling a request, but otherwise
1067                          *      live, we can kill it.
1068                          */
1069                         if ((handle->request == NULL) &&
1070                             (handle->status == THREAD_RUNNING) &&
1071                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1072                                 handle->status = THREAD_CANCELLED;
1073                                 sem_post(&thread_pool.semaphore);
1074                         }
1075                 }
1076         }
1077
1078         /*
1079          *      Otherwise everything's kosher.  There are not too few,
1080          *      or too many spare threads.  Exit happily.
1081          */
1082         return 0;
1083 }
1084
1085
1086 /*
1087  *      Thread wrapper for fork().
1088  */
1089 pid_t rad_fork(int exec_wait)
1090 {
1091         pid_t child_pid;
1092
1093         if (exec_wait) return fork();
1094
1095         reap_children();        /* be nice to non-wait thingies */
1096
1097         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1098                 return -1;
1099         }
1100
1101         /*
1102          *      Fork & save the PID for later reaping.
1103          */
1104         child_pid = fork();
1105         if (child_pid > 0) {
1106                 int rcode;
1107                 pid_t *ptr = rad_malloc(sizeof(*ptr));
1108
1109                 *ptr = child_pid;
1110
1111                 /*
1112                  *      Lock the mutex.
1113                  */
1114                 pthread_mutex_lock(&thread_pool.wait_mutex);
1115
1116                 rcode = lrad_hash_table_insert(thread_pool.waiters,
1117                                                lrad_hash(&child_pid,
1118                                                          sizeof(child_pid)),
1119                                                ptr);
1120                 
1121                 /*
1122                  *      Unlock the mutex.
1123                  */
1124                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1125
1126                 if (!rcode) {
1127                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1128                                (int) child_pid);
1129                 }
1130         }
1131
1132         /*
1133          *      Return whatever we were told.
1134          */
1135         return child_pid;
1136 }
1137
1138 /*
1139  *      We may not need this any more...
1140  */
1141 pid_t rad_waitpid(pid_t pid, int *status, int options)
1142 {
1143         reap_children();        /* be nice to non-wait thingies */
1144         return waitpid(pid, status, options);
1145         
1146 }
1147
1148 #else /* HAVE_PTHREAD_H */
1149 /*
1150  *      "thread" code when we don't have threads.
1151  */
1152 int thread_pool_init(int spawn_flag)
1153 {
1154         return 0;
1155 }
1156
1157 /*
1158  *      call "radrespond".
1159  */
1160 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1161 {
1162         rad_respond(request, fun);
1163         return 1;
1164 }
1165
1166 #endif /* HAVE_PTHREAD_H */