Catch corner cases
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/rad_assert.h>
29
30 /*
31  *      Other OS's have sem_init, OS X doesn't.
32  */
33 #ifdef HAVE_SEMAPHORE_H
34 #include <semaphore.h>
35 #endif
36
37 #ifdef DARWIN
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #ifdef HAVE_SYS_WAIT_H
52 #include <sys/wait.h>
53 #endif
54
55 #ifdef HAVE_PTHREAD_H
56
57 #ifdef HAVE_OPENSSL_CRYPTO_H
58 #include <openssl/crypto.h>
59 #endif
60 #ifdef HAVE_OPENSSL_ERR_H
61 #include <openssl/err.h>
62 #endif
63
64 #define SEMAPHORE_LOCKED        (0)
65 #define SEMAPHORE_UNLOCKED      (1)
66
67 #define THREAD_RUNNING          (1)
68 #define THREAD_CANCELLED        (2)
69 #define THREAD_EXITED           (3)
70
71 #define NUM_FIFOS               RAD_LISTEN_MAX
72
73
74 /*
75  *  A data structure which contains the information about
76  *  the current thread.
77  *
78  *  pthread_id     pthread id
79  *  thread_num     server thread number, 1...number of threads
80  *  semaphore     used to block the thread until a request comes in
81  *  status        is the thread running or exited?
82  *  request_count the number of requests that this thread has handled
83  *  timestamp     when the thread started executing.
84  */
85 typedef struct THREAD_HANDLE {
86         struct THREAD_HANDLE *prev;
87         struct THREAD_HANDLE *next;
88         pthread_t            pthread_id;
89         int                  thread_num;
90         int                  status;
91         unsigned int         request_count;
92         time_t               timestamp;
93         REQUEST              *request;
94 } THREAD_HANDLE;
95
96 /*
97  *      For the request queue.
98  */
99 typedef struct request_queue_t {
100         REQUEST           *request;
101         RAD_REQUEST_FUNP  fun;
102 } request_queue_t;
103
104 typedef struct thread_fork_t {
105         pid_t           pid;
106         int             status;
107         int             exited;
108 } thread_fork_t;
109
110
111 /*
112  *      A data structure to manage the thread pool.  There's no real
113  *      need for a data structure, but it makes things conceptually
114  *      easier.
115  */
116 typedef struct THREAD_POOL {
117         THREAD_HANDLE *head;
118         THREAD_HANDLE *tail;
119
120         int total_threads;
121         int active_threads;     /* protected by queue_mutex */
122         int max_thread_num;
123         int start_threads;
124         int max_threads;
125         int min_spare_threads;
126         int max_spare_threads;
127         unsigned int max_requests_per_thread;
128         unsigned long request_count;
129         time_t time_last_spawned;
130         int cleanup_delay;
131         int spawn_flag;
132
133         pthread_mutex_t wait_mutex;
134         lrad_hash_table_t *waiters;
135
136         /*
137          *      All threads wait on this semaphore, for requests
138          *      to enter the queue.
139          */
140         sem_t           semaphore;
141
142         /*
143          *      To ensure only one thread at a time touches the queue.
144          */
145         pthread_mutex_t queue_mutex;
146
147         int             max_queue_size;
148         int             num_queued;
149         int             can_read_detail;
150         lrad_fifo_t     *fifo[NUM_FIFOS];
151 } THREAD_POOL;
152
153 static THREAD_POOL thread_pool;
154 static int pool_initialized = FALSE;
155
156
157 /*
158  *      A mapping of configuration file names to internal integers
159  */
160 static const CONF_PARSER thread_config[] = {
161         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
162         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
163         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
164         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
165         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
166         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
167         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
168         { NULL, -1, 0, NULL, NULL }
169 };
170
171
172 #ifdef HAVE_OPENSSL_CRYPTO_H
173
174 /*
175  *      If we're linking against OpenSSL, then it is the
176  *      duty of the application, if it is multithreaded,
177  *      to provide OpenSSL with appropriate thread id
178  *      and mutex locking functions
179  *
180  *      Note: this only implements static callbacks.
181  *      OpenSSL does not use dynamic locking callbacks
182  *      right now, but may in the futiure, so we will have
183  *      to add them at some point.
184  */
185
186 static pthread_mutex_t *ssl_mutexes = NULL;
187
188 static unsigned long ssl_id_function(void)
189 {
190         return (unsigned long) pthread_self();
191 }
192
193 static void ssl_locking_function(int mode, int n, const char *file, int line)
194 {
195         file = file;            /* -Wunused */
196         line = line;            /* -Wunused */
197
198         if (mode & CRYPTO_LOCK) {
199                 pthread_mutex_lock(&(ssl_mutexes[n]));
200         } else {
201                 pthread_mutex_unlock(&(ssl_mutexes[n]));
202         }
203 }
204
205 static int setup_ssl_mutexes(void)
206 {
207         int i;
208
209         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
210         if (!ssl_mutexes) {
211                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
212                 return 0;
213         }
214
215         for (i = 0; i < CRYPTO_num_locks(); i++) {
216                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
217         }
218
219         CRYPTO_set_id_callback(ssl_id_function);
220         CRYPTO_set_locking_callback(ssl_locking_function);
221
222         return 1;
223 }
224 #endif
225
226
227 /*
228  *      We don't want to catch SIGCHLD for a host of reasons.
229  *
230  *      - exec_wait means that someone, somewhere, somewhen, will
231  *      call waitpid(), and catch the child.
232  *
233  *      - SIGCHLD is delivered to a random thread, not the one that
234  *      forked.
235  *
236  *      - if another thread catches the child, we have to coordinate
237  *      with the thread doing the waiting.
238  *
239  *      - if we don't waitpid() for non-wait children, they'll be zombies,
240  *      and will hang around forever.
241  *
242  */
243 static void reap_children(void)
244 {
245         pid_t pid;
246         int status;
247         thread_fork_t mytf, *tf;
248
249
250         pthread_mutex_lock(&thread_pool.wait_mutex);
251
252         do {
253                 pid = waitpid(0, &status, WNOHANG);
254                 if (pid <= 0) break;
255
256                 mytf.pid = pid;
257                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
258                 if (!tf) continue;
259
260                 tf->status = status;
261                 tf->exited = 1;
262         } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
263
264         pthread_mutex_unlock(&thread_pool.wait_mutex);
265 }
266
267 /*
268  *      Add a request to the list of waiting requests.
269  *      This function gets called ONLY from the main handler thread...
270  *
271  *      This function should never fail.
272  */
273 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
274 {
275         request_queue_t *entry;
276
277         pthread_mutex_lock(&thread_pool.queue_mutex);
278
279         thread_pool.request_count++;
280
281         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
282                 pthread_mutex_unlock(&thread_pool.queue_mutex);
283
284                 /*
285                  *      Mark the request as done.
286                  */
287                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
288                 request->child_state = REQUEST_DONE;
289                 return 0;
290         }
291
292         entry = rad_malloc(sizeof(*entry));
293         entry->request = request;
294         entry->fun = fun;
295
296         /*
297          *      Push the request onto the appropriate fifo for that
298          */
299         if (!lrad_fifo_push(thread_pool.fifo[request->priority],
300                             entry)) {
301                 pthread_mutex_unlock(&thread_pool.queue_mutex);
302                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
303                 request->child_state = REQUEST_DONE;
304                 return 0;
305         }
306
307         /*
308          *      We've added an entry that didn't come from the detail
309          *      file.  Note that the child thread should signal the
310          *      main worker thread again when the queue becomes empty.
311          */
312         if (request->listener->type != RAD_LISTEN_DETAIL) {
313                 thread_pool.can_read_detail = FALSE;
314         }
315
316         thread_pool.num_queued++;
317
318         pthread_mutex_unlock(&thread_pool.queue_mutex);
319
320         /*
321          *      There's one more request in the queue.
322          *
323          *      Note that we're not touching the queue any more, so
324          *      the semaphore post is outside of the mutex.  This also
325          *      means that when the thread wakes up and tries to lock
326          *      the mutex, it will be unlocked, and there won't be
327          *      contention.
328          */
329         sem_post(&thread_pool.semaphore);
330
331         return 1;
332 }
333
334 /*
335  *      Remove a request from the queue.
336  */
337 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
338 {
339         RAD_LISTEN_TYPE i, start;
340         request_queue_t *entry;
341
342         reap_children();
343
344         pthread_mutex_lock(&thread_pool.queue_mutex);
345
346         /*
347          *      Clear old requests from all queues.
348          *
349          *      We only do one pass over the queue, in order to
350          *      amortize the work across the child threads.  Since we
351          *      do N checks for one request de-queued, the old
352          *      requests will be quickly cleared.
353          */
354         for (i = 0; i < RAD_LISTEN_MAX; i++) {
355                 entry = lrad_fifo_peek(thread_pool.fifo[i]);
356                 if (!entry ||
357                     (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
358                         continue;
359 }
360                 /*
361                  *      This entry was marked to be stopped.  Acknowledge it.
362                  */
363                 entry = lrad_fifo_pop(thread_pool.fifo[i]);
364                 rad_assert(entry != NULL);
365                 entry->request->child_state = REQUEST_DONE;
366         }
367
368         start = 0;
369  retry:
370         /*
371          *      Pop results from the top of the queue
372          */
373         for (i = start; i < RAD_LISTEN_MAX; i++) {
374                 entry = lrad_fifo_pop(thread_pool.fifo[i]);
375                 if (entry) {
376                         start = i;
377                         break;
378                 }
379         }
380
381         if (!entry) {
382                 pthread_mutex_unlock(&thread_pool.queue_mutex);
383                 *request = NULL;
384                 *fun = NULL;
385                 return 0;
386         }
387
388         rad_assert(thread_pool.num_queued > 0);
389         thread_pool.num_queued--;
390         *request = entry->request;
391         *fun = entry->fun;
392         free(entry);
393
394         rad_assert(*request != NULL);
395         rad_assert((*request)->magic == REQUEST_MAGIC);
396         rad_assert(*fun != NULL);
397
398         /*
399          *      If the request has sat in the queue for too long,
400          *      kill it.
401          *
402          *      The main clean-up code can't delete the request from
403          *      the queue, and therefore won't clean it up until we
404          *      have acknowledged it as "done".
405          */
406         if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
407                 (*request)->child_state = REQUEST_DONE;
408                 goto retry;
409         }
410
411         /*
412          *      The thread is currently processing a request.
413          */
414         thread_pool.active_threads++;
415
416         pthread_mutex_unlock(&thread_pool.queue_mutex);
417
418         return 1;
419 }
420
421
422 /*
423  *      The main thread handler for requests.
424  *
425  *      Wait on the semaphore until we have it, and process the request.
426  */
427 static void *request_handler_thread(void *arg)
428 {
429         RAD_REQUEST_FUNP  fun;
430         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
431
432         /*
433          *      Loop forever, until told to exit.
434          */
435         do {
436                 int can_read_detail;
437
438                 /*
439                  *      Wait to be signalled.
440                  */
441                 DEBUG2("Thread %d waiting to be assigned a request",
442                        self->thread_num);
443         re_wait:
444                 if (sem_wait(&thread_pool.semaphore) != 0) {
445                         /*
446                          *      Interrupted system call.  Go back to
447                          *      waiting, but DON'T print out any more
448                          *      text.
449                          */
450                         if (errno == EINTR) {
451                                 DEBUG2("Re-wait %d", self->thread_num);
452                                 goto re_wait;
453                         }
454                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
455                                self->thread_num, strerror(errno));
456                         break;
457                 }
458
459                 DEBUG2("Thread %d got semaphore", self->thread_num);
460
461                 /*
462                  *      Try to grab a request from the queue.
463                  *
464                  *      It may be empty, in which case we fail
465                  *      gracefully.
466                  */
467                 if (!request_dequeue(&self->request, &fun)) continue;
468
469                 self->request->child_pid = self->pthread_id;
470                 self->request_count++;
471
472                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
473                        self->thread_num, self->request->number,
474                        self->request_count);
475
476                 radius_handle_request(self->request, fun);
477
478                 /*
479                  *      Update the active threads.
480                  */
481                 pthread_mutex_lock(&thread_pool.queue_mutex);
482                 rad_assert(thread_pool.active_threads > 0);
483                 thread_pool.active_threads--;
484
485                 /*
486                  *      If we're not currently allowed to read the
487                  *      detail file, AND there are no requests queued,
488                  *      THEN signal the main worker thread that
489                  *      there's at least one waiting thread (us) who
490                  *      can accept a packet from the detail file.
491                  */
492                 can_read_detail = FALSE;
493                 if (!thread_pool.can_read_detail &&
494                     (thread_pool.num_queued == 0)) {
495                         can_read_detail = TRUE;
496                 }
497
498                 pthread_mutex_unlock(&thread_pool.queue_mutex);
499
500                 /*
501                  *      Do this out of the lock to be nice to everyone.
502                  */
503                 if (can_read_detail) {
504                         radius_signal_self(RADIUS_SIGNAL_SELF_DETAIL);
505                 }
506
507         } while (self->status != THREAD_CANCELLED);
508
509         DEBUG2("Thread %d exiting...", self->thread_num);
510
511 #ifdef HAVE_OPENSSL_ERR_H
512         /*
513          *      If we linked with OpenSSL, the application
514          *      must remove the thread's error queue before
515          *      exiting to prevent memory leaks.
516          */
517         ERR_remove_state(0);
518 #endif
519
520         /*
521          *  Do this as the LAST thing before exiting.
522          */
523         self->status = THREAD_EXITED;
524
525         return NULL;
526 }
527
528 /*
529  *      Take a THREAD_HANDLE, delete it from the thread pool and
530  *      free its resources.
531  *
532  *      This function is called ONLY from the main server thread,
533  *      ONLY after the thread has exited.
534  */
535 static void delete_thread(THREAD_HANDLE *handle)
536 {
537         THREAD_HANDLE *prev;
538         THREAD_HANDLE *next;
539
540         rad_assert(handle->request == NULL);
541
542         DEBUG2("Deleting thread %d", handle->thread_num);
543
544         prev = handle->prev;
545         next = handle->next;
546         rad_assert(thread_pool.total_threads > 0);
547         thread_pool.total_threads--;
548
549         /*
550          *      Remove the handle from the list.
551          */
552         if (prev == NULL) {
553                 rad_assert(thread_pool.head == handle);
554                 thread_pool.head = next;
555         } else {
556                 prev->next = next;
557         }
558
559         if (next == NULL) {
560                 rad_assert(thread_pool.tail == handle);
561                 thread_pool.tail = prev;
562         } else {
563                 next->prev = prev;
564         }
565
566         /*
567          *      Free the handle, now that it's no longer referencable.
568          */
569         free(handle);
570 }
571
572
573 /*
574  *      Spawn a new thread, and place it in the thread pool.
575  *
576  *      The thread is started initially in the blocked state, waiting
577  *      for the semaphore.
578  */
579 static THREAD_HANDLE *spawn_thread(time_t now)
580 {
581         int rcode;
582         THREAD_HANDLE *handle;
583         pthread_attr_t attr;
584
585         /*
586          *      Ensure that we don't spawn too many threads.
587          */
588         if (thread_pool.total_threads >= thread_pool.max_threads) {
589                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
590                 return NULL;
591         }
592
593         /*
594          *      Allocate a new thread handle.
595          */
596         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
597         memset(handle, 0, sizeof(THREAD_HANDLE));
598         handle->prev = NULL;
599         handle->next = NULL;
600         handle->pthread_id = NO_SUCH_CHILD_PID;
601         handle->thread_num = thread_pool.max_thread_num++;
602         handle->request_count = 0;
603         handle->status = THREAD_RUNNING;
604         handle->timestamp = time(NULL);
605
606         /*
607          *      Initialize the thread's attributes to detached.
608          *
609          *      We could call pthread_detach() later, but if the thread
610          *      exits between the create & detach calls, it will need to
611          *      be joined, which will never happen.
612          */
613         pthread_attr_init(&attr);
614         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
615
616         /*
617          *      Create the thread detached, so that it cleans up it's
618          *      own memory when it exits.
619          *
620          *      Note that the function returns non-zero on error, NOT
621          *      -1.  The return code is the error, and errno isn't set.
622          */
623         rcode = pthread_create(&handle->pthread_id, &attr,
624                         request_handler_thread, handle);
625         if (rcode != 0) {
626                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
627                        strerror(rcode));
628                 exit(1);
629         }
630         pthread_attr_destroy(&attr);
631
632         /*
633          *      One more thread to go into the list.
634          */
635         thread_pool.total_threads++;
636         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
637                         handle->thread_num, thread_pool.total_threads);
638
639         /*
640          *      Add the thread handle to the tail of the thread pool list.
641          */
642         if (thread_pool.tail) {
643                 thread_pool.tail->next = handle;
644                 handle->prev = thread_pool.tail;
645                 thread_pool.tail = handle;
646         } else {
647                 rad_assert(thread_pool.head == NULL);
648                 thread_pool.head = thread_pool.tail = handle;
649         }
650
651         /*
652          *      Update the time we last spawned a thread.
653          */
654         thread_pool.time_last_spawned = now;
655
656         /*
657          *      And return the new handle to the caller.
658          */
659         return handle;
660 }
661
662 /*
663  *      Temporary function to prevent server from executing a SIGHUP
664  *      until all threads are finished handling requests.  This returns
665  *      the number of active threads to 'radiusd.c'.
666  */
667 int total_active_threads(void)
668 {
669         /*
670          *      We don't acquire the mutex, so this is just an estimate.
671          *      We can't return with the lock held, so there's no point
672          *      in getting the guaranteed correct value; by the time
673          *      the caller sees it, it can be wrong again.
674          */
675         return thread_pool.active_threads;
676 }
677
678
679 static uint32_t pid_hash(const void *data)
680 {
681         const thread_fork_t *tf = data;
682
683         return lrad_hash(&tf->pid, sizeof(tf->pid));
684 }
685
686 static int pid_cmp(const void *one, const void *two)
687 {
688         const thread_fork_t *a = one;
689         const thread_fork_t *b = two;
690
691         return (a->pid - b->pid);
692 }
693
694 /*
695  *      Allocate the thread pool, and seed it with an initial number
696  *      of threads.
697  *
698  *      FIXME: What to do on a SIGHUP???
699  */
700 int thread_pool_init(int spawn_flag)
701 {
702         int             i, rcode;
703         CONF_SECTION    *pool_cf;
704         time_t          now;
705
706         DEBUG("Initializing the thread pool...");
707         now = time(NULL);
708
709         /*
710          *      We're not spawning new threads, don't do
711          *      anything.
712          */
713         if (!spawn_flag) return 0;
714
715         /*
716          *      After a SIGHUP, we don't over-write the previous values.
717          */
718         if (!pool_initialized) {
719                 /*
720                  *      Initialize the thread pool to some reasonable values.
721                  */
722                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
723                 thread_pool.head = NULL;
724                 thread_pool.tail = NULL;
725                 thread_pool.total_threads = 0;
726                 thread_pool.max_thread_num = 1;
727                 thread_pool.cleanup_delay = 5;
728                 thread_pool.spawn_flag = spawn_flag;
729
730                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
731                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
732                                strerror(errno));
733                         exit(1);
734                 }
735
736                 /*
737                  *      Create the hash table of child PID's
738                  */
739                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
740                                                              pid_cmp,
741                                                              free);
742                 if (!thread_pool.waiters) {
743                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
744                         exit(1);
745                 }
746         }
747
748         pool_cf = cf_section_find("thread");
749         if (!pool_cf) {
750                 radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
751                 exit(1);
752         }
753
754         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
755                 exit(1);
756         }
757
758         /*
759          *      Catch corner cases.
760          */
761         if (thread_pool.min_spare_threads < 1)
762                 thread_pool.min_spare_threads = 1;
763         if (thread_pool.max_spare_threads < 1)
764                 thread_pool.max_spare_threads = 1;
765         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
766                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
767
768         /*
769          *      The pool has already been initialized.  Don't spawn
770          *      new threads, and don't forget about forked children,
771          */
772         if (pool_initialized) {
773                 return 0;
774         }
775
776         /*
777          *      Initialize the queue of requests.
778          */
779         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
780         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
781         if (rcode != 0) {
782                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
783                        strerror(errno));
784                 exit(1);
785         }
786
787         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
788         if (rcode != 0) {
789                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
790                        strerror(errno));
791                 exit(1);
792         }
793
794         /*
795          *      Allocate multiple fifos.
796          */
797         for (i = 0; i < RAD_LISTEN_MAX; i++) {
798                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
799                 if (!thread_pool.fifo[i]) {
800                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
801                         exit(1);
802                 }
803         }
804
805 #ifdef HAVE_OPENSSL_CRYPTO_H
806         /*
807          *      If we're linking with OpenSSL too, then we need
808          *      to set up the mutexes and enable the thread callbacks.
809          */
810         if (!setup_ssl_mutexes()) {
811                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
812                 exit(1);
813         }
814 #endif
815
816
817         /*
818          *      Create a number of waiting threads.
819          *
820          *      If we fail while creating them, do something intelligent.
821          */
822         for (i = 0; i < thread_pool.start_threads; i++) {
823                 if (spawn_thread(now) == NULL) {
824                         return -1;
825                 }
826         }
827
828         DEBUG2("Thread pool initialized");
829         pool_initialized = TRUE;
830         return 0;
831 }
832
833
834 /*
835  *      Assign a new request to a free thread.
836  *
837  *      If there isn't a free thread, then try to create a new one,
838  *      up to the configured limits.
839  */
840 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
841 {
842         /*
843          *      We've been told not to spawn threads, so don't.
844          */
845         if (!thread_pool.spawn_flag) {
846                 radius_handle_request(request, fun);
847
848                 /*
849                  *      Requests that care about child process exit
850                  *      codes have already either called
851                  *      rad_waitpid(), or they've given up.
852                  */
853                 wait(NULL);
854                 return 1;
855         }
856
857         /*
858          *      Add the new request to the queue.
859          */
860         if (!request_enqueue(request, fun)) return 0;
861
862         /*
863          *      If the thread pool is busy handling requests, then
864          *      try to spawn another one.  We don't acquire the mutex
865          *      before reading active_threads, so our thread count is
866          *      just an estimate.  It's fine to go ahead and spawn an
867          *      extra thread in that case.
868          *      NOTE: the log message may be in error since active_threads
869          *      is an estimate, but it's only in error about the thread
870          *      count, not about the fact that we can't create a new one.
871          */
872         if (thread_pool.active_threads == thread_pool.total_threads) {
873                 if (spawn_thread(request->timestamp) == NULL) {
874                         radlog(L_INFO,
875                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
876                                thread_pool.max_threads);
877                         return 1;
878                 }
879         }
880
881         return 1;
882 }
883
884 /*
885  *      Check the min_spare_threads and max_spare_threads.
886  *
887  *      If there are too many or too few threads waiting, then we
888  *      either create some more, or delete some.
889  */
890 int thread_pool_clean(time_t now)
891 {
892         int spare;
893         int i, total;
894         THREAD_HANDLE *handle, *next;
895         int active_threads;
896         static time_t last_cleaned = 0;
897
898         /*
899          *      Loop over the thread pool deleting exited threads.
900          */
901         for (handle = thread_pool.head; handle; handle = next) {
902                 next = handle->next;
903
904                 /*
905                  *      Maybe we've asked the thread to exit, and it
906                  *      has agreed.
907                  */
908                 if (handle->status == THREAD_EXITED) {
909                         delete_thread(handle);
910                 }
911         }
912
913         /*
914          *      We don't need a mutex lock here, as we're reading
915          *      active_threads, and not modifying it.  We want a close
916          *      approximation of the number of active threads, and this
917          *      is good enough.
918          */
919         active_threads = thread_pool.active_threads;
920         spare = thread_pool.total_threads - active_threads;
921         if (debug_flag) {
922                 static int old_total = -1;
923                 static int old_active = -1;
924
925                 if ((old_total != thread_pool.total_threads) ||
926                                 (old_active != active_threads)) {
927                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
928                                         thread_pool.total_threads, active_threads, spare);
929                         old_total = thread_pool.total_threads;
930                         old_active = active_threads;
931                 }
932         }
933
934         /*
935          *      If there are too few spare threads, create some more.
936          */
937         if (spare < thread_pool.min_spare_threads) {
938                 total = thread_pool.min_spare_threads - spare;
939
940                 DEBUG2("Threads: Spawning %d spares", total);
941                 /*
942                  *      Create a number of spare threads.
943                  */
944                 for (i = 0; i < total; i++) {
945                         handle = spawn_thread(now);
946                         if (handle == NULL) {
947                                 return -1;
948                         }
949                 }
950
951                 /*
952                  *      And exit, as there can't be too many spare threads.
953                  */
954                 return 0;
955         }
956
957         /*
958          *      Only delete spare threads if we haven't already done
959          *      so this second.
960          */
961         if (now == last_cleaned) {
962                 return 0;
963         }
964         last_cleaned = now;
965
966         /*
967          *      Only delete the spare threads if sufficient time has
968          *      passed since we last created one.  This helps to minimize
969          *      the amount of create/delete cycles.
970          */
971         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
972                 return 0;
973         }
974
975         /*
976          *      If there are too many spare threads, delete one.
977          *
978          *      Note that we only delete ONE at a time, instead of
979          *      wiping out many.  This allows the excess servers to
980          *      be slowly reaped, just in case the load spike comes again.
981          */
982         if (spare > thread_pool.max_spare_threads) {
983
984                 spare -= thread_pool.max_spare_threads;
985
986                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
987
988                 /*
989                  *      Walk through the thread pool, deleting the
990                  *      first idle thread we come across.
991                  */
992                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
993                         next = handle->next;
994
995                         /*
996                          *      If the thread is not handling a
997                          *      request, but still live, then tell it
998                          *      to exit.
999                          *
1000                          *      It will eventually wake up, and realize
1001                          *      it's been told to commit suicide.
1002                          */
1003                         if ((handle->request == NULL) &&
1004                             (handle->status == THREAD_RUNNING)) {
1005                                 handle->status = THREAD_CANCELLED;
1006                                 /*
1007                                  *      Post an extra semaphore, as a
1008                                  *      signal to wake up, and exit.
1009                                  */
1010                                 sem_post(&thread_pool.semaphore);
1011                                 spare--;
1012                                 break;
1013                         }
1014                 }
1015         }
1016
1017         /*
1018          *      If the thread has handled too many requests, then make it
1019          *      exit.
1020          */
1021         if (thread_pool.max_requests_per_thread > 0) {
1022                 for (handle = thread_pool.head; handle; handle = next) {
1023                         next = handle->next;
1024
1025                         /*
1026                          *      Not handling a request, but otherwise
1027                          *      live, we can kill it.
1028                          */
1029                         if ((handle->request == NULL) &&
1030                             (handle->status == THREAD_RUNNING) &&
1031                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1032                                 handle->status = THREAD_CANCELLED;
1033                                 sem_post(&thread_pool.semaphore);
1034                         }
1035                 }
1036         }
1037
1038         /*
1039          *      Otherwise everything's kosher.  There are not too few,
1040          *      or too many spare threads.  Exit happily.
1041          */
1042         return 0;
1043 }
1044
1045
1046 /*
1047  *      Thread wrapper for fork().
1048  */
1049 pid_t rad_fork(void)
1050 {
1051         pid_t child_pid;
1052
1053         if (!pool_initialized) return fork();
1054
1055         reap_children();        /* be nice to non-wait thingies */
1056
1057         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1058                 return -1;
1059         }
1060
1061         /*
1062          *      Fork & save the PID for later reaping.
1063          */
1064         child_pid = fork();
1065         if (child_pid > 0) {
1066                 int rcode;
1067                 thread_fork_t *tf;
1068
1069                 tf = rad_malloc(sizeof(*tf));
1070                 memset(tf, 0, sizeof(*tf));
1071
1072                 tf->pid = child_pid;
1073
1074                 pthread_mutex_lock(&thread_pool.wait_mutex);
1075                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1076                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1077
1078                 if (!rcode) {
1079                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1080                                (int) child_pid);
1081                 }
1082         }
1083
1084         /*
1085          *      Return whatever we were told.
1086          */
1087         return child_pid;
1088 }
1089
1090
1091 /*
1092  *      Wait 10 seconds at most for a child to exit, then give up.
1093  */
1094 pid_t rad_waitpid(pid_t pid, int *status)
1095 {
1096         int i;
1097         thread_fork_t mytf, *tf;
1098
1099         if (!pool_initialized) return waitpid(pid, status, 0);
1100
1101         if (pid <= 0) return -1;
1102
1103         mytf.pid = pid;
1104
1105         pthread_mutex_lock(&thread_pool.wait_mutex);
1106         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1107         pthread_mutex_unlock(&thread_pool.wait_mutex);
1108
1109         if (!tf) return -1;
1110
1111         for (i = 0; i < 100; i++) {
1112                 reap_children();
1113
1114                 if (tf->exited) {
1115                         *status = tf->status;
1116
1117                         pthread_mutex_lock(&thread_pool.wait_mutex);
1118                         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1119                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1120                         return pid;
1121                 }
1122                 usleep(100000); /* sleep for 1/10 of a second */
1123         }
1124
1125         /*
1126          *      10 seconds have passed, give up on the child.
1127          */
1128         pthread_mutex_lock(&thread_pool.wait_mutex);
1129         lrad_hash_table_delete(thread_pool.waiters, &mytf);
1130         pthread_mutex_unlock(&thread_pool.wait_mutex);
1131
1132         return 0;
1133 }
1134
1135 #else /* HAVE_PTHREAD_H */
1136 /*
1137  *      "thread" code when we don't have threads.
1138  */
1139 int thread_pool_init(int spawn_flag)
1140 {
1141         return 0;
1142 }
1143
1144 /*
1145  *      call "radrespond".
1146  */
1147 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1148 {
1149         radius_handle_request(request, fun);
1150         return 1;
1151 }
1152
1153 #endif /* HAVE_PTHREAD_H */