add new header ident.h
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/autoconf.h>
25
26 #include <stdlib.h>
27 #include <string.h>
28
29 /*
30  *      Other OS's have sem_init, OS X doesn't.
31  */
32 #ifndef DARWIN
33 #include <semaphore.h>
34 #else
35 #include <mach/task.h>
36 #include <mach/semaphore.h>
37
38 #undef sem_t
39 #define sem_t semaphore_t
40 #undef sem_init
41 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
42 #undef sem_wait
43 #define sem_wait(s) semaphore_wait(*s)
44 #undef sem_post
45 #define sem_post(s) semaphore_signal(*s)
46 #endif
47
48 #include <signal.h>
49
50 #ifdef HAVE_SYS_WAIT_H
51 #include <sys/wait.h>
52 #endif
53
54 #include <freeradius-devel/radiusd.h>
55 #include <freeradius-devel/rad_assert.h>
56 #include <freeradius-devel/modules.h>
57
58 static const char rcsid[] =
59 "$Id$";
60
61 #ifdef HAVE_PTHREAD_H
62
63 #ifdef HAVE_OPENSSL_CRYPTO_H
64 #include <openssl/crypto.h>
65 #endif
66 #ifdef HAVE_OPENSSL_ERR_H
67 #include <openssl/err.h>
68 #endif
69
70 #define SEMAPHORE_LOCKED        (0)
71 #define SEMAPHORE_UNLOCKED      (1)
72
73 #define THREAD_RUNNING          (1)
74 #define THREAD_CANCELLED        (2)
75 #define THREAD_EXITED           (3)
76
77 #define NUM_FIFOS               (2)
78
79 /*
80  *     Ordered this way because we prefer proxy, then ongoing, then
81  *     start.
82  */
83 #define FIFO_START   (1)
84 #define FIFO_PROXY   (0)
85
86 /*
87  *  A data structure which contains the information about
88  *  the current thread.
89  *
90  *  pthread_id     pthread id
91  *  thread_num     server thread number, 1...number of threads
92  *  semaphore     used to block the thread until a request comes in
93  *  status        is the thread running or exited?
94  *  request_count the number of requests that this thread has handled
95  *  timestamp     when the thread started executing.
96  */
97 typedef struct THREAD_HANDLE {
98         struct THREAD_HANDLE *prev;
99         struct THREAD_HANDLE *next;
100         pthread_t            pthread_id;
101         int                  thread_num;
102         int                  status;
103         unsigned int         request_count;
104         time_t               timestamp;
105         REQUEST              *request;
106 } THREAD_HANDLE;
107
108 /*
109  *      For the request queue.
110  */
111 typedef struct request_queue_t {
112         REQUEST           *request;
113         RAD_REQUEST_FUNP  fun;
114 } request_queue_t;
115
116 typedef struct thread_fork_t {
117         pid_t           pid;
118         int             status;
119         int             exited;
120 } thread_fork_t;
121
122
123 /*
124  *      A data structure to manage the thread pool.  There's no real
125  *      need for a data structure, but it makes things conceptually
126  *      easier.
127  */
128 typedef struct THREAD_POOL {
129         THREAD_HANDLE *head;
130         THREAD_HANDLE *tail;
131
132         int total_threads;
133         int active_threads;     /* protected by queue_mutex */
134         int max_thread_num;
135         int start_threads;
136         int max_threads;
137         int min_spare_threads;
138         int max_spare_threads;
139         unsigned int max_requests_per_thread;
140         unsigned long request_count;
141         time_t time_last_spawned;
142         int cleanup_delay;
143         int spawn_flag;
144
145         pthread_mutex_t wait_mutex;
146         lrad_hash_table_t *waiters;
147
148         /*
149          *      All threads wait on this semaphore, for requests
150          *      to enter the queue.
151          */
152         sem_t           semaphore;
153
154         /*
155          *      To ensure only one thread at a time touches the queue.
156          */
157         pthread_mutex_t queue_mutex;
158
159         int             max_queue_size;
160         int             num_queued;
161         int             fifo_state;
162         lrad_fifo_t     *fifo[NUM_FIFOS];
163 } THREAD_POOL;
164
165 static THREAD_POOL thread_pool;
166 static int pool_initialized = FALSE;
167
168
169 /*
170  *      A mapping of configuration file names to internal integers
171  */
172 static const CONF_PARSER thread_config[] = {
173         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
174         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
175         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
176         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
177         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
178         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
179         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
180         { NULL, -1, 0, NULL, NULL }
181 };
182
183
184 #ifdef HAVE_OPENSSL_CRYPTO_H
185
186 /*
187  *      If we're linking against OpenSSL, then it is the
188  *      duty of the application, if it is multithreaded,
189  *      to provide OpenSSL with appropriate thread id
190  *      and mutex locking functions
191  *
192  *      Note: this only implements static callbacks.
193  *      OpenSSL does not use dynamic locking callbacks
194  *      right now, but may in the futiure, so we will have
195  *      to add them at some point.
196  */
197
198 static pthread_mutex_t *ssl_mutexes = NULL;
199
200 static unsigned long ssl_id_function(void)
201 {
202         return (unsigned long) pthread_self();
203 }
204
205 static void ssl_locking_function(int mode, int n, const char *file, int line)
206 {
207         file = file;            /* -Wunused */
208         line = line;            /* -Wunused */
209
210         if (mode & CRYPTO_LOCK) {
211                 pthread_mutex_lock(&(ssl_mutexes[n]));
212         } else {
213                 pthread_mutex_unlock(&(ssl_mutexes[n]));
214         }
215 }
216
217 static int setup_ssl_mutexes(void)
218 {
219         int i;
220
221         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
222         if (!ssl_mutexes) {
223                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
224                 return 0;
225         }
226
227         for (i = 0; i < CRYPTO_num_locks(); i++) {
228                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
229         }
230
231         CRYPTO_set_id_callback(ssl_id_function);
232         CRYPTO_set_locking_callback(ssl_locking_function);
233
234         return 1;
235 }
236 #endif
237
238
239 /*
240  *      We don't want to catch SIGCHLD for a host of reasons.
241  *
242  *      - exec_wait means that someone, somewhere, somewhen, will
243  *      call waitpid(), and catch the child.
244  *
245  *      - SIGCHLD is delivered to a random thread, not the one that
246  *      forked.
247  *
248  *      - if another thread catches the child, we have to coordinate
249  *      with the thread doing the waiting.
250  *
251  *      - if we don't waitpid() for non-wait children, they'll be zombies,
252  *      and will hang around forever.
253  *
254  */
255 static void reap_children(void)
256 {
257         pid_t pid;
258         int status;
259         thread_fork_t mytf, *tf;
260
261         if (lrad_hash_table_num_elements(thread_pool.waiters) == 0) return;
262
263         pthread_mutex_lock(&thread_pool.wait_mutex);
264
265         while (1) {
266                 pid = waitpid(0, &status, WNOHANG);
267                 if (pid <= 0) break;
268
269                 mytf.pid = pid;
270                 tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
271                 if (!tf) continue;
272                 
273                 tf->status = status;
274                 tf->exited = 1;
275         }
276
277         pthread_mutex_unlock(&thread_pool.wait_mutex);
278 }
279
280 /*
281  *      Add a request to the list of waiting requests.
282  *      This function gets called ONLY from the main handler thread...
283  *
284  *      This function should never fail.
285  */
286 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
287 {
288         int fifo = FIFO_START;
289         request_queue_t *entry;
290
291         pthread_mutex_lock(&thread_pool.queue_mutex);
292
293         thread_pool.request_count++;
294
295         /*
296          *      FIXME: Handle proxy replies separately?
297          */
298         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
299                 pthread_mutex_unlock(&thread_pool.queue_mutex);
300                 
301                 /*
302                  *      Mark the request as done.
303                  */
304                 radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
305                 request->finished = TRUE;
306                 return 0;
307         }
308
309         /*
310          *      Requests get handled in priority.  First, we handle
311          *      replies from a home server, to finish ongoing requests.
312          *
313          *      Then, we handle requests with State, to finish
314          *      multi-packet transactions.
315          *
316          *      Finally, we handle new requests.
317          */
318         if (request->proxy_reply) {
319                 fifo = FIFO_PROXY;
320         } else {
321                 fifo = FIFO_START;
322         }
323
324         entry = rad_malloc(sizeof(*entry));
325         entry->request = request;
326         entry->fun = fun;
327
328         if (!lrad_fifo_push(thread_pool.fifo[fifo], entry)) {
329                 pthread_mutex_unlock(&thread_pool.queue_mutex);
330                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
331                 request->finished = TRUE;
332                 return 0;
333         }
334
335         thread_pool.num_queued++;
336
337         pthread_mutex_unlock(&thread_pool.queue_mutex);
338
339         /*
340          *      There's one more request in the queue.
341          *
342          *      Note that we're not touching the queue any more, so
343          *      the semaphore post is outside of the mutex.  This also
344          *      means that when the thread wakes up and tries to lock
345          *      the mutex, it will be unlocked, and there won't be
346          *      contention.
347          */
348         sem_post(&thread_pool.semaphore);
349
350         return 1;
351 }
352
353 /*
354  *      Remove a request from the queue.
355  */
356 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
357 {
358         int fifo_state;
359         request_queue_t *entry;
360
361         reap_children();
362
363         pthread_mutex_lock(&thread_pool.queue_mutex);
364
365         fifo_state = thread_pool.fifo_state;
366
367  retry:
368         do {
369                 /*
370                  *      Pop an entry from the current queue, and go to
371                  *      the next queue.
372                  */
373                 entry = lrad_fifo_pop(thread_pool.fifo[fifo_state]);
374                 fifo_state++;
375                 if (fifo_state >= NUM_FIFOS) fifo_state = 0;
376         } while ((fifo_state != thread_pool.fifo_state) && !entry);
377
378         if (!entry) {
379                 pthread_mutex_unlock(&thread_pool.queue_mutex);
380                 *request = NULL;
381                 *fun = NULL;
382                 return 0;
383         }
384
385         rad_assert(thread_pool.num_queued > 0);
386         thread_pool.num_queued--;
387         *request = entry->request;
388         *fun = entry->fun;
389         free(entry);
390
391         rad_assert(*request != NULL);
392         rad_assert((*request)->magic == REQUEST_MAGIC);
393         rad_assert(*fun != NULL);
394
395         /*
396          *      If the request has sat in the queue for too long,
397          *      kill it.
398          *
399          *      The main clean-up code won't delete the request from
400          *      the request list, until it's marked "finished"
401          */
402         if ((*request)->options & RAD_REQUEST_OPTION_STOP_NOW) {
403                 (*request)->finished = 1;
404                 goto retry;
405         }
406
407         /*
408          *      The thread is currently processing a request.
409          */
410         thread_pool.active_threads++;
411         thread_pool.fifo_state = fifo_state;
412
413         pthread_mutex_unlock(&thread_pool.queue_mutex);
414
415         rad_assert((*request)->child_pid == NO_SUCH_CHILD_PID);
416
417         return 1;
418 }
419
420
421 /*
422  *      The main thread handler for requests.
423  *
424  *      Wait on the semaphore until we have it, and process the request.
425  */
426 static void *request_handler_thread(void *arg)
427 {
428         RAD_REQUEST_FUNP  fun;
429         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
430 #ifdef HAVE_PTHREAD_SIGMASK
431         sigset_t set;
432
433         /*
434          *      Block SIGHUP handling for the child threads.
435          *
436          *      This ensures that only the main server thread will
437          *      process HUP signals.
438          *
439          *      If we don't have sigprocmask, then it shouldn't be
440          *      a problem, either, as the sig_hup handler should check
441          *      for this condition.
442          */
443         sigemptyset(&set);
444         sigaddset(&set, SIGHUP);
445         sigaddset(&set, SIGINT);
446         sigaddset(&set, SIGQUIT);
447         sigaddset(&set, SIGTERM);
448         pthread_sigmask(SIG_BLOCK, &set, NULL);
449 #endif
450
451         /*
452          *      Loop forever, until told to exit.
453          */
454         do {
455                 int finished;
456
457                 /*
458                  *      Wait to be signalled.
459                  */
460                 DEBUG2("Thread %d waiting to be assigned a request",
461                        self->thread_num);
462         re_wait:
463                 if (sem_wait(&thread_pool.semaphore) != 0) {
464                         /*
465                          *      Interrupted system call.  Go back to
466                          *      waiting, but DON'T print out any more
467                          *      text.
468                          */
469                         if (errno == EINTR) {
470                                 DEBUG2("Re-wait %d", self->thread_num);
471                                 goto re_wait;
472                         }
473                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
474                                self->thread_num, strerror(errno));
475                         break;
476                 }
477
478                 DEBUG2("Thread %d got semaphore", self->thread_num);
479
480                 /*
481                  *      Try to grab a request from the queue.
482                  *
483                  *      It may be empty, in which case we fail
484                  *      gracefully.
485                  */
486                 if (!request_dequeue(&self->request, &fun)) continue;
487
488                 self->request->child_pid = self->pthread_id;
489                 self->request_count++;
490
491                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
492                        self->thread_num, self->request->number,
493                        self->request_count);
494
495                 /*
496                  *      Respond, and reset request->child_pid
497                  */
498                 finished = rad_respond(self->request, fun);
499
500                 /*
501                  *      Update the active threads.
502                  */
503                 pthread_mutex_lock(&thread_pool.queue_mutex);
504
505                 /*
506                  *      We haven't replied to the client, but we HAVE
507                  *      sent a proxied packet, and we have NOT
508                  *      received a proxy response.  In that case, send
509                  *      the proxied packet now.  Doing this in the mutex
510                  *      avoids race conditions.
511                  *
512                  *      FIXME: this work should really depend on a
513                  *      "state", and "next handler", rather than
514                  *      horrid hacks like thise.
515                  */
516                 if (!self->request->reply->data &&
517                     self->request->proxy && self->request->proxy->data
518                     && !self->request->proxy_reply)
519                         self->request->proxy_listener->send(self->request->proxy_listener,
520                                                             (char *)self->request->proxysecret);
521
522                 self->request->child_pid = NO_SUCH_CHILD_PID;
523                 self->request->finished = finished;
524                 self->request = NULL;
525                 
526                 rad_assert(thread_pool.active_threads > 0);
527                 thread_pool.active_threads--;
528                 pthread_mutex_unlock(&thread_pool.queue_mutex);
529         } while (self->status != THREAD_CANCELLED);
530
531         DEBUG2("Thread %d exiting...", self->thread_num);
532
533 #ifdef HAVE_OPENSSL_ERR_H
534         /*
535          *      If we linked with OpenSSL, the application
536          *      must remove the thread's error queue before
537          *      exiting to prevent memory leaks.
538          */
539         ERR_remove_state(0);
540 #endif
541
542         /*
543          *  Do this as the LAST thing before exiting.
544          */
545         self->status = THREAD_EXITED;
546
547         return NULL;
548 }
549
550 /*
551  *      Take a THREAD_HANDLE, delete it from the thread pool and
552  *      free its resources.
553  *
554  *      This function is called ONLY from the main server thread,
555  *      ONLY after the thread has exited.
556  */
557 static void delete_thread(THREAD_HANDLE *handle)
558 {
559         THREAD_HANDLE *prev;
560         THREAD_HANDLE *next;
561
562         rad_assert(handle->request == NULL);
563
564         DEBUG2("Deleting thread %d", handle->thread_num);
565
566         prev = handle->prev;
567         next = handle->next;
568         rad_assert(thread_pool.total_threads > 0);
569         thread_pool.total_threads--;
570
571         /*
572          *      Remove the handle from the list.
573          */
574         if (prev == NULL) {
575                 rad_assert(thread_pool.head == handle);
576                 thread_pool.head = next;
577         } else {
578                 prev->next = next;
579         }
580
581         if (next == NULL) {
582                 rad_assert(thread_pool.tail == handle);
583                 thread_pool.tail = prev;
584         } else {
585                 next->prev = prev;
586         }
587
588         /*
589          *      Free the handle, now that it's no longer referencable.
590          */
591         free(handle);
592 }
593
594
595 /*
596  *      Spawn a new thread, and place it in the thread pool.
597  *
598  *      The thread is started initially in the blocked state, waiting
599  *      for the semaphore.
600  */
601 static THREAD_HANDLE *spawn_thread(time_t now)
602 {
603         int rcode;
604         THREAD_HANDLE *handle;
605         pthread_attr_t attr;
606
607         /*
608          *      Ensure that we don't spawn too many threads.
609          */
610         if (thread_pool.total_threads >= thread_pool.max_threads) {
611                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
612                 return NULL;
613         }
614
615         /*
616          *      Allocate a new thread handle.
617          */
618         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
619         memset(handle, 0, sizeof(THREAD_HANDLE));
620         handle->prev = NULL;
621         handle->next = NULL;
622         handle->pthread_id = NO_SUCH_CHILD_PID;
623         handle->thread_num = thread_pool.max_thread_num++;
624         handle->request_count = 0;
625         handle->status = THREAD_RUNNING;
626         handle->timestamp = time(NULL);
627
628         /*
629          *      Initialize the thread's attributes to detached.
630          *
631          *      We could call pthread_detach() later, but if the thread
632          *      exits between the create & detach calls, it will need to
633          *      be joined, which will never happen.
634          */
635         pthread_attr_init(&attr);
636         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
637
638         /*
639          *      Create the thread detached, so that it cleans up it's
640          *      own memory when it exits.
641          *
642          *      Note that the function returns non-zero on error, NOT
643          *      -1.  The return code is the error, and errno isn't set.
644          */
645         rcode = pthread_create(&handle->pthread_id, &attr,
646                         request_handler_thread, handle);
647         if (rcode != 0) {
648                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
649                        strerror(rcode));
650                 exit(1);
651         }
652         pthread_attr_destroy(&attr);
653
654         /*
655          *      One more thread to go into the list.
656          */
657         thread_pool.total_threads++;
658         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
659                         handle->thread_num, thread_pool.total_threads);
660
661         /*
662          *      Add the thread handle to the tail of the thread pool list.
663          */
664         if (thread_pool.tail) {
665                 thread_pool.tail->next = handle;
666                 handle->prev = thread_pool.tail;
667                 thread_pool.tail = handle;
668         } else {
669                 rad_assert(thread_pool.head == NULL);
670                 thread_pool.head = thread_pool.tail = handle;
671         }
672
673         /*
674          *      Update the time we last spawned a thread.
675          */
676         thread_pool.time_last_spawned = now;
677
678         /*
679          *      And return the new handle to the caller.
680          */
681         return handle;
682 }
683
684 /*
685  *      Temporary function to prevent server from executing a SIGHUP
686  *      until all threads are finished handling requests.  This returns
687  *      the number of active threads to 'radiusd.c'.
688  */
689 int total_active_threads(void)
690 {
691         /*
692          *      We don't acquire the mutex, so this is just an estimate.
693          *      We can't return with the lock held, so there's no point
694          *      in getting the guaranteed correct value; by the time
695          *      the caller sees it, it can be wrong again.
696          */
697         return thread_pool.active_threads;
698 }
699
700
701 static uint32_t pid_hash(const void *data)
702 {
703         const thread_fork_t *tf = data;
704
705         return lrad_hash(&tf->pid, sizeof(tf->pid));
706 }
707
708 static int pid_cmp(const void *one, const void *two)
709 {
710         const thread_fork_t *a = one;
711         const thread_fork_t *b = two;
712
713         return (a->pid - b->pid);
714 }
715
716 /*
717  *      Allocate the thread pool, and seed it with an initial number
718  *      of threads.
719  *
720  *      FIXME: What to do on a SIGHUP???
721  */
722 int thread_pool_init(int spawn_flag)
723 {
724         int             i, rcode;
725         CONF_SECTION    *pool_cf;
726         time_t          now;
727
728         DEBUG("Initializing the thread pool...");
729         now = time(NULL);
730
731         /*
732          *      After a SIGHUP, we don't over-write the previous values.
733          */
734         if (!pool_initialized) {
735                 /*
736                  *      Initialize the thread pool to some reasonable values.
737                  */
738                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
739                 thread_pool.head = NULL;
740                 thread_pool.tail = NULL;
741                 thread_pool.total_threads = 0;
742                 thread_pool.max_thread_num = 1;
743                 thread_pool.cleanup_delay = 5;
744                 thread_pool.spawn_flag = spawn_flag;
745
746                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
747                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
748                                strerror(errno));
749                         exit(1);
750                 }               
751                 
752                 /*
753                  *      Create the hash table of child PID's
754                  */
755                 thread_pool.waiters = lrad_hash_table_create(pid_hash,
756                                                              pid_cmp,
757                                                              free);
758                 if (!thread_pool.waiters) {
759                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
760                         exit(1);
761                 }
762         }
763
764         /*
765          *      We're not spawning new threads, don't do
766          *      anything.
767          */
768         if (!spawn_flag) return 0;
769
770         pool_cf = cf_section_find("thread");
771         if (pool_cf != NULL) {
772                 /*
773                  *      FIXME: Check for errors?
774                  */
775                 cf_section_parse(pool_cf, NULL, thread_config);
776         }
777
778         /*
779          *      The pool has already been initialized.  Don't spawn
780          *      new threads, and don't forget about forked children,
781          */
782         if (pool_initialized) {
783                 return 0;
784         }
785
786         /*
787          *      Initialize the queue of requests.
788          */
789         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
790         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
791         if (rcode != 0) {
792                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
793                        strerror(errno));
794                 exit(1);
795         }
796
797         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
798         if (rcode != 0) {
799                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
800                        strerror(errno));
801                 exit(1);
802         }
803
804         /*
805          *      Allocate multiple fifos.
806          */
807         for (i = 0; i < NUM_FIFOS; i++) {
808                 thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
809                 if (!thread_pool.fifo[i]) {
810                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
811                         exit(1);
812                 }
813         }
814
815 #ifdef HAVE_OPENSSL_CRYPTO_H
816         /*
817          *      If we're linking with OpenSSL too, then we need
818          *      to set up the mutexes and enable the thread callbacks.
819          */
820         if (!setup_ssl_mutexes()) {
821                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
822                 exit(1);
823         }
824 #endif
825
826
827         /*
828          *      Create a number of waiting threads.
829          *
830          *      If we fail while creating them, do something intelligent.
831          */
832         for (i = 0; i < thread_pool.start_threads; i++) {
833                 if (spawn_thread(now) == NULL) {
834                         return -1;
835                 }
836         }
837
838         DEBUG2("Thread pool initialized");
839         pool_initialized = TRUE;
840         return 0;
841 }
842
843
844 /*
845  *      Assign a new request to a free thread.
846  *
847  *      If there isn't a free thread, then try to create a new one,
848  *      up to the configured limits.
849  */
850 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
851 {
852         /*
853          *      We've been told not to spawn threads, so don't.
854          */
855         if (!thread_pool.spawn_flag) {
856                 request->finished = rad_respond(request, fun);
857                 return 1;
858         }
859
860         /*
861          *      Add the new request to the queue.
862          */
863         if (!request_enqueue(request, fun)) return 0;
864
865         /*
866          *      If the thread pool is busy handling requests, then
867          *      try to spawn another one.  We don't acquire the mutex
868          *      before reading active_threads, so our thread count is
869          *      just an estimate.  It's fine to go ahead and spawn an
870          *      extra thread in that case.
871          *      NOTE: the log message may be in error since active_threads
872          *      is an estimate, but it's only in error about the thread
873          *      count, not about the fact that we can't create a new one.
874          */
875         if (thread_pool.active_threads == thread_pool.total_threads) {
876                 if (spawn_thread(request->timestamp) == NULL) {
877                         radlog(L_INFO,
878                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
879                                thread_pool.max_threads);
880                         return 1;
881                 }
882         }
883
884         return 1;
885 }
886
887 /*
888  *      Check the min_spare_threads and max_spare_threads.
889  *
890  *      If there are too many or too few threads waiting, then we
891  *      either create some more, or delete some.
892  */
893 int thread_pool_clean(time_t now)
894 {
895         int spare;
896         int i, total;
897         THREAD_HANDLE *handle, *next;
898         int active_threads;
899         static time_t last_cleaned = 0;
900
901         /*
902          *      Loop over the thread pool deleting exited threads.
903          */
904         for (handle = thread_pool.head; handle; handle = next) {
905                 next = handle->next;
906
907                 /*
908                  *      Maybe we've asked the thread to exit, and it
909                  *      has agreed.
910                  */
911                 if (handle->status == THREAD_EXITED) {
912                         delete_thread(handle);
913                 }
914         }
915
916         /*
917          *      We don't need a mutex lock here, as we're reading
918          *      active_threads, and not modifying it.  We want a close
919          *      approximation of the number of active threads, and this
920          *      is good enough.
921          */
922         active_threads = thread_pool.active_threads;
923         spare = thread_pool.total_threads - active_threads;
924         if (debug_flag) {
925                 static int old_total = -1;
926                 static int old_active = -1;
927
928                 if ((old_total != thread_pool.total_threads) ||
929                                 (old_active != active_threads)) {
930                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
931                                         thread_pool.total_threads, active_threads, spare);
932                         old_total = thread_pool.total_threads;
933                         old_active = active_threads;
934                 }
935         }
936
937         /*
938          *      If there are too few spare threads, create some more.
939          */
940         if (spare < thread_pool.min_spare_threads) {
941                 total = thread_pool.min_spare_threads - spare;
942
943                 DEBUG2("Threads: Spawning %d spares", total);
944                 /*
945                  *      Create a number of spare threads.
946                  */
947                 for (i = 0; i < total; i++) {
948                         handle = spawn_thread(now);
949                         if (handle == NULL) {
950                                 return -1;
951                         }
952                 }
953
954                 /*
955                  *      And exit, as there can't be too many spare threads.
956                  */
957                 return 0;
958         }
959
960         /*
961          *      Only delete spare threads if we haven't already done
962          *      so this second.
963          */
964         if (now == last_cleaned) {
965                 return 0;
966         }
967         last_cleaned = now;
968
969         /*
970          *      Only delete the spare threads if sufficient time has
971          *      passed since we last created one.  This helps to minimize
972          *      the amount of create/delete cycles.
973          */
974         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
975                 return 0;
976         }
977
978         /*
979          *      If there are too many spare threads, delete one.
980          *
981          *      Note that we only delete ONE at a time, instead of
982          *      wiping out many.  This allows the excess servers to
983          *      be slowly reaped, just in case the load spike comes again.
984          */
985         if (spare > thread_pool.max_spare_threads) {
986
987                 spare -= thread_pool.max_spare_threads;
988
989                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
990
991                 /*
992                  *      Walk through the thread pool, deleting the
993                  *      first idle thread we come across.
994                  */
995                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
996                         next = handle->next;
997
998                         /*
999                          *      If the thread is not handling a
1000                          *      request, but still live, then tell it
1001                          *      to exit.
1002                          *
1003                          *      It will eventually wake up, and realize
1004                          *      it's been told to commit suicide.
1005                          */
1006                         if ((handle->request == NULL) &&
1007                             (handle->status == THREAD_RUNNING)) {
1008                                 handle->status = THREAD_CANCELLED;
1009                                 /*
1010                                  *      Post an extra semaphore, as a
1011                                  *      signal to wake up, and exit.
1012                                  */
1013                                 sem_post(&thread_pool.semaphore);
1014                                 spare--;
1015                                 break;
1016                         }
1017                 }
1018         }
1019
1020         /*
1021          *      If the thread has handled too many requests, then make it
1022          *      exit.
1023          */
1024         if (thread_pool.max_requests_per_thread > 0) {
1025                 for (handle = thread_pool.head; handle; handle = next) {
1026                         next = handle->next;
1027
1028                         /*
1029                          *      Not handling a request, but otherwise
1030                          *      live, we can kill it.
1031                          */
1032                         if ((handle->request == NULL) &&
1033                             (handle->status == THREAD_RUNNING) &&
1034                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1035                                 handle->status = THREAD_CANCELLED;
1036                                 sem_post(&thread_pool.semaphore);
1037                         }
1038                 }
1039         }
1040
1041         /*
1042          *      Otherwise everything's kosher.  There are not too few,
1043          *      or too many spare threads.  Exit happily.
1044          */
1045         return 0;
1046 }
1047
1048
1049 /*
1050  *      Thread wrapper for fork().
1051  */
1052 pid_t rad_fork(int exec_wait)
1053 {
1054         pid_t child_pid;
1055
1056         if (exec_wait) return fork();
1057
1058         reap_children();        /* be nice to non-wait thingies */
1059
1060         if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1061                 return -1;
1062         }
1063
1064         /*
1065          *      Fork & save the PID for later reaping.
1066          */
1067         child_pid = fork();
1068         if (child_pid > 0) {
1069                 int rcode;
1070                 thread_fork_t *tf;
1071
1072                 tf = rad_malloc(sizeof(*tf));
1073                 memset(tf, 0, sizeof(*tf));
1074                 
1075                 tf->pid = child_pid;
1076
1077                 /*
1078                  *      Lock the mutex.
1079                  */
1080                 pthread_mutex_lock(&thread_pool.wait_mutex);
1081
1082                 rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
1083
1084                 /*
1085                  *      Unlock the mutex.
1086                  */
1087                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1088
1089                 if (!rcode) {
1090                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1091                                (int) child_pid);
1092                 }
1093         }
1094
1095         /*
1096          *      Return whatever we were told.
1097          */
1098         return child_pid;
1099 }
1100
1101 /*
1102  *      We may not need this any more...
1103  */
1104 pid_t rad_waitpid(pid_t pid, int *status, int options)
1105 {
1106         thread_fork_t mytf, *tf;
1107
1108         reap_children();        /* be nice to non-wait thingies */
1109
1110         if (pid <= 0) return -1;
1111
1112         if ((options & WNOHANG) == 0) return -1;
1113
1114         mytf.pid = pid;
1115
1116         pthread_mutex_lock(&thread_pool.wait_mutex);
1117         tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
1118
1119         if (!tf) {              /* not found.  It's a problem... */
1120                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1121                 return waitpid(pid, status, options);
1122         }
1123
1124         if (tf->exited) {
1125                 *status = tf->status;
1126                 lrad_hash_table_delete(thread_pool.waiters, &mytf);
1127                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1128                 return pid;
1129         }
1130         
1131         /*
1132          *      Don't wait, and it hasn't exited.  Return.
1133          */
1134         pthread_mutex_unlock(&thread_pool.wait_mutex);
1135         return 0;
1136 }
1137
1138 #else /* HAVE_PTHREAD_H */
1139 /*
1140  *      "thread" code when we don't have threads.
1141  */
1142 int thread_pool_init(int spawn_flag)
1143 {
1144         return 0;
1145 }
1146
1147 /*
1148  *      call "radrespond".
1149  */
1150 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1151 {
1152         rad_respond(request, fun);
1153         return 1;
1154 }
1155
1156 #endif /* HAVE_PTHREAD_H */