import from HEAD:
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include "libradius.h"
25
26 #ifdef HAVE_PTHREAD_H
27
28 #include <stdlib.h>
29 #include <string.h>
30
31 /*
32  *      Other OS's have sem_init, OS X doesn't.
33  */
34 #ifndef DARWIN
35 #include <semaphore.h>
36 #else
37 #include <mach/task.h>
38 #include <mach/semaphore.h>
39
40 #undef sem_t
41 #define sem_t semaphore_t
42 #undef sem_init
43 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
44 #undef sem_wait
45 #define sem_wait(s) semaphore_wait(*s)
46 #undef sem_post
47 #define sem_post(s) semaphore_signal(*s)
48 #endif
49
50 #include <signal.h>
51
52 #ifdef HAVE_SYS_WAIT_H
53 #include <sys/wait.h>
54 #endif
55
56 #include "radiusd.h"
57 #include "rad_assert.h"
58 #include "conffile.h"
59
60 static const char rcsid[] =
61 "$Id$";
62
63 #ifdef HAVE_OPENSSL_CRYPTO_H
64 #include <openssl/crypto.h>
65 #endif
66 #ifdef HAVE_OPENSSL_ERR_H
67 #include <openssl/err.h>
68 #endif
69
70 #define SEMAPHORE_LOCKED        (0)
71 #define SEMAPHORE_UNLOCKED      (1)
72
73 #define THREAD_RUNNING          (1)
74 #define THREAD_CANCELLED        (2)
75 #define THREAD_EXITED           (3)
76
77 /*
78  *  A data structure which contains the information about
79  *  the current thread.
80  *
81  *  pthread_id     pthread id
82  *  thread_num     server thread number, 1...number of threads
83  *  semaphore     used to block the thread until a request comes in
84  *  status        is the thread running or exited?
85  *  request_count the number of requests that this thread has handled
86  *  timestamp     when the thread started executing.
87  */
88 typedef struct THREAD_HANDLE {
89         struct THREAD_HANDLE *prev;
90         struct THREAD_HANDLE *next;
91         pthread_t            pthread_id;
92         int                  thread_num;
93         int                  status;
94         unsigned int         request_count;
95         time_t               timestamp;
96         REQUEST              *request;
97 } THREAD_HANDLE;
98
99 /*
100  *      For the request queue.
101  */
102 typedef struct request_queue_t {
103         REQUEST           *request;
104         RAD_REQUEST_FUNP  fun;
105 } request_queue_t;
106
107
108 typedef struct thread_fork_t {
109         pid_t           pid;
110         int             status;
111         int             exited;
112 } thread_fork_t;
113
114
115 /*
116  *      A data structure to manage the thread pool.  There's no real
117  *      need for a data structure, but it makes things conceptually
118  *      easier.
119  */
120 typedef struct THREAD_POOL {
121         THREAD_HANDLE *head;
122         THREAD_HANDLE *tail;
123
124         int total_threads;
125         int max_thread_num;
126         int start_threads;
127         int max_threads;
128         int min_spare_threads;
129         int max_spare_threads;
130         unsigned int max_requests_per_thread;
131         unsigned long request_count;
132         time_t time_last_spawned;
133         int cleanup_delay;
134
135         /*
136          *      If threaded, we have to pay more attention to
137          *      child PID's when we fork...
138          */
139         pthread_mutex_t wait_mutex;
140         lrad_hash_table_t *waiters;
141
142         /*
143          *      All threads wait on this semaphore, for requests
144          *      to enter the queue.
145          */
146         sem_t           semaphore;
147
148         /*
149          *      To ensure only one thread at a time touches the queue.
150          */
151         pthread_mutex_t mutex;
152
153         int             active_threads;
154         int             queue_head; /* first filled entry */
155         int             queue_tail; /* first empty entry */
156         int             queue_size;
157         request_queue_t *queue;
158 } THREAD_POOL;
159
160 static THREAD_POOL thread_pool;
161 static int pool_initialized = FALSE;
162
163
164 /*
165  *      A mapping of configuration file names to internal integers
166  */
167 static const CONF_PARSER thread_config[] = {
168         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
169         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
170         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
171         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
172         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
173         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
174         { NULL, -1, 0, NULL, NULL }
175 };
176
177
178 #ifdef HAVE_OPENSSL_CRYPTO_H
179
180 /*
181  *      If we're linking against OpenSSL, then it is the
182  *      duty of the application, if it is multithreaded,
183  *      to provide OpenSSL with appropriate thread id
184  *      and mutex locking functions
185  *
186  *      Note: this only implements static callbacks.
187  *      OpenSSL does not use dynamic locking callbacks
188  *      right now, but may in the futiure, so we will have
189  *      to add them at some point.
190  */
191
192 static  pthread_mutex_t *ssl_mutexes = NULL;
193
194
195 static unsigned long ssl_id_function (void) {
196         return (unsigned long)pthread_self();
197 }
198
199 static void ssl_locking_function (int mode, int n, const char *file, int line) {
200         if (mode & CRYPTO_LOCK) {
201                 pthread_mutex_lock(&(ssl_mutexes[n]));
202         } else {
203                 pthread_mutex_unlock(&(ssl_mutexes[n]));
204         }
205 }
206
207 static int setup_ssl_mutexes (void) {
208         int i;
209
210         ssl_mutexes = (pthread_mutex_t *)rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
211         if(!ssl_mutexes) {
212                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
213                 return 0;
214         }
215
216         for(i = 0; i < CRYPTO_num_locks(); i++) {
217                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
218         }
219
220         CRYPTO_set_id_callback(ssl_id_function);
221         CRYPTO_set_locking_callback(ssl_locking_function);
222
223         return 1;
224 }
225
226 #endif
227
228
229 /*
230  *      We don't want to catch SIGCHLD for a host of reasons.
231  *
232  *      - exec_wait means that someone, somewhere, somewhen, will
233  *      call waitpid(), and catch the child.
234  *
235  *      - SIGCHLD is delivered to a random thread, not the one that
236  *      forked.
237  *
238  *      - if another thread catches the child, we have to coordinate
239  *      with the thread doing the waiting.
240  *
241  *      - if we don't waitpid() for non-wait children, they'll be zombies,
242  *      and will hang around forever.
243  *
244  */
245 static void reap_children(void)
246 {
247         pid_t pid;
248         int status;
249         thread_fork_t mytf, *tf;
250
251         pthread_mutex_lock(&thread_pool.wait_mutex);
252
253         do {
254                 pid = waitpid(0, &status, WNOHANG);
255                 if (pid <= 0) break;
256
257                 mytf.pid = pid;
258                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
259                 if (!tf) continue;
260                 
261                 tf->status = status;
262                 tf->exited = 1;
263         } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
264
265         pthread_mutex_unlock(&thread_pool.wait_mutex);
266 }
267
268
269 /*
270  *      Add a request to the list of waiting requests.
271  *      This function gets called ONLY from the main handler thread...
272  *
273  *      This function should never fail.
274  */
275 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
276 {
277         int num_entries;
278
279         pthread_mutex_lock(&thread_pool.mutex);
280
281         thread_pool.request_count++;
282
283         /*
284          *      If the queue is empty, re-set the indices to zero,
285          *      for no particular reason...
286          */
287         if ((thread_pool.queue_head == thread_pool.queue_tail) &&
288             (thread_pool.queue_head != 0)) {
289                 thread_pool.queue_head = thread_pool.queue_tail = 0;
290         }
291
292         /*
293          *      If the queue is full, die.
294          *
295          *      The math is to take into account the fact that it's a
296          *      circular queue.
297          */
298         num_entries = ((thread_pool.queue_tail + thread_pool.queue_size) -
299                        thread_pool.queue_head) % thread_pool.queue_size;
300         if (num_entries == (thread_pool.queue_size - 1)) {
301                 int i;
302                 request_queue_t *new_queue;
303
304                 /*
305                  *      If the queue becomes larger than 65536,
306                  *      there's a serious problem.
307                  */
308                 if (thread_pool.queue_size >= 65536) {
309                         pthread_mutex_unlock(&thread_pool.mutex);
310
311                         /*
312                          *      Mark the request as done.
313                          */
314                         radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
315                         request->finished = TRUE;
316                         return 0;
317                 }
318
319                 /*
320                  *      Malloc a new queue, doubled in size, copy the
321                  *      data from the current queue over to it, zero
322                  *      out the second half of the queue, free the old
323                  *      one, and replace thread_pool.queue with the
324                  *      new one.
325                  */
326                 new_queue = rad_malloc(sizeof(*new_queue) * thread_pool.queue_size * 2);
327                 /*
328                  *      Copy the queue element by element
329                  */
330                 for (i = 0; i < thread_pool.queue_size; i++) {
331                         new_queue[i] = thread_pool.queue[(i + thread_pool.queue_head) % thread_pool.queue_size];
332                 }
333                 memset(new_queue + thread_pool.queue_size,
334                        0, sizeof(*new_queue) * thread_pool.queue_size);
335
336                 free(thread_pool.queue);
337                 thread_pool.queue = new_queue;
338                 thread_pool.queue_tail = ((thread_pool.queue_tail + thread_pool.queue_size) - thread_pool.queue_head) % thread_pool.queue_size;
339                 thread_pool.queue_head = 0;
340                 thread_pool.queue_size *= 2;
341         }
342
343         /*
344          *      Add the data to the queue tail, increment the tail,
345          *      and signal the semaphore that there's another request
346          *      in the queue.
347          */
348         thread_pool.queue[thread_pool.queue_tail].request = request;
349         thread_pool.queue[thread_pool.queue_tail].fun = fun;
350         thread_pool.queue_tail++;
351         thread_pool.queue_tail &= (thread_pool.queue_size - 1);
352
353         pthread_mutex_unlock(&thread_pool.mutex);
354
355         /*
356          *      There's one more request in the queue.
357          *
358          *      Note that we're not touching the queue any more, so
359          *      the semaphore post is outside of the mutex.  This also
360          *      means that when the thread wakes up and tries to lock
361          *      the mutex, it will be unlocked, and there won't be
362          *      contention.
363          */
364
365         sem_post(&thread_pool.semaphore);
366
367         return 1;
368 }
369
370 /*
371  *      Remove a request from the queue.
372  */
373 static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
374 {
375         reap_children();
376
377         pthread_mutex_lock(&thread_pool.mutex);
378
379         /*
380          *      Head & tail are the same.  There's nothing in
381          *      the queue.
382          */
383         if (thread_pool.queue_head == thread_pool.queue_tail) {
384                 pthread_mutex_unlock(&thread_pool.mutex);
385                 *request = NULL;
386                 *fun = NULL;
387                 return;
388         }
389
390         *request = thread_pool.queue[thread_pool.queue_head].request;
391         *fun = thread_pool.queue[thread_pool.queue_head].fun;
392
393         rad_assert(*request != NULL);
394         rad_assert((*request)->magic == REQUEST_MAGIC);
395         rad_assert(*fun != NULL);
396
397         thread_pool.queue_head++;
398         thread_pool.queue_head &= (thread_pool.queue_size - 1);
399
400         /*
401          *      FIXME: Check the request timestamp.  If it's more than
402          *      "clean_delay" seconds old, then discard the request,
403          *      log an error, and try to de-queue another request.
404          *
405          *      The main clean-up code won't delete the request from
406          *      the request list, because it's not marked "finished"
407          */
408
409         /*
410          *      The thread is currently processing a request.
411          */
412         thread_pool.active_threads++;
413
414         pthread_mutex_unlock(&thread_pool.mutex);
415
416         /*
417          *      If the request is currently being processed, then that
418          *      MAY be OK, if it's a proxy reply.  In that case,
419          *      sending the packet may result in a reply being
420          *      received before that thread clears the child_pid.
421          *
422          *      In that case, we busy-wait for the request to be free.
423          *
424          *      We COULD push it onto the queue and try to grab
425          *      another request, but what if this is the only request?
426          *      What if there are multiple such packets with race
427          *      conditions?  We don't want to thrash the queue...
428          *
429          *      This busy-wait is less than optimal, but it's simple,
430          *      fail-safe, and it works.
431          */
432         if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
433                 int count, ok;
434                 struct timeval tv;
435 #ifdef HAVE_PTHREAD_SIGMASK
436                 sigset_t set, old_set;
437
438                 /*
439                  *      Block a large number of signals which could
440                  *      cause the select to return EINTR
441                  */
442                 sigemptyset(&set);
443                 sigaddset(&set, SIGPIPE);
444                 sigaddset(&set, SIGCONT);
445                 sigaddset(&set, SIGSTOP);
446                 sigaddset(&set, SIGCHLD);
447                 pthread_sigmask(SIG_BLOCK, &set, &old_set);
448 #endif
449
450                 rad_assert((*request)->proxy_reply != NULL);
451
452                 ok = FALSE;
453
454                 /*
455                  *      Sleep for 100 milliseconds.  If the other thread
456                  *      doesn't get serviced in this time, to clear
457                  *      the "child_pid" entry, then the server is too
458                  *      busy, so we die.
459                  */
460                 for (count = 0; count < 10; count++) {
461                         tv.tv_sec = 0;
462                         tv.tv_usec = 10000; /* sleep for 10 milliseconds */
463
464                         /*
465                          *      Portable sleep that's thread-safe.
466                          *
467                          *      Don't worry about interrupts, as they're
468                          *      blocked above.
469                          */
470                         select(0, NULL, NULL, NULL, &tv);
471                         if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
472                                 ok = TRUE;
473                                 break;
474                         }
475                 }
476
477 #ifdef HAVE_PTHREAD_SIGMASK
478                 /*
479                  *      Restore the original thread signal mask.
480                  */
481                 pthread_sigmask(SIG_SETMASK, &old_set, NULL);
482 #endif
483
484                 if (!ok) {
485                         radlog(L_ERR, "FATAL!  Server is too busy to process requests");
486                         exit(1);
487                 }
488         }
489
490         return;
491 }
492
493
494 /*
495  *      The main thread handler for requests.
496  *
497  *      Wait on the semaphore until we have it, and process the request.
498  */
499 static void *request_handler_thread(void *arg)
500 {
501         RAD_REQUEST_FUNP  fun;
502         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
503 #ifdef HAVE_PTHREAD_SIGMASK
504         sigset_t set;
505
506         /*
507          *      Block SIGHUP handling for the child threads.
508          *
509          *      This ensures that only the main server thread will
510          *      process HUP signals.
511          *
512          *      If we don't have sigprocmask, then it shouldn't be
513          *      a problem, either, as the sig_hup handler should check
514          *      for this condition.
515          */
516         sigemptyset(&set);
517         sigaddset(&set, SIGHUP);
518         sigaddset(&set, SIGINT);
519         sigaddset(&set, SIGQUIT);
520         sigaddset(&set, SIGTERM);
521         pthread_sigmask(SIG_BLOCK, &set, NULL);
522 #endif
523
524         /*
525          *      Loop forever, until told to exit.
526          */
527         do {
528                 /*
529                  *      Wait to be signalled.
530                  */
531                 DEBUG2("Thread %d waiting to be assigned a request",
532                        self->thread_num);
533         re_wait:
534                 if (sem_wait(&thread_pool.semaphore) != 0) {
535                         /*
536                          *      Interrupted system call.  Go back to
537                          *      waiting, but DON'T print out any more
538                          *      text.
539                          */
540                         if (errno == EINTR) {
541                                 DEBUG2("Re-wait %d", self->thread_num);
542                                 goto re_wait;
543                         }
544                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
545                                self->thread_num, strerror(errno));
546                         break;
547                 }
548
549                 DEBUG2("Thread %d got semaphore", self->thread_num);
550
551                 /*
552                  *      Try to grab a request from the queue.
553                  *
554                  *      It may be empty, in which case we fail
555                  *      gracefully.
556                  */
557                 request_dequeue(&self->request, &fun);
558                 if (!self->request) continue;
559
560                 self->request->child_pid = self->pthread_id;
561                 self->request_count++;
562
563                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
564                        self->thread_num, self->request->number,
565                        self->request_count);
566
567                 /*
568                  *      Respond, and reset request->child_pid
569                  */
570                 rad_respond(self->request, fun);
571                 self->request = NULL;
572
573                 /*
574                  *      Update the active threads.
575                  */
576                 pthread_mutex_lock(&thread_pool.mutex);
577                 rad_assert(thread_pool.active_threads > 0);
578                 thread_pool.active_threads--;
579                 pthread_mutex_unlock(&thread_pool.mutex);
580         } while (self->status != THREAD_CANCELLED);
581
582         DEBUG2("Thread %d exiting...", self->thread_num);
583
584 #ifdef HAVE_OPENSSL_ERR_H
585         /*
586          *      If we linked with OpenSSL, the application
587          *      must remove the thread's error queue before
588          *      exiting to prevent memory leaks.
589          */
590         ERR_remove_state(0);
591 #endif
592
593         /*
594          *  Do this as the LAST thing before exiting.
595          */
596         self->status = THREAD_EXITED;
597
598         return NULL;
599 }
600
601 /*
602  *      Take a THREAD_HANDLE, and delete it from the thread pool.
603  *
604  *      This function is called ONLY from the main server thread.
605  */
606 static void delete_thread(THREAD_HANDLE *handle)
607 {
608         THREAD_HANDLE *prev;
609         THREAD_HANDLE *next;
610
611         rad_assert(handle->request == NULL);
612
613         prev = handle->prev;
614         next = handle->next;
615         rad_assert(thread_pool.total_threads > 0);
616         thread_pool.total_threads--;
617
618         /*
619          *      Remove the handle from the list.
620          */
621         if (prev == NULL) {
622                 rad_assert(thread_pool.head == handle);
623                 thread_pool.head = next;
624         } else {
625                 prev->next = next;
626         }
627
628         if (next == NULL) {
629                 rad_assert(thread_pool.tail == handle);
630                 thread_pool.tail = prev;
631         } else {
632                 next->prev = prev;
633         }
634
635         DEBUG2("Deleting thread %d", handle->thread_num);
636
637         /*
638          *      This thread has exited.  Delete any additional
639          *      resources associated with it.
640          */
641
642         /*
643          *      Free the memory, now that we're sure the thread
644          *      exited.
645          */
646         free(handle);
647 }
648
649
650 /*
651  *      Spawn a new thread, and place it in the thread pool.
652  *
653  *      The thread is started initially in the blocked state, waiting
654  *      for the semaphore.
655  */
656 static THREAD_HANDLE *spawn_thread(time_t now)
657 {
658         int rcode;
659         THREAD_HANDLE *handle;
660         pthread_attr_t attr;
661
662         /*
663          *      Ensure that we don't spawn too many threads.
664          */
665         if (thread_pool.total_threads >= thread_pool.max_threads) {
666                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
667                 return NULL;
668         }
669
670         /*
671          *      Allocate a new thread handle.
672          */
673         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
674         memset(handle, 0, sizeof(THREAD_HANDLE));
675         handle->prev = NULL;
676         handle->next = NULL;
677         handle->pthread_id = NO_SUCH_CHILD_PID;
678         handle->thread_num = thread_pool.max_thread_num++;
679         handle->request_count = 0;
680         handle->status = THREAD_RUNNING;
681         handle->timestamp = time(NULL);
682
683         /*
684          *      Initialize the thread's attributes to detached.
685          *
686          *      We could call pthread_detach() later, but if the thread
687          *      exits between the create & detach calls, it will need to
688          *      be joined, which will never happen.
689          */
690         pthread_attr_init(&attr);
691         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
692
693         /*
694          *      Create the thread detached, so that it cleans up it's
695          *      own memory when it exits.
696          *
697          *      Note that the function returns non-zero on error, NOT
698          *      -1.  The return code is the error, and errno isn't set.
699          */
700         rcode = pthread_create(&handle->pthread_id, &attr,
701                         request_handler_thread, handle);
702         if (rcode != 0) {
703                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
704                        strerror(rcode));
705                 exit(1);
706         }
707         pthread_attr_destroy(&attr);
708
709         /*
710          *      One more thread to go into the list.
711          */
712         thread_pool.total_threads++;
713         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
714                         handle->thread_num, thread_pool.total_threads);
715
716         /*
717          *      Add the thread handle to the tail of the thread pool list.
718          */
719         if (thread_pool.tail) {
720                 thread_pool.tail->next = handle;
721                 handle->prev = thread_pool.tail;
722                 thread_pool.tail = handle;
723         } else {
724                 rad_assert(thread_pool.head == NULL);
725                 thread_pool.head = thread_pool.tail = handle;
726         }
727
728         /*
729          *      Update the time we last spawned a thread.
730          */
731         thread_pool.time_last_spawned = now;
732
733         /*
734          *      And return the new handle to the caller.
735          */
736         return handle;
737 }
738
739 /*
740  *      Temporary function to prevent server from executing a SIGHUP
741  *      until all threads are finished handling requests.  This returns
742  *      the number of active threads to 'radiusd.c'.
743  */
744 int total_active_threads(void)
745 {
746         int rcode = 0;
747         THREAD_HANDLE *handle;
748
749         for (handle = thread_pool.head; handle != NULL; handle = handle->next){
750                 if (handle->request != NULL) {
751                         rcode ++;
752                 }
753         }
754         return (rcode);
755 }
756
757
758 static uint32_t pid_hash(const void *data)
759 {
760         const thread_fork_t *tf = data;
761
762         return lrad_hash(&tf->pid, sizeof(tf->pid));
763 }
764
765 static int pid_cmp(const void *one, const void *two)
766 {
767         const thread_fork_t *a = one;
768         const thread_fork_t *b = two;
769
770         return (a->pid - b->pid);
771 }
772
773 /*
774  *      Allocate the thread pool, and seed it with an initial number
775  *      of threads.
776  *
777  *      FIXME: What to do on a SIGHUP???
778  */
779 int thread_pool_init(void)
780 {
781         int             i, rcode;
782         CONF_SECTION    *pool_cf;
783         time_t          now;
784
785         DEBUG("Initializing the thread pool...");
786         now = time(NULL);
787
788         /*
789          *      After a SIGHUP, we don't over-write the previous values.
790          */
791         if (!pool_initialized) {
792                 /*
793                  *      Initialize the thread pool to some reasonable values.
794                  */
795                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
796                 thread_pool.head = NULL;
797                 thread_pool.tail = NULL;
798                 thread_pool.total_threads = 0;
799                 thread_pool.max_thread_num = 1;
800                 thread_pool.cleanup_delay = 5;
801
802                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
803                         radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
804                                strerror(errno));
805                         exit(1);
806                 }               
807
808                 /*
809                  *      Create the hash table of child PID's
810                  */
811                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
812                                                              pid_cmp,
813                                                              free);
814                 if (!thread_pool.waiters) {
815                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
816                         exit(1);
817                 }
818         }
819
820         pool_cf = cf_section_find("thread");
821         if (!pool_cf) {
822                 radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
823                 exit(1);
824         }
825
826         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
827                 exit(1);
828         }
829
830         /*
831          *      The pool has already been initialized.  Don't spawn
832          *      new threads, and don't forget about forked children,
833          */
834         if (pool_initialized) {
835                 return 0;
836         }
837
838         /*
839          *      Initialize the queue of requests.
840          */
841         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
842         if (rcode != 0) {
843                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
844                        strerror(errno));
845                 exit(1);
846         }
847
848         rcode = pthread_mutex_init(&thread_pool.mutex,NULL);
849         if (rcode != 0) {
850                 radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
851                        strerror(errno));
852                 exit(1);
853         }
854
855         /*
856          *      Queue head & tail are set to zero by the memset,
857          *      above.
858          *
859          *      Allocate an initial queue, always as a power of 2.
860          */
861         thread_pool.queue_size = 256;
862         thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) *
863                                        thread_pool.queue_size);
864         memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) *
865                                       thread_pool.queue_size));
866
867 #ifdef HAVE_OPENSSL_CRYPTO_H
868         /*
869          *      If we're linking with OpenSSL too, then we need
870          *      to set up the mutexes and enable the thread callbacks.
871          */
872         if(!setup_ssl_mutexes()) {
873                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
874                 exit(1);
875         }
876 #endif
877
878
879         /*
880          *      Create a number of waiting threads.
881          *
882          *      If we fail while creating them, do something intelligent.
883          */
884         for (i = 0; i < thread_pool.start_threads; i++) {
885                 if (spawn_thread(now) == NULL) {
886                         return -1;
887                 }
888         }
889
890         DEBUG2("Thread pool initialized");
891         pool_initialized = TRUE;
892         return 0;
893 }
894
895
896 /*
897  *      Assign a new request to a free thread.
898  *
899  *      If there isn't a free thread, then try to create a new one,
900  *      up to the configured limits.
901  */
902 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
903 {
904         /*
905          *      Add the new request to the queue.
906          */
907         if (!request_enqueue(request, fun)) return 0;
908
909         /*
910          *      If the thread pool is busy handling requests, then
911          *      try to spawn another one.
912          */
913         if (thread_pool.active_threads == thread_pool.total_threads) {
914                 if (spawn_thread(request->timestamp) == NULL) {
915                         radlog(L_INFO,
916                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
917                                thread_pool.max_threads);
918                         return 1;
919                 }
920         }
921
922         return 1;
923 }
924
925 /*
926  *      Check the min_spare_threads and max_spare_threads.
927  *
928  *      If there are too many or too few threads waiting, then we
929  *      either create some more, or delete some.
930  */
931 int thread_pool_clean(time_t now)
932 {
933         int spare;
934         int i, total;
935         THREAD_HANDLE *handle, *next;
936         int active_threads;
937         static time_t last_cleaned = 0;
938
939         /*
940          *      Loop over the thread pool deleting exited threads.
941          */
942         for (handle = thread_pool.head; handle; handle = next) {
943                 next = handle->next;
944
945                 /*
946                  *      Maybe we've asked the thread to exit, and it
947                  *      has agreed.
948                  */
949                 if (handle->status == THREAD_EXITED) {
950                         delete_thread(handle);
951                 }
952         }
953
954         /*
955          *      We don't need a mutex lock here, as we're reading
956          *      the location, and not modifying it.  We want a close
957          *      approximation of the number of active threads, and this
958          *      is good enough.
959          */
960         active_threads = thread_pool.active_threads;
961         spare = thread_pool.total_threads - active_threads;
962         if (debug_flag) {
963                 static int old_total = -1;
964                 static int old_active = -1;
965
966                 if ((old_total != thread_pool.total_threads) ||
967                                 (old_active != active_threads)) {
968                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
969                                         thread_pool.total_threads, active_threads, spare);
970                         old_total = thread_pool.total_threads;
971                         old_active = active_threads;
972                 }
973         }
974
975         /*
976          *      If there are too few spare threads, create some more.
977          */
978         if (spare < thread_pool.min_spare_threads) {
979                 total = thread_pool.min_spare_threads - spare;
980
981                 DEBUG2("Threads: Spawning %d spares", total);
982                 /*
983                  *      Create a number of spare threads.
984                  */
985                 for (i = 0; i < total; i++) {
986                         handle = spawn_thread(now);
987                         if (handle == NULL) {
988                                 return -1;
989                         }
990                 }
991
992                 /*
993                  *      And exit, as there can't be too many spare threads.
994                  */
995                 return 0;
996         }
997
998         /*
999          *      Only delete spare threads if we haven't already done
1000          *      so this second.
1001          */
1002         if (now == last_cleaned) {
1003                 return 0;
1004         }
1005         last_cleaned = now;
1006
1007         /*
1008          *      Only delete the spare threads if sufficient time has
1009          *      passed since we last created one.  This helps to minimize
1010          *      the amount of create/delete cycles.
1011          */
1012         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
1013                 return 0;
1014         }
1015
1016         /*
1017          *      If there are too many spare threads, delete one.
1018          *
1019          *      Note that we only delete ONE at a time, instead of
1020          *      wiping out many.  This allows the excess servers to
1021          *      be slowly reaped, just in case the load spike comes again.
1022          */
1023         if (spare > thread_pool.max_spare_threads) {
1024
1025                 spare -= thread_pool.max_spare_threads;
1026
1027                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1028
1029                 /*
1030                  *      Walk through the thread pool, deleting the
1031                  *      first idle thread we come across.
1032                  */
1033                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1034                         next = handle->next;
1035
1036                         /*
1037                          *      If the thread is not handling a
1038                          *      request, but still live, then tell it
1039                          *      to exit.
1040                          *
1041                          *      It will eventually wake up, and realize
1042                          *      it's been told to commit suicide.
1043                          */
1044                         if ((handle->request == NULL) &&
1045                             (handle->status == THREAD_RUNNING)) {
1046                                 handle->status = THREAD_CANCELLED;
1047                                 /*
1048                                  *      Post an extra semaphore, as a
1049                                  *      signal to wake up, and exit.
1050                                  */
1051                                 sem_post(&thread_pool.semaphore);
1052                                 spare--;
1053                                 break;
1054                         }
1055                 }
1056         }
1057
1058         /*
1059          *      If the thread has handled too many requests, then make it
1060          *      exit.
1061          */
1062         if (thread_pool.max_requests_per_thread > 0) {
1063                 for (handle = thread_pool.head; handle; handle = next) {
1064                         next = handle->next;
1065
1066                         /*
1067                          *      Not handling a request, but otherwise
1068                          *      live, we can kill it.
1069                          */
1070                         if ((handle->request == NULL) &&
1071                             (handle->status == THREAD_RUNNING) &&
1072                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1073                                 handle->status = THREAD_CANCELLED;
1074                                 sem_post(&thread_pool.semaphore);
1075                         }
1076                 }
1077         }
1078
1079         /*
1080          *      Otherwise everything's kosher.  There are not too few,
1081          *      or too many spare threads.  Exit happily.
1082          */
1083         return 0;
1084 }
1085
1086
1087 /*
1088  *      Thread wrapper for fork().
1089  */
1090 pid_t rad_fork(void)
1091 {
1092         pid_t child_pid;
1093
1094         if (!pool_initialized) return fork();
1095
1096         reap_children();        /* be nice to non-wait thingies */
1097
1098         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1099                 return -1;
1100         }
1101
1102         /*
1103          *      Fork & save the PID for later reaping.
1104          */
1105         child_pid = fork();
1106         if (child_pid > 0) {
1107                 int rcode;
1108                 thread_fork_t *tf;
1109
1110                 tf = rad_malloc(sizeof(*tf));
1111                 memset(tf, 0, sizeof(*tf));
1112                 
1113                 tf->pid = child_pid;
1114
1115                 pthread_mutex_lock(&thread_pool.wait_mutex);
1116                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1117                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1118
1119                 if (!rcode) {
1120                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1121                                (int) child_pid);
1122                 }
1123         }
1124
1125         /*
1126          *      Return whatever we were told.
1127          */
1128         return child_pid;
1129 }
1130
1131 /*
1132  *      Wait 10 seconds at most for a child to exit, then give up.
1133  */
1134 pid_t rad_waitpid(pid_t pid, int *status)
1135 {
1136         int i;
1137         thread_fork_t mytf, *tf;
1138
1139         if (!pool_initialized) return waitpid(pid, status, 0);
1140
1141         if (pid <= 0) return -1;
1142
1143         mytf.pid = pid;
1144
1145         pthread_mutex_lock(&thread_pool.wait_mutex);
1146         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1147         pthread_mutex_unlock(&thread_pool.wait_mutex);
1148
1149         if (!tf) return -1;
1150         
1151         for (i = 0; i < 100; i++) {
1152                 reap_children();
1153                 
1154                 if (tf->exited) {
1155                         *status = tf->status;
1156
1157                         pthread_mutex_lock(&thread_pool.wait_mutex);
1158                         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1159                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1160                         return pid;
1161                 }
1162                 usleep(100000);
1163         }
1164         
1165         /*
1166          *      10 seconds have passed, give up on the child.
1167          */
1168         pthread_mutex_lock(&thread_pool.wait_mutex);
1169         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1170         pthread_mutex_unlock(&thread_pool.wait_mutex);
1171
1172         return 0;
1173 }
1174
1175 #endif