9ce78cb3715dd87d44e3442edaab0d81d2f4bee4
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/rad_assert.h>
29
30 /*
31  *      Other OS's have sem_init, OS X doesn't.
32  */
33 #ifdef HAVE_SEMAPHORE_H
34 #include <semaphore.h>
35 #endif
36
37 #ifdef DARWIN
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #ifdef HAVE_SYS_WAIT_H
52 #include <sys/wait.h>
53 #endif
54
55 #ifdef HAVE_PTHREAD_H
56
57 #ifdef HAVE_OPENSSL_CRYPTO_H
58 #include <openssl/crypto.h>
59 #endif
60 #ifdef HAVE_OPENSSL_ERR_H
61 #include <openssl/err.h>
62 #endif
63 #ifdef HAVE_OPENSSL_EVP_H
64 #include <openssl/evp.h>
65 #endif
66
67 #define SEMAPHORE_LOCKED        (0)
68 #define SEMAPHORE_UNLOCKED      (1)
69
70 #define THREAD_RUNNING          (1)
71 #define THREAD_CANCELLED        (2)
72 #define THREAD_EXITED           (3)
73
74 #define NUM_FIFOS               RAD_LISTEN_MAX
75
76
77 /*
78  *  A data structure which contains the information about
79  *  the current thread.
80  *
81  *  pthread_id     pthread id
82  *  thread_num     server thread number, 1...number of threads
83  *  semaphore     used to block the thread until a request comes in
84  *  status        is the thread running or exited?
85  *  request_count the number of requests that this thread has handled
86  *  timestamp     when the thread started executing.
87  */
88 typedef struct THREAD_HANDLE {
89         struct THREAD_HANDLE *prev;
90         struct THREAD_HANDLE *next;
91         pthread_t            pthread_id;
92         int                  thread_num;
93         int                  status;
94         unsigned int         request_count;
95         time_t               timestamp;
96         REQUEST              *request;
97 } THREAD_HANDLE;
98
99 /*
100  *      For the request queue.
101  */
102 typedef struct request_queue_t {
103         REQUEST           *request;
104         RAD_REQUEST_FUNP  fun;
105 } request_queue_t;
106
107 typedef struct thread_fork_t {
108         pid_t           pid;
109         int             status;
110         int             exited;
111 } thread_fork_t;
112
113
114 /*
115  *      A data structure to manage the thread pool.  There's no real
116  *      need for a data structure, but it makes things conceptually
117  *      easier.
118  */
119 typedef struct THREAD_POOL {
120         THREAD_HANDLE *head;
121         THREAD_HANDLE *tail;
122
123         int total_threads;
124         int active_threads;     /* protected by queue_mutex */
125         int max_thread_num;
126         int start_threads;
127         int max_threads;
128         int min_spare_threads;
129         int max_spare_threads;
130         unsigned int max_requests_per_thread;
131         unsigned long request_count;
132         time_t time_last_spawned;
133         int cleanup_delay;
134         int spawn_flag;
135
136 #ifdef WNOHANG
137         pthread_mutex_t wait_mutex;
138         fr_hash_table_t *waiters;
139 #endif
140
141         /*
142          *      All threads wait on this semaphore, for requests
143          *      to enter the queue.
144          */
145         sem_t           semaphore;
146
147         /*
148          *      To ensure only one thread at a time touches the queue.
149          */
150         pthread_mutex_t queue_mutex;
151
152         int             max_queue_size;
153         int             num_queued;
154         fr_fifo_t       *fifo[NUM_FIFOS];
155 } THREAD_POOL;
156
157 static THREAD_POOL thread_pool;
158 static int pool_initialized = FALSE;
159 static time_t last_cleaned = 0;
160
161 static void thread_pool_manage(time_t now);
162
163 /*
164  *      A mapping of configuration file names to internal integers
165  */
166 static const CONF_PARSER thread_config[] = {
167         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
168         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
169         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
170         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
171         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
172         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
173         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
174         { NULL, -1, 0, NULL, NULL }
175 };
176
177
178 #ifdef HAVE_OPENSSL_CRYPTO_H
179
180 /*
181  *      If we're linking against OpenSSL, then it is the
182  *      duty of the application, if it is multithreaded,
183  *      to provide OpenSSL with appropriate thread id
184  *      and mutex locking functions
185  *
186  *      Note: this only implements static callbacks.
187  *      OpenSSL does not use dynamic locking callbacks
188  *      right now, but may in the futiure, so we will have
189  *      to add them at some point.
190  */
191
192 static pthread_mutex_t *ssl_mutexes = NULL;
193
194 static unsigned long ssl_id_function(void)
195 {
196         return (unsigned long) pthread_self();
197 }
198
199 static void ssl_locking_function(int mode, int n, const char *file, int line)
200 {
201         file = file;            /* -Wunused */
202         line = line;            /* -Wunused */
203
204         if (mode & CRYPTO_LOCK) {
205                 pthread_mutex_lock(&(ssl_mutexes[n]));
206         } else {
207                 pthread_mutex_unlock(&(ssl_mutexes[n]));
208         }
209 }
210
211 static int setup_ssl_mutexes(void)
212 {
213         int i;
214
215 #ifdef HAVE_OPENSSL_EVP_H
216         /*
217          *      Enable all ciphers and digests.
218          */
219         OpenSSL_add_all_algorithms();
220 #endif
221
222         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
223         if (!ssl_mutexes) {
224                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
225                 return 0;
226         }
227
228         for (i = 0; i < CRYPTO_num_locks(); i++) {
229                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
230         }
231
232         CRYPTO_set_id_callback(ssl_id_function);
233         CRYPTO_set_locking_callback(ssl_locking_function);
234
235         return 1;
236 }
237 #endif
238
239 #ifdef WNOHANG
240 /*
241  *      We don't want to catch SIGCHLD for a host of reasons.
242  *
243  *      - exec_wait means that someone, somewhere, somewhen, will
244  *      call waitpid(), and catch the child.
245  *
246  *      - SIGCHLD is delivered to a random thread, not the one that
247  *      forked.
248  *
249  *      - if another thread catches the child, we have to coordinate
250  *      with the thread doing the waiting.
251  *
252  *      - if we don't waitpid() for non-wait children, they'll be zombies,
253  *      and will hang around forever.
254  *
255  */
256 static void reap_children(void)
257 {
258         pid_t pid;
259         int status;
260         thread_fork_t mytf, *tf;
261
262
263         pthread_mutex_lock(&thread_pool.wait_mutex);
264
265         do {
266                 pid = waitpid(0, &status, WNOHANG);
267                 if (pid <= 0) break;
268
269                 mytf.pid = pid;
270                 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
271                 if (!tf) continue;
272
273                 tf->status = status;
274                 tf->exited = 1;
275         } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
276
277         pthread_mutex_unlock(&thread_pool.wait_mutex);
278 }
279 #else
280 #define reap_children()
281 #endif /* WNOHANG */
282
283 /*
284  *      Add a request to the list of waiting requests.
285  *      This function gets called ONLY from the main handler thread...
286  *
287  *      This function should never fail.
288  */
289 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
290 {
291         request_queue_t *entry;
292
293         pthread_mutex_lock(&thread_pool.queue_mutex);
294
295         thread_pool.request_count++;
296
297         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
298                 pthread_mutex_unlock(&thread_pool.queue_mutex);
299
300                 /*
301                  *      Mark the request as done.
302                  */
303                 radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
304                 request->child_state = REQUEST_DONE;
305                 return 0;
306         }
307
308         entry = rad_malloc(sizeof(*entry));
309         entry->request = request;
310         entry->fun = fun;
311
312         /*
313          *      Push the request onto the appropriate fifo for that
314          */
315         if (!fr_fifo_push(thread_pool.fifo[request->priority],
316                             entry)) {
317                 pthread_mutex_unlock(&thread_pool.queue_mutex);
318                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
319                 request->child_state = REQUEST_DONE;
320                 return 0;
321         }
322
323         thread_pool.num_queued++;
324
325         pthread_mutex_unlock(&thread_pool.queue_mutex);
326
327         /*
328          *      There's one more request in the queue.
329          *
330          *      Note that we're not touching the queue any more, so
331          *      the semaphore post is outside of the mutex.  This also
332          *      means that when the thread wakes up and tries to lock
333          *      the mutex, it will be unlocked, and there won't be
334          *      contention.
335          */
336         sem_post(&thread_pool.semaphore);
337
338         return 1;
339 }
340
341 /*
342  *      Remove a request from the queue.
343  */
344 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
345 {
346         RAD_LISTEN_TYPE i, start;
347         request_queue_t *entry;
348
349         reap_children();
350
351         pthread_mutex_lock(&thread_pool.queue_mutex);
352
353         /*
354          *      Clear old requests from all queues.
355          *
356          *      We only do one pass over the queue, in order to
357          *      amortize the work across the child threads.  Since we
358          *      do N checks for one request de-queued, the old
359          *      requests will be quickly cleared.
360          */
361         for (i = 0; i < RAD_LISTEN_MAX; i++) {
362                 entry = fr_fifo_peek(thread_pool.fifo[i]);
363                 if (!entry ||
364                     (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
365                         continue;
366 }
367                 /*
368                  *      This entry was marked to be stopped.  Acknowledge it.
369                  */
370                 entry = fr_fifo_pop(thread_pool.fifo[i]);
371                 rad_assert(entry != NULL);
372                 entry->request->child_state = REQUEST_DONE;
373                 thread_pool.num_queued--;
374                 free(entry);
375                 entry = NULL;
376         }
377
378         start = 0;
379  retry:
380         /*
381          *      Pop results from the top of the queue
382          */
383         for (i = start; i < RAD_LISTEN_MAX; i++) {
384                 entry = fr_fifo_pop(thread_pool.fifo[i]);
385                 if (entry) {
386                         start = i;
387                         break;
388                 }
389         }
390
391         if (!entry) {
392                 pthread_mutex_unlock(&thread_pool.queue_mutex);
393                 *request = NULL;
394                 *fun = NULL;
395                 return 0;
396         }
397
398         rad_assert(thread_pool.num_queued > 0);
399         thread_pool.num_queued--;
400         *request = entry->request;
401         *fun = entry->fun;
402         free(entry);
403         entry = NULL;
404
405         rad_assert(*request != NULL);
406         rad_assert((*request)->magic == REQUEST_MAGIC);
407         rad_assert(*fun != NULL);
408
409         /*
410          *      If the request has sat in the queue for too long,
411          *      kill it.
412          *
413          *      The main clean-up code can't delete the request from
414          *      the queue, and therefore won't clean it up until we
415          *      have acknowledged it as "done".
416          */
417         if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
418                 (*request)->child_state = REQUEST_DONE;
419                 goto retry;
420         }
421
422         /*
423          *      The thread is currently processing a request.
424          */
425         thread_pool.active_threads++;
426
427         pthread_mutex_unlock(&thread_pool.queue_mutex);
428
429         return 1;
430 }
431
432
433 /*
434  *      The main thread handler for requests.
435  *
436  *      Wait on the semaphore until we have it, and process the request.
437  */
438 static void *request_handler_thread(void *arg)
439 {
440         RAD_REQUEST_FUNP  fun;
441         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
442
443         /*
444          *      Loop forever, until told to exit.
445          */
446         do {
447                 /*
448                  *      Wait to be signalled.
449                  */
450                 DEBUG2("Thread %d waiting to be assigned a request",
451                        self->thread_num);
452         re_wait:
453                 if (sem_wait(&thread_pool.semaphore) != 0) {
454                         /*
455                          *      Interrupted system call.  Go back to
456                          *      waiting, but DON'T print out any more
457                          *      text.
458                          */
459                         if (errno == EINTR) {
460                                 DEBUG2("Re-wait %d", self->thread_num);
461                                 goto re_wait;
462                         }
463                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
464                                self->thread_num, strerror(errno));
465                         break;
466                 }
467
468                 DEBUG2("Thread %d got semaphore", self->thread_num);
469
470 #ifdef HAVE_OPENSSL_ERR_H
471                 /*
472                  *      Clear the error queue for the current thread.
473                  */
474                 ERR_clear_error ();
475 #endif
476
477                 /*
478                  *      Try to grab a request from the queue.
479                  *
480                  *      It may be empty, in which case we fail
481                  *      gracefully.
482                  */
483                 if (!request_dequeue(&self->request, &fun)) continue;
484
485                 self->request->child_pid = self->pthread_id;
486                 self->request_count++;
487
488                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
489                        self->thread_num, self->request->number,
490                        self->request_count);
491
492                 radius_handle_request(self->request, fun);
493
494                 /*
495                  *      Update the active threads.
496                  */
497                 pthread_mutex_lock(&thread_pool.queue_mutex);
498                 rad_assert(thread_pool.active_threads > 0);
499                 thread_pool.active_threads--;
500                 pthread_mutex_unlock(&thread_pool.queue_mutex);
501         } while (self->status != THREAD_CANCELLED);
502
503         DEBUG2("Thread %d exiting...", self->thread_num);
504
505 #ifdef HAVE_OPENSSL_ERR_H
506         /*
507          *      If we linked with OpenSSL, the application
508          *      must remove the thread's error queue before
509          *      exiting to prevent memory leaks.
510          */
511         ERR_remove_state(0);
512 #endif
513
514         /*
515          *  Do this as the LAST thing before exiting.
516          */
517         self->request = NULL;
518         self->status = THREAD_EXITED;
519
520         return NULL;
521 }
522
523 /*
524  *      Take a THREAD_HANDLE, delete it from the thread pool and
525  *      free its resources.
526  *
527  *      This function is called ONLY from the main server thread,
528  *      ONLY after the thread has exited.
529  */
530 static void delete_thread(THREAD_HANDLE *handle)
531 {
532         THREAD_HANDLE *prev;
533         THREAD_HANDLE *next;
534
535         rad_assert(handle->request == NULL);
536
537         DEBUG2("Deleting thread %d", handle->thread_num);
538
539         prev = handle->prev;
540         next = handle->next;
541         rad_assert(thread_pool.total_threads > 0);
542         thread_pool.total_threads--;
543
544         /*
545          *      Remove the handle from the list.
546          */
547         if (prev == NULL) {
548                 rad_assert(thread_pool.head == handle);
549                 thread_pool.head = next;
550         } else {
551                 prev->next = next;
552         }
553
554         if (next == NULL) {
555                 rad_assert(thread_pool.tail == handle);
556                 thread_pool.tail = prev;
557         } else {
558                 next->prev = prev;
559         }
560
561         /*
562          *      Free the handle, now that it's no longer referencable.
563          */
564         free(handle);
565 }
566
567
568 /*
569  *      Spawn a new thread, and place it in the thread pool.
570  *
571  *      The thread is started initially in the blocked state, waiting
572  *      for the semaphore.
573  */
574 static THREAD_HANDLE *spawn_thread(time_t now)
575 {
576         int rcode;
577         THREAD_HANDLE *handle;
578         pthread_attr_t attr;
579
580         /*
581          *      Ensure that we don't spawn too many threads.
582          */
583         if (thread_pool.total_threads >= thread_pool.max_threads) {
584                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
585                 return NULL;
586         }
587
588         /*
589          *      Allocate a new thread handle.
590          */
591         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
592         memset(handle, 0, sizeof(THREAD_HANDLE));
593         handle->prev = NULL;
594         handle->next = NULL;
595         handle->thread_num = thread_pool.max_thread_num++;
596         handle->request_count = 0;
597         handle->status = THREAD_RUNNING;
598         handle->timestamp = time(NULL);
599
600         /*
601          *      Initialize the thread's attributes to detached.
602          *
603          *      We could call pthread_detach() later, but if the thread
604          *      exits between the create & detach calls, it will need to
605          *      be joined, which will never happen.
606          */
607         pthread_attr_init(&attr);
608         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
609
610         /*
611          *      Create the thread detached, so that it cleans up it's
612          *      own memory when it exits.
613          *
614          *      Note that the function returns non-zero on error, NOT
615          *      -1.  The return code is the error, and errno isn't set.
616          */
617         rcode = pthread_create(&handle->pthread_id, &attr,
618                         request_handler_thread, handle);
619         if (rcode != 0) {
620                 radlog(L_ERR, "Thread create failed: %s",
621                        strerror(rcode));
622                 return NULL;
623         }
624         pthread_attr_destroy(&attr);
625
626         /*
627          *      One more thread to go into the list.
628          */
629         thread_pool.total_threads++;
630         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
631                         handle->thread_num, thread_pool.total_threads);
632
633         /*
634          *      Add the thread handle to the tail of the thread pool list.
635          */
636         if (thread_pool.tail) {
637                 thread_pool.tail->next = handle;
638                 handle->prev = thread_pool.tail;
639                 thread_pool.tail = handle;
640         } else {
641                 rad_assert(thread_pool.head == NULL);
642                 thread_pool.head = thread_pool.tail = handle;
643         }
644
645         /*
646          *      Update the time we last spawned a thread.
647          */
648         thread_pool.time_last_spawned = now;
649
650         /*
651          *      And return the new handle to the caller.
652          */
653         return handle;
654 }
655
656 /*
657  *      Temporary function to prevent server from executing a SIGHUP
658  *      until all threads are finished handling requests.  This returns
659  *      the number of active threads to 'radiusd.c'.
660  */
661 int total_active_threads(void)
662 {
663         /*
664          *      We don't acquire the mutex, so this is just an estimate.
665          *      We can't return with the lock held, so there's no point
666          *      in getting the guaranteed correct value; by the time
667          *      the caller sees it, it can be wrong again.
668          */
669         return thread_pool.active_threads;
670 }
671
672
673 #ifdef WNOHANG
674 static uint32_t pid_hash(const void *data)
675 {
676         const thread_fork_t *tf = data;
677
678         return fr_hash(&tf->pid, sizeof(tf->pid));
679 }
680
681 static int pid_cmp(const void *one, const void *two)
682 {
683         const thread_fork_t *a = one;
684         const thread_fork_t *b = two;
685
686         return (a->pid - b->pid);
687 }
688 #endif
689
690 /*
691  *      Allocate the thread pool, and seed it with an initial number
692  *      of threads.
693  *
694  *      FIXME: What to do on a SIGHUP???
695  */
696 int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
697 {
698         int             i, rcode;
699         CONF_SECTION    *pool_cf;
700         time_t          now;
701
702         now = time(NULL);
703
704         /*
705          *      We're not spawning new threads, don't do
706          *      anything.
707          */
708         if (!spawn_flag) return 0;
709
710         /*
711          *      After a SIGHUP, we don't over-write the previous values.
712          */
713         if (!pool_initialized) {
714                 /*
715                  *      Initialize the thread pool to some reasonable values.
716                  */
717                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
718                 thread_pool.head = NULL;
719                 thread_pool.tail = NULL;
720                 thread_pool.total_threads = 0;
721                 thread_pool.max_thread_num = 1;
722                 thread_pool.cleanup_delay = 5;
723                 thread_pool.spawn_flag = spawn_flag;
724
725 #ifdef WNOHANG
726                 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
727                         radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
728                                strerror(errno));
729                         return -1;
730                 }
731
732                 /*
733                  *      Create the hash table of child PID's
734                  */
735                 thread_pool.waiters = fr_hash_table_create(pid_hash,
736                                                              pid_cmp,
737                                                              free);
738                 if (!thread_pool.waiters) {
739                         radlog(L_ERR, "FATAL: Failed to set up wait hash");
740                         return -1;
741                 }
742 #endif
743         }
744
745         pool_cf = cf_subsection_find_next(cs, NULL, "thread");
746         if (!pool_cf) {
747                 radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
748                 return -1;
749         }
750
751         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
752                 return -1;
753         }
754
755         /*
756          *      Catch corner cases.
757          */
758         if (thread_pool.min_spare_threads < 1)
759                 thread_pool.min_spare_threads = 1;
760         if (thread_pool.max_spare_threads < 1)
761                 thread_pool.max_spare_threads = 1;
762         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
763                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
764
765         /*
766          *      The pool has already been initialized.  Don't spawn
767          *      new threads, and don't forget about forked children,
768          */
769         if (pool_initialized) {
770                 return 0;
771         }
772
773         /*
774          *      Initialize the queue of requests.
775          */
776         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
777         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
778         if (rcode != 0) {
779                 radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
780                        strerror(errno));
781                 return -1;
782         }
783
784         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
785         if (rcode != 0) {
786                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
787                        strerror(errno));
788                 return -1;
789         }
790
791         /*
792          *      Allocate multiple fifos.
793          */
794         for (i = 0; i < RAD_LISTEN_MAX; i++) {
795                 thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
796                 if (!thread_pool.fifo[i]) {
797                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
798                         return -1;
799                 }
800         }
801
802 #ifdef HAVE_OPENSSL_CRYPTO_H
803         /*
804          *      If we're linking with OpenSSL too, then we need
805          *      to set up the mutexes and enable the thread callbacks.
806          */
807         if (!setup_ssl_mutexes()) {
808                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
809                 return -1;
810         }
811 #endif
812
813
814         /*
815          *      Create a number of waiting threads.
816          *
817          *      If we fail while creating them, do something intelligent.
818          */
819         for (i = 0; i < thread_pool.start_threads; i++) {
820                 if (spawn_thread(now) == NULL) {
821                         return -1;
822                 }
823         }
824
825         DEBUG2("Thread pool initialized");
826         pool_initialized = TRUE;
827         return 0;
828 }
829
830
831 /*
832  *      Assign a new request to a free thread.
833  *
834  *      If there isn't a free thread, then try to create a new one,
835  *      up to the configured limits.
836  */
837 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
838 {
839         time_t now = request->timestamp;
840
841         /*
842          *      We've been told not to spawn threads, so don't.
843          */
844         if (!thread_pool.spawn_flag) {
845                 radius_handle_request(request, fun);
846
847 #ifdef WNOHANG
848                 /*
849                  *      Requests that care about child process exit
850                  *      codes have already either called
851                  *      rad_waitpid(), or they've given up.
852                  */
853                 wait(NULL);
854 #endif
855                 return 1;
856         }
857
858         /*
859          *      Add the new request to the queue.
860          */
861         if (!request_enqueue(request, fun)) return 0;
862
863         /*
864          *      If we haven't checked the number of child threads
865          *      in a while, OR if the thread pool appears to be full,
866          *      go manage it.
867          */
868         if ((last_cleaned < now) ||
869             (thread_pool.active_threads == thread_pool.total_threads)) {
870                 thread_pool_manage(now);
871         }
872
873         return 1;
874 }
875
876 /*
877  *      Check the min_spare_threads and max_spare_threads.
878  *
879  *      If there are too many or too few threads waiting, then we
880  *      either create some more, or delete some.
881  */
882 static void thread_pool_manage(time_t now)
883 {
884         int spare;
885         int i, total;
886         THREAD_HANDLE *handle, *next;
887         int active_threads;
888
889         /*
890          *      We don't need a mutex lock here, as we're reading
891          *      active_threads, and not modifying it.  We want a close
892          *      approximation of the number of active threads, and this
893          *      is good enough.
894          */
895         active_threads = thread_pool.active_threads;
896         spare = thread_pool.total_threads - active_threads;
897         if (debug_flag) {
898                 static int old_total = -1;
899                 static int old_active = -1;
900
901                 if ((old_total != thread_pool.total_threads) ||
902                                 (old_active != active_threads)) {
903                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
904                                         thread_pool.total_threads, active_threads, spare);
905                         old_total = thread_pool.total_threads;
906                         old_active = active_threads;
907                 }
908         }
909
910         /*
911          *      If there are too few spare threads.  Go create some more.
912          */
913         if (spare < thread_pool.min_spare_threads) {
914                 total = thread_pool.min_spare_threads - spare;
915
916                 DEBUG2("Threads: Spawning %d spares", total);
917
918                 /*
919                  *      Create a number of spare threads.
920                  */
921                 for (i = 0; i < total; i++) {
922                         handle = spawn_thread(now);
923                         if (handle == NULL) {
924                                 return;
925                         }
926                 }
927
928                 return;         /* there aren't too many spare threads */
929         }
930
931         /*
932          *      Only delete spare threads if we haven't already done
933          *      so this second.
934          */
935         if (now == last_cleaned) {
936                 return;
937         }
938         last_cleaned = now;
939
940         /*
941          *      Loop over the thread pool, deleting exited threads.
942          */
943         for (handle = thread_pool.head; handle; handle = next) {
944                 next = handle->next;
945
946                 /*
947                  *      Maybe we've asked the thread to exit, and it
948                  *      has agreed.
949                  */
950                 if (handle->status == THREAD_EXITED) {
951                         delete_thread(handle);
952                 }
953         }
954
955         /*
956          *      Only delete the spare threads if sufficient time has
957          *      passed since we last created one.  This helps to minimize
958          *      the amount of create/delete cycles.
959          */
960         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
961                 return;
962         }
963
964         /*
965          *      If there are too many spare threads, delete one.
966          *
967          *      Note that we only delete ONE at a time, instead of
968          *      wiping out many.  This allows the excess servers to
969          *      be slowly reaped, just in case the load spike comes again.
970          */
971         if (spare > thread_pool.max_spare_threads) {
972
973                 spare -= thread_pool.max_spare_threads;
974
975                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
976
977                 /*
978                  *      Walk through the thread pool, deleting the
979                  *      first idle thread we come across.
980                  */
981                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
982                         next = handle->next;
983
984                         /*
985                          *      If the thread is not handling a
986                          *      request, but still live, then tell it
987                          *      to exit.
988                          *
989                          *      It will eventually wake up, and realize
990                          *      it's been told to commit suicide.
991                          */
992                         if ((handle->request == NULL) &&
993                             (handle->status == THREAD_RUNNING)) {
994                                 handle->status = THREAD_CANCELLED;
995                                 /*
996                                  *      Post an extra semaphore, as a
997                                  *      signal to wake up, and exit.
998                                  */
999                                 sem_post(&thread_pool.semaphore);
1000                                 spare--;
1001                                 break;
1002                         }
1003                 }
1004         }
1005
1006         /*
1007          *      If the thread has handled too many requests, then make it
1008          *      exit.
1009          */
1010         if (thread_pool.max_requests_per_thread > 0) {
1011                 for (handle = thread_pool.head; handle; handle = next) {
1012                         next = handle->next;
1013
1014                         /*
1015                          *      Not handling a request, but otherwise
1016                          *      live, we can kill it.
1017                          */
1018                         if ((handle->request == NULL) &&
1019                             (handle->status == THREAD_RUNNING) &&
1020                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1021                                 handle->status = THREAD_CANCELLED;
1022                                 sem_post(&thread_pool.semaphore);
1023                         }
1024                 }
1025         }
1026
1027         /*
1028          *      Otherwise everything's kosher.  There are not too few,
1029          *      or too many spare threads.  Exit happily.
1030          */
1031         return;
1032 }
1033
1034
1035 #ifdef WNOHANG
1036 /*
1037  *      Thread wrapper for fork().
1038  */
1039 pid_t rad_fork(void)
1040 {
1041         pid_t child_pid;
1042
1043         if (!pool_initialized) return fork();
1044
1045         reap_children();        /* be nice to non-wait thingies */
1046
1047         if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1048                 return -1;
1049         }
1050
1051         /*
1052          *      Fork & save the PID for later reaping.
1053          */
1054         child_pid = fork();
1055         if (child_pid > 0) {
1056                 int rcode;
1057                 thread_fork_t *tf;
1058
1059                 tf = rad_malloc(sizeof(*tf));
1060                 memset(tf, 0, sizeof(*tf));
1061
1062                 tf->pid = child_pid;
1063
1064                 pthread_mutex_lock(&thread_pool.wait_mutex);
1065                 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1066                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1067
1068                 if (!rcode) {
1069                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1070                                (int) child_pid);
1071                 }
1072         }
1073
1074         /*
1075          *      Return whatever we were told.
1076          */
1077         return child_pid;
1078 }
1079
1080
1081 /*
1082  *      Wait 10 seconds at most for a child to exit, then give up.
1083  */
1084 pid_t rad_waitpid(pid_t pid, int *status)
1085 {
1086         int i;
1087         thread_fork_t mytf, *tf;
1088
1089         if (!pool_initialized) return waitpid(pid, status, 0);
1090
1091         if (pid <= 0) return -1;
1092
1093         mytf.pid = pid;
1094
1095         pthread_mutex_lock(&thread_pool.wait_mutex);
1096         tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1097         pthread_mutex_unlock(&thread_pool.wait_mutex);
1098
1099         if (!tf) return -1;
1100
1101         for (i = 0; i < 100; i++) {
1102                 reap_children();
1103
1104                 if (tf->exited) {
1105                         *status = tf->status;
1106
1107                         pthread_mutex_lock(&thread_pool.wait_mutex);
1108                         fr_hash_table_delete(thread_pool.waiters, &mytf);
1109                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1110                         return pid;
1111                 }
1112                 usleep(100000); /* sleep for 1/10 of a second */
1113         }
1114
1115         /*
1116          *      10 seconds have passed, give up on the child.
1117          */
1118         pthread_mutex_lock(&thread_pool.wait_mutex);
1119         fr_hash_table_delete(thread_pool.waiters, &mytf);
1120         pthread_mutex_unlock(&thread_pool.wait_mutex);
1121
1122         return 0;
1123 }
1124 #else
1125 /*
1126  *      No rad_fork or rad_waitpid
1127  */
1128 #endif
1129
1130 void thread_pool_lock(void)
1131 {
1132         pthread_mutex_lock(&thread_pool.queue_mutex);
1133 }
1134
1135 void thread_pool_unlock(void)
1136 {
1137         pthread_mutex_unlock(&thread_pool.queue_mutex);
1138 }
1139
1140 void thread_pool_queue_stats(int *array)
1141 {
1142         int i;
1143
1144         if (pool_initialized) {
1145                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1146                         array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1147                 }
1148         } else {
1149                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1150                         array[i] = 0;
1151                 }
1152         }
1153 }
1154 #endif /* HAVE_PTHREAD_H */