Made it 2.0.0, and removed the changes that are in 1.1.x, as
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/autoconf.h>
25
26 #include <stdlib.h>
27 #include <string.h>
28
29 /*
30  *      Other OS's have sem_init, OS X doesn't.
31  */
32 #ifndef DARWIN
33 #include <semaphore.h>
34 #else
35 #include <mach/task.h>
36 #include <mach/semaphore.h>
37
38 #undef sem_t
39 #define sem_t semaphore_t
40 #undef sem_init
41 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
42 #undef sem_wait
43 #define sem_wait(s) semaphore_wait(*s)
44 #undef sem_post
45 #define sem_post(s) semaphore_signal(*s)
46 #endif
47
48 #include <signal.h>
49
50 #ifdef HAVE_SYS_WAIT_H
51 #include <sys/wait.h>
52 #endif
53
54 #include <freeradius-devel/radiusd.h>
55 #include <freeradius-devel/rad_assert.h>
56 #include <freeradius-devel/conffile.h>
57 #include <freeradius-devel/modules.h>
58
59 static const char rcsid[] =
60 "$Id$";
61
62 #ifdef HAVE_PTHREAD_H
63
64 #ifdef HAVE_OPENSSL_CRYPTO_H
65 #include <openssl/crypto.h>
66 #endif
67 #ifdef HAVE_OPENSSL_ERR_H
68 #include <openssl/err.h>
69 #endif
70
71 #define SEMAPHORE_LOCKED        (0)
72 #define SEMAPHORE_UNLOCKED      (1)
73
74 #define THREAD_RUNNING          (1)
75 #define THREAD_CANCELLED        (2)
76 #define THREAD_EXITED           (3)
77
78 #define NUM_FIFOS               (2)
79
80 /*
81  *     Ordered this way because we prefer proxy, then ongoing, then
82  *     start.
83  */
84 #define FIFO_START   (1)
85 #define FIFO_PROXY   (0)
86
87 /*
88  *  A data structure which contains the information about
89  *  the current thread.
90  *
91  *  pthread_id     pthread id
92  *  thread_num     server thread number, 1...number of threads
93  *  semaphore     used to block the thread until a request comes in
94  *  status        is the thread running or exited?
95  *  request_count the number of requests that this thread has handled
96  *  timestamp     when the thread started executing.
97  */
98 typedef struct THREAD_HANDLE {
99         struct THREAD_HANDLE *prev;
100         struct THREAD_HANDLE *next;
101         pthread_t            pthread_id;
102         int                  thread_num;
103         int                  status;
104         unsigned int         request_count;
105         time_t               timestamp;
106         REQUEST              *request;
107 } THREAD_HANDLE;
108
109 /*
110  *      For the request queue.
111  */
112 typedef struct request_queue_t {
113         REQUEST           *request;
114         RAD_REQUEST_FUNP  fun;
115 } request_queue_t;
116
117
118 /*
119  *      A data structure to manage the thread pool.  There's no real
120  *      need for a data structure, but it makes things conceptually
121  *      easier.
122  */
123 typedef struct THREAD_POOL {
124         THREAD_HANDLE *head;
125         THREAD_HANDLE *tail;
126
127         int total_threads;
128         int active_threads;
129         int max_thread_num;
130         int start_threads;
131         int max_threads;
132         int min_spare_threads;
133         int max_spare_threads;
134         unsigned int max_requests_per_thread;
135         unsigned long request_count;
136         time_t time_last_spawned;
137         int cleanup_delay;
138         int spawn_flag;
139
140         pthread_mutex_t wait_mutex;
141         lrad_hash_table_t *waiters;
142
143         /*
144          *      All threads wait on this semaphore, for requests
145          *      to enter the queue.
146          */
147         sem_t           semaphore;
148
149         /*
150          *      To ensure only one thread at a time touches the queue.
151          */
152         pthread_mutex_t queue_mutex;
153
154         int             max_queue_size;
155         int             num_queued;
156         int             fifo_state;
157         lrad_fifo_t     *fifo[NUM_FIFOS];
158 } THREAD_POOL;
159
160 static THREAD_POOL thread_pool;
161 static int pool_initialized = FALSE;
162
163
164 /*
165  *      A mapping of configuration file names to internal integers
166  */
167 static const CONF_PARSER thread_config[] = {
168         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
169         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
170         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
171         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
172         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
173         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
174         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
175         { NULL, -1, 0, NULL, NULL }
176 };
177
178
179 #ifdef HAVE_OPENSSL_CRYPTO_H
180
181 /*
182  *      If we're linking against OpenSSL, then it is the
183  *      duty of the application, if it is multithreaded,
184  *      to provide OpenSSL with appropriate thread id
185  *      and mutex locking functions
186  *
187  *      Note: this only implements static callbacks.
188  *      OpenSSL does not use dynamic locking callbacks
189  *      right now, but may in the futiure, so we will have
190  *      to add them at some point.
191  */
192
193 static pthread_mutex_t *ssl_mutexes = NULL;
194
195 static unsigned long ssl_id_function(void)
196 {
197         return (unsigned long) pthread_self();
198 }
199
200 static void ssl_locking_function(int mode, int n, const char *file, int line)
201 {
202         file = file;            /* -Wunused */
203         line = line;            /* -Wunused */
204
205         if (mode & CRYPTO_LOCK) {
206                 pthread_mutex_lock(&(ssl_mutexes[n]));
207         } else {
208                 pthread_mutex_unlock(&(ssl_mutexes[n]));
209         }
210 }
211
212 static int setup_ssl_mutexes(void)
213 {
214         int i;
215
216         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
217         if (!ssl_mutexes) {
218                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
219                 return 0;
220         }
221
222         for (i = 0; i < CRYPTO_num_locks(); i++) {
223                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
224         }
225
226         CRYPTO_set_id_callback(ssl_id_function);
227         CRYPTO_set_locking_callback(ssl_locking_function);
228
229         return 1;
230 }
231 #endif
232
233
234 /*
235  *      Callback for reaping
236  */
237 static int reap_callback(void *ctx, void *data)
238 {
239         pid_t pid = *(pid_t *) data;
240         lrad_hash_table_t *ht = ctx;
241
242         /*
243          *      Child is still alive, do nothing.
244          */
245         if (waitpid(pid, NULL, WNOHANG) == 0) return 0;
246
247         /*
248          *      Else no child, or was already reaped
249          */
250
251         lrad_hash_table_delete(ht, pid);
252
253         return 0;
254 }
255
256
257 /*
258  *      We don't want to catch SIGCHLD for a host of reasons.
259  *
260  *      - exec_wait means that someone, somewhere, somewhen, will
261  *      call waitpid(), and catch the child.
262  *
263  *      - SIGCHLD is delivered to a random thread, not the one that
264  *      forked.
265  *
266  *      - if another thread catches the child, we have to coordinate
267  *      with the thread doing the waiting.
268  *
269  *      - if we don't waitpid() for non-wait children, they'll be zombies,
270  *      and will hang around forever.
271  *
272  */
273 static void reap_children(void)
274 {
275         if (lrad_hash_table_num_elements(thread_pool.waiters) == 0) return;
276
277         pthread_mutex_lock(&thread_pool.wait_mutex);
278
279         lrad_hash_table_walk(thread_pool.waiters,
280                              reap_callback, thread_pool.waiters);
281
282         pthread_mutex_unlock(&thread_pool.wait_mutex);
283 }
284
285 /*
286  *      Add a request to the list of waiting requests.
287  *      This function gets called ONLY from the main handler thread...
288  *
289  *      This function should never fail.
290  */
291 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
292 {
293         int fifo = FIFO_START;
294         request_queue_t *entry;
295
296         pthread_mutex_lock(&thread_pool.queue_mutex);
297
298         thread_pool.request_count++;
299
300         /*
301          *      FIXME: Handle proxy replies separately?
302          */
303         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
304                 pthread_mutex_unlock(&thread_pool.queue_mutex);
305                 
306                 /*
307                  *      Mark the request as done.
308                  */
309                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
310                 request->finished = TRUE;
311                 return 0;
312         }
313
314         /*
315          *      Requests get handled in priority.  First, we handle
316          *      replies from a home server, to finish ongoing requests.
317          *
318          *      Then, we handle requests with State, to finish
319          *      multi-packet transactions.
320          *
321          *      Finally, we handle new requests.
322          */
323         if (request->proxy_reply) {
324                 fifo = FIFO_PROXY;
325         } else {
326                 fifo = FIFO_START;
327         }
328
329         entry = rad_malloc(sizeof(*entry));
330         entry->request = request;
331         entry->fun = fun;
332
333         if (!lrad_fifo_push(thread_pool.fifo[fifo], entry)) {
334                 pthread_mutex_unlock(&thread_pool.queue_mutex);
335                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
336                 request->finished = TRUE;
337                 return 0;
338         }
339
340         thread_pool.num_queued++;
341
342         pthread_mutex_unlock(&thread_pool.queue_mutex);
343
344         /*
345          *      There's one more request in the queue.
346          *
347          *      Note that we're not touching the queue any more, so
348          *      the semaphore post is outside of the mutex.  This also
349          *      means that when the thread wakes up and tries to lock
350          *      the mutex, it will be unlocked, and there won't be
351          *      contention.
352          */
353         sem_post(&thread_pool.semaphore);
354
355         return 1;
356 }
357
358 /*
359  *      Remove a request from the queue.
360  */
361 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
362 {
363         int fifo_state;
364         request_queue_t *entry;
365
366         reap_children();
367
368         pthread_mutex_lock(&thread_pool.queue_mutex);
369
370         fifo_state = thread_pool.fifo_state;
371
372  retry:
373         do {
374                 /*
375                  *      Pop an entry from the current queue, and go to
376                  *      the next queue.
377                  */
378                 entry = lrad_fifo_pop(thread_pool.fifo[fifo_state]);
379                 fifo_state++;
380                 if (fifo_state >= NUM_FIFOS) fifo_state = 0;
381         } while ((fifo_state != thread_pool.fifo_state) && !entry);
382
383         if (!entry) {
384                 pthread_mutex_unlock(&thread_pool.queue_mutex);
385                 *request = NULL;
386                 *fun = NULL;
387                 return 0;
388         }
389
390         rad_assert(thread_pool.num_queued > 0);
391         thread_pool.num_queued--;
392         *request = entry->request;
393         *fun = entry->fun;
394         free(entry);
395
396         rad_assert(*request != NULL);
397         rad_assert((*request)->magic == REQUEST_MAGIC);
398         rad_assert(*fun != NULL);
399
400         /*
401          *      If the request has sat in the queue for too long,
402          *      kill it.
403          *
404          *      The main clean-up code won't delete the request from
405          *      the request list, until it's marked "finished"
406          */
407         if ((*request)->options & RAD_REQUEST_OPTION_STOP_NOW) {
408                 (*request)->finished = 1;
409                 goto retry;
410         }
411
412         /*
413          *      The thread is currently processing a request.
414          */
415         thread_pool.active_threads++;
416         thread_pool.fifo_state = fifo_state;
417
418         pthread_mutex_unlock(&thread_pool.queue_mutex);
419
420         /*
421          *      If the request is currently being processed, then that
422          *      MAY be OK, if it's a proxy reply.  In that case,
423          *      sending the packet may result in a reply being
424          *      received before that thread clears the child_pid.
425          *
426          *      In that case, we busy-wait for the request to be free.
427          *
428          *      We COULD push it onto the queue and try to grab
429          *      another request, but what if this is the only request?
430          *      What if there are multiple such packets with race
431          *      conditions?  We don't want to thrash the queue...
432          *
433          *      This busy-wait is less than optimal, but it's simple,
434          *      fail-safe, and it works.
435          */
436         if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
437                 int count, ok;
438                 struct timeval tv;
439 #ifdef HAVE_PTHREAD_SIGMASK
440                 sigset_t set, old_set;
441
442                 /*
443                  *      Block a large number of signals which could
444                  *      cause the select to return EINTR
445                  */
446                 sigemptyset(&set);
447                 sigaddset(&set, SIGPIPE);
448                 sigaddset(&set, SIGCONT);
449                 sigaddset(&set, SIGSTOP);
450                 sigaddset(&set, SIGCHLD);
451                 pthread_sigmask(SIG_BLOCK, &set, &old_set);
452 #endif
453
454                 rad_assert((*request)->proxy_reply != NULL);
455
456                 ok = FALSE;
457
458                 /*
459                  *      Sleep for 100 milliseconds.  If the other thread
460                  *      doesn't get serviced in this time, to clear
461                  *      the "child_pid" entry, then the server is too
462                  *      busy, so we die.
463                  */
464                 for (count = 0; count < 10; count++) {
465                         tv.tv_sec = 0;
466                         tv.tv_usec = 10000; /* sleep for 10 milliseconds */
467
468                         /*
469                          *      Portable sleep that's thread-safe.
470                          *
471                          *      Don't worry about interrupts, as they're
472                          *      blocked above.
473                          */
474                         select(0, NULL, NULL, NULL, &tv);
475                         if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
476                                 ok = TRUE;
477                                 break;
478                         }
479                 }
480
481 #ifdef HAVE_PTHREAD_SIGMASK
482                 /*
483                  *      Restore the original thread signal mask.
484                  */
485                 pthread_sigmask(SIG_SETMASK, &old_set, NULL);
486 #endif
487
488                 if (!ok) {
489                         radlog(L_ERR, "FATAL!  Server is too busy to process requests");
490                         exit(1);
491                 }
492         }
493
494         return 1;
495 }
496
497
498 /*
499  *      The main thread handler for requests.
500  *
501  *      Wait on the semaphore until we have it, and process the request.
502  */
503 static void *request_handler_thread(void *arg)
504 {
505         RAD_REQUEST_FUNP  fun;
506         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
507 #ifdef HAVE_PTHREAD_SIGMASK
508         sigset_t set;
509
510         /*
511          *      Block SIGHUP handling for the child threads.
512          *
513          *      This ensures that only the main server thread will
514          *      process HUP signals.
515          *
516          *      If we don't have sigprocmask, then it shouldn't be
517          *      a problem, either, as the sig_hup handler should check
518          *      for this condition.
519          */
520         sigemptyset(&set);
521         sigaddset(&set, SIGHUP);
522         sigaddset(&set, SIGINT);
523         sigaddset(&set, SIGQUIT);
524         sigaddset(&set, SIGTERM);
525         pthread_sigmask(SIG_BLOCK, &set, NULL);
526 #endif
527
528         /*
529          *      Loop forever, until told to exit.
530          */
531         do {
532                 /*
533                  *      Wait to be signalled.
534                  */
535                 DEBUG2("Thread %d waiting to be assigned a request",
536                        self->thread_num);
537         re_wait:
538                 if (sem_wait(&thread_pool.semaphore) != 0) {
539                         /*
540                          *      Interrupted system call.  Go back to
541                          *      waiting, but DON'T print out any more
542                          *      text.
543                          */
544                         if (errno == EINTR) {
545                                 DEBUG2("Re-wait %d", self->thread_num);
546                                 goto re_wait;
547                         }
548                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
549                                self->thread_num, strerror(errno));
550                         break;
551                 }
552
553                 DEBUG2("Thread %d got semaphore", self->thread_num);
554
555                 /*
556                  *      Try to grab a request from the queue.
557                  *
558                  *      It may be empty, in which case we fail
559                  *      gracefully.
560                  */
561                 if (!request_dequeue(&self->request, &fun)) continue;
562
563                 self->request->child_pid = self->pthread_id;
564                 self->request_count++;
565
566                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
567                        self->thread_num, self->request->number,
568                        self->request_count);
569
570                 /*
571                  *      Respond, and reset request->child_pid
572                  */
573                 rad_respond(self->request, fun);
574                 self->request = NULL;
575
576                 /*
577                  *      Update the active threads.
578                  */
579                 pthread_mutex_lock(&thread_pool.queue_mutex);
580                 rad_assert(thread_pool.active_threads > 0);
581                 thread_pool.active_threads--;
582                 pthread_mutex_unlock(&thread_pool.queue_mutex);
583         } while (self->status != THREAD_CANCELLED);
584
585         DEBUG2("Thread %d exiting...", self->thread_num);
586
587 #ifdef HAVE_OPENSSL_ERR_H
588         /*
589          *      If we linked with OpenSSL, the application
590          *      must remove the thread's error queue before
591          *      exiting to prevent memory leaks.
592          */
593         ERR_remove_state(0);
594 #endif
595
596         /*
597          *  Do this as the LAST thing before exiting.
598          */
599         self->status = THREAD_EXITED;
600
601         return NULL;
602 }
603
604 /*
605  *      Take a THREAD_HANDLE, and delete it from the thread pool.
606  *
607  *      This function is called ONLY from the main server thread.
608  */
609 static void delete_thread(THREAD_HANDLE *handle)
610 {
611         THREAD_HANDLE *prev;
612         THREAD_HANDLE *next;
613
614         rad_assert(handle->request == NULL);
615
616         prev = handle->prev;
617         next = handle->next;
618         rad_assert(thread_pool.total_threads > 0);
619         thread_pool.total_threads--;
620
621         /*
622          *      Remove the handle from the list.
623          */
624         if (prev == NULL) {
625                 rad_assert(thread_pool.head == handle);
626                 thread_pool.head = next;
627         } else {
628                 prev->next = next;
629         }
630
631         if (next == NULL) {
632                 rad_assert(thread_pool.tail == handle);
633                 thread_pool.tail = prev;
634         } else {
635                 next->prev = prev;
636         }
637
638         DEBUG2("Deleting thread %d", handle->thread_num);
639
640         /*
641          *      This thread has exited.  Delete any additional
642          *      resources associated with it.
643          */
644
645         /*
646          *      Free the memory, now that we're sure the thread
647          *      exited.
648          */
649         free(handle);
650 }
651
652
653 /*
654  *      Spawn a new thread, and place it in the thread pool.
655  *
656  *      The thread is started initially in the blocked state, waiting
657  *      for the semaphore.
658  */
659 static THREAD_HANDLE *spawn_thread(time_t now)
660 {
661         int rcode;
662         THREAD_HANDLE *handle;
663         pthread_attr_t attr;
664
665         /*
666          *      Ensure that we don't spawn too many threads.
667          */
668         if (thread_pool.total_threads >= thread_pool.max_threads) {
669                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
670                 return NULL;
671         }
672
673         /*
674          *      Allocate a new thread handle.
675          */
676         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
677         memset(handle, 0, sizeof(THREAD_HANDLE));
678         handle->prev = NULL;
679         handle->next = NULL;
680         handle->pthread_id = NO_SUCH_CHILD_PID;
681         handle->thread_num = thread_pool.max_thread_num++;
682         handle->request_count = 0;
683         handle->status = THREAD_RUNNING;
684         handle->timestamp = time(NULL);
685
686         /*
687          *      Initialize the thread's attributes to detached.
688          *
689          *      We could call pthread_detach() later, but if the thread
690          *      exits between the create & detach calls, it will need to
691          *      be joined, which will never happen.
692          */
693         pthread_attr_init(&attr);
694         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
695
696         /*
697          *      Create the thread detached, so that it cleans up it's
698          *      own memory when it exits.
699          *
700          *      Note that the function returns non-zero on error, NOT
701          *      -1.  The return code is the error, and errno isn't set.
702          */
703         rcode = pthread_create(&handle->pthread_id, &attr,
704                         request_handler_thread, handle);
705         if (rcode != 0) {
706                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
707                        strerror(rcode));
708                 exit(1);
709         }
710         pthread_attr_destroy(&attr);
711
712         /*
713          *      One more thread to go into the list.
714          */
715         thread_pool.total_threads++;
716         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
717                         handle->thread_num, thread_pool.total_threads);
718
719         /*
720          *      Add the thread handle to the tail of the thread pool list.
721          */
722         if (thread_pool.tail) {
723                 thread_pool.tail->next = handle;
724                 handle->prev = thread_pool.tail;
725                 thread_pool.tail = handle;
726         } else {
727                 rad_assert(thread_pool.head == NULL);
728                 thread_pool.head = thread_pool.tail = handle;
729         }
730
731         /*
732          *      Update the time we last spawned a thread.
733          */
734         thread_pool.time_last_spawned = now;
735
736         /*
737          *      And return the new handle to the caller.
738          */
739         return handle;
740 }
741
742 /*
743  *      Temporary function to prevent server from executing a SIGHUP
744  *      until all threads are finished handling requests.  This returns
745  *      the number of active threads to 'radiusd.c'.
746  */
747 int total_active_threads(void)
748 {
749         int rcode = 0;
750         THREAD_HANDLE *handle;
751
752         for (handle = thread_pool.head; handle != NULL; handle = handle->next){
753                 if (handle->request != NULL) {
754                         rcode ++;
755                 }
756         }
757         return (rcode);
758 }
759
760
761 /*
762  *      Allocate the thread pool, and seed it with an initial number
763  *      of threads.
764  *
765  *      FIXME: What to do on a SIGHUP???
766  */
767 int thread_pool_init(int spawn_flag)
768 {
769         int             i, rcode;
770         CONF_SECTION    *pool_cf;
771         time_t          now;
772
773         DEBUG("Initializing the thread pool...");
774         now = time(NULL);
775
776         /*
777          *      After a SIGHUP, we don't over-write the previous values.
778          */
779         if (!pool_initialized) {
780                 /*
781                  *      Initialize the thread pool to some reasonable values.
782                  */
783                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
784                 thread_pool.head = NULL;
785                 thread_pool.tail = NULL;
786                 thread_pool.total_threads = 0;
787                 thread_pool.max_thread_num = 1;
788                 thread_pool.cleanup_delay = 5;
789                 thread_pool.spawn_flag = spawn_flag;
790
791                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
792                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
793                                strerror(errno));
794                         exit(1);
795                 }               
796                 
797                 /*
798                  *      Create the hash table of child PID's
799                  */
800                 thread_pool.waiters = lrad_hash_table_create(8, NULL, 0);
801                 if (!thread_pool.waiters) {
802                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
803                         exit(1);
804                 }
805                 lrad_hash_table_set_data_size(thread_pool.waiters,
806                                               sizeof(pid_t));
807         }
808
809         /*
810          *      We're not spawning new threads, don't do
811          *      anything.
812          */
813         if (!spawn_flag) return 0;
814
815         pool_cf = cf_section_find("thread");
816         if (pool_cf != NULL) {
817                 /*
818                  *      FIXME: Check for errors?
819                  */
820                 cf_section_parse(pool_cf, NULL, thread_config);
821         }
822
823         /*
824          *      The pool has already been initialized.  Don't spawn
825          *      new threads, and don't forget about forked children,
826          */
827         if (pool_initialized) {
828                 return 0;
829         }
830
831         /*
832          *      Initialize the queue of requests.
833          */
834         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
835         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
836         if (rcode != 0) {
837                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
838                        strerror(errno));
839                 exit(1);
840         }
841
842         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
843         if (rcode != 0) {
844                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
845                        strerror(errno));
846                 exit(1);
847         }
848
849         /*
850          *      Allocate multiple fifos.
851          */
852         for (i = 0; i < NUM_FIFOS; i++) {
853                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
854                 if (!thread_pool.fifo[i]) {
855                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
856                         exit(1);
857                 }
858         }
859
860 #ifdef HAVE_OPENSSL_CRYPTO_H
861         /*
862          *      If we're linking with OpenSSL too, then we need
863          *      to set up the mutexes and enable the thread callbacks.
864          */
865         if (!setup_ssl_mutexes()) {
866                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
867                 exit(1);
868         }
869 #endif
870
871
872         /*
873          *      Create a number of waiting threads.
874          *
875          *      If we fail while creating them, do something intelligent.
876          */
877         for (i = 0; i < thread_pool.start_threads; i++) {
878                 if (spawn_thread(now) == NULL) {
879                         return -1;
880                 }
881         }
882
883         DEBUG2("Thread pool initialized");
884         pool_initialized = TRUE;
885         return 0;
886 }
887
888
889 /*
890  *      Assign a new request to a free thread.
891  *
892  *      If there isn't a free thread, then try to create a new one,
893  *      up to the configured limits.
894  */
895 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
896 {
897         /*
898          *      We've been told not to spawn threads, so don't.
899          */
900         if (!thread_pool.spawn_flag) {
901                 rad_respond(request, fun);
902                 return 1;
903         }
904
905         /*
906          *      Add the new request to the queue.
907          */
908         if (!request_enqueue(request, fun)) return 0;
909
910         /*
911          *      If the thread pool is busy handling requests, then
912          *      try to spawn another one.
913          */
914         if (thread_pool.active_threads == thread_pool.total_threads) {
915                 if (spawn_thread(request->timestamp) == NULL) {
916                         radlog(L_INFO,
917                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
918                                thread_pool.max_threads);
919                         return 1;
920                 }
921         }
922
923         return 1;
924 }
925
926 /*
927  *      Check the min_spare_threads and max_spare_threads.
928  *
929  *      If there are too many or too few threads waiting, then we
930  *      either create some more, or delete some.
931  */
932 int thread_pool_clean(time_t now)
933 {
934         int spare;
935         int i, total;
936         THREAD_HANDLE *handle, *next;
937         int active_threads;
938         static time_t last_cleaned = 0;
939
940         /*
941          *      Loop over the thread pool deleting exited threads.
942          */
943         for (handle = thread_pool.head; handle; handle = next) {
944                 next = handle->next;
945
946                 /*
947                  *      Maybe we've asked the thread to exit, and it
948                  *      has agreed.
949                  */
950                 if (handle->status == THREAD_EXITED) {
951                         delete_thread(handle);
952                 }
953         }
954
955         /*
956          *      We don't need a mutex lock here, as we're reading
957          *      the location, and not modifying it.  We want a close
958          *      approximation of the number of active threads, and this
959          *      is good enough.
960          */
961         active_threads = thread_pool.active_threads;
962         spare = thread_pool.total_threads - active_threads;
963         if (debug_flag) {
964                 static int old_total = -1;
965                 static int old_active = -1;
966
967                 if ((old_total != thread_pool.total_threads) ||
968                                 (old_active != active_threads)) {
969                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
970                                         thread_pool.total_threads, active_threads, spare);
971                         old_total = thread_pool.total_threads;
972                         old_active = active_threads;
973                 }
974         }
975
976         /*
977          *      If there are too few spare threads, create some more.
978          */
979         if (spare < thread_pool.min_spare_threads) {
980                 total = thread_pool.min_spare_threads - spare;
981
982                 DEBUG2("Threads: Spawning %d spares", total);
983                 /*
984                  *      Create a number of spare threads.
985                  */
986                 for (i = 0; i < total; i++) {
987                         handle = spawn_thread(now);
988                         if (handle == NULL) {
989                                 return -1;
990                         }
991                 }
992
993                 /*
994                  *      And exit, as there can't be too many spare threads.
995                  */
996                 return 0;
997         }
998
999         /*
1000          *      Only delete spare threads if we haven't already done
1001          *      so this second.
1002          */
1003         if (now == last_cleaned) {
1004                 return 0;
1005         }
1006         last_cleaned = now;
1007
1008         /*
1009          *      Only delete the spare threads if sufficient time has
1010          *      passed since we last created one.  This helps to minimize
1011          *      the amount of create/delete cycles.
1012          */
1013         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
1014                 return 0;
1015         }
1016
1017         /*
1018          *      If there are too many spare threads, delete one.
1019          *
1020          *      Note that we only delete ONE at a time, instead of
1021          *      wiping out many.  This allows the excess servers to
1022          *      be slowly reaped, just in case the load spike comes again.
1023          */
1024         if (spare > thread_pool.max_spare_threads) {
1025
1026                 spare -= thread_pool.max_spare_threads;
1027
1028                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1029
1030                 /*
1031                  *      Walk through the thread pool, deleting the
1032                  *      first idle thread we come across.
1033                  */
1034                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1035                         next = handle->next;
1036
1037                         /*
1038                          *      If the thread is not handling a
1039                          *      request, but still live, then tell it
1040                          *      to exit.
1041                          *
1042                          *      It will eventually wake up, and realize
1043                          *      it's been told to commit suicide.
1044                          */
1045                         if ((handle->request == NULL) &&
1046                             (handle->status == THREAD_RUNNING)) {
1047                                 handle->status = THREAD_CANCELLED;
1048                                 /*
1049                                  *      Post an extra semaphore, as a
1050                                  *      signal to wake up, and exit.
1051                                  */
1052                                 sem_post(&thread_pool.semaphore);
1053                                 spare--;
1054                                 break;
1055                         }
1056                 }
1057         }
1058
1059         /*
1060          *      If the thread has handled too many requests, then make it
1061          *      exit.
1062          */
1063         if (thread_pool.max_requests_per_thread > 0) {
1064                 for (handle = thread_pool.head; handle; handle = next) {
1065                         next = handle->next;
1066
1067                         /*
1068                          *      Not handling a request, but otherwise
1069                          *      live, we can kill it.
1070                          */
1071                         if ((handle->request == NULL) &&
1072                             (handle->status == THREAD_RUNNING) &&
1073                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1074                                 handle->status = THREAD_CANCELLED;
1075                                 sem_post(&thread_pool.semaphore);
1076                         }
1077                 }
1078         }
1079
1080         /*
1081          *      Otherwise everything's kosher.  There are not too few,
1082          *      or too many spare threads.  Exit happily.
1083          */
1084         return 0;
1085 }
1086
1087
1088 /*
1089  *      Thread wrapper for fork().
1090  */
1091 pid_t rad_fork(int exec_wait)
1092 {
1093         pid_t child_pid;
1094
1095         if (exec_wait) return fork();
1096
1097         reap_children();        /* be nice to non-wait thingies */
1098
1099         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1100                 return -1;
1101         }
1102
1103         /*
1104          *      Fork & save the PID for later reaping.
1105          */
1106         child_pid = fork();
1107         if (child_pid > 0) {
1108                 int rcode;
1109
1110                 /*
1111                  *      Lock the mutex.
1112                  */
1113                 pthread_mutex_lock(&thread_pool.wait_mutex);
1114
1115                 rcode = lrad_hash_table_insert(thread_pool.waiters,
1116                                                child_pid, &child_pid);
1117                 
1118                 /*
1119                  *      Unlock the mutex.
1120                  */
1121                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1122
1123                 if (!rcode) {
1124                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1125                                (int) child_pid);
1126                 }
1127         }
1128
1129         /*
1130          *      Return whatever we were told.
1131          */
1132         return child_pid;
1133 }
1134
1135 /*
1136  *      We may not need this any more...
1137  */
1138 pid_t rad_waitpid(pid_t pid, int *status, int options)
1139 {
1140         reap_children();        /* be nice to non-wait thingies */
1141         return waitpid(pid, status, options);
1142         
1143 }
1144
1145 #else /* HAVE_PTHREAD_H */
1146 /*
1147  *      "thread" code when we don't have threads.
1148  */
1149 int thread_pool_init(int spawn_flag)
1150 {
1151         return 0;
1152 }
1153
1154 /*
1155  *      call "radrespond".
1156  */
1157 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1158 {
1159         rad_respond(request, fun);
1160         return 1;
1161 }
1162
1163 #endif /* HAVE_PTHREAD_H */