Cleaned up fork code (again) so that when a module exec's a program,
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/autoconf.h>
25
26 #include <stdlib.h>
27 #include <string.h>
28
29 /*
30  *      Other OS's have sem_init, OS X doesn't.
31  */
32 #ifndef DARWIN
33 #include <semaphore.h>
34 #else
35 #include <mach/task.h>
36 #include <mach/semaphore.h>
37
38 #undef sem_t
39 #define sem_t semaphore_t
40 #undef sem_init
41 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
42 #undef sem_wait
43 #define sem_wait(s) semaphore_wait(*s)
44 #undef sem_post
45 #define sem_post(s) semaphore_signal(*s)
46 #endif
47
48 #include <signal.h>
49
50 #ifdef HAVE_SYS_WAIT_H
51 #include <sys/wait.h>
52 #endif
53
54 #include <freeradius-devel/radiusd.h>
55 #include <freeradius-devel/rad_assert.h>
56 #include <freeradius-devel/conffile.h>
57 #include <freeradius-devel/modules.h>
58
59 static const char rcsid[] =
60 "$Id$";
61
62 #ifdef HAVE_PTHREAD_H
63
64 #ifdef HAVE_OPENSSL_CRYPTO_H
65 #include <openssl/crypto.h>
66 #endif
67 #ifdef HAVE_OPENSSL_ERR_H
68 #include <openssl/err.h>
69 #endif
70
71 #define SEMAPHORE_LOCKED        (0)
72 #define SEMAPHORE_UNLOCKED      (1)
73
74 #define THREAD_RUNNING          (1)
75 #define THREAD_CANCELLED        (2)
76 #define THREAD_EXITED           (3)
77
78 #define NUM_FIFOS               (2)
79
80 /*
81  *     Ordered this way because we prefer proxy, then ongoing, then
82  *     start.
83  */
84 #define FIFO_START   (1)
85 #define FIFO_PROXY   (0)
86
87 /*
88  *  A data structure which contains the information about
89  *  the current thread.
90  *
91  *  pthread_id     pthread id
92  *  thread_num     server thread number, 1...number of threads
93  *  semaphore     used to block the thread until a request comes in
94  *  status        is the thread running or exited?
95  *  request_count the number of requests that this thread has handled
96  *  timestamp     when the thread started executing.
97  */
98 typedef struct THREAD_HANDLE {
99         struct THREAD_HANDLE *prev;
100         struct THREAD_HANDLE *next;
101         pthread_t            pthread_id;
102         int                  thread_num;
103         int                  status;
104         unsigned int         request_count;
105         time_t               timestamp;
106         REQUEST              *request;
107 } THREAD_HANDLE;
108
109 /*
110  *      For the request queue.
111  */
112 typedef struct request_queue_t {
113         REQUEST           *request;
114         RAD_REQUEST_FUNP  fun;
115 } request_queue_t;
116
117 typedef struct thread_fork_t {
118         pid_t           pid;
119         int             status;
120         int             exited;
121 } thread_fork_t;
122
123
124 /*
125  *      A data structure to manage the thread pool.  There's no real
126  *      need for a data structure, but it makes things conceptually
127  *      easier.
128  */
129 typedef struct THREAD_POOL {
130         THREAD_HANDLE *head;
131         THREAD_HANDLE *tail;
132
133         int total_threads;
134         int active_threads;     /* protected by queue_mutex */
135         int max_thread_num;
136         int start_threads;
137         int max_threads;
138         int min_spare_threads;
139         int max_spare_threads;
140         unsigned int max_requests_per_thread;
141         unsigned long request_count;
142         time_t time_last_spawned;
143         int cleanup_delay;
144         int spawn_flag;
145
146         pthread_mutex_t wait_mutex;
147         lrad_hash_table_t *waiters;
148
149         /*
150          *      All threads wait on this semaphore, for requests
151          *      to enter the queue.
152          */
153         sem_t           semaphore;
154
155         /*
156          *      To ensure only one thread at a time touches the queue.
157          */
158         pthread_mutex_t queue_mutex;
159
160         int             max_queue_size;
161         int             num_queued;
162         int             fifo_state;
163         lrad_fifo_t     *fifo[NUM_FIFOS];
164 } THREAD_POOL;
165
166 static THREAD_POOL thread_pool;
167 static int pool_initialized = FALSE;
168
169
170 /*
171  *      A mapping of configuration file names to internal integers
172  */
173 static const CONF_PARSER thread_config[] = {
174         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
175         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
176         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
177         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
178         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
179         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
180         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
181         { NULL, -1, 0, NULL, NULL }
182 };
183
184
185 #ifdef HAVE_OPENSSL_CRYPTO_H
186
187 /*
188  *      If we're linking against OpenSSL, then it is the
189  *      duty of the application, if it is multithreaded,
190  *      to provide OpenSSL with appropriate thread id
191  *      and mutex locking functions
192  *
193  *      Note: this only implements static callbacks.
194  *      OpenSSL does not use dynamic locking callbacks
195  *      right now, but may in the futiure, so we will have
196  *      to add them at some point.
197  */
198
199 static pthread_mutex_t *ssl_mutexes = NULL;
200
201 static unsigned long ssl_id_function(void)
202 {
203         return (unsigned long) pthread_self();
204 }
205
206 static void ssl_locking_function(int mode, int n, const char *file, int line)
207 {
208         file = file;            /* -Wunused */
209         line = line;            /* -Wunused */
210
211         if (mode & CRYPTO_LOCK) {
212                 pthread_mutex_lock(&(ssl_mutexes[n]));
213         } else {
214                 pthread_mutex_unlock(&(ssl_mutexes[n]));
215         }
216 }
217
218 static int setup_ssl_mutexes(void)
219 {
220         int i;
221
222         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
223         if (!ssl_mutexes) {
224                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
225                 return 0;
226         }
227
228         for (i = 0; i < CRYPTO_num_locks(); i++) {
229                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
230         }
231
232         CRYPTO_set_id_callback(ssl_id_function);
233         CRYPTO_set_locking_callback(ssl_locking_function);
234
235         return 1;
236 }
237 #endif
238
239
240 /*
241  *      We don't want to catch SIGCHLD for a host of reasons.
242  *
243  *      - exec_wait means that someone, somewhere, somewhen, will
244  *      call waitpid(), and catch the child.
245  *
246  *      - SIGCHLD is delivered to a random thread, not the one that
247  *      forked.
248  *
249  *      - if another thread catches the child, we have to coordinate
250  *      with the thread doing the waiting.
251  *
252  *      - if we don't waitpid() for non-wait children, they'll be zombies,
253  *      and will hang around forever.
254  *
255  */
256 static void reap_children(void)
257 {
258         pid_t pid;
259         int status;
260         thread_fork_t mytf, *tf;
261
262         if (lrad_hash_table_num_elements(thread_pool.waiters) == 0) return;
263
264         pthread_mutex_lock(&thread_pool.wait_mutex);
265
266         while (1) {
267                 pid = waitpid(0, &status, WNOHANG);
268                 if (pid <= 0) break;
269
270                 mytf.pid = pid;
271                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
272                 if (!tf) continue;
273                 
274                 tf->status = status;
275                 tf->exited = 1;
276         }
277
278         pthread_mutex_unlock(&thread_pool.wait_mutex);
279 }
280
281 /*
282  *      Add a request to the list of waiting requests.
283  *      This function gets called ONLY from the main handler thread...
284  *
285  *      This function should never fail.
286  */
287 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
288 {
289         int fifo = FIFO_START;
290         request_queue_t *entry;
291
292         pthread_mutex_lock(&thread_pool.queue_mutex);
293
294         thread_pool.request_count++;
295
296         /*
297          *      FIXME: Handle proxy replies separately?
298          */
299         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
300                 pthread_mutex_unlock(&thread_pool.queue_mutex);
301                 
302                 /*
303                  *      Mark the request as done.
304                  */
305                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
306                 request->finished = TRUE;
307                 return 0;
308         }
309
310         /*
311          *      Requests get handled in priority.  First, we handle
312          *      replies from a home server, to finish ongoing requests.
313          *
314          *      Then, we handle requests with State, to finish
315          *      multi-packet transactions.
316          *
317          *      Finally, we handle new requests.
318          */
319         if (request->proxy_reply) {
320                 fifo = FIFO_PROXY;
321         } else {
322                 fifo = FIFO_START;
323         }
324
325         entry = rad_malloc(sizeof(*entry));
326         entry->request = request;
327         entry->fun = fun;
328
329         if (!lrad_fifo_push(thread_pool.fifo[fifo], entry)) {
330                 pthread_mutex_unlock(&thread_pool.queue_mutex);
331                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
332                 request->finished = TRUE;
333                 return 0;
334         }
335
336         thread_pool.num_queued++;
337
338         pthread_mutex_unlock(&thread_pool.queue_mutex);
339
340         /*
341          *      There's one more request in the queue.
342          *
343          *      Note that we're not touching the queue any more, so
344          *      the semaphore post is outside of the mutex.  This also
345          *      means that when the thread wakes up and tries to lock
346          *      the mutex, it will be unlocked, and there won't be
347          *      contention.
348          */
349         sem_post(&thread_pool.semaphore);
350
351         return 1;
352 }
353
354 /*
355  *      Remove a request from the queue.
356  */
357 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
358 {
359         int fifo_state;
360         request_queue_t *entry;
361
362         reap_children();
363
364         pthread_mutex_lock(&thread_pool.queue_mutex);
365
366         fifo_state = thread_pool.fifo_state;
367
368  retry:
369         do {
370                 /*
371                  *      Pop an entry from the current queue, and go to
372                  *      the next queue.
373                  */
374                 entry = lrad_fifo_pop(thread_pool.fifo[fifo_state]);
375                 fifo_state++;
376                 if (fifo_state >= NUM_FIFOS) fifo_state = 0;
377         } while ((fifo_state != thread_pool.fifo_state) && !entry);
378
379         if (!entry) {
380                 pthread_mutex_unlock(&thread_pool.queue_mutex);
381                 *request = NULL;
382                 *fun = NULL;
383                 return 0;
384         }
385
386         rad_assert(thread_pool.num_queued > 0);
387         thread_pool.num_queued--;
388         *request = entry->request;
389         *fun = entry->fun;
390         free(entry);
391
392         rad_assert(*request != NULL);
393         rad_assert((*request)->magic == REQUEST_MAGIC);
394         rad_assert(*fun != NULL);
395
396         /*
397          *      If the request has sat in the queue for too long,
398          *      kill it.
399          *
400          *      The main clean-up code won't delete the request from
401          *      the request list, until it's marked "finished"
402          */
403         if ((*request)->options & RAD_REQUEST_OPTION_STOP_NOW) {
404                 (*request)->finished = 1;
405                 goto retry;
406         }
407
408         /*
409          *      The thread is currently processing a request.
410          */
411         thread_pool.active_threads++;
412         thread_pool.fifo_state = fifo_state;
413
414         pthread_mutex_unlock(&thread_pool.queue_mutex);
415
416         /*
417          *      If the request is currently being processed, then that
418          *      MAY be OK, if it's a proxy reply.  In that case,
419          *      sending the packet may result in a reply being
420          *      received before that thread clears the child_pid.
421          *
422          *      In that case, we busy-wait for the request to be free.
423          *
424          *      We COULD push it onto the queue and try to grab
425          *      another request, but what if this is the only request?
426          *      What if there are multiple such packets with race
427          *      conditions?  We don't want to thrash the queue...
428          *
429          *      This busy-wait is less than optimal, but it's simple,
430          *      fail-safe, and it works.
431          */
432         if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
433                 int count, ok;
434                 struct timeval tv;
435 #ifdef HAVE_PTHREAD_SIGMASK
436                 sigset_t set, old_set;
437
438                 /*
439                  *      Block a large number of signals which could
440                  *      cause the select to return EINTR
441                  */
442                 sigemptyset(&set);
443                 sigaddset(&set, SIGPIPE);
444                 sigaddset(&set, SIGCONT);
445                 sigaddset(&set, SIGSTOP);
446                 sigaddset(&set, SIGCHLD);
447                 pthread_sigmask(SIG_BLOCK, &set, &old_set);
448 #endif
449
450                 rad_assert((*request)->proxy_reply != NULL);
451
452                 ok = FALSE;
453
454                 /*
455                  *      Sleep for 100 milliseconds.  If the other thread
456                  *      doesn't get serviced in this time, to clear
457                  *      the "child_pid" entry, then the server is too
458                  *      busy, so we die.
459                  */
460                 for (count = 0; count < 10; count++) {
461                         tv.tv_sec = 0;
462                         tv.tv_usec = 10000; /* sleep for 10 milliseconds */
463
464                         /*
465                          *      Portable sleep that's thread-safe.
466                          *
467                          *      Don't worry about interrupts, as they're
468                          *      blocked above.
469                          */
470                         select(0, NULL, NULL, NULL, &tv);
471                         if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
472                                 ok = TRUE;
473                                 break;
474                         }
475                 }
476
477 #ifdef HAVE_PTHREAD_SIGMASK
478                 /*
479                  *      Restore the original thread signal mask.
480                  */
481                 pthread_sigmask(SIG_SETMASK, &old_set, NULL);
482 #endif
483
484                 if (!ok) {
485                         radlog(L_ERR, "FATAL!  Server is too busy to process requests");
486                         exit(1);
487                 }
488         }
489
490         return 1;
491 }
492
493
494 /*
495  *      The main thread handler for requests.
496  *
497  *      Wait on the semaphore until we have it, and process the request.
498  */
499 static void *request_handler_thread(void *arg)
500 {
501         RAD_REQUEST_FUNP  fun;
502         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
503 #ifdef HAVE_PTHREAD_SIGMASK
504         sigset_t set;
505
506         /*
507          *      Block SIGHUP handling for the child threads.
508          *
509          *      This ensures that only the main server thread will
510          *      process HUP signals.
511          *
512          *      If we don't have sigprocmask, then it shouldn't be
513          *      a problem, either, as the sig_hup handler should check
514          *      for this condition.
515          */
516         sigemptyset(&set);
517         sigaddset(&set, SIGHUP);
518         sigaddset(&set, SIGINT);
519         sigaddset(&set, SIGQUIT);
520         sigaddset(&set, SIGTERM);
521         pthread_sigmask(SIG_BLOCK, &set, NULL);
522 #endif
523
524         /*
525          *      Loop forever, until told to exit.
526          */
527         do {
528                 /*
529                  *      Wait to be signalled.
530                  */
531                 DEBUG2("Thread %d waiting to be assigned a request",
532                        self->thread_num);
533         re_wait:
534                 if (sem_wait(&thread_pool.semaphore) != 0) {
535                         /*
536                          *      Interrupted system call.  Go back to
537                          *      waiting, but DON'T print out any more
538                          *      text.
539                          */
540                         if (errno == EINTR) {
541                                 DEBUG2("Re-wait %d", self->thread_num);
542                                 goto re_wait;
543                         }
544                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
545                                self->thread_num, strerror(errno));
546                         break;
547                 }
548
549                 DEBUG2("Thread %d got semaphore", self->thread_num);
550
551                 /*
552                  *      Try to grab a request from the queue.
553                  *
554                  *      It may be empty, in which case we fail
555                  *      gracefully.
556                  */
557                 if (!request_dequeue(&self->request, &fun)) continue;
558
559                 self->request->child_pid = self->pthread_id;
560                 self->request_count++;
561
562                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
563                        self->thread_num, self->request->number,
564                        self->request_count);
565
566                 /*
567                  *      Respond, and reset request->child_pid
568                  */
569                 rad_respond(self->request, fun);
570                 self->request = NULL;
571
572                 /*
573                  *      Update the active threads.
574                  */
575                 pthread_mutex_lock(&thread_pool.queue_mutex);
576                 rad_assert(thread_pool.active_threads > 0);
577                 thread_pool.active_threads--;
578                 pthread_mutex_unlock(&thread_pool.queue_mutex);
579         } while (self->status != THREAD_CANCELLED);
580
581         DEBUG2("Thread %d exiting...", self->thread_num);
582
583 #ifdef HAVE_OPENSSL_ERR_H
584         /*
585          *      If we linked with OpenSSL, the application
586          *      must remove the thread's error queue before
587          *      exiting to prevent memory leaks.
588          */
589         ERR_remove_state(0);
590 #endif
591
592         /*
593          *  Do this as the LAST thing before exiting.
594          */
595         self->status = THREAD_EXITED;
596
597         return NULL;
598 }
599
600 /*
601  *      Take a THREAD_HANDLE, delete it from the thread pool and
602  *      free its resources.
603  *
604  *      This function is called ONLY from the main server thread,
605  *      ONLY after the thread has exited.
606  */
607 static void delete_thread(THREAD_HANDLE *handle)
608 {
609         THREAD_HANDLE *prev;
610         THREAD_HANDLE *next;
611
612         rad_assert(handle->request == NULL);
613
614         DEBUG2("Deleting thread %d", handle->thread_num);
615
616         prev = handle->prev;
617         next = handle->next;
618         rad_assert(thread_pool.total_threads > 0);
619         thread_pool.total_threads--;
620
621         /*
622          *      Remove the handle from the list.
623          */
624         if (prev == NULL) {
625                 rad_assert(thread_pool.head == handle);
626                 thread_pool.head = next;
627         } else {
628                 prev->next = next;
629         }
630
631         if (next == NULL) {
632                 rad_assert(thread_pool.tail == handle);
633                 thread_pool.tail = prev;
634         } else {
635                 next->prev = prev;
636         }
637
638         /*
639          *      Free the handle, now that it's no longer referencable.
640          */
641         free(handle);
642 }
643
644
645 /*
646  *      Spawn a new thread, and place it in the thread pool.
647  *
648  *      The thread is started initially in the blocked state, waiting
649  *      for the semaphore.
650  */
651 static THREAD_HANDLE *spawn_thread(time_t now)
652 {
653         int rcode;
654         THREAD_HANDLE *handle;
655         pthread_attr_t attr;
656
657         /*
658          *      Ensure that we don't spawn too many threads.
659          */
660         if (thread_pool.total_threads >= thread_pool.max_threads) {
661                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
662                 return NULL;
663         }
664
665         /*
666          *      Allocate a new thread handle.
667          */
668         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
669         memset(handle, 0, sizeof(THREAD_HANDLE));
670         handle->prev = NULL;
671         handle->next = NULL;
672         handle->pthread_id = NO_SUCH_CHILD_PID;
673         handle->thread_num = thread_pool.max_thread_num++;
674         handle->request_count = 0;
675         handle->status = THREAD_RUNNING;
676         handle->timestamp = time(NULL);
677
678         /*
679          *      Initialize the thread's attributes to detached.
680          *
681          *      We could call pthread_detach() later, but if the thread
682          *      exits between the create & detach calls, it will need to
683          *      be joined, which will never happen.
684          */
685         pthread_attr_init(&attr);
686         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
687
688         /*
689          *      Create the thread detached, so that it cleans up it's
690          *      own memory when it exits.
691          *
692          *      Note that the function returns non-zero on error, NOT
693          *      -1.  The return code is the error, and errno isn't set.
694          */
695         rcode = pthread_create(&handle->pthread_id, &attr,
696                         request_handler_thread, handle);
697         if (rcode != 0) {
698                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
699                        strerror(rcode));
700                 exit(1);
701         }
702         pthread_attr_destroy(&attr);
703
704         /*
705          *      One more thread to go into the list.
706          */
707         thread_pool.total_threads++;
708         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
709                         handle->thread_num, thread_pool.total_threads);
710
711         /*
712          *      Add the thread handle to the tail of the thread pool list.
713          */
714         if (thread_pool.tail) {
715                 thread_pool.tail->next = handle;
716                 handle->prev = thread_pool.tail;
717                 thread_pool.tail = handle;
718         } else {
719                 rad_assert(thread_pool.head == NULL);
720                 thread_pool.head = thread_pool.tail = handle;
721         }
722
723         /*
724          *      Update the time we last spawned a thread.
725          */
726         thread_pool.time_last_spawned = now;
727
728         /*
729          *      And return the new handle to the caller.
730          */
731         return handle;
732 }
733
734 /*
735  *      Temporary function to prevent server from executing a SIGHUP
736  *      until all threads are finished handling requests.  This returns
737  *      the number of active threads to 'radiusd.c'.
738  */
739 int total_active_threads(void)
740 {
741         /*
742          *      We don't acquire the mutex, so this is just an estimate.
743          *      We can't return with the lock held, so there's no point
744          *      in getting the guaranteed correct value; by the time
745          *      the caller sees it, it can be wrong again.
746          */
747         return thread_pool.active_threads;
748 }
749
750
751 static uint32_t pid_hash(const void *data)
752 {
753         const thread_fork_t *tf = data;
754
755         return lrad_hash(&tf->pid, sizeof(tf->pid));
756 }
757
758 static int pid_cmp(const void *one, const void *two)
759 {
760         const thread_fork_t *a = one;
761         const thread_fork_t *b = two;
762
763         return (a->pid - b->pid);
764 }
765
766 /*
767  *      Allocate the thread pool, and seed it with an initial number
768  *      of threads.
769  *
770  *      FIXME: What to do on a SIGHUP???
771  */
772 int thread_pool_init(int spawn_flag)
773 {
774         int             i, rcode;
775         CONF_SECTION    *pool_cf;
776         time_t          now;
777
778         DEBUG("Initializing the thread pool...");
779         now = time(NULL);
780
781         /*
782          *      After a SIGHUP, we don't over-write the previous values.
783          */
784         if (!pool_initialized) {
785                 /*
786                  *      Initialize the thread pool to some reasonable values.
787                  */
788                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
789                 thread_pool.head = NULL;
790                 thread_pool.tail = NULL;
791                 thread_pool.total_threads = 0;
792                 thread_pool.max_thread_num = 1;
793                 thread_pool.cleanup_delay = 5;
794                 thread_pool.spawn_flag = spawn_flag;
795
796                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
797                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
798                                strerror(errno));
799                         exit(1);
800                 }               
801                 
802                 /*
803                  *      Create the hash table of child PID's
804                  */
805                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
806                                                              pid_cmp,
807                                                              free);
808                 if (!thread_pool.waiters) {
809                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
810                         exit(1);
811                 }
812         }
813
814         /*
815          *      We're not spawning new threads, don't do
816          *      anything.
817          */
818         if (!spawn_flag) return 0;
819
820         pool_cf = cf_section_find("thread");
821         if (pool_cf != NULL) {
822                 /*
823                  *      FIXME: Check for errors?
824                  */
825                 cf_section_parse(pool_cf, NULL, thread_config);
826         }
827
828         /*
829          *      The pool has already been initialized.  Don't spawn
830          *      new threads, and don't forget about forked children,
831          */
832         if (pool_initialized) {
833                 return 0;
834         }
835
836         /*
837          *      Initialize the queue of requests.
838          */
839         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
840         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
841         if (rcode != 0) {
842                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
843                        strerror(errno));
844                 exit(1);
845         }
846
847         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
848         if (rcode != 0) {
849                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
850                        strerror(errno));
851                 exit(1);
852         }
853
854         /*
855          *      Allocate multiple fifos.
856          */
857         for (i = 0; i < NUM_FIFOS; i++) {
858                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
859                 if (!thread_pool.fifo[i]) {
860                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
861                         exit(1);
862                 }
863         }
864
865 #ifdef HAVE_OPENSSL_CRYPTO_H
866         /*
867          *      If we're linking with OpenSSL too, then we need
868          *      to set up the mutexes and enable the thread callbacks.
869          */
870         if (!setup_ssl_mutexes()) {
871                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
872                 exit(1);
873         }
874 #endif
875
876
877         /*
878          *      Create a number of waiting threads.
879          *
880          *      If we fail while creating them, do something intelligent.
881          */
882         for (i = 0; i < thread_pool.start_threads; i++) {
883                 if (spawn_thread(now) == NULL) {
884                         return -1;
885                 }
886         }
887
888         DEBUG2("Thread pool initialized");
889         pool_initialized = TRUE;
890         return 0;
891 }
892
893
894 /*
895  *      Assign a new request to a free thread.
896  *
897  *      If there isn't a free thread, then try to create a new one,
898  *      up to the configured limits.
899  */
900 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
901 {
902         /*
903          *      We've been told not to spawn threads, so don't.
904          */
905         if (!thread_pool.spawn_flag) {
906                 rad_respond(request, fun);
907                 return 1;
908         }
909
910         /*
911          *      Add the new request to the queue.
912          */
913         if (!request_enqueue(request, fun)) return 0;
914
915         /*
916          *      If the thread pool is busy handling requests, then
917          *      try to spawn another one.  We don't acquire the mutex
918          *      before reading active_threads, so our thread count is
919          *      just an estimate.  It's fine to go ahead and spawn an
920          *      extra thread in that case.
921          *      NOTE: the log message may be in error since active_threads
922          *      is an estimate, but it's only in error about the thread
923          *      count, not about the fact that we can't create a new one.
924          */
925         if (thread_pool.active_threads == thread_pool.total_threads) {
926                 if (spawn_thread(request->timestamp) == NULL) {
927                         radlog(L_INFO,
928                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
929                                thread_pool.max_threads);
930                         return 1;
931                 }
932         }
933
934         return 1;
935 }
936
937 /*
938  *      Check the min_spare_threads and max_spare_threads.
939  *
940  *      If there are too many or too few threads waiting, then we
941  *      either create some more, or delete some.
942  */
943 int thread_pool_clean(time_t now)
944 {
945         int spare;
946         int i, total;
947         THREAD_HANDLE *handle, *next;
948         int active_threads;
949         static time_t last_cleaned = 0;
950
951         /*
952          *      Loop over the thread pool deleting exited threads.
953          */
954         for (handle = thread_pool.head; handle; handle = next) {
955                 next = handle->next;
956
957                 /*
958                  *      Maybe we've asked the thread to exit, and it
959                  *      has agreed.
960                  */
961                 if (handle->status == THREAD_EXITED) {
962                         delete_thread(handle);
963                 }
964         }
965
966         /*
967          *      We don't need a mutex lock here, as we're reading
968          *      active_threads, and not modifying it.  We want a close
969          *      approximation of the number of active threads, and this
970          *      is good enough.
971          */
972         active_threads = thread_pool.active_threads;
973         spare = thread_pool.total_threads - active_threads;
974         if (debug_flag) {
975                 static int old_total = -1;
976                 static int old_active = -1;
977
978                 if ((old_total != thread_pool.total_threads) ||
979                                 (old_active != active_threads)) {
980                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
981                                         thread_pool.total_threads, active_threads, spare);
982                         old_total = thread_pool.total_threads;
983                         old_active = active_threads;
984                 }
985         }
986
987         /*
988          *      If there are too few spare threads, create some more.
989          */
990         if (spare < thread_pool.min_spare_threads) {
991                 total = thread_pool.min_spare_threads - spare;
992
993                 DEBUG2("Threads: Spawning %d spares", total);
994                 /*
995                  *      Create a number of spare threads.
996                  */
997                 for (i = 0; i < total; i++) {
998                         handle = spawn_thread(now);
999                         if (handle == NULL) {
1000                                 return -1;
1001                         }
1002                 }
1003
1004                 /*
1005                  *      And exit, as there can't be too many spare threads.
1006                  */
1007                 return 0;
1008         }
1009
1010         /*
1011          *      Only delete spare threads if we haven't already done
1012          *      so this second.
1013          */
1014         if (now == last_cleaned) {
1015                 return 0;
1016         }
1017         last_cleaned = now;
1018
1019         /*
1020          *      Only delete the spare threads if sufficient time has
1021          *      passed since we last created one.  This helps to minimize
1022          *      the amount of create/delete cycles.
1023          */
1024         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
1025                 return 0;
1026         }
1027
1028         /*
1029          *      If there are too many spare threads, delete one.
1030          *
1031          *      Note that we only delete ONE at a time, instead of
1032          *      wiping out many.  This allows the excess servers to
1033          *      be slowly reaped, just in case the load spike comes again.
1034          */
1035         if (spare > thread_pool.max_spare_threads) {
1036
1037                 spare -= thread_pool.max_spare_threads;
1038
1039                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1040
1041                 /*
1042                  *      Walk through the thread pool, deleting the
1043                  *      first idle thread we come across.
1044                  */
1045                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1046                         next = handle->next;
1047
1048                         /*
1049                          *      If the thread is not handling a
1050                          *      request, but still live, then tell it
1051                          *      to exit.
1052                          *
1053                          *      It will eventually wake up, and realize
1054                          *      it's been told to commit suicide.
1055                          */
1056                         if ((handle->request == NULL) &&
1057                             (handle->status == THREAD_RUNNING)) {
1058                                 handle->status = THREAD_CANCELLED;
1059                                 /*
1060                                  *      Post an extra semaphore, as a
1061                                  *      signal to wake up, and exit.
1062                                  */
1063                                 sem_post(&thread_pool.semaphore);
1064                                 spare--;
1065                                 break;
1066                         }
1067                 }
1068         }
1069
1070         /*
1071          *      If the thread has handled too many requests, then make it
1072          *      exit.
1073          */
1074         if (thread_pool.max_requests_per_thread > 0) {
1075                 for (handle = thread_pool.head; handle; handle = next) {
1076                         next = handle->next;
1077
1078                         /*
1079                          *      Not handling a request, but otherwise
1080                          *      live, we can kill it.
1081                          */
1082                         if ((handle->request == NULL) &&
1083                             (handle->status == THREAD_RUNNING) &&
1084                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1085                                 handle->status = THREAD_CANCELLED;
1086                                 sem_post(&thread_pool.semaphore);
1087                         }
1088                 }
1089         }
1090
1091         /*
1092          *      Otherwise everything's kosher.  There are not too few,
1093          *      or too many spare threads.  Exit happily.
1094          */
1095         return 0;
1096 }
1097
1098
1099 /*
1100  *      Thread wrapper for fork().
1101  */
1102 pid_t rad_fork(int exec_wait)
1103 {
1104         pid_t child_pid;
1105
1106         if (exec_wait) return fork();
1107
1108         reap_children();        /* be nice to non-wait thingies */
1109
1110         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1111                 return -1;
1112         }
1113
1114         /*
1115          *      Fork & save the PID for later reaping.
1116          */
1117         child_pid = fork();
1118         if (child_pid > 0) {
1119                 int rcode;
1120                 thread_fork_t *tf;
1121
1122                 tf = rad_malloc(sizeof(*tf));
1123                 memset(tf, 0, sizeof(*tf));
1124                 
1125                 tf->pid = child_pid;
1126
1127                 /*
1128                  *      Lock the mutex.
1129                  */
1130                 pthread_mutex_lock(&thread_pool.wait_mutex);
1131
1132                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1133
1134                 /*
1135                  *      Unlock the mutex.
1136                  */
1137                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1138
1139                 if (!rcode) {
1140                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1141                                (int) child_pid);
1142                 }
1143         }
1144
1145         /*
1146          *      Return whatever we were told.
1147          */
1148         return child_pid;
1149 }
1150
1151 /*
1152  *      We may not need this any more...
1153  */
1154 pid_t rad_waitpid(pid_t pid, int *status, int options)
1155 {
1156         thread_fork_t mytf, *tf;
1157
1158         reap_children();        /* be nice to non-wait thingies */
1159
1160         if (pid <= 0) return -1;
1161
1162         if ((options & WNOHANG) == 0) return -1;
1163
1164         mytf.pid = pid;
1165
1166         pthread_mutex_lock(&thread_pool.wait_mutex);
1167         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1168
1169         if (!tf) {              /* not found.  It's a problem... */
1170                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1171                 return waitpid(pid, status, options);
1172         }
1173
1174         if (tf->exited) {
1175                 *status = tf->status;
1176                 lrad_hash_table_delete(thread_pool.waiters, &mytf);
1177                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1178                 return pid;
1179         }
1180         
1181         /*
1182          *      Don't wait, and it hasn't exited.  Return.
1183          */
1184         pthread_mutex_unlock(&thread_pool.wait_mutex);
1185         return 0;
1186 }
1187
1188 #else /* HAVE_PTHREAD_H */
1189 /*
1190  *      "thread" code when we don't have threads.
1191  */
1192 int thread_pool_init(int spawn_flag)
1193 {
1194         return 0;
1195 }
1196
1197 /*
1198  *      call "radrespond".
1199  */
1200 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1201 {
1202         rad_respond(request, fun);
1203         return 1;
1204 }
1205
1206 #endif /* HAVE_PTHREAD_H */