import from HEAD:
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include "libradius.h"
25
26 #ifdef HAVE_PTHREAD_H
27
28 #include <stdlib.h>
29 #include <string.h>
30 #include <semaphore.h>
31 #include <signal.h>
32
33 #ifdef HAVE_SYS_WAIT_H
34 #include <sys/wait.h>
35 #endif
36
37 #include "radiusd.h"
38 #include "rad_assert.h"
39 #include "conffile.h"
40
41 static const char rcsid[] =
42 "$Id$";
43
44 #define SEMAPHORE_LOCKED        (0)
45 #define SEMAPHORE_UNLOCKED      (1)
46
47 #define THREAD_RUNNING          (1)
48 #define THREAD_CANCELLED        (2)
49 #define THREAD_EXITED           (3)
50
51 /*
52  *  A data structure which contains the information about
53  *  the current thread.
54  *
55  *  pthread_id     pthread id
56  *  thread_num     server thread number, 1...number of threads
57  *  semaphore     used to block the thread until a request comes in
58  *  status        is the thread running or exited?
59  *  request_count the number of requests that this thread has handled
60  *  timestamp     when the thread started executing.
61  */
62 typedef struct THREAD_HANDLE {
63         struct THREAD_HANDLE *prev;
64         struct THREAD_HANDLE *next;
65         pthread_t            pthread_id;
66         int                  thread_num;
67         int                  status;
68         unsigned int         request_count;
69         time_t               timestamp;
70         REQUEST              *request;
71 } THREAD_HANDLE;
72
73 /*
74  *      For the request queue.
75  */
76 typedef struct request_queue_t {
77         REQUEST           *request;
78         RAD_REQUEST_FUNP  fun;
79 } request_queue_t;
80
81
82 /*
83  *      A data structure to manage the thread pool.  There's no real
84  *      need for a data structure, but it makes things conceptually
85  *      easier.
86  */
87 typedef struct THREAD_POOL {
88         THREAD_HANDLE *head;
89         THREAD_HANDLE *tail;
90
91         int total_threads;
92         int max_thread_num;
93         int start_threads;
94         int max_threads;
95         int min_spare_threads;
96         int max_spare_threads;
97         unsigned int max_requests_per_thread;
98         unsigned long request_count;
99         time_t time_last_spawned;
100         int cleanup_delay;
101
102         /*
103          *      All threads wait on this semaphore, for requests
104          *      to enter the queue.
105          */
106         sem_t           semaphore;
107
108         /*
109          *      To ensure only one thread at a time touches the queue.
110          */
111         pthread_mutex_t mutex;
112
113         int             active_threads;
114         int             queue_head; /* first filled entry */
115         int             queue_tail; /* first empty entry */
116         int             queue_size;
117         request_queue_t *queue;
118 } THREAD_POOL;
119
120 static THREAD_POOL thread_pool;
121 static int pool_initialized = FALSE;
122
123 /*
124  *      Data structure to keep track of which child forked which
125  *      request.  If we cared, we'd keep a list of "free" and "active"
126  *      entries.
127  *
128  *      FIXME: Have a time out, so we clean up entries which haven't
129  *      been picked up!
130  */
131 typedef struct rad_fork_t {
132         pthread_t       thread_id;
133         pid_t           child_pid;
134         sem_t           child_done;
135         int             status; /* exit status of the child */
136         time_t          time_forked;
137 } rad_fork_t;
138
139 /*
140  *  This MUST be a power of 2 for it to work properly!
141  */
142 #define NUM_FORKERS (8192)
143 static rad_fork_t forkers[NUM_FORKERS];
144
145 /*
146  *      This mutex ensures that only one thread is doing certain
147  *      kinds of magic to the previous array.
148  */
149 static pthread_mutex_t fork_mutex;
150
151
152 /*
153  *      A mapping of configuration file names to internal integers
154  */
155 static const CONF_PARSER thread_config[] = {
156         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
157         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
158         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
159         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
160         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
161         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
162         { NULL, -1, 0, NULL, NULL }
163 };
164
165
166 /*
167  *      Add a request to the list of waiting requests.
168  *      This function gets called ONLY from the main handler thread...
169  *
170  *      This function should never fail.
171  */
172 static void request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
173 {
174         int num_entries;
175
176         pthread_mutex_lock(&thread_pool.mutex);
177
178         thread_pool.request_count++;
179
180         /*
181          *      If the queue is empty, re-set the indices to zero,
182          *      for no particular reason...
183          */
184         if ((thread_pool.queue_head == thread_pool.queue_tail) &&
185             (thread_pool.queue_head != 0)) {
186                 thread_pool.queue_head = thread_pool.queue_tail = 0;
187         }
188
189         /*
190          *      If the queue is full, die.
191          *
192          *      The math is to take into account the fact that it's a
193          *      circular queue.
194          */
195         num_entries = ((thread_pool.queue_tail + thread_pool.queue_size) -
196                        thread_pool.queue_head) % thread_pool.queue_size;
197         if (num_entries == (thread_pool.queue_size - 1)) {
198                 request_queue_t *new_queue;
199
200                 /*
201                  *      If the queue becomes larger than 65536,
202                  *      there's a serious problem.
203                  */
204                 if (thread_pool.queue_size >= 65536) {
205                         pthread_mutex_unlock(&thread_pool.mutex);
206
207                         /*
208                          *      Mark the request as done.
209                          */
210                         radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
211                         request->finished = TRUE;
212                         return;
213                 }
214
215                 /*
216                  *      Malloc a new queue, doubled in size, copy the
217                  *      data from the current queue over to it, zero
218                  *      out the second half of the queue, free the old
219                  *      one, and replace thread_pool.queue with the
220                  *      new one.
221                  */
222                 new_queue = rad_malloc(sizeof(*new_queue) * thread_pool.queue_size * 2);
223                 memcpy(new_queue, thread_pool.queue,
224                        sizeof(*new_queue) * thread_pool.queue_size);
225                 memset(new_queue + sizeof(*new_queue) * thread_pool.queue_size,
226                        0, sizeof(*new_queue) * thread_pool.queue_size);
227
228                 free(thread_pool.queue);
229                 thread_pool.queue = new_queue;
230                 thread_pool.queue_size *= 2;
231         }
232
233         /*
234          *      Add the data to the queue tail, increment the tail,
235          *      and signal the semaphore that there's another request
236          *      in the queue.
237          */
238         thread_pool.queue[thread_pool.queue_tail].request = request;
239         thread_pool.queue[thread_pool.queue_tail].fun = fun;
240         thread_pool.queue_tail++;
241         thread_pool.queue_tail &= (thread_pool.queue_size - 1);
242
243         pthread_mutex_unlock(&thread_pool.mutex);
244
245         /*
246          *      There's one more request in the queue.
247          *
248          *      Note that we're not touching the queue any more, so
249          *      the semaphore post is outside of the mutex.  This also
250          *      means that when the thread wakes up and tries to lock
251          *      the mutex, it will be unlocked, and there won't be
252          *      contention.
253          */
254
255         sem_post(&thread_pool.semaphore);
256
257         return;
258 }
259
260 /*
261  *      Remove a request from the queue.
262  */
263 static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
264 {
265         pthread_mutex_lock(&thread_pool.mutex);
266
267         /*
268          *      Head & tail are the same.  There's nothing in
269          *      the queue.
270          */
271         if (thread_pool.queue_head == thread_pool.queue_tail) {
272                 pthread_mutex_unlock(&thread_pool.mutex);
273                 *request = NULL;
274                 *fun = NULL;
275                 return;
276         }
277
278         *request = thread_pool.queue[thread_pool.queue_head].request;
279         *fun = thread_pool.queue[thread_pool.queue_head].fun;
280
281         rad_assert(*request != NULL);
282         rad_assert((*request)->magic == REQUEST_MAGIC);
283         rad_assert(*fun != NULL);
284
285         thread_pool.queue_head++;
286         thread_pool.queue_head &= (thread_pool.queue_size - 1);
287
288         /*
289          *      FIXME: Check the request timestamp.  If it's more than
290          *      "clean_delay" seconds old, then discard the request,
291          *      log an error, and try to de-queue another request.
292          *
293          *      The main clean-up code won't delete the request from
294          *      the request list, because it's not marked "finished"
295          */
296
297         /*
298          *      The thread is currently processing a request.
299          */
300         thread_pool.active_threads++;
301
302         pthread_mutex_unlock(&thread_pool.mutex);
303
304         /*
305          *      If the request is currently being processed, then that
306          *      MAY be OK, if it's a proxy reply.  In that case,
307          *      sending the packet may result in a reply being
308          *      received before that thread clears the child_pid.
309          *
310          *      In that case, we busy-wait for the request to be free.
311          *
312          *      We COULD push it onto the queue and try to grab
313          *      another request, but what if this is the only request?
314          *      What if there are multiple such packets with race
315          *      conditions?  We don't want to thrash the queue...
316          *
317          *      This busy-wait is less than optimal, but it's simple,
318          *      fail-safe, and it works.
319          */
320         if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
321                 int count, ok;
322                 struct timeval tv;
323 #ifdef HAVE_PTHREAD_SIGMASK
324                 sigset_t set, old_set;
325
326                 /*
327                  *      Block a large number of signals which could
328                  *      cause the select to return EINTR
329                  */
330                 sigemptyset(&set);
331                 sigaddset(&set, SIGPIPE);
332                 sigaddset(&set, SIGCONT);
333                 sigaddset(&set, SIGSTOP);
334                 sigaddset(&set, SIGCHLD);
335                 pthread_sigmask(SIG_BLOCK, &set, &old_set);
336 #endif
337
338                 rad_assert((*request)->proxy_reply != NULL);
339
340                 ok = FALSE;
341
342                 /*
343                  *      Sleep for 100 milliseconds.  If the other thread
344                  *      doesn't get serviced in this time, to clear
345                  *      the "child_pid" entry, then the server is too
346                  *      busy, so we die.
347                  */
348                 for (count = 0; count < 10; count++) {
349                         tv.tv_sec = 0;
350                         tv.tv_usec = 10000; /* sleep for 10 milliseconds */
351
352                         /*
353                          *      Portable sleep that's thread-safe.
354                          *
355                          *      Don't worry about interrupts, as they're
356                          *      blocked above.
357                          */
358                         select(0, NULL, NULL, NULL, &tv);
359                         if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
360                                 ok = TRUE;
361                                 break;
362                         }
363                 }
364
365 #ifdef HAVE_PTHREAD_SIGMASK
366                 /*
367                  *      Restore the original thread signal mask.
368                  */
369                 pthread_sigmask(SIG_SETMASK, &old_set, NULL);
370 #endif
371
372                 if (!ok) {
373                         radlog(L_ERR, "FATAL!  Server is too busy to process requests");
374                         exit(1);
375                 }
376         }
377
378         return;
379 }
380
381
382 /*
383  *      The main thread handler for requests.
384  *
385  *      Wait on the semaphore until we have it, and process the request.
386  */
387 static void *request_handler_thread(void *arg)
388 {
389         RAD_REQUEST_FUNP  fun;
390         THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
391 #ifdef HAVE_PTHREAD_SIGMASK
392         sigset_t set;
393
394         /*
395          *      Block SIGHUP handling for the child threads.
396          *
397          *      This ensures that only the main server thread will
398          *      process HUP signals.
399          *
400          *      If we don't have sigprocmask, then it shouldn't be
401          *      a problem, either, as the sig_hup handler should check
402          *      for this condition.
403          */
404         sigemptyset(&set);
405         sigaddset(&set, SIGHUP);
406         sigaddset(&set, SIGINT);
407         sigaddset(&set, SIGQUIT);
408         sigaddset(&set, SIGTERM);
409         pthread_sigmask(SIG_BLOCK, &set, NULL);
410 #endif
411
412         /*
413          *      Loop forever, until told to exit.
414          */
415         do {
416                 /*
417                  *      Wait to be signalled.
418                  */
419                 DEBUG2("Thread %d waiting to be assigned a request",
420                        self->thread_num);
421         re_wait:
422                 if (sem_wait(&thread_pool.semaphore) != 0) {
423                         /*
424                          *      Interrupted system call.  Go back to
425                          *      waiting, but DON'T print out any more
426                          *      text.
427                          */
428                         if (errno == EINTR) {
429                                 DEBUG2("Re-wait %d", self->thread_num);
430                                 goto re_wait;
431                         }
432                         radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
433                                self->thread_num, strerror(errno));
434                         break;
435                 }
436
437                 DEBUG2("Thread %d got semaphore", self->thread_num);
438
439                 /*
440                  *      Try to grab a request from the queue.
441                  *
442                  *      It may be empty, in which case we fail
443                  *      gracefully.
444                  */
445                 request_dequeue(&self->request, &fun);
446                 if (!self->request) continue;
447
448                 self->request->child_pid = self->pthread_id;
449                 self->request_count++;
450
451                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
452                        self->thread_num, self->request->number,
453                        self->request_count);
454
455                 /*
456                  *      Respond, and reset request->child_pid
457                  */
458                 rad_respond(self->request, fun);
459                 self->request = NULL;
460
461                 /*
462                  *      Update the active threads.
463                  */
464                 pthread_mutex_lock(&thread_pool.mutex);
465                 rad_assert(thread_pool.active_threads > 0);
466                 thread_pool.active_threads--;
467                 pthread_mutex_unlock(&thread_pool.mutex);
468         } while (self->status != THREAD_CANCELLED);
469
470         DEBUG2("Thread %d exiting...", self->thread_num);
471
472         /*
473          *  Do this as the LAST thing before exiting.
474          */
475         self->status = THREAD_EXITED;
476
477         return NULL;
478 }
479
480 /*
481  *      Take a THREAD_HANDLE, and delete it from the thread pool.
482  *
483  *      This function is called ONLY from the main server thread.
484  */
485 static void delete_thread(THREAD_HANDLE *handle)
486 {
487         THREAD_HANDLE *prev;
488         THREAD_HANDLE *next;
489
490         rad_assert(handle->request == NULL);
491
492         prev = handle->prev;
493         next = handle->next;
494         rad_assert(thread_pool.total_threads > 0);
495         thread_pool.total_threads--;
496
497         /*
498          *      Remove the handle from the list.
499          */
500         if (prev == NULL) {
501                 rad_assert(thread_pool.head == handle);
502                 thread_pool.head = next;
503         } else {
504                 prev->next = next;
505         }
506
507         if (next == NULL) {
508                 rad_assert(thread_pool.tail == handle);
509                 thread_pool.tail = prev;
510         } else {
511                 next->prev = prev;
512         }
513
514         DEBUG2("Deleting thread %d", handle->thread_num);
515
516         /*
517          *      This thread has exited.  Delete any additional
518          *      resources associated with it.
519          */
520
521         /*
522          *      Free the memory, now that we're sure the thread
523          *      exited.
524          */
525         free(handle);
526 }
527
528
529 /*
530  *      Spawn a new thread, and place it in the thread pool.
531  *
532  *      The thread is started initially in the blocked state, waiting
533  *      for the semaphore.
534  */
535 static THREAD_HANDLE *spawn_thread(time_t now)
536 {
537         int rcode;
538         THREAD_HANDLE *handle;
539         pthread_attr_t attr;
540
541         /*
542          *      Ensure that we don't spawn too many threads.
543          */
544         if (thread_pool.total_threads >= thread_pool.max_threads) {
545                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
546                 return NULL;
547         }
548
549         /*
550          *      Allocate a new thread handle.
551          */
552         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
553         memset(handle, 0, sizeof(THREAD_HANDLE));
554         handle->prev = NULL;
555         handle->next = NULL;
556         handle->pthread_id = NO_SUCH_CHILD_PID;
557         handle->thread_num = thread_pool.max_thread_num++;
558         handle->request_count = 0;
559         handle->status = THREAD_RUNNING;
560         handle->timestamp = time(NULL);
561
562         /*
563          *      Initialize the thread's attributes to detached.
564          *
565          *      We could call pthread_detach() later, but if the thread
566          *      exits between the create & detach calls, it will need to
567          *      be joined, which will never happen.
568          */
569         pthread_attr_init(&attr);
570         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
571
572         /*
573          *      Create the thread detached, so that it cleans up it's
574          *      own memory when it exits.
575          *
576          *      Note that the function returns non-zero on error, NOT
577          *      -1.  The return code is the error, and errno isn't set.
578          */
579         rcode = pthread_create(&handle->pthread_id, &attr,
580                         request_handler_thread, handle);
581         if (rcode != 0) {
582                 radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
583                        strerror(rcode));
584                 exit(1);
585         }
586         pthread_attr_destroy(&attr);
587
588         /*
589          *      One more thread to go into the list.
590          */
591         thread_pool.total_threads++;
592         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
593                         handle->thread_num, thread_pool.total_threads);
594
595         /*
596          *      Add the thread handle to the tail of the thread pool list.
597          */
598         if (thread_pool.tail) {
599                 thread_pool.tail->next = handle;
600                 handle->prev = thread_pool.tail;
601                 thread_pool.tail = handle;
602         } else {
603                 rad_assert(thread_pool.head == NULL);
604                 thread_pool.head = thread_pool.tail = handle;
605         }
606
607         /*
608          *      Update the time we last spawned a thread.
609          */
610         thread_pool.time_last_spawned = now;
611
612         /*
613          *      And return the new handle to the caller.
614          */
615         return handle;
616 }
617
618 /*
619  *      Temporary function to prevent server from executing a SIGHUP
620  *      until all threads are finished handling requests.  This returns
621  *      the number of active threads to 'radiusd.c'.
622  */
623 int total_active_threads(void)
624 {
625         int rcode = 0;
626         THREAD_HANDLE *handle;
627
628         for (handle = thread_pool.head; handle != NULL; handle = handle->next){
629                 if (handle->request != NULL) {
630                         rcode ++;
631                 }
632         }
633         return (rcode);
634 }
635
636 /*
637  *      Allocate the thread pool, and seed it with an initial number
638  *      of threads.
639  *
640  *      FIXME: What to do on a SIGHUP???
641  */
642 int thread_pool_init(void)
643 {
644         int             i, rcode;
645         CONF_SECTION    *pool_cf;
646         time_t          now;
647
648         DEBUG("Initializing the thread pool...");
649         now = time(NULL);
650
651         /*
652          *      After a SIGHUP, we don't over-write the previous values.
653          */
654         if (!pool_initialized) {
655                 /*
656                  *      Initialize the thread pool to some reasonable values.
657                  */
658                 memset(&thread_pool, 0, sizeof(THREAD_POOL));
659                 thread_pool.head = NULL;
660                 thread_pool.tail = NULL;
661                 thread_pool.total_threads = 0;
662                 thread_pool.max_thread_num = 1;
663                 thread_pool.cleanup_delay = 5;
664         }
665
666         pool_cf = cf_section_find("thread");
667         if (pool_cf != NULL) {
668                 cf_section_parse(pool_cf, NULL, thread_config);
669         }
670
671         /*
672          *      Limit the maximum number of threads to the maximum
673          *      number of forks we can do.
674          *
675          *      FIXME: Make this code better...
676          */
677         if (thread_pool.max_threads >= NUM_FORKERS) {
678                 thread_pool.max_threads = NUM_FORKERS;
679         }
680
681
682         /*
683          *      The pool has already been initialized.  Don't spawn
684          *      new threads, and don't forget about forked children,
685          */
686         if (pool_initialized) {
687                 return 0;
688         }
689
690         /*
691          *      Initialize the queue of requests.
692          */
693         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
694         if (rcode != 0) {
695                 radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
696                        strerror(errno));
697                 exit(1);
698         }
699
700         rcode = pthread_mutex_init(&thread_pool.mutex,NULL);
701         if (rcode != 0) {
702                 radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
703                        strerror(errno));
704                 exit(1);
705         }
706
707         /*
708          *      Queue head & tail are set to zero by the memset,
709          *      above.
710          *
711          *      Allocate an initial queue, always as a power of 2.
712          */
713         thread_pool.queue_size = 256;
714         thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) *
715                                        thread_pool.queue_size);
716         memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) *
717                                       thread_pool.queue_size));
718
719         /*
720          *      Create a number of waiting threads.
721          *
722          *      If we fail while creating them, do something intelligent.
723          */
724         for (i = 0; i < thread_pool.start_threads; i++) {
725                 if (spawn_thread(now) == NULL) {
726                         return -1;
727                 }
728         }
729
730         DEBUG2("Thread pool initialized");
731         pool_initialized = TRUE;
732         return 0;
733 }
734
735
736 /*
737  *      Assign a new request to a free thread.
738  *
739  *      If there isn't a free thread, then try to create a new one,
740  *      up to the configured limits.
741  */
742 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
743 {
744         /*
745          *      If the thread pool is busy handling requests, then
746          *      try to spawn another one.
747          */
748         if (thread_pool.active_threads == thread_pool.total_threads) {
749                 if (spawn_thread(request->timestamp) == NULL) {
750                         radlog(L_INFO,
751                                "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
752                                thread_pool.max_threads);
753                         return 0;
754                 }
755         }
756
757         /*
758          *      Add the new request to the queue.
759          */
760         request_enqueue(request, fun);
761
762         return 1;
763 }
764
765 /*
766  *      Check the min_spare_threads and max_spare_threads.
767  *
768  *      If there are too many or too few threads waiting, then we
769  *      either create some more, or delete some.
770  */
771 int thread_pool_clean(time_t now)
772 {
773         int spare;
774         int i, total;
775         THREAD_HANDLE *handle, *next;
776         int active_threads;
777         static time_t last_cleaned = 0;
778
779         /*
780          *      Loop over the thread pool deleting exited threads.
781          */
782         for (handle = thread_pool.head; handle; handle = next) {
783                 next = handle->next;
784
785                 /*
786                  *      Maybe we've asked the thread to exit, and it
787                  *      has agreed.
788                  */
789                 if (handle->status == THREAD_EXITED) {
790                         delete_thread(handle);
791                 }
792         }
793
794         /*
795          *      We don't need a mutex lock here, as we're reading
796          *      the location, and not modifying it.  We want a close
797          *      approximation of the number of active threads, and this
798          *      is good enough.
799          */
800         active_threads = thread_pool.active_threads;
801         spare = thread_pool.total_threads - active_threads;
802         if (debug_flag) {
803                 static int old_total = -1;
804                 static int old_active = -1;
805
806                 if ((old_total != thread_pool.total_threads) ||
807                                 (old_active != active_threads)) {
808                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
809                                         thread_pool.total_threads, active_threads, spare);
810                         old_total = thread_pool.total_threads;
811                         old_active = active_threads;
812                 }
813         }
814
815         /*
816          *      If there are too few spare threads, create some more.
817          */
818         if (spare < thread_pool.min_spare_threads) {
819                 total = thread_pool.min_spare_threads - spare;
820
821                 DEBUG2("Threads: Spawning %d spares", total);
822                 /*
823                  *      Create a number of spare threads.
824                  */
825                 for (i = 0; i < total; i++) {
826                         handle = spawn_thread(now);
827                         if (handle == NULL) {
828                                 return -1;
829                         }
830                 }
831
832                 /*
833                  *      And exit, as there can't be too many spare threads.
834                  */
835                 return 0;
836         }
837
838         /*
839          *      Only delete spare threads if we haven't already done
840          *      so this second.
841          */
842         if (now == last_cleaned) {
843                 return 0;
844         }
845         last_cleaned = now;
846
847         /*
848          *      Only delete the spare threads if sufficient time has
849          *      passed since we last created one.  This helps to minimize
850          *      the amount of create/delete cycles.
851          */
852         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
853                 return 0;
854         }
855
856         /*
857          *      If there are too many spare threads, delete one.
858          *
859          *      Note that we only delete ONE at a time, instead of
860          *      wiping out many.  This allows the excess servers to
861          *      be slowly reaped, just in case the load spike comes again.
862          */
863         if (spare > thread_pool.max_spare_threads) {
864
865                 spare -= thread_pool.max_spare_threads;
866
867                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
868
869                 /*
870                  *      Walk through the thread pool, deleting the
871                  *      first idle thread we come across.
872                  */
873                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
874                         next = handle->next;
875
876                         /*
877                          *      If the thread is not handling a
878                          *      request, but still live, then tell it
879                          *      to exit.
880                          *
881                          *      It will eventually wake up, and realize
882                          *      it's been told to commit suicide.
883                          */
884                         if ((handle->request == NULL) &&
885                             (handle->status == THREAD_RUNNING)) {
886                                 handle->status = THREAD_CANCELLED;
887                                 /*
888                                  *      Post an extra semaphore, as a
889                                  *      signal to wake up, and exit.
890                                  */
891                                 sem_post(&thread_pool.semaphore);
892                                 spare--;
893                                 break;
894                         }
895                 }
896         }
897
898         /*
899          *      If the thread has handled too many requests, then make it
900          *      exit.
901          */
902         if (thread_pool.max_requests_per_thread > 0) {
903                 for (handle = thread_pool.head; handle; handle = next) {
904                         next = handle->next;
905
906                         /*
907                          *      Not handling a request, but otherwise
908                          *      live, we can kill it.
909                          */
910                         if ((handle->request == NULL) &&
911                             (handle->status == THREAD_RUNNING) &&
912                             (handle->request_count > thread_pool.max_requests_per_thread)) {
913                                 handle->status = THREAD_CANCELLED;
914                                 sem_post(&thread_pool.semaphore);
915                         }
916                 }
917         }
918
919         /*
920          *      Otherwise everything's kosher.  There are not too few,
921          *      or too many spare threads.  Exit happily.
922          */
923         return 0;
924 }
925
926 static int exec_initialized = FALSE;
927
928 /*
929  *      Initialize the stuff for keeping track of child processes.
930  */
931 void rad_exec_init(void)
932 {
933         int i;
934
935         /*
936          *      Initialize the mutex used to remember calls to fork.
937          */
938         pthread_mutex_init(&fork_mutex, NULL);
939
940         /*
941          *      Initialize the data structure where we remember the
942          *      mappings of thread ID && child PID to exit status.
943          */
944         for (i = 0; i < NUM_FORKERS; i++) {
945                 forkers[i].thread_id = NO_SUCH_CHILD_PID;
946                 forkers[i].child_pid = -1;
947                 forkers[i].status = 0;
948         }
949
950         exec_initialized = TRUE;
951 }
952
953 /*
954  *      We use the PID number as a base for the array index, so that
955  *      we can quickly turn the PID into a free array entry, instead
956  *      of rooting blindly through the entire array.
957  */
958 #define PID_2_ARRAY(pid) (((int) pid ) & (NUM_FORKERS - 1))
959
960 /*
961  *      Thread wrapper for fork().
962  */
963 pid_t rad_fork(int exec_wait)
964 {
965         sigset_t set;
966         pid_t child_pid;
967
968         /*
969          *      The thread is NOT interested in waiting for the exit
970          *      status of the child process, so we don't bother
971          *      updating our kludgy array.
972          *
973          *      Or, there no NO threads, so we can just do the fork
974          *      thing.
975          */
976         if (!exec_wait || !exec_initialized) {
977                 return fork();
978         }
979
980         /*
981          *      Block SIGCLHD until such time as we've saved the PID.
982          *
983          *      Note that we block SIGCHLD for ALL threads associated
984          *      with this process!  This is to prevent race conditions!
985          */
986         sigemptyset(&set);
987         sigaddset(&set, SIGCHLD);
988         sigprocmask(SIG_BLOCK, &set, NULL);
989
990         /*
991          *      Do the fork.
992          */
993         child_pid = fork();
994
995         /*
996          *      We managed to fork.  Let's see if we have a free
997          *      array entry.
998          */
999         if (child_pid > 0) { /* parent */
1000                 int i;
1001                 int found;
1002                 time_t now = time(NULL);
1003
1004                 /*
1005                  *      We store the information in the array
1006                  *      indexed by PID.  This means that we have
1007                  *      on average an O(1) lookup to find the element,
1008                  *      instead of rooting through the entire array.
1009                  */
1010                 i = PID_2_ARRAY(child_pid);
1011                 found = -1;
1012
1013                 /*
1014                  *      We may have multiple threads trying to find an
1015                  *      empty position, so we lock the array until
1016                  *      we've found an entry.
1017                  */
1018                 pthread_mutex_lock(&fork_mutex);
1019                 do {
1020                         if (forkers[i].thread_id == NO_SUCH_CHILD_PID) {
1021                                 found = i;
1022                                 break;
1023                         }
1024
1025                         /*
1026                          *      Clean up any stale forked sessions.
1027                          *
1028                          *      This sometimes happens, for crazy reasons.
1029                          */
1030                         if ((now - forkers[i].time_forked) > 30) {
1031                                 forkers[i].thread_id = NO_SUCH_CHILD_PID;
1032
1033                                 /*
1034                                  *      Grab the child's exit condition,
1035                                  *      just in case...
1036                                  */
1037                                 waitpid(forkers[i].child_pid,
1038                                         &forkers[i].status,
1039                                         WNOHANG);
1040                                 sem_destroy(&forkers[i].child_done);
1041                                 found = i;
1042                                 break;
1043                         }
1044
1045                         /*
1046                          *  Increment it, within the array.
1047                          */
1048                         i++;
1049                         i &= (NUM_FORKERS - 1);
1050                 } while (i != PID_2_ARRAY(child_pid));
1051
1052                 /*
1053                  *      Arg.  We did a fork, and there was nowhere to
1054                  *      put the answer.
1055                  */
1056                 if (found < 0) {
1057                         sigprocmask(SIG_UNBLOCK, &set, NULL);
1058                         pthread_mutex_unlock(&fork_mutex);
1059                         return (pid_t) -1;
1060                 }
1061
1062                 /*
1063                  *      In the parent, set the status, and create the
1064                  *      semaphore.
1065                  */
1066                 forkers[found].status = -1;
1067                 forkers[found].child_pid = child_pid;
1068                 forkers[found].thread_id = pthread_self();
1069                 forkers[found].time_forked = now;
1070                 sem_init(&forkers[found].child_done, 0, SEMAPHORE_LOCKED);
1071                 pthread_mutex_unlock(&fork_mutex);
1072         }
1073
1074         /*
1075          *      Unblock SIGCHLD, now that there's no chance of bad entries
1076          *      in the array.
1077          */
1078         sigprocmask(SIG_UNBLOCK, &set, NULL);
1079
1080         /*
1081          *      Return whatever we were told.
1082          */
1083         return child_pid;
1084 }
1085
1086 /*
1087  *      Thread wrapper for waitpid(), so threads can wait for
1088  *      the PID they forked.
1089  */
1090 pid_t rad_waitpid(pid_t pid, int *status, int options)
1091 {
1092         int i, rcode;
1093         int found;
1094         pthread_t self = pthread_self();
1095
1096         /*
1097          *      We're only allowed to wait for a SPECIFIC pid.
1098          */
1099         if (pid <= 0) {
1100                 return -1;
1101         }
1102
1103         /*
1104          *      Find the PID to wait for, starting at an index within
1105          *      the array.  This makes the lookups O(1) on average,
1106          *      instead of O(n), when the array is filling up.
1107          */
1108         found = -1;
1109         i = PID_2_ARRAY(pid);
1110         do {
1111                 /*
1112                  *      We were the ones who forked this specific
1113                  *      child.
1114                  */
1115                 if ((forkers[i].thread_id == self) &&
1116                     (forkers[i].child_pid == pid)) {
1117                         found = i;
1118                         break;
1119                 }
1120
1121                 i++;
1122                 i &= (NUM_FORKERS - 1);
1123         } while (i != PID_2_ARRAY(pid));
1124
1125         /*
1126          *      No thread ID found: we're trying to wait for a child
1127          *      we've never forked!
1128          */
1129         if (found < 0) {
1130                 return -1;
1131         }
1132
1133         /*
1134          *      Wait for the signal that the child's status has been
1135          *      returned.
1136          */
1137         if (options == WNOHANG) {
1138                 rcode = sem_trywait(&forkers[found].child_done);
1139                 if (rcode != 0) {
1140                         return 0; /* no child available */
1141                 }
1142         } else {                /* wait forever */
1143         re_wait:
1144                 rcode = sem_wait(&forkers[found].child_done);
1145                 if ((rcode != 0) && (errno == EINTR)) {
1146                         goto re_wait;
1147                 }
1148         }
1149
1150         /*
1151          *      We've got the semaphore.  Now destroy it.
1152          *
1153          *      FIXME: Maybe we want to set up the semaphores in advance,
1154          *      to prevent the creation && deletion of lots of them,
1155          *      if creating and deleting them is expensive.
1156          */
1157         sem_destroy(&forkers[found].child_done);
1158
1159         /*
1160          *      Save the status BEFORE we re-set the thread ID.
1161          */
1162         *status = forkers[found].status;
1163
1164         /*
1165          *      This next line taints the other array entries,
1166          *      due to other threads re-using the data structure.
1167          */
1168         forkers[found].thread_id = NO_SUCH_CHILD_PID;
1169
1170         return pid;
1171 }
1172
1173 /*
1174  *      Called by the main signal handler, to save the status of the child
1175  */
1176 int rad_savepid(pid_t pid, int status)
1177 {
1178         int i;
1179
1180         /*
1181          *      Find the PID to wait for, starting at an index within
1182          *      the array.  This makes the lookups O(1) on average,
1183          *      instead of O(n), when the array is filling up.
1184          */
1185         i = PID_2_ARRAY(pid);
1186
1187         /*
1188          *      Do NOT lock the array, as nothing else sets the
1189          *      status and posts the semaphore.
1190          */
1191         do {
1192                 /*
1193                  *      Any thread can get the sigchild...
1194                  */
1195                 if ((forkers[i].thread_id != NO_SUCH_CHILD_PID) &&
1196                     (forkers[i].child_pid == pid)) {
1197                         /*
1198                          *      Save the status, THEN post the
1199                          *      semaphore.
1200                          */
1201                         forkers[i].status = status;
1202                         sem_post(&forkers[i].child_done);
1203
1204                         /*
1205                          *      FIXME: If the child is more than 60
1206                          *      seconds out of date, then delete it.
1207                          *
1208                          *      That is, we've forked, and the forker
1209                          *      is waiting nearly forever
1210                          */
1211                         return 0;
1212                 }
1213
1214                 i++;
1215                 i &= (NUM_FORKERS - 1);
1216         } while (i != PID_2_ARRAY(pid));
1217
1218         return -1;
1219 }
1220 #endif