Set "component" and "module" to more descriptive values
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include <freeradius-devel/ident.h>
25 RCSID("$Id$")
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/rad_assert.h>
29
30 /*
31  *      Other OS's have sem_init, OS X doesn't.
32  */
33 #ifdef HAVE_SEMAPHORE_H
34 #include <semaphore.h>
35 #endif
36
37 #ifdef DARWIN
38 #include <mach/task.h>
39 #include <mach/semaphore.h>
40
41 #undef sem_t
42 #define sem_t semaphore_t
43 #undef sem_init
44 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45 #undef sem_wait
46 #define sem_wait(s) semaphore_wait(*s)
47 #undef sem_post
48 #define sem_post(s) semaphore_signal(*s)
49 #endif
50
51 #ifdef HAVE_SYS_WAIT_H
52 #include <sys/wait.h>
53 #endif
54
55 #ifdef HAVE_PTHREAD_H
56
57 #ifdef HAVE_OPENSSL_CRYPTO_H
58 #include <openssl/crypto.h>
59 #endif
60 #ifdef HAVE_OPENSSL_ERR_H
61 #include <openssl/err.h>
62 #endif
63 #ifdef HAVE_OPENSSL_EVP_H
64 #include <openssl/evp.h>
65 #endif
66
67 #define SEMAPHORE_LOCKED        (0)
68 #define SEMAPHORE_UNLOCKED      (1)
69
70 #define THREAD_RUNNING          (1)
71 #define THREAD_CANCELLED        (2)
72 #define THREAD_EXITED           (3)
73
74 #define NUM_FIFOS               RAD_LISTEN_MAX
75
76
77 /*
78  *  A data structure which contains the information about
79  *  the current thread.
80  *
81  *  pthread_id     pthread id
82  *  thread_num     server thread number, 1...number of threads
83  *  semaphore     used to block the thread until a request comes in
84  *  status        is the thread running or exited?
85  *  request_count the number of requests that this thread has handled
86  *  timestamp     when the thread started executing.
87  */
88 typedef struct THREAD_HANDLE {
89         struct THREAD_HANDLE *prev;
90         struct THREAD_HANDLE *next;
91         pthread_t            pthread_id;
92         int                  thread_num;
93         int                  status;
94         unsigned int         request_count;
95         time_t               timestamp;
96         REQUEST              *request;
97 } THREAD_HANDLE;
98
99 /*
100  *      For the request queue.
101  */
102 typedef struct request_queue_t {
103         REQUEST           *request;
104         RAD_REQUEST_FUNP  fun;
105 } request_queue_t;
106
107 typedef struct thread_fork_t {
108         pid_t           pid;
109         int             status;
110         int             exited;
111 } thread_fork_t;
112
113
114 /*
115  *      A data structure to manage the thread pool.  There's no real
116  *      need for a data structure, but it makes things conceptually
117  *      easier.
118  */
119 typedef struct THREAD_POOL {
120         THREAD_HANDLE *head;
121         THREAD_HANDLE *tail;
122
123         int total_threads;
124         int active_threads;     /* protected by queue_mutex */
125         int max_thread_num;
126         int start_threads;
127         int max_threads;
128         int min_spare_threads;
129         int max_spare_threads;
130         unsigned int max_requests_per_thread;
131         unsigned long request_count;
132         time_t time_last_spawned;
133         int cleanup_delay;
134         int spawn_flag;
135
136 #ifdef WNOHANG
137         pthread_mutex_t wait_mutex;
138         fr_hash_table_t *waiters;
139 #endif
140
141         /*
142          *      All threads wait on this semaphore, for requests
143          *      to enter the queue.
144          */
145         sem_t           semaphore;
146
147         /*
148          *      To ensure only one thread at a time touches the queue.
149          */
150         pthread_mutex_t queue_mutex;
151
152         int             max_queue_size;
153         int             num_queued;
154         fr_fifo_t       *fifo[NUM_FIFOS];
155 } THREAD_POOL;
156
157 static THREAD_POOL thread_pool;
158 static int pool_initialized = FALSE;
159 static time_t last_cleaned = 0;
160 static time_t almost_now = 0;
161
162 static void thread_pool_manage(time_t now);
163
164 /*
165  *      A mapping of configuration file names to internal integers
166  */
167 static const CONF_PARSER thread_config[] = {
168         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
169         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
170         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
171         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
172         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
173         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
174         { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
175         { NULL, -1, 0, NULL, NULL }
176 };
177
178
179 #ifdef HAVE_OPENSSL_CRYPTO_H
180
181 /*
182  *      If we're linking against OpenSSL, then it is the
183  *      duty of the application, if it is multithreaded,
184  *      to provide OpenSSL with appropriate thread id
185  *      and mutex locking functions
186  *
187  *      Note: this only implements static callbacks.
188  *      OpenSSL does not use dynamic locking callbacks
189  *      right now, but may in the futiure, so we will have
190  *      to add them at some point.
191  */
192
193 static pthread_mutex_t *ssl_mutexes = NULL;
194
195 static unsigned long ssl_id_function(void)
196 {
197         return (unsigned long) pthread_self();
198 }
199
200 static void ssl_locking_function(int mode, int n, const char *file, int line)
201 {
202         file = file;            /* -Wunused */
203         line = line;            /* -Wunused */
204
205         if (mode & CRYPTO_LOCK) {
206                 pthread_mutex_lock(&(ssl_mutexes[n]));
207         } else {
208                 pthread_mutex_unlock(&(ssl_mutexes[n]));
209         }
210 }
211
212 static int setup_ssl_mutexes(void)
213 {
214         int i;
215
216 #ifdef HAVE_OPENSSL_EVP_H
217         /*
218          *      Enable all ciphers and digests.
219          */
220         OpenSSL_add_all_algorithms();
221 #endif
222
223         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
224         if (!ssl_mutexes) {
225                 radlog(L_ERR, "Error allocating memory for SSL mutexes!");
226                 return 0;
227         }
228
229         for (i = 0; i < CRYPTO_num_locks(); i++) {
230                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
231         }
232
233         CRYPTO_set_id_callback(ssl_id_function);
234         CRYPTO_set_locking_callback(ssl_locking_function);
235
236         return 1;
237 }
238 #endif
239
240 #ifdef WNOHANG
241 /*
242  *      We don't want to catch SIGCHLD for a host of reasons.
243  *
244  *      - exec_wait means that someone, somewhere, somewhen, will
245  *      call waitpid(), and catch the child.
246  *
247  *      - SIGCHLD is delivered to a random thread, not the one that
248  *      forked.
249  *
250  *      - if another thread catches the child, we have to coordinate
251  *      with the thread doing the waiting.
252  *
253  *      - if we don't waitpid() for non-wait children, they'll be zombies,
254  *      and will hang around forever.
255  *
256  */
257 static void reap_children(void)
258 {
259         pid_t pid;
260         int status;
261         thread_fork_t mytf, *tf;
262
263
264         pthread_mutex_lock(&thread_pool.wait_mutex);
265
266         do {
267         retry:
268                 pid = waitpid(0, &status, WNOHANG);
269                 if (pid <= 0) break;
270
271                 mytf.pid = pid;
272                 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
273                 if (!tf) goto retry;
274
275                 tf->status = status;
276                 tf->exited = 1;
277         } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
278
279         pthread_mutex_unlock(&thread_pool.wait_mutex);
280 }
281 #else
282 #define reap_children()
283 #endif /* WNOHANG */
284
285 /*
286  *      Add a request to the list of waiting requests.
287  *      This function gets called ONLY from the main handler thread...
288  *
289  *      This function should never fail.
290  */
291 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
292 {
293         request_queue_t *entry;
294
295         pthread_mutex_lock(&thread_pool.queue_mutex);
296
297         thread_pool.request_count++;
298
299         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
300                 pthread_mutex_unlock(&thread_pool.queue_mutex);
301
302                 /*
303                  *      Mark the request as done.
304                  */
305                 radlog(L_ERR, "Something is blocking the server.  There are %d packets in the queue, waiting to be processed.  Ignoring the new request.", thread_pool.max_queue_size);
306                 request->child_state = REQUEST_DONE;
307                 return 0;
308         }
309         request->child_state = REQUEST_QUEUED;
310         request->component = "<core>";
311         request->module = "<queue>";
312
313         entry = rad_malloc(sizeof(*entry));
314         entry->request = request;
315         entry->fun = fun;
316
317         /*
318          *      Push the request onto the appropriate fifo for that
319          */
320         if (!fr_fifo_push(thread_pool.fifo[request->priority],
321                             entry)) {
322                 pthread_mutex_unlock(&thread_pool.queue_mutex);
323                 radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
324                 request->child_state = REQUEST_DONE;
325                 return 0;
326         }
327
328         thread_pool.num_queued++;
329
330         pthread_mutex_unlock(&thread_pool.queue_mutex);
331
332         /*
333          *      There's one more request in the queue.
334          *
335          *      Note that we're not touching the queue any more, so
336          *      the semaphore post is outside of the mutex.  This also
337          *      means that when the thread wakes up and tries to lock
338          *      the mutex, it will be unlocked, and there won't be
339          *      contention.
340          */
341         sem_post(&thread_pool.semaphore);
342
343         return 1;
344 }
345
346 /*
347  *      Remove a request from the queue.
348  */
349 static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
350 {
351         int blocked;
352         RAD_LISTEN_TYPE i, start;
353         request_queue_t *entry;
354
355         reap_children();
356
357         pthread_mutex_lock(&thread_pool.queue_mutex);
358
359         /*
360          *      Clear old requests from all queues.
361          *
362          *      We only do one pass over the queue, in order to
363          *      amortize the work across the child threads.  Since we
364          *      do N checks for one request de-queued, the old
365          *      requests will be quickly cleared.
366          */
367         for (i = 0; i < RAD_LISTEN_MAX; i++) {
368                 entry = fr_fifo_peek(thread_pool.fifo[i]);
369                 if (!entry ||
370                     (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
371                         continue;
372 }
373                 /*
374                  *      This entry was marked to be stopped.  Acknowledge it.
375                  */
376                 entry = fr_fifo_pop(thread_pool.fifo[i]);
377                 rad_assert(entry != NULL);
378                 entry->request->child_state = REQUEST_DONE;
379                 thread_pool.num_queued--;
380                 free(entry);
381                 entry = NULL;
382         }
383
384         start = 0;
385  retry:
386         /*
387          *      Pop results from the top of the queue
388          */
389         for (i = start; i < RAD_LISTEN_MAX; i++) {
390                 entry = fr_fifo_pop(thread_pool.fifo[i]);
391                 if (entry) {
392                         start = i;
393                         break;
394                 }
395         }
396
397         if (!entry) {
398                 pthread_mutex_unlock(&thread_pool.queue_mutex);
399                 *request = NULL;
400                 *fun = NULL;
401                 return 0;
402         }
403
404         rad_assert(thread_pool.num_queued > 0);
405         thread_pool.num_queued--;
406         *request = entry->request;
407         *fun = entry->fun;
408         free(entry);
409         entry = NULL;
410
411         rad_assert(*request != NULL);
412         rad_assert((*request)->magic == REQUEST_MAGIC);
413         rad_assert(*fun != NULL);
414
415         (*request)->component = "<core>";
416         (*request)->module = "<thread>";
417
418         /*
419          *      If the request has sat in the queue for too long,
420          *      kill it.
421          *
422          *      The main clean-up code can't delete the request from
423          *      the queue, and therefore won't clean it up until we
424          *      have acknowledged it as "done".
425          */
426         if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
427                 (*request)->module = "<done>";
428                 (*request)->child_state = REQUEST_DONE;
429                 goto retry;
430         }
431
432         /*
433          *      Produce messages for people who have 10 million rows
434          *      in a database, without indexes.
435          */
436         rad_assert(almost_now != 0);
437         blocked = almost_now - (*request)->timestamp;
438         if (blocked < 5) {
439                 blocked = 0;
440         } else {
441                 static time_t last_complained = 0;
442                 
443                 if (last_complained != almost_now) {
444                         last_complained = almost_now;
445                 } else {
446                         blocked = 0;
447                 }
448         }
449
450         /*
451          *      The thread is currently processing a request.
452          */
453         thread_pool.active_threads++;
454
455         pthread_mutex_unlock(&thread_pool.queue_mutex);
456
457         if (blocked) {
458                 radlog(L_ERR, "Request %u has been waiting in the processing queue for %d seconds.  Check that all databases are running properly!",
459                        (*request)->number, blocked);
460         }
461
462         return 1;
463 }
464
465
466 /*
467  *      The main thread handler for requests.
468  *
469  *      Wait on the semaphore until we have it, and process the request.
470  */
471 static void *request_handler_thread(void *arg)
472 {
473         RAD_REQUEST_FUNP  fun;
474         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
475
476         /*
477          *      Loop forever, until told to exit.
478          */
479         do {
480                 /*
481                  *      Wait to be signalled.
482                  */
483                 DEBUG2("Thread %d waiting to be assigned a request",
484                        self->thread_num);
485         re_wait:
486                 if (sem_wait(&thread_pool.semaphore) != 0) {
487                         /*
488                          *      Interrupted system call.  Go back to
489                          *      waiting, but DON'T print out any more
490                          *      text.
491                          */
492                         if (errno == EINTR) {
493                                 DEBUG2("Re-wait %d", self->thread_num);
494                                 goto re_wait;
495                         }
496                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
497                                self->thread_num, strerror(errno));
498                         break;
499                 }
500
501                 DEBUG2("Thread %d got semaphore", self->thread_num);
502
503 #ifdef HAVE_OPENSSL_ERR_H
504                 /*
505                  *      Clear the error queue for the current thread.
506                  */
507                 ERR_clear_error ();
508 #endif
509
510                 /*
511                  *      Try to grab a request from the queue.
512                  *
513                  *      It may be empty, in which case we fail
514                  *      gracefully.
515                  */
516                 if (!request_dequeue(&self->request, &fun)) continue;
517
518                 self->request->child_pid = self->pthread_id;
519                 self->request_count++;
520
521                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
522                        self->thread_num, self->request->number,
523                        self->request_count);
524
525                 radius_handle_request(self->request, fun);
526
527                 /*
528                  *      Update the active threads.
529                  */
530                 pthread_mutex_lock(&thread_pool.queue_mutex);
531                 rad_assert(thread_pool.active_threads > 0);
532                 thread_pool.active_threads--;
533                 pthread_mutex_unlock(&thread_pool.queue_mutex);
534         } while (self->status != THREAD_CANCELLED);
535
536         DEBUG2("Thread %d exiting...", self->thread_num);
537
538 #ifdef HAVE_OPENSSL_ERR_H
539         /*
540          *      If we linked with OpenSSL, the application
541          *      must remove the thread's error queue before
542          *      exiting to prevent memory leaks.
543          */
544         ERR_remove_state(0);
545 #endif
546
547         /*
548          *  Do this as the LAST thing before exiting.
549          */
550         self->request = NULL;
551         self->status = THREAD_EXITED;
552
553         return NULL;
554 }
555
556 /*
557  *      Take a THREAD_HANDLE, delete it from the thread pool and
558  *      free its resources.
559  *
560  *      This function is called ONLY from the main server thread,
561  *      ONLY after the thread has exited.
562  */
563 static void delete_thread(THREAD_HANDLE *handle)
564 {
565         THREAD_HANDLE *prev;
566         THREAD_HANDLE *next;
567
568         rad_assert(handle->request == NULL);
569
570         DEBUG2("Deleting thread %d", handle->thread_num);
571
572         prev = handle->prev;
573         next = handle->next;
574         rad_assert(thread_pool.total_threads > 0);
575         thread_pool.total_threads--;
576
577         /*
578          *      Remove the handle from the list.
579          */
580         if (prev == NULL) {
581                 rad_assert(thread_pool.head == handle);
582                 thread_pool.head = next;
583         } else {
584                 prev->next = next;
585         }
586
587         if (next == NULL) {
588                 rad_assert(thread_pool.tail == handle);
589                 thread_pool.tail = prev;
590         } else {
591                 next->prev = prev;
592         }
593
594         /*
595          *      Free the handle, now that it's no longer referencable.
596          */
597         free(handle);
598 }
599
600
601 /*
602  *      Spawn a new thread, and place it in the thread pool.
603  *
604  *      The thread is started initially in the blocked state, waiting
605  *      for the semaphore.
606  */
607 static THREAD_HANDLE *spawn_thread(time_t now)
608 {
609         int rcode;
610         THREAD_HANDLE *handle;
611         pthread_attr_t attr;
612
613         /*
614          *      Ensure that we don't spawn too many threads.
615          */
616         if (thread_pool.total_threads >= thread_pool.max_threads) {
617                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
618                 return NULL;
619         }
620
621         /*
622          *      Allocate a new thread handle.
623          */
624         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
625         memset(handle, 0, sizeof(THREAD_HANDLE));
626         handle->prev = NULL;
627         handle->next = NULL;
628         handle->thread_num = thread_pool.max_thread_num++;
629         handle->request_count = 0;
630         handle->status = THREAD_RUNNING;
631         handle->timestamp = time(NULL);
632
633         /*
634          *      Initialize the thread's attributes to detached.
635          *
636          *      We could call pthread_detach() later, but if the thread
637          *      exits between the create & detach calls, it will need to
638          *      be joined, which will never happen.
639          */
640         pthread_attr_init(&attr);
641         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
642
643         /*
644          *      Create the thread detached, so that it cleans up it's
645          *      own memory when it exits.
646          *
647          *      Note that the function returns non-zero on error, NOT
648          *      -1.  The return code is the error, and errno isn't set.
649          */
650         rcode = pthread_create(&handle->pthread_id, &attr,
651                         request_handler_thread, handle);
652         if (rcode != 0) {
653                 radlog(L_ERR, "Thread create failed: %s",
654                        strerror(rcode));
655                 return NULL;
656         }
657         pthread_attr_destroy(&attr);
658
659         /*
660          *      One more thread to go into the list.
661          */
662         thread_pool.total_threads++;
663         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
664                         handle->thread_num, thread_pool.total_threads);
665
666         /*
667          *      Add the thread handle to the tail of the thread pool list.
668          */
669         if (thread_pool.tail) {
670                 thread_pool.tail->next = handle;
671                 handle->prev = thread_pool.tail;
672                 thread_pool.tail = handle;
673         } else {
674                 rad_assert(thread_pool.head == NULL);
675                 thread_pool.head = thread_pool.tail = handle;
676         }
677
678         /*
679          *      Update the time we last spawned a thread.
680          */
681         thread_pool.time_last_spawned = now;
682
683         /*
684          *      And return the new handle to the caller.
685          */
686         return handle;
687 }
688
689 /*
690  *      Temporary function to prevent server from executing a SIGHUP
691  *      until all threads are finished handling requests.  This returns
692  *      the number of active threads to 'radiusd.c'.
693  */
694 int total_active_threads(void)
695 {
696         /*
697          *      We don't acquire the mutex, so this is just an estimate.
698          *      We can't return with the lock held, so there's no point
699          *      in getting the guaranteed correct value; by the time
700          *      the caller sees it, it can be wrong again.
701          */
702         return thread_pool.active_threads;
703 }
704
705
706 #ifdef WNOHANG
707 static uint32_t pid_hash(const void *data)
708 {
709         const thread_fork_t *tf = data;
710
711         return fr_hash(&tf->pid, sizeof(tf->pid));
712 }
713
714 static int pid_cmp(const void *one, const void *two)
715 {
716         const thread_fork_t *a = one;
717         const thread_fork_t *b = two;
718
719         return (a->pid - b->pid);
720 }
721 #endif
722
723 /*
724  *      Allocate the thread pool, and seed it with an initial number
725  *      of threads.
726  *
727  *      FIXME: What to do on a SIGHUP???
728  */
729 int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
730 {
731         int             i, rcode;
732         CONF_SECTION    *pool_cf;
733         time_t          now;
734
735         now = time(NULL);
736
737         rad_assert(spawn_flag != NULL);
738         rad_assert(*spawn_flag == TRUE);
739         rad_assert(pool_initialized == FALSE); /* not called on HUP */
740
741         pool_cf = cf_subsection_find_next(cs, NULL, "thread");
742         if (!pool_cf) *spawn_flag = FALSE;
743
744         /*
745          *      Initialize the thread pool to some reasonable values.
746          */
747         memset(&thread_pool, 0, sizeof(THREAD_POOL));
748         thread_pool.head = NULL;
749         thread_pool.tail = NULL;
750         thread_pool.total_threads = 0;
751         thread_pool.max_thread_num = 1;
752         thread_pool.cleanup_delay = 5;
753         thread_pool.spawn_flag = *spawn_flag;
754         
755         /*
756          *      Don't bother initializing the mutexes or
757          *      creating the hash tables.  They won't be used.
758          */
759         if (!*spawn_flag) return 0;
760         
761 #ifdef WNOHANG
762         if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
763                 radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
764                        strerror(errno));
765                 return -1;
766         }
767         
768         /*
769          *      Create the hash table of child PID's
770          */
771         thread_pool.waiters = fr_hash_table_create(pid_hash,
772                                                    pid_cmp,
773                                                    free);
774         if (!thread_pool.waiters) {
775                 radlog(L_ERR, "FATAL: Failed to set up wait hash");
776                 return -1;
777         }
778 #endif
779
780         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
781                 return -1;
782         }
783
784         /*
785          *      Catch corner cases.
786          */
787         if (thread_pool.min_spare_threads < 1)
788                 thread_pool.min_spare_threads = 1;
789         if (thread_pool.max_spare_threads < 1)
790                 thread_pool.max_spare_threads = 1;
791         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
792                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
793
794         /*
795          *      The pool has already been initialized.  Don't spawn
796          *      new threads, and don't forget about forked children,
797          */
798         if (pool_initialized) {
799                 return 0;
800         }
801
802         /*
803          *      Initialize the queue of requests.
804          */
805         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
806         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
807         if (rcode != 0) {
808                 radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
809                        strerror(errno));
810                 return -1;
811         }
812
813         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
814         if (rcode != 0) {
815                 radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
816                        strerror(errno));
817                 return -1;
818         }
819
820         /*
821          *      Allocate multiple fifos.
822          */
823         for (i = 0; i < RAD_LISTEN_MAX; i++) {
824                 thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
825                 if (!thread_pool.fifo[i]) {
826                         radlog(L_ERR, "FATAL: Failed to set up request fifo");
827                         return -1;
828                 }
829         }
830
831 #ifdef HAVE_OPENSSL_CRYPTO_H
832         /*
833          *      If we're linking with OpenSSL too, then we need
834          *      to set up the mutexes and enable the thread callbacks.
835          */
836         if (!setup_ssl_mutexes()) {
837                 radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
838                 return -1;
839         }
840 #endif
841
842
843         /*
844          *      Create a number of waiting threads.
845          *
846          *      If we fail while creating them, do something intelligent.
847          */
848         for (i = 0; i < thread_pool.start_threads; i++) {
849                 if (spawn_thread(now) == NULL) {
850                         return -1;
851                 }
852         }
853
854         DEBUG2("Thread pool initialized");
855         pool_initialized = TRUE;
856         return 0;
857 }
858
859
860 /*
861  *      Assign a new request to a free thread.
862  *
863  *      If there isn't a free thread, then try to create a new one,
864  *      up to the configured limits.
865  */
866 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
867 {
868         almost_now = request->timestamp;
869
870         /*
871          *      We've been told not to spawn threads, so don't.
872          */
873         if (!thread_pool.spawn_flag) {
874                 radius_handle_request(request, fun);
875
876 #ifdef WNOHANG
877                 /*
878                  *      Requests that care about child process exit
879                  *      codes have already either called
880                  *      rad_waitpid(), or they've given up.
881                  */
882                 wait(NULL);
883 #endif
884                 return 1;
885         }
886
887         /*
888          *      Add the new request to the queue.
889          */
890         if (!request_enqueue(request, fun)) return 0;
891
892         /*
893          *      If we haven't checked the number of child threads
894          *      in a while, OR if the thread pool appears to be full,
895          *      go manage it.
896          */
897         if ((last_cleaned < almost_now) ||
898             (thread_pool.active_threads == thread_pool.total_threads)) {
899                 thread_pool_manage(almost_now);
900         }
901
902         return 1;
903 }
904
905 /*
906  *      Check the min_spare_threads and max_spare_threads.
907  *
908  *      If there are too many or too few threads waiting, then we
909  *      either create some more, or delete some.
910  */
911 static void thread_pool_manage(time_t now)
912 {
913         int spare;
914         int i, total;
915         THREAD_HANDLE *handle, *next;
916         int active_threads;
917
918         /*
919          *      We don't need a mutex lock here, as we're reading
920          *      active_threads, and not modifying it.  We want a close
921          *      approximation of the number of active threads, and this
922          *      is good enough.
923          */
924         active_threads = thread_pool.active_threads;
925         spare = thread_pool.total_threads - active_threads;
926         if (debug_flag) {
927                 static int old_total = -1;
928                 static int old_active = -1;
929
930                 if ((old_total != thread_pool.total_threads) ||
931                                 (old_active != active_threads)) {
932                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
933                                         thread_pool.total_threads, active_threads, spare);
934                         old_total = thread_pool.total_threads;
935                         old_active = active_threads;
936                 }
937         }
938
939         /*
940          *      If there are too few spare threads.  Go create some more.
941          */
942         if (spare < thread_pool.min_spare_threads) {
943                 total = thread_pool.min_spare_threads - spare;
944
945                 DEBUG2("Threads: Spawning %d spares", total);
946
947                 /*
948                  *      Create a number of spare threads.
949                  */
950                 for (i = 0; i < total; i++) {
951                         handle = spawn_thread(now);
952                         if (handle == NULL) {
953                                 return;
954                         }
955                 }
956
957                 return;         /* there aren't too many spare threads */
958         }
959
960         /*
961          *      Only delete spare threads if we haven't already done
962          *      so this second.
963          */
964         if (now == last_cleaned) {
965                 return;
966         }
967         last_cleaned = now;
968
969         /*
970          *      Loop over the thread pool, deleting exited threads.
971          */
972         for (handle = thread_pool.head; handle; handle = next) {
973                 next = handle->next;
974
975                 /*
976                  *      Maybe we've asked the thread to exit, and it
977                  *      has agreed.
978                  */
979                 if (handle->status == THREAD_EXITED) {
980                         delete_thread(handle);
981                 }
982         }
983
984         /*
985          *      Only delete the spare threads if sufficient time has
986          *      passed since we last created one.  This helps to minimize
987          *      the amount of create/delete cycles.
988          */
989         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
990                 return;
991         }
992
993         /*
994          *      If there are too many spare threads, delete one.
995          *
996          *      Note that we only delete ONE at a time, instead of
997          *      wiping out many.  This allows the excess servers to
998          *      be slowly reaped, just in case the load spike comes again.
999          */
1000         if (spare > thread_pool.max_spare_threads) {
1001
1002                 spare -= thread_pool.max_spare_threads;
1003
1004                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1005
1006                 /*
1007                  *      Walk through the thread pool, deleting the
1008                  *      first idle thread we come across.
1009                  */
1010                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1011                         next = handle->next;
1012
1013                         /*
1014                          *      If the thread is not handling a
1015                          *      request, but still live, then tell it
1016                          *      to exit.
1017                          *
1018                          *      It will eventually wake up, and realize
1019                          *      it's been told to commit suicide.
1020                          */
1021                         if ((handle->request == NULL) &&
1022                             (handle->status == THREAD_RUNNING)) {
1023                                 handle->status = THREAD_CANCELLED;
1024                                 /*
1025                                  *      Post an extra semaphore, as a
1026                                  *      signal to wake up, and exit.
1027                                  */
1028                                 sem_post(&thread_pool.semaphore);
1029                                 spare--;
1030                                 break;
1031                         }
1032                 }
1033         }
1034
1035         /*
1036          *      If the thread has handled too many requests, then make it
1037          *      exit.
1038          */
1039         if (thread_pool.max_requests_per_thread > 0) {
1040                 for (handle = thread_pool.head; handle; handle = next) {
1041                         next = handle->next;
1042
1043                         /*
1044                          *      Not handling a request, but otherwise
1045                          *      live, we can kill it.
1046                          */
1047                         if ((handle->request == NULL) &&
1048                             (handle->status == THREAD_RUNNING) &&
1049                             (handle->request_count > thread_pool.max_requests_per_thread)) {
1050                                 handle->status = THREAD_CANCELLED;
1051                                 sem_post(&thread_pool.semaphore);
1052                         }
1053                 }
1054         }
1055
1056         /*
1057          *      Otherwise everything's kosher.  There are not too few,
1058          *      or too many spare threads.  Exit happily.
1059          */
1060         return;
1061 }
1062
1063
1064 #ifdef WNOHANG
1065 /*
1066  *      Thread wrapper for fork().
1067  */
1068 pid_t rad_fork(void)
1069 {
1070         pid_t child_pid;
1071
1072         if (!pool_initialized) return fork();
1073
1074         reap_children();        /* be nice to non-wait thingies */
1075
1076         if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1077                 return -1;
1078         }
1079
1080         /*
1081          *      Fork & save the PID for later reaping.
1082          */
1083         child_pid = fork();
1084         if (child_pid > 0) {
1085                 int rcode;
1086                 thread_fork_t *tf;
1087
1088                 tf = rad_malloc(sizeof(*tf));
1089                 memset(tf, 0, sizeof(*tf));
1090
1091                 tf->pid = child_pid;
1092
1093                 pthread_mutex_lock(&thread_pool.wait_mutex);
1094                 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1095                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1096
1097                 if (!rcode) {
1098                         radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1099                                (int) child_pid);
1100                         free(tf);
1101                 }
1102         }
1103
1104         /*
1105          *      Return whatever we were told.
1106          */
1107         return child_pid;
1108 }
1109
1110
1111 /*
1112  *      Wait 10 seconds at most for a child to exit, then give up.
1113  */
1114 pid_t rad_waitpid(pid_t pid, int *status)
1115 {
1116         int i;
1117         thread_fork_t mytf, *tf;
1118
1119         if (!pool_initialized) return waitpid(pid, status, 0);
1120
1121         if (pid <= 0) return -1;
1122
1123         mytf.pid = pid;
1124
1125         pthread_mutex_lock(&thread_pool.wait_mutex);
1126         tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1127         pthread_mutex_unlock(&thread_pool.wait_mutex);
1128
1129         if (!tf) return -1;
1130
1131         for (i = 0; i < 100; i++) {
1132                 reap_children();
1133
1134                 if (tf->exited) {
1135                         *status = tf->status;
1136
1137                         pthread_mutex_lock(&thread_pool.wait_mutex);
1138                         fr_hash_table_delete(thread_pool.waiters, &mytf);
1139                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1140                         return pid;
1141                 }
1142                 usleep(100000); /* sleep for 1/10 of a second */
1143         }
1144
1145         /*
1146          *      10 seconds have passed, give up on the child.
1147          */
1148         pthread_mutex_lock(&thread_pool.wait_mutex);
1149         fr_hash_table_delete(thread_pool.waiters, &mytf);
1150         pthread_mutex_unlock(&thread_pool.wait_mutex);
1151
1152         return 0;
1153 }
1154 #else
1155 /*
1156  *      No rad_fork or rad_waitpid
1157  */
1158 #endif
1159
1160 void thread_pool_lock(void)
1161 {
1162         pthread_mutex_lock(&thread_pool.queue_mutex);
1163 }
1164
1165 void thread_pool_unlock(void)
1166 {
1167         pthread_mutex_unlock(&thread_pool.queue_mutex);
1168 }
1169
1170 void thread_pool_queue_stats(int *array)
1171 {
1172         int i;
1173
1174         if (pool_initialized) {
1175                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1176                         array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1177                 }
1178         } else {
1179                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1180                         array[i] = 0;
1181                 }
1182         }
1183 }
1184 #else
1185 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
1186 {
1187         radius_handle_request(request, fun);
1188         
1189 #ifdef WNOHANG
1190         /*
1191          *      Requests that care about child process exit
1192          *      codes have already either called
1193          *      rad_waitpid(), or they've given up.
1194          */
1195         wait(NULL);
1196 #endif
1197         return 1;
1198 }
1199
1200 #endif /* HAVE_PTHREAD_H */