Move socket listening code to plugin modules.
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/rad_assert.h>
29
30 /*
31  *      Other OS's have sem_init, OS X doesn't.
32  */
33 #ifdef HAVE_SEMAPHORE_H
34 #include <semaphore.h>
35 #endif
36
37 #ifdef DARWIN
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #ifdef HAVE_SYS_WAIT_H
52 #include <sys/wait.h>
53 #endif
54
55 #ifdef HAVE_PTHREAD_H
56
57 #ifdef HAVE_OPENSSL_CRYPTO_H
58 #include <openssl/crypto.h>
59 #endif
60 #ifdef HAVE_OPENSSL_ERR_H
61 #include <openssl/err.h>
62 #endif
63 #ifdef HAVE_OPENSSL_EVP_H
64 #include <openssl/evp.h>
65 #endif
66
67 #define SEMAPHORE_LOCKED        (0)
68 #define SEMAPHORE_UNLOCKED      (1)
69
70 #define THREAD_RUNNING          (1)
71 #define THREAD_CANCELLED        (2)
72 #define THREAD_EXITED           (3)
73
74 #define NUM_FIFOS               RAD_LISTEN_MAX
75
76
77 /*
78  *  A data structure which contains the information about
79  *  the current thread.
80  *
81  *  pthread_id     pthread id
82  *  thread_num     server thread number, 1...number of threads
83  *  semaphore     used to block the thread until a request comes in
84  *  status        is the thread running or exited?
85  *  request_count the number of requests that this thread has handled
86  *  timestamp     when the thread started executing.
87  */
88 typedef struct THREAD_HANDLE {
89         struct THREAD_HANDLE *prev;
90         struct THREAD_HANDLE *next;
91         pthread_t            pthread_id;
92         int                  thread_num;
93         int                  status;
94         unsigned int         request_count;
95         time_t               timestamp;
96         REQUEST              *request;
97 } THREAD_HANDLE;
98
99 typedef struct thread_fork_t {
100         pid_t           pid;
101         int             status;
102         int             exited;
103 } thread_fork_t;
104
105
106 /*
107  *      A data structure to manage the thread pool.  There's no real
108  *      need for a data structure, but it makes things conceptually
109  *      easier.
110  */
111 typedef struct THREAD_POOL {
112         THREAD_HANDLE *head;
113         THREAD_HANDLE *tail;
114
115         int total_threads;
116         int active_threads;     /* protected by queue_mutex */
117         int max_thread_num;
118         int start_threads;
119         int max_threads;
120         int min_spare_threads;
121         int max_spare_threads;
122         unsigned int max_requests_per_thread;
123         unsigned long request_count;
124         time_t time_last_spawned;
125         int cleanup_delay;
126         int spawn_flag;
127
128 #ifdef WNOHANG
129         pthread_mutex_t wait_mutex;
130         fr_hash_table_t *waiters;
131 #endif
132
133         /*
134          *      All threads wait on this semaphore, for requests
135          *      to enter the queue.
136          */
137         sem_t           semaphore;
138
139         /*
140          *      To ensure only one thread at a time touches the queue.
141          */
142         pthread_mutex_t queue_mutex;
143
144         int             max_queue_size;
145         int             num_queued;
146         fr_fifo_t       *fifo[NUM_FIFOS];
147 } THREAD_POOL;
148
149 static THREAD_POOL thread_pool;
150 static int pool_initialized = FALSE;
151 static time_t last_cleaned = 0;
152
153 static void thread_pool_manage(time_t now);
154
155 /*
156  *      A mapping of configuration file names to internal integers
157  */
158 static const CONF_PARSER thread_config[] = {
159         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
160         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
161         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
162         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
163         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
164         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
165         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
166         { NULL, -1, 0, NULL, NULL }
167 };
168
169
170 #ifdef HAVE_OPENSSL_CRYPTO_H
171
172 /*
173  *      If we're linking against OpenSSL, then it is the
174  *      duty of the application, if it is multithreaded,
175  *      to provide OpenSSL with appropriate thread id
176  *      and mutex locking functions
177  *
178  *      Note: this only implements static callbacks.
179  *      OpenSSL does not use dynamic locking callbacks
180  *      right now, but may in the futiure, so we will have
181  *      to add them at some point.
182  */
183
184 static pthread_mutex_t *ssl_mutexes = NULL;
185
186 static unsigned long ssl_id_function(void)
187 {
188         return (unsigned long) pthread_self();
189 }
190
191 static void ssl_locking_function(int mode, int n, const char *file, int line)
192 {
193         file = file;            /* -Wunused */
194         line = line;            /* -Wunused */
195
196         if (mode & CRYPTO_LOCK) {
197                 pthread_mutex_lock(&(ssl_mutexes[n]));
198         } else {
199                 pthread_mutex_unlock(&(ssl_mutexes[n]));
200         }
201 }
202
203 static int setup_ssl_mutexes(void)
204 {
205         int i;
206
207 #ifdef HAVE_OPENSSL_EVP_H
208         /*
209          *      Enable all ciphers and digests.
210          */
211         OpenSSL_add_all_algorithms();
212 #endif
213
214         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
215         if (!ssl_mutexes) {
216                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
217                 return 0;
218         }
219
220         for (i = 0; i < CRYPTO_num_locks(); i++) {
221                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
222         }
223
224         CRYPTO_set_id_callback(ssl_id_function);
225         CRYPTO_set_locking_callback(ssl_locking_function);
226
227         return 1;
228 }
229 #endif
230
231 #ifdef WNOHANG
232 /*
233  *      We don't want to catch SIGCHLD for a host of reasons.
234  *
235  *      - exec_wait means that someone, somewhere, somewhen, will
236  *      call waitpid(), and catch the child.
237  *
238  *      - SIGCHLD is delivered to a random thread, not the one that
239  *      forked.
240  *
241  *      - if another thread catches the child, we have to coordinate
242  *      with the thread doing the waiting.
243  *
244  *      - if we don't waitpid() for non-wait children, they'll be zombies,
245  *      and will hang around forever.
246  *
247  */
248 static void reap_children(void)
249 {
250         pid_t pid;
251         int status;
252         thread_fork_t mytf, *tf;
253
254
255         pthread_mutex_lock(&thread_pool.wait_mutex);
256
257         do {
258                 pid = waitpid(0, &status, WNOHANG);
259                 if (pid <= 0) break;
260
261                 mytf.pid = pid;
262                 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
263                 if (!tf) continue;
264
265                 tf->status = status;
266                 tf->exited = 1;
267         } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
268
269         pthread_mutex_unlock(&thread_pool.wait_mutex);
270 }
271 #else
272 #define reap_children()
273 #endif /* WNOHANG */
274
275 /*
276  *      Add a request to the list of waiting requests.
277  *      This function gets called ONLY from the main handler thread...
278  *
279  *      This function should never fail.
280  */
281 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
282 {
283         rad_assert(request->process == fun);
284         pthread_mutex_lock(&thread_pool.queue_mutex);
285
286         thread_pool.request_count++;
287
288         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
289                 pthread_mutex_unlock(&thread_pool.queue_mutex);
290
291                 /*
292                  *      Mark the request as done.
293                  */
294                 radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
295                 request->child_state = REQUEST_DONE;
296                 return 0;
297         }
298
299         /*
300          *      Push the request onto the appropriate fifo for that
301          */
302         if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
303                 pthread_mutex_unlock(&thread_pool.queue_mutex);
304                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
305                 request->child_state = REQUEST_DONE;
306                 return 0;
307         }
308
309         thread_pool.num_queued++;
310
311         pthread_mutex_unlock(&thread_pool.queue_mutex);
312
313         /*
314          *      There's one more request in the queue.
315          *
316          *      Note that we're not touching the queue any more, so
317          *      the semaphore post is outside of the mutex.  This also
318          *      means that when the thread wakes up and tries to lock
319          *      the mutex, it will be unlocked, and there won't be
320          *      contention.
321          */
322         sem_post(&thread_pool.semaphore);
323
324         return 1;
325 }
326
327 /*
328  *      Remove a request from the queue.
329  */
330 static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
331 {
332         RAD_LISTEN_TYPE i, start;
333         REQUEST *request = NULL;
334
335         reap_children();
336
337         pthread_mutex_lock(&thread_pool.queue_mutex);
338
339         /*
340          *      Clear old requests from all queues.
341          *
342          *      We only do one pass over the queue, in order to
343          *      amortize the work across the child threads.  Since we
344          *      do N checks for one request de-queued, the old
345          *      requests will be quickly cleared.
346          */
347         for (i = 0; i < RAD_LISTEN_MAX; i++) {
348                 request = fr_fifo_peek(thread_pool.fifo[i]);
349                 if (!request ||
350                     (request->master_state != REQUEST_STOP_PROCESSING)) {
351                         continue;
352 }
353                 /*
354                  *      This entry was marked to be stopped.  Acknowledge it.
355                  */
356                 request = fr_fifo_pop(thread_pool.fifo[i]);
357                 rad_assert(request != NULL);
358                 request->child_state = REQUEST_DONE;
359                 thread_pool.num_queued--;
360         }
361
362         start = 0;
363  retry:
364         /*
365          *      Pop results from the top of the queue
366          */
367         for (i = start; i < RAD_LISTEN_MAX; i++) {
368                 request = fr_fifo_pop(thread_pool.fifo[i]);
369                 if (request) {
370                         start = i;
371                         break;
372                 }
373         }
374
375         if (!request) {
376                 pthread_mutex_unlock(&thread_pool.queue_mutex);
377                 *prequest = NULL;
378                 *fun = NULL;
379                 return 0;
380         }
381
382         rad_assert(thread_pool.num_queued > 0);
383         thread_pool.num_queued--;
384         *prequest = request;
385         *fun = request->process;
386
387         rad_assert(request->magic == REQUEST_MAGIC);
388         rad_assert(*fun != NULL);
389
390         /*
391          *      If the request has sat in the queue for too long,
392          *      kill it.
393          *
394          *      The main clean-up code can't delete the request from
395          *      the queue, and therefore won't clean it up until we
396          *      have acknowledged it as "done".
397          */
398         if (request->master_state == REQUEST_STOP_PROCESSING) {
399                 request->child_state = REQUEST_DONE;
400                 goto retry;
401         }
402
403         /*
404          *      The thread is currently processing a request.
405          */
406         thread_pool.active_threads++;
407
408         pthread_mutex_unlock(&thread_pool.queue_mutex);
409
410         return 1;
411 }
412
413
414 /*
415  *      The main thread handler for requests.
416  *
417  *      Wait on the semaphore until we have it, and process the request.
418  */
419 static void *request_handler_thread(void *arg)
420 {
421         RAD_REQUEST_FUNP  fun;
422         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
423
424         /*
425          *      Loop forever, until told to exit.
426          */
427         do {
428                 /*
429                  *      Wait to be signalled.
430                  */
431                 DEBUG2("Thread %d waiting to be assigned a request",
432                        self->thread_num);
433         re_wait:
434                 if (sem_wait(&thread_pool.semaphore) != 0) {
435                         /*
436                          *      Interrupted system call.  Go back to
437                          *      waiting, but DON'T print out any more
438                          *      text.
439                          */
440                         if (errno == EINTR) {
441                                 DEBUG2("Re-wait %d", self->thread_num);
442                                 goto re_wait;
443                         }
444                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
445                                self->thread_num, strerror(errno));
446                         break;
447                 }
448
449                 DEBUG2("Thread %d got semaphore", self->thread_num);
450
451 #ifdef HAVE_OPENSSL_ERR_H
452                 /*
453                  *      Clear the error queue for the current thread.
454                  */
455                 ERR_clear_error ();
456 #endif
457
458                 /*
459                  *      Try to grab a request from the queue.
460                  *
461                  *      It may be empty, in which case we fail
462                  *      gracefully.
463                  */
464                 if (!request_dequeue(&self->request, &fun)) continue;
465
466                 self->request->child_pid = self->pthread_id;
467                 self->request_count++;
468
469                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
470                        self->thread_num, self->request->number,
471                        self->request_count);
472
473                 radius_handle_request(self->request, fun);
474
475                 /*
476                  *      Update the active threads.
477                  */
478                 pthread_mutex_lock(&thread_pool.queue_mutex);
479                 rad_assert(thread_pool.active_threads > 0);
480                 thread_pool.active_threads--;
481                 pthread_mutex_unlock(&thread_pool.queue_mutex);
482         } while (self->status != THREAD_CANCELLED);
483
484         DEBUG2("Thread %d exiting...", self->thread_num);
485
486 #ifdef HAVE_OPENSSL_ERR_H
487         /*
488          *      If we linked with OpenSSL, the application
489          *      must remove the thread's error queue before
490          *      exiting to prevent memory leaks.
491          */
492         ERR_remove_state(0);
493 #endif
494
495         /*
496          *  Do this as the LAST thing before exiting.
497          */
498         self->request = NULL;
499         self->status = THREAD_EXITED;
500
501         return NULL;
502 }
503
504 /*
505  *      Take a THREAD_HANDLE, delete it from the thread pool and
506  *      free its resources.
507  *
508  *      This function is called ONLY from the main server thread,
509  *      ONLY after the thread has exited.
510  */
511 static void delete_thread(THREAD_HANDLE *handle)
512 {
513         THREAD_HANDLE *prev;
514         THREAD_HANDLE *next;
515
516         rad_assert(handle->request == NULL);
517
518         DEBUG2("Deleting thread %d", handle->thread_num);
519
520         prev = handle->prev;
521         next = handle->next;
522         rad_assert(thread_pool.total_threads > 0);
523         thread_pool.total_threads--;
524
525         /*
526          *      Remove the handle from the list.
527          */
528         if (prev == NULL) {
529                 rad_assert(thread_pool.head == handle);
530                 thread_pool.head = next;
531         } else {
532                 prev->next = next;
533         }
534
535         if (next == NULL) {
536                 rad_assert(thread_pool.tail == handle);
537                 thread_pool.tail = prev;
538         } else {
539                 next->prev = prev;
540         }
541
542         /*
543          *      Free the handle, now that it's no longer referencable.
544          */
545         free(handle);
546 }
547
548
549 /*
550  *      Spawn a new thread, and place it in the thread pool.
551  *
552  *      The thread is started initially in the blocked state, waiting
553  *      for the semaphore.
554  */
555 static THREAD_HANDLE *spawn_thread(time_t now)
556 {
557         int rcode;
558         THREAD_HANDLE *handle;
559         pthread_attr_t attr;
560
561         /*
562          *      Ensure that we don't spawn too many threads.
563          */
564         if (thread_pool.total_threads >= thread_pool.max_threads) {
565                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
566                 return NULL;
567         }
568
569         /*
570          *      Allocate a new thread handle.
571          */
572         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
573         memset(handle, 0, sizeof(THREAD_HANDLE));
574         handle->prev = NULL;
575         handle->next = NULL;
576         handle->thread_num = thread_pool.max_thread_num++;
577         handle->request_count = 0;
578         handle->status = THREAD_RUNNING;
579         handle->timestamp = time(NULL);
580
581         /*
582          *      Initialize the thread's attributes to detached.
583          *
584          *      We could call pthread_detach() later, but if the thread
585          *      exits between the create & detach calls, it will need to
586          *      be joined, which will never happen.
587          */
588         pthread_attr_init(&attr);
589         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
590
591         /*
592          *      Create the thread detached, so that it cleans up it's
593          *      own memory when it exits.
594          *
595          *      Note that the function returns non-zero on error, NOT
596          *      -1.  The return code is the error, and errno isn't set.
597          */
598         rcode = pthread_create(&handle->pthread_id, &attr,
599                         request_handler_thread, handle);
600         if (rcode != 0) {
601                 radlog(L_ERR, "Thread create failed: %s",
602                        strerror(rcode));
603                 return NULL;
604         }
605         pthread_attr_destroy(&attr);
606
607         /*
608          *      One more thread to go into the list.
609          */
610         thread_pool.total_threads++;
611         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
612                         handle->thread_num, thread_pool.total_threads);
613
614         /*
615          *      Add the thread handle to the tail of the thread pool list.
616          */
617         if (thread_pool.tail) {
618                 thread_pool.tail->next = handle;
619                 handle->prev = thread_pool.tail;
620                 thread_pool.tail = handle;
621         } else {
622                 rad_assert(thread_pool.head == NULL);
623                 thread_pool.head = thread_pool.tail = handle;
624         }
625
626         /*
627          *      Update the time we last spawned a thread.
628          */
629         thread_pool.time_last_spawned = now;
630
631         /*
632          *      And return the new handle to the caller.
633          */
634         return handle;
635 }
636
637 /*
638  *      Temporary function to prevent server from executing a SIGHUP
639  *      until all threads are finished handling requests.  This returns
640  *      the number of active threads to 'radiusd.c'.
641  */
642 int total_active_threads(void)
643 {
644         /*
645          *      We don't acquire the mutex, so this is just an estimate.
646          *      We can't return with the lock held, so there's no point
647          *      in getting the guaranteed correct value; by the time
648          *      the caller sees it, it can be wrong again.
649          */
650         return thread_pool.active_threads;
651 }
652
653
654 #ifdef WNOHANG
655 static uint32_t pid_hash(const void *data)
656 {
657         const thread_fork_t *tf = data;
658
659         return fr_hash(&tf->pid, sizeof(tf->pid));
660 }
661
662 static int pid_cmp(const void *one, const void *two)
663 {
664         const thread_fork_t *a = one;
665         const thread_fork_t *b = two;
666
667         return (a->pid - b->pid);
668 }
669 #endif
670
671 /*
672  *      Allocate the thread pool, and seed it with an initial number
673  *      of threads.
674  *
675  *      FIXME: What to do on a SIGHUP???
676  */
677 int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
678 {
679         int             i, rcode;
680         CONF_SECTION    *pool_cf;
681         time_t          now;
682
683         now = time(NULL);
684
685         /*
686          *      We're not spawning new threads, don't do
687          *      anything.
688          */
689         if (!spawn_flag) return 0;
690
691         /*
692          *      After a SIGHUP, we don't over-write the previous values.
693          */
694         if (!pool_initialized) {
695                 /*
696                  *      Initialize the thread pool to some reasonable values.
697                  */
698                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
699                 thread_pool.head = NULL;
700                 thread_pool.tail = NULL;
701                 thread_pool.total_threads = 0;
702                 thread_pool.max_thread_num = 1;
703                 thread_pool.cleanup_delay = 5;
704                 thread_pool.spawn_flag = spawn_flag;
705
706 #ifdef WNOHANG
707                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
708                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
709                                strerror(errno));
710                         return -1;
711                 }
712
713                 /*
714                  *      Create the hash table of child PID's
715                  */
716                 thread_pool.waiters = fr_hash_table_create(pid_hash,
717                                                              pid_cmp,
718                                                              free);
719                 if (!thread_pool.waiters) {
720                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
721                         return -1;
722                 }
723 #endif
724         }
725
726         pool_cf = cf_subsection_find_next(cs, NULL, "thread");
727         if (!pool_cf) {
728                 radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
729                 return -1;
730         }
731
732         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
733                 return -1;
734         }
735
736         /*
737          *      Catch corner cases.
738          */
739         if (thread_pool.min_spare_threads < 1)
740                 thread_pool.min_spare_threads = 1;
741         if (thread_pool.max_spare_threads < 1)
742                 thread_pool.max_spare_threads = 1;
743         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
744                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
745
746         /*
747          *      The pool has already been initialized.  Don't spawn
748          *      new threads, and don't forget about forked children,
749          */
750         if (pool_initialized) {
751                 return 0;
752         }
753
754         /*
755          *      Initialize the queue of requests.
756          */
757         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
758         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
759         if (rcode != 0) {
760                 radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
761                        strerror(errno));
762                 return -1;
763         }
764
765         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
766         if (rcode != 0) {
767                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
768                        strerror(errno));
769                 return -1;
770         }
771
772         /*
773          *      Allocate multiple fifos.
774          */
775         for (i = 0; i < RAD_LISTEN_MAX; i++) {
776                 thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
777                 if (!thread_pool.fifo[i]) {
778                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
779                         return -1;
780                 }
781         }
782
783 #ifdef HAVE_OPENSSL_CRYPTO_H
784         /*
785          *      If we're linking with OpenSSL too, then we need
786          *      to set up the mutexes and enable the thread callbacks.
787          */
788         if (!setup_ssl_mutexes()) {
789                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
790                 return -1;
791         }
792 #endif
793
794
795         /*
796          *      Create a number of waiting threads.
797          *
798          *      If we fail while creating them, do something intelligent.
799          */
800         for (i = 0; i < thread_pool.start_threads; i++) {
801                 if (spawn_thread(now) == NULL) {
802                         return -1;
803                 }
804         }
805
806         DEBUG2("Thread pool initialized");
807         pool_initialized = TRUE;
808         return 0;
809 }
810
811
812 /*
813  *      Assign a new request to a free thread.
814  *
815  *      If there isn't a free thread, then try to create a new one,
816  *      up to the configured limits.
817  */
818 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
819 {
820         time_t now = request->timestamp;
821
822         request->process = fun;
823
824         /*
825          *      We've been told not to spawn threads, so don't.
826          */
827         if (!thread_pool.spawn_flag) {
828                 radius_handle_request(request, fun);
829
830 #ifdef WNOHANG
831                 /*
832                  *      Requests that care about child process exit
833                  *      codes have already either called
834                  *      rad_waitpid(), or they've given up.
835                  */
836                 wait(NULL);
837 #endif
838                 return 1;
839         }
840
841         /*
842          *      Add the new request to the queue.
843          */
844         if (!request_enqueue(request, fun)) return 0;
845
846         /*
847          *      If we haven't checked the number of child threads
848          *      in a while, OR if the thread pool appears to be full,
849          *      go manage it.
850          */
851         if ((last_cleaned < now) ||
852             (thread_pool.active_threads == thread_pool.total_threads)) {
853                 thread_pool_manage(now);
854         }
855
856         return 1;
857 }
858
859 /*
860  *      Check the min_spare_threads and max_spare_threads.
861  *
862  *      If there are too many or too few threads waiting, then we
863  *      either create some more, or delete some.
864  */
865 static void thread_pool_manage(time_t now)
866 {
867         int spare;
868         int i, total;
869         THREAD_HANDLE *handle, *next;
870         int active_threads;
871
872         /*
873          *      We don't need a mutex lock here, as we're reading
874          *      active_threads, and not modifying it.  We want a close
875          *      approximation of the number of active threads, and this
876          *      is good enough.
877          */
878         active_threads = thread_pool.active_threads;
879         spare = thread_pool.total_threads - active_threads;
880         if (debug_flag) {
881                 static int old_total = -1;
882                 static int old_active = -1;
883
884                 if ((old_total != thread_pool.total_threads) ||
885                                 (old_active != active_threads)) {
886                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
887                                         thread_pool.total_threads, active_threads, spare);
888                         old_total = thread_pool.total_threads;
889                         old_active = active_threads;
890                 }
891         }
892
893         /*
894          *      If there are too few spare threads.  Go create some more.
895          */
896         if (spare < thread_pool.min_spare_threads) {
897                 total = thread_pool.min_spare_threads - spare;
898
899                 DEBUG2("Threads: Spawning %d spares", total);
900
901                 /*
902                  *      Create a number of spare threads.
903                  */
904                 for (i = 0; i < total; i++) {
905                         handle = spawn_thread(now);
906                         if (handle == NULL) {
907                                 return;
908                         }
909                 }
910
911                 return;         /* there aren't too many spare threads */
912         }
913
914         /*
915          *      Only delete spare threads if we haven't already done
916          *      so this second.
917          */
918         if (now == last_cleaned) {
919                 return;
920         }
921         last_cleaned = now;
922
923         /*
924          *      Loop over the thread pool, deleting exited threads.
925          */
926         for (handle = thread_pool.head; handle; handle = next) {
927                 next = handle->next;
928
929                 /*
930                  *      Maybe we've asked the thread to exit, and it
931                  *      has agreed.
932                  */
933                 if (handle->status == THREAD_EXITED) {
934                         delete_thread(handle);
935                 }
936         }
937
938         /*
939          *      Only delete the spare threads if sufficient time has
940          *      passed since we last created one.  This helps to minimize
941          *      the amount of create/delete cycles.
942          */
943         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
944                 return;
945         }
946
947         /*
948          *      If there are too many spare threads, delete one.
949          *
950          *      Note that we only delete ONE at a time, instead of
951          *      wiping out many.  This allows the excess servers to
952          *      be slowly reaped, just in case the load spike comes again.
953          */
954         if (spare > thread_pool.max_spare_threads) {
955
956                 spare -= thread_pool.max_spare_threads;
957
958                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
959
960                 /*
961                  *      Walk through the thread pool, deleting the
962                  *      first idle thread we come across.
963                  */
964                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
965                         next = handle->next;
966
967                         /*
968                          *      If the thread is not handling a
969                          *      request, but still live, then tell it
970                          *      to exit.
971                          *
972                          *      It will eventually wake up, and realize
973                          *      it's been told to commit suicide.
974                          */
975                         if ((handle->request == NULL) &&
976                             (handle->status == THREAD_RUNNING)) {
977                                 handle->status = THREAD_CANCELLED;
978                                 /*
979                                  *      Post an extra semaphore, as a
980                                  *      signal to wake up, and exit.
981                                  */
982                                 sem_post(&thread_pool.semaphore);
983                                 spare--;
984                                 break;
985                         }
986                 }
987         }
988
989         /*
990          *      If the thread has handled too many requests, then make it
991          *      exit.
992          */
993         if (thread_pool.max_requests_per_thread > 0) {
994                 for (handle = thread_pool.head; handle; handle = next) {
995                         next = handle->next;
996
997                         /*
998                          *      Not handling a request, but otherwise
999                          *      live, we can kill it.
1000                          */
1001                         if ((handle->request == NULL) &&
1002                             (handle->status == THREAD_RUNNING) &&
1003                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1004                                 handle->status = THREAD_CANCELLED;
1005                                 sem_post(&thread_pool.semaphore);
1006                         }
1007                 }
1008         }
1009
1010         /*
1011          *      Otherwise everything's kosher.  There are not too few,
1012          *      or too many spare threads.  Exit happily.
1013          */
1014         return;
1015 }
1016
1017
1018 #ifdef WNOHANG
1019 /*
1020  *      Thread wrapper for fork().
1021  */
1022 pid_t rad_fork(void)
1023 {
1024         pid_t child_pid;
1025
1026         if (!pool_initialized) return fork();
1027
1028         reap_children();        /* be nice to non-wait thingies */
1029
1030         if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1031                 return -1;
1032         }
1033
1034         /*
1035          *      Fork & save the PID for later reaping.
1036          */
1037         child_pid = fork();
1038         if (child_pid > 0) {
1039                 int rcode;
1040                 thread_fork_t *tf;
1041
1042                 tf = rad_malloc(sizeof(*tf));
1043                 memset(tf, 0, sizeof(*tf));
1044
1045                 tf->pid = child_pid;
1046
1047                 pthread_mutex_lock(&thread_pool.wait_mutex);
1048                 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1049                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1050
1051                 if (!rcode) {
1052                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1053                                (int) child_pid);
1054                 }
1055         }
1056
1057         /*
1058          *      Return whatever we were told.
1059          */
1060         return child_pid;
1061 }
1062
1063
1064 /*
1065  *      Wait 10 seconds at most for a child to exit, then give up.
1066  */
1067 pid_t rad_waitpid(pid_t pid, int *status)
1068 {
1069         int i;
1070         thread_fork_t mytf, *tf;
1071
1072         if (!pool_initialized) return waitpid(pid, status, 0);
1073
1074         if (pid <= 0) return -1;
1075
1076         mytf.pid = pid;
1077
1078         pthread_mutex_lock(&thread_pool.wait_mutex);
1079         tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1080         pthread_mutex_unlock(&thread_pool.wait_mutex);
1081
1082         if (!tf) return -1;
1083
1084         for (i = 0; i < 100; i++) {
1085                 reap_children();
1086
1087                 if (tf->exited) {
1088                         *status = tf->status;
1089
1090                         pthread_mutex_lock(&thread_pool.wait_mutex);
1091                         fr_hash_table_delete(thread_pool.waiters, &mytf);
1092                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1093                         return pid;
1094                 }
1095                 usleep(100000); /* sleep for 1/10 of a second */
1096         }
1097
1098         /*
1099          *      10 seconds have passed, give up on the child.
1100          */
1101         pthread_mutex_lock(&thread_pool.wait_mutex);
1102         fr_hash_table_delete(thread_pool.waiters, &mytf);
1103         pthread_mutex_unlock(&thread_pool.wait_mutex);
1104
1105         return 0;
1106 }
1107 #else
1108 /*
1109  *      No rad_fork or rad_waitpid
1110  */
1111 #endif
1112
1113 void thread_pool_lock(void)
1114 {
1115         pthread_mutex_lock(&thread_pool.queue_mutex);
1116 }
1117
1118 void thread_pool_unlock(void)
1119 {
1120         pthread_mutex_unlock(&thread_pool.queue_mutex);
1121 }
1122
1123 void thread_pool_queue_stats(int *array)
1124 {
1125         int i;
1126
1127         if (pool_initialized) {
1128                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1129                         array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1130                 }
1131         } else {
1132                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1133                         array[i] = 0;
1134                 }
1135         }
1136 }
1137 #endif /* HAVE_PTHREAD_H */