Massive change to the server core to remove horrid code in
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/autoconf.h>
28
29 #include <stdlib.h>
30 #include <string.h>
31
32 /*
33  *      Other OS's have sem_init, OS X doesn't.
34  */
35 #ifndef DARWIN
36 #include <semaphore.h>
37 #else
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #include <signal.h>
52
53 #ifdef HAVE_SYS_WAIT_H
54 #include <sys/wait.h>
55 #endif
56
57 #include <freeradius-devel/radiusd.h>
58 #include <freeradius-devel/rad_assert.h>
59 #include <freeradius-devel/modules.h>
60
61 #ifdef HAVE_PTHREAD_H
62
63 #ifdef HAVE_OPENSSL_CRYPTO_H
64 #include <openssl/crypto.h>
65 #endif
66 #ifdef HAVE_OPENSSL_ERR_H
67 #include <openssl/err.h>
68 #endif
69
70 #define SEMAPHORE_LOCKED        (0)
71 #define SEMAPHORE_UNLOCKED      (1)
72
73 #define THREAD_RUNNING          (1)
74 #define THREAD_CANCELLED        (2)
75 #define THREAD_EXITED           (3)
76
77 #define NUM_FIFOS               (2)
78
79 /*
80  *     Ordered this way because we prefer proxy, then ongoing, then
81  *     start.
82  */
83 #define FIFO_START   (1)
84 #define FIFO_PROXY   (0)
85
86 /*
87  *  A data structure which contains the information about
88  *  the current thread.
89  *
90  *  pthread_id     pthread id
91  *  thread_num     server thread number, 1...number of threads
92  *  semaphore     used to block the thread until a request comes in
93  *  status        is the thread running or exited?
94  *  request_count the number of requests that this thread has handled
95  *  timestamp     when the thread started executing.
96  */
97 typedef struct THREAD_HANDLE {
98         struct THREAD_HANDLE *prev;
99         struct THREAD_HANDLE *next;
100         pthread_t            pthread_id;
101         int                  thread_num;
102         int                  status;
103         unsigned int         request_count;
104         time_t               timestamp;
105         REQUEST              *request;
106 } THREAD_HANDLE;
107
108 /*
109  *      For the request queue.
110  */
111 typedef struct request_queue_t {
112         REQUEST           *request;
113         RAD_REQUEST_FUNP  fun;
114 } request_queue_t;
115
116 typedef struct thread_fork_t {
117         pid_t           pid;
118         int             status;
119         int             exited;
120 } thread_fork_t;
121
122
123 /*
124  *      A data structure to manage the thread pool.  There's no real
125  *      need for a data structure, but it makes things conceptually
126  *      easier.
127  */
128 typedef struct THREAD_POOL {
129         THREAD_HANDLE *head;
130         THREAD_HANDLE *tail;
131
132         int total_threads;
133         int active_threads;     /* protected by queue_mutex */
134         int max_thread_num;
135         int start_threads;
136         int max_threads;
137         int min_spare_threads;
138         int max_spare_threads;
139         unsigned int max_requests_per_thread;
140         unsigned long request_count;
141         time_t time_last_spawned;
142         int cleanup_delay;
143         int spawn_flag;
144
145         pthread_mutex_t wait_mutex;
146         lrad_hash_table_t *waiters;
147
148         /*
149          *      All threads wait on this semaphore, for requests
150          *      to enter the queue.
151          */
152         sem_t           semaphore;
153
154         /*
155          *      To ensure only one thread at a time touches the queue.
156          */
157         pthread_mutex_t queue_mutex;
158
159         int             max_queue_size;
160         int             num_queued;
161         int             fifo_state;
162         lrad_fifo_t     *fifo[NUM_FIFOS];
163 } THREAD_POOL;
164
165 static THREAD_POOL thread_pool;
166 static int pool_initialized = FALSE;
167
168
169 /*
170  *      A mapping of configuration file names to internal integers
171  */
172 static const CONF_PARSER thread_config[] = {
173         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
174         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
175         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
176         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
177         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
178         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
179         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
180         { NULL, -1, 0, NULL, NULL }
181 };
182
183
184 #ifdef HAVE_OPENSSL_CRYPTO_H
185
186 /*
187  *      If we're linking against OpenSSL, then it is the
188  *      duty of the application, if it is multithreaded,
189  *      to provide OpenSSL with appropriate thread id
190  *      and mutex locking functions
191  *
192  *      Note: this only implements static callbacks.
193  *      OpenSSL does not use dynamic locking callbacks
194  *      right now, but may in the futiure, so we will have
195  *      to add them at some point.
196  */
197
198 static pthread_mutex_t *ssl_mutexes = NULL;
199
200 static unsigned long ssl_id_function(void)
201 {
202         return (unsigned long) pthread_self();
203 }
204
205 static void ssl_locking_function(int mode, int n, const char *file, int line)
206 {
207         file = file;            /* -Wunused */
208         line = line;            /* -Wunused */
209
210         if (mode & CRYPTO_LOCK) {
211                 pthread_mutex_lock(&(ssl_mutexes[n]));
212         } else {
213                 pthread_mutex_unlock(&(ssl_mutexes[n]));
214         }
215 }
216
217 static int setup_ssl_mutexes(void)
218 {
219         int i;
220
221         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
222         if (!ssl_mutexes) {
223                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
224                 return 0;
225         }
226
227         for (i = 0; i < CRYPTO_num_locks(); i++) {
228                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
229         }
230
231         CRYPTO_set_id_callback(ssl_id_function);
232         CRYPTO_set_locking_callback(ssl_locking_function);
233
234         return 1;
235 }
236 #endif
237
238
239 /*
240  *      We don't want to catch SIGCHLD for a host of reasons.
241  *
242  *      - exec_wait means that someone, somewhere, somewhen, will
243  *      call waitpid(), and catch the child.
244  *
245  *      - SIGCHLD is delivered to a random thread, not the one that
246  *      forked.
247  *
248  *      - if another thread catches the child, we have to coordinate
249  *      with the thread doing the waiting.
250  *
251  *      - if we don't waitpid() for non-wait children, they'll be zombies,
252  *      and will hang around forever.
253  *
254  */
255 static void reap_children(void)
256 {
257         pid_t pid;
258         int status;
259         thread_fork_t mytf, *tf;
260
261
262         pthread_mutex_lock(&thread_pool.wait_mutex);
263
264         do {
265                 pid = waitpid(0, &status, WNOHANG);
266                 if (pid <= 0) break;
267
268                 mytf.pid = pid;
269                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
270                 if (!tf) continue;
271                 
272                 tf->status = status;
273                 tf->exited = 1;
274         } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
275
276         pthread_mutex_unlock(&thread_pool.wait_mutex);
277 }
278
279 /*
280  *      Add a request to the list of waiting requests.
281  *      This function gets called ONLY from the main handler thread...
282  *
283  *      This function should never fail.
284  */
285 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
286 {
287         int fifo = FIFO_START;
288         request_queue_t *entry;
289
290         pthread_mutex_lock(&thread_pool.queue_mutex);
291
292         thread_pool.request_count++;
293
294         /*
295          *      FIXME: Handle proxy replies separately?
296          */
297         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
298                 pthread_mutex_unlock(&thread_pool.queue_mutex);
299                 
300                 /*
301                  *      Mark the request as done.
302                  */
303                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
304                 request->child_state = REQUEST_DONE;
305                 return 0;
306         }
307
308         /*
309          *      Requests get handled in priority.  First, we handle
310          *      replies from a home server, to finish ongoing requests.
311          *
312          *      Then, we handle requests with State, to finish
313          *      multi-packet transactions.
314          *
315          *      Finally, we handle new requests.
316          */
317         if (request->proxy_reply) {
318                 fifo = FIFO_PROXY;
319         } else {
320                 fifo = FIFO_START;
321         }
322
323         entry = rad_malloc(sizeof(*entry));
324         entry->request = request;
325         entry->fun = fun;
326
327         if (!lrad_fifo_push(thread_pool.fifo[fifo], entry)) {
328                 pthread_mutex_unlock(&thread_pool.queue_mutex);
329                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
330                 request->child_state = REQUEST_DONE;
331                 return 0;
332         }
333
334         thread_pool.num_queued++;
335
336         pthread_mutex_unlock(&thread_pool.queue_mutex);
337
338         /*
339          *      There's one more request in the queue.
340          *
341          *      Note that we're not touching the queue any more, so
342          *      the semaphore post is outside of the mutex.  This also
343          *      means that when the thread wakes up and tries to lock
344          *      the mutex, it will be unlocked, and there won't be
345          *      contention.
346          */
347         sem_post(&thread_pool.semaphore);
348
349         return 1;
350 }
351
352 /*
353  *      Remove a request from the queue.
354  */
355 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
356 {
357         int fifo_state;
358         request_queue_t *entry;
359
360         reap_children();
361
362         pthread_mutex_lock(&thread_pool.queue_mutex);
363
364         fifo_state = thread_pool.fifo_state;
365
366  retry:
367         do {
368                 /*
369                  *      Pop an entry from the current queue, and go to
370                  *      the next queue.
371                  */
372                 entry = lrad_fifo_pop(thread_pool.fifo[fifo_state]);
373                 fifo_state++;
374                 if (fifo_state >= NUM_FIFOS) fifo_state = 0;
375         } while ((fifo_state != thread_pool.fifo_state) && !entry);
376
377         if (!entry) {
378                 pthread_mutex_unlock(&thread_pool.queue_mutex);
379                 *request = NULL;
380                 *fun = NULL;
381                 return 0;
382         }
383
384         rad_assert(thread_pool.num_queued > 0);
385         thread_pool.num_queued--;
386         *request = entry->request;
387         *fun = entry->fun;
388         free(entry);
389
390         rad_assert(*request != NULL);
391         rad_assert((*request)->magic == REQUEST_MAGIC);
392         rad_assert(*fun != NULL);
393
394         /*
395          *      If the request has sat in the queue for too long,
396          *      kill it.
397          *
398          *      The main clean-up code won't delete the request from
399          *      the request list, until it's marked "finished"
400          */
401         if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
402                 (*request)->child_state = REQUEST_DONE;
403                 goto retry;
404         }
405
406         /*
407          *      The thread is currently processing a request.
408          */
409         thread_pool.active_threads++;
410         thread_pool.fifo_state = fifo_state;
411
412         pthread_mutex_unlock(&thread_pool.queue_mutex);
413
414         rad_assert((*request)->child_pid == NO_SUCH_CHILD_PID);
415
416         return 1;
417 }
418
419
420 /*
421  *      The main thread handler for requests.
422  *
423  *      Wait on the semaphore until we have it, and process the request.
424  */
425 static void *request_handler_thread(void *arg)
426 {
427         RAD_REQUEST_FUNP  fun;
428         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
429 #ifdef HAVE_PTHREAD_SIGMASK
430         sigset_t set;
431
432         /*
433          *      Block SIGHUP handling for the child threads.
434          *
435          *      This ensures that only the main server thread will
436          *      process HUP signals.
437          *
438          *      If we don't have sigprocmask, then it shouldn't be
439          *      a problem, either, as the sig_hup handler should check
440          *      for this condition.
441          */
442         sigemptyset(&set);
443         sigaddset(&set, SIGHUP);
444         sigaddset(&set, SIGINT);
445         sigaddset(&set, SIGQUIT);
446         sigaddset(&set, SIGTERM);
447         pthread_sigmask(SIG_BLOCK, &set, NULL);
448 #endif
449
450         /*
451          *      Loop forever, until told to exit.
452          */
453         do {
454                 /*
455                  *      Wait to be signalled.
456                  */
457                 DEBUG2("Thread %d waiting to be assigned a request",
458                        self->thread_num);
459         re_wait:
460                 if (sem_wait(&thread_pool.semaphore) != 0) {
461                         /*
462                          *      Interrupted system call.  Go back to
463                          *      waiting, but DON'T print out any more
464                          *      text.
465                          */
466                         if (errno == EINTR) {
467                                 DEBUG2("Re-wait %d", self->thread_num);
468                                 goto re_wait;
469                         }
470                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
471                                self->thread_num, strerror(errno));
472                         break;
473                 }
474
475                 DEBUG2("Thread %d got semaphore", self->thread_num);
476
477                 /*
478                  *      Try to grab a request from the queue.
479                  *
480                  *      It may be empty, in which case we fail
481                  *      gracefully.
482                  */
483                 if (!request_dequeue(&self->request, &fun)) continue;
484
485                 self->request->child_pid = self->pthread_id;
486                 self->request_count++;
487
488                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
489                        self->thread_num, self->request->number,
490                        self->request_count);
491
492                 radius_handle_request(self->request, fun);
493
494                 /*
495                  *      Update the active threads.
496                  */
497                 pthread_mutex_lock(&thread_pool.queue_mutex);
498                 rad_assert(thread_pool.active_threads > 0);
499                 thread_pool.active_threads--;
500                 pthread_mutex_unlock(&thread_pool.queue_mutex);
501         } while (self->status != THREAD_CANCELLED);
502
503         DEBUG2("Thread %d exiting...", self->thread_num);
504
505 #ifdef HAVE_OPENSSL_ERR_H
506         /*
507          *      If we linked with OpenSSL, the application
508          *      must remove the thread's error queue before
509          *      exiting to prevent memory leaks.
510          */
511         ERR_remove_state(0);
512 #endif
513
514         /*
515          *  Do this as the LAST thing before exiting.
516          */
517         self->status = THREAD_EXITED;
518
519         return NULL;
520 }
521
522 /*
523  *      Take a THREAD_HANDLE, delete it from the thread pool and
524  *      free its resources.
525  *
526  *      This function is called ONLY from the main server thread,
527  *      ONLY after the thread has exited.
528  */
529 static void delete_thread(THREAD_HANDLE *handle)
530 {
531         THREAD_HANDLE *prev;
532         THREAD_HANDLE *next;
533
534         rad_assert(handle->request == NULL);
535
536         DEBUG2("Deleting thread %d", handle->thread_num);
537
538         prev = handle->prev;
539         next = handle->next;
540         rad_assert(thread_pool.total_threads > 0);
541         thread_pool.total_threads--;
542
543         /*
544          *      Remove the handle from the list.
545          */
546         if (prev == NULL) {
547                 rad_assert(thread_pool.head == handle);
548                 thread_pool.head = next;
549         } else {
550                 prev->next = next;
551         }
552
553         if (next == NULL) {
554                 rad_assert(thread_pool.tail == handle);
555                 thread_pool.tail = prev;
556         } else {
557                 next->prev = prev;
558         }
559
560         /*
561          *      Free the handle, now that it's no longer referencable.
562          */
563         free(handle);
564 }
565
566
567 /*
568  *      Spawn a new thread, and place it in the thread pool.
569  *
570  *      The thread is started initially in the blocked state, waiting
571  *      for the semaphore.
572  */
573 static THREAD_HANDLE *spawn_thread(time_t now)
574 {
575         int rcode;
576         THREAD_HANDLE *handle;
577         pthread_attr_t attr;
578
579         /*
580          *      Ensure that we don't spawn too many threads.
581          */
582         if (thread_pool.total_threads >= thread_pool.max_threads) {
583                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
584                 return NULL;
585         }
586
587         /*
588          *      Allocate a new thread handle.
589          */
590         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
591         memset(handle, 0, sizeof(THREAD_HANDLE));
592         handle->prev = NULL;
593         handle->next = NULL;
594         handle->pthread_id = NO_SUCH_CHILD_PID;
595         handle->thread_num = thread_pool.max_thread_num++;
596         handle->request_count = 0;
597         handle->status = THREAD_RUNNING;
598         handle->timestamp = time(NULL);
599
600         /*
601          *      Initialize the thread's attributes to detached.
602          *
603          *      We could call pthread_detach() later, but if the thread
604          *      exits between the create & detach calls, it will need to
605          *      be joined, which will never happen.
606          */
607         pthread_attr_init(&attr);
608         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
609
610         /*
611          *      Create the thread detached, so that it cleans up it's
612          *      own memory when it exits.
613          *
614          *      Note that the function returns non-zero on error, NOT
615          *      -1.  The return code is the error, and errno isn't set.
616          */
617         rcode = pthread_create(&handle->pthread_id, &attr,
618                         request_handler_thread, handle);
619         if (rcode != 0) {
620                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
621                        strerror(rcode));
622                 exit(1);
623         }
624         pthread_attr_destroy(&attr);
625
626         /*
627          *      One more thread to go into the list.
628          */
629         thread_pool.total_threads++;
630         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
631                         handle->thread_num, thread_pool.total_threads);
632
633         /*
634          *      Add the thread handle to the tail of the thread pool list.
635          */
636         if (thread_pool.tail) {
637                 thread_pool.tail->next = handle;
638                 handle->prev = thread_pool.tail;
639                 thread_pool.tail = handle;
640         } else {
641                 rad_assert(thread_pool.head == NULL);
642                 thread_pool.head = thread_pool.tail = handle;
643         }
644
645         /*
646          *      Update the time we last spawned a thread.
647          */
648         thread_pool.time_last_spawned = now;
649
650         /*
651          *      And return the new handle to the caller.
652          */
653         return handle;
654 }
655
656 /*
657  *      Temporary function to prevent server from executing a SIGHUP
658  *      until all threads are finished handling requests.  This returns
659  *      the number of active threads to 'radiusd.c'.
660  */
661 int total_active_threads(void)
662 {
663         /*
664          *      We don't acquire the mutex, so this is just an estimate.
665          *      We can't return with the lock held, so there's no point
666          *      in getting the guaranteed correct value; by the time
667          *      the caller sees it, it can be wrong again.
668          */
669         return thread_pool.active_threads;
670 }
671
672
673 static uint32_t pid_hash(const void *data)
674 {
675         const thread_fork_t *tf = data;
676
677         return lrad_hash(&tf->pid, sizeof(tf->pid));
678 }
679
680 static int pid_cmp(const void *one, const void *two)
681 {
682         const thread_fork_t *a = one;
683         const thread_fork_t *b = two;
684
685         return (a->pid - b->pid);
686 }
687
688 /*
689  *      Allocate the thread pool, and seed it with an initial number
690  *      of threads.
691  *
692  *      FIXME: What to do on a SIGHUP???
693  */
694 int thread_pool_init(int spawn_flag)
695 {
696         int             i, rcode;
697         CONF_SECTION    *pool_cf;
698         time_t          now;
699
700         DEBUG("Initializing the thread pool...");
701         now = time(NULL);
702
703         /*
704          *      We're not spawning new threads, don't do
705          *      anything.
706          */
707         if (!spawn_flag) return 0;
708
709         /*
710          *      After a SIGHUP, we don't over-write the previous values.
711          */
712         if (!pool_initialized) {
713                 /*
714                  *      Initialize the thread pool to some reasonable values.
715                  */
716                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
717                 thread_pool.head = NULL;
718                 thread_pool.tail = NULL;
719                 thread_pool.total_threads = 0;
720                 thread_pool.max_thread_num = 1;
721                 thread_pool.cleanup_delay = 5;
722                 thread_pool.spawn_flag = spawn_flag;
723
724                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
725                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
726                                strerror(errno));
727                         exit(1);
728                 }               
729                 
730                 /*
731                  *      Create the hash table of child PID's
732                  */
733                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
734                                                              pid_cmp,
735                                                              free);
736                 if (!thread_pool.waiters) {
737                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
738                         exit(1);
739                 }
740         }
741
742         pool_cf = cf_section_find("thread");
743         if (pool_cf != NULL) {
744                 /*
745                  *      FIXME: Check for errors?
746                  */
747                 cf_section_parse(pool_cf, NULL, thread_config);
748         }
749
750         /*
751          *      The pool has already been initialized.  Don't spawn
752          *      new threads, and don't forget about forked children,
753          */
754         if (pool_initialized) {
755                 return 0;
756         }
757
758         /*
759          *      Initialize the queue of requests.
760          */
761         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
762         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
763         if (rcode != 0) {
764                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
765                        strerror(errno));
766                 exit(1);
767         }
768
769         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
770         if (rcode != 0) {
771                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
772                        strerror(errno));
773                 exit(1);
774         }
775
776         /*
777          *      Allocate multiple fifos.
778          */
779         for (i = 0; i < NUM_FIFOS; i++) {
780                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
781                 if (!thread_pool.fifo[i]) {
782                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
783                         exit(1);
784                 }
785         }
786
787 #ifdef HAVE_OPENSSL_CRYPTO_H
788         /*
789          *      If we're linking with OpenSSL too, then we need
790          *      to set up the mutexes and enable the thread callbacks.
791          */
792         if (!setup_ssl_mutexes()) {
793                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
794                 exit(1);
795         }
796 #endif
797
798
799         /*
800          *      Create a number of waiting threads.
801          *
802          *      If we fail while creating them, do something intelligent.
803          */
804         for (i = 0; i < thread_pool.start_threads; i++) {
805                 if (spawn_thread(now) == NULL) {
806                         return -1;
807                 }
808         }
809
810         DEBUG2("Thread pool initialized");
811         pool_initialized = TRUE;
812         return 0;
813 }
814
815
816 /*
817  *      Assign a new request to a free thread.
818  *
819  *      If there isn't a free thread, then try to create a new one,
820  *      up to the configured limits.
821  */
822 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
823 {
824         /*
825          *      We've been told not to spawn threads, so don't.
826          */
827         if (!thread_pool.spawn_flag) {
828                 radius_handle_request(request, fun);
829                 
830                 /*
831                  *      Requests that care about child process exit
832                  *      codes have already either called
833                  *      rad_waitpid(), or they've given up.
834                  */
835                 wait(NULL);
836                 return 1;
837         }
838
839         /*
840          *      Add the new request to the queue.
841          */
842         if (!request_enqueue(request, fun)) return 0;
843
844         /*
845          *      If the thread pool is busy handling requests, then
846          *      try to spawn another one.  We don't acquire the mutex
847          *      before reading active_threads, so our thread count is
848          *      just an estimate.  It's fine to go ahead and spawn an
849          *      extra thread in that case.
850          *      NOTE: the log message may be in error since active_threads
851          *      is an estimate, but it's only in error about the thread
852          *      count, not about the fact that we can't create a new one.
853          */
854         if (thread_pool.active_threads == thread_pool.total_threads) {
855                 if (spawn_thread(request->timestamp) == NULL) {
856                         radlog(L_INFO,
857                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
858                                thread_pool.max_threads);
859                         return 1;
860                 }
861         }
862
863         return 1;
864 }
865
866 /*
867  *      Check the min_spare_threads and max_spare_threads.
868  *
869  *      If there are too many or too few threads waiting, then we
870  *      either create some more, or delete some.
871  */
872 int thread_pool_clean(time_t now)
873 {
874         int spare;
875         int i, total;
876         THREAD_HANDLE *handle, *next;
877         int active_threads;
878         static time_t last_cleaned = 0;
879
880         /*
881          *      Loop over the thread pool deleting exited threads.
882          */
883         for (handle = thread_pool.head; handle; handle = next) {
884                 next = handle->next;
885
886                 /*
887                  *      Maybe we've asked the thread to exit, and it
888                  *      has agreed.
889                  */
890                 if (handle->status == THREAD_EXITED) {
891                         delete_thread(handle);
892                 }
893         }
894
895         /*
896          *      We don't need a mutex lock here, as we're reading
897          *      active_threads, and not modifying it.  We want a close
898          *      approximation of the number of active threads, and this
899          *      is good enough.
900          */
901         active_threads = thread_pool.active_threads;
902         spare = thread_pool.total_threads - active_threads;
903         if (debug_flag) {
904                 static int old_total = -1;
905                 static int old_active = -1;
906
907                 if ((old_total != thread_pool.total_threads) ||
908                                 (old_active != active_threads)) {
909                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
910                                         thread_pool.total_threads, active_threads, spare);
911                         old_total = thread_pool.total_threads;
912                         old_active = active_threads;
913                 }
914         }
915
916         /*
917          *      If there are too few spare threads, create some more.
918          */
919         if (spare < thread_pool.min_spare_threads) {
920                 total = thread_pool.min_spare_threads - spare;
921
922                 DEBUG2("Threads: Spawning %d spares", total);
923                 /*
924                  *      Create a number of spare threads.
925                  */
926                 for (i = 0; i < total; i++) {
927                         handle = spawn_thread(now);
928                         if (handle == NULL) {
929                                 return -1;
930                         }
931                 }
932
933                 /*
934                  *      And exit, as there can't be too many spare threads.
935                  */
936                 return 0;
937         }
938
939         /*
940          *      Only delete spare threads if we haven't already done
941          *      so this second.
942          */
943         if (now == last_cleaned) {
944                 return 0;
945         }
946         last_cleaned = now;
947
948         /*
949          *      Only delete the spare threads if sufficient time has
950          *      passed since we last created one.  This helps to minimize
951          *      the amount of create/delete cycles.
952          */
953         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
954                 return 0;
955         }
956
957         /*
958          *      If there are too many spare threads, delete one.
959          *
960          *      Note that we only delete ONE at a time, instead of
961          *      wiping out many.  This allows the excess servers to
962          *      be slowly reaped, just in case the load spike comes again.
963          */
964         if (spare > thread_pool.max_spare_threads) {
965
966                 spare -= thread_pool.max_spare_threads;
967
968                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
969
970                 /*
971                  *      Walk through the thread pool, deleting the
972                  *      first idle thread we come across.
973                  */
974                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
975                         next = handle->next;
976
977                         /*
978                          *      If the thread is not handling a
979                          *      request, but still live, then tell it
980                          *      to exit.
981                          *
982                          *      It will eventually wake up, and realize
983                          *      it's been told to commit suicide.
984                          */
985                         if ((handle->request == NULL) &&
986                             (handle->status == THREAD_RUNNING)) {
987                                 handle->status = THREAD_CANCELLED;
988                                 /*
989                                  *      Post an extra semaphore, as a
990                                  *      signal to wake up, and exit.
991                                  */
992                                 sem_post(&thread_pool.semaphore);
993                                 spare--;
994                                 break;
995                         }
996                 }
997         }
998
999         /*
1000          *      If the thread has handled too many requests, then make it
1001          *      exit.
1002          */
1003         if (thread_pool.max_requests_per_thread > 0) {
1004                 for (handle = thread_pool.head; handle; handle = next) {
1005                         next = handle->next;
1006
1007                         /*
1008                          *      Not handling a request, but otherwise
1009                          *      live, we can kill it.
1010                          */
1011                         if ((handle->request == NULL) &&
1012                             (handle->status == THREAD_RUNNING) &&
1013                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1014                                 handle->status = THREAD_CANCELLED;
1015                                 sem_post(&thread_pool.semaphore);
1016                         }
1017                 }
1018         }
1019
1020         /*
1021          *      Otherwise everything's kosher.  There are not too few,
1022          *      or too many spare threads.  Exit happily.
1023          */
1024         return 0;
1025 }
1026
1027
1028 /*
1029  *      Thread wrapper for fork().
1030  */
1031 pid_t rad_fork(void)
1032 {
1033         pid_t child_pid;
1034
1035         if (!pool_initialized) return fork();
1036
1037         reap_children();        /* be nice to non-wait thingies */
1038
1039         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1040                 return -1;
1041         }
1042
1043         /*
1044          *      Fork & save the PID for later reaping.
1045          */
1046         child_pid = fork();
1047         if (child_pid > 0) {
1048                 int rcode;
1049                 thread_fork_t *tf;
1050
1051                 tf = rad_malloc(sizeof(*tf));
1052                 memset(tf, 0, sizeof(*tf));
1053                 
1054                 tf->pid = child_pid;
1055
1056                 pthread_mutex_lock(&thread_pool.wait_mutex);
1057                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1058                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1059
1060                 if (!rcode) {
1061                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1062                                (int) child_pid);
1063                 }
1064         }
1065
1066         /*
1067          *      Return whatever we were told.
1068          */
1069         return child_pid;
1070 }
1071
1072
1073 /*
1074  *      Wait 10 seconds at most for a child to exit, then give up.
1075  */
1076 pid_t rad_waitpid(pid_t pid, int *status)
1077 {
1078         int i;
1079         thread_fork_t mytf, *tf;
1080
1081         if (!pool_initialized) return waitpid(pid, status, 0);
1082
1083         if (pid <= 0) return -1;
1084
1085         mytf.pid = pid;
1086
1087         pthread_mutex_lock(&thread_pool.wait_mutex);
1088         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1089         pthread_mutex_unlock(&thread_pool.wait_mutex);
1090
1091         if (!tf) return -1;
1092         
1093         for (i = 0; i < 100; i++) {
1094                 reap_children();
1095                 
1096                 if (tf->exited) {
1097                         *status = tf->status;
1098
1099                         pthread_mutex_lock(&thread_pool.wait_mutex);
1100                         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1101                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1102                         return pid;
1103                 }
1104                 usleep(100000); /* sleep for 1/10 of a second */
1105         }
1106         
1107         /*
1108          *      10 seconds have passed, give up on the child.
1109          */
1110         pthread_mutex_lock(&thread_pool.wait_mutex);
1111         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1112         pthread_mutex_unlock(&thread_pool.wait_mutex);
1113
1114         return 0;
1115 }
1116
1117 #else /* HAVE_PTHREAD_H */
1118 /*
1119  *      "thread" code when we don't have threads.
1120  */
1121 int thread_pool_init(int spawn_flag)
1122 {
1123         return 0;
1124 }
1125
1126 /*
1127  *      call "radrespond".
1128  */
1129 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1130 {
1131         rad_respond(request, fun);
1132         return 1;
1133 }
1134
1135 #endif /* HAVE_PTHREAD_H */