Do waitpid() for ALL children, even if there are no threads waiting
[freeradius.git] / src / main / threads.c
index 9c90403..130fe9e 100644 (file)
  *
  *   You should have received a copy of the GNU General Public License
  *   along with this program; if not, write to the Free Software
- *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
  *
- * Copyright 2000  The FreeRADIUS server project
+ * Copyright 2000,2006  The FreeRADIUS server project
  * Copyright 2000  Alan DeKok <aland@ox.org>
  */
 
-#include "libradius.h"
+#include <freeradius-devel/ident.h>
+RCSID("$Id$")
 
-#ifdef HAVE_PTHREAD_H
+#include <freeradius-devel/radiusd.h>
+#include <freeradius-devel/rad_assert.h>
 
-#include <stdlib.h>
-#include <string.h>
+/*
+ *     Other OS's have sem_init, OS X doesn't.
+ */
+#ifdef HAVE_SEMAPHORE_H
 #include <semaphore.h>
-#include <signal.h>
+#endif
+
+#ifdef DARWIN
+#include <mach/task.h>
+#include <mach/semaphore.h>
+
+#undef sem_t
+#define sem_t semaphore_t
+#undef sem_init
+#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
+#undef sem_wait
+#define sem_wait(s) semaphore_wait(*s)
+#undef sem_post
+#define sem_post(s) semaphore_signal(*s)
+#endif
 
 #ifdef HAVE_SYS_WAIT_H
 #include <sys/wait.h>
 #endif
 
-#include "radiusd.h"
-#include "rad_assert.h"
-#include "conffile.h"
+#ifdef HAVE_PTHREAD_H
 
-static const char rcsid[] =
-"$Id$";
+#ifdef HAVE_OPENSSL_CRYPTO_H
+#include <openssl/crypto.h>
+#endif
+#ifdef HAVE_OPENSSL_ERR_H
+#include <openssl/err.h>
+#endif
+#ifdef HAVE_OPENSSL_EVP_H
+#include <openssl/evp.h>
+#endif
 
 #define SEMAPHORE_LOCKED       (0)
 #define SEMAPHORE_UNLOCKED     (1)
@@ -48,6 +71,9 @@ static const char rcsid[] =
 #define THREAD_CANCELLED       (2)
 #define THREAD_EXITED          (3)
 
+#define NUM_FIFOS               RAD_LISTEN_MAX
+
+
 /*
  *  A data structure which contains the information about
  *  the current thread.
@@ -70,13 +96,11 @@ typedef struct THREAD_HANDLE {
        REQUEST              *request;
 } THREAD_HANDLE;
 
-/*
- *     For the request queue.
- */
-typedef struct request_queue_t {
-       REQUEST           *request;
-       RAD_REQUEST_FUNP  fun;
-} request_queue_t;
+typedef struct thread_fork_t {
+       pid_t           pid;
+       int             status;
+       int             exited;
+} thread_fork_t;
 
 
 /*
@@ -89,6 +113,7 @@ typedef struct THREAD_POOL {
        THREAD_HANDLE *tail;
 
        int total_threads;
+       int active_threads;     /* protected by queue_mutex */
        int max_thread_num;
        int start_threads;
        int max_threads;
@@ -98,6 +123,12 @@ typedef struct THREAD_POOL {
        unsigned long request_count;
        time_t time_last_spawned;
        int cleanup_delay;
+       int spawn_flag;
+
+#ifdef WNOHANG
+       pthread_mutex_t wait_mutex;
+       fr_hash_table_t *waiters;
+#endif
 
        /*
         *      All threads wait on this semaphore, for requests
@@ -108,46 +139,18 @@ typedef struct THREAD_POOL {
        /*
         *      To ensure only one thread at a time touches the queue.
         */
-       pthread_mutex_t mutex;
+       pthread_mutex_t queue_mutex;
 
-       int             active_threads;
-       int             queue_head; /* first filled entry */
-       int             queue_tail; /* first empty entry */
-       int             queue_size;
-       request_queue_t *queue;
+       int             max_queue_size;
+       int             num_queued;
+       fr_fifo_t       *fifo[NUM_FIFOS];
 } THREAD_POOL;
 
 static THREAD_POOL thread_pool;
 static int pool_initialized = FALSE;
+static time_t last_cleaned = 0;
 
-/*
- *     Data structure to keep track of which child forked which
- *     request.  If we cared, we'd keep a list of "free" and "active"
- *     entries.
- *
- *     FIXME: Have a time out, so we clean up entries which haven't
- *     been picked up!
- */
-typedef struct rad_fork_t {
-       pthread_t       thread_id;
-       pid_t           child_pid;
-       sem_t           child_done;
-       int             status; /* exit status of the child */
-       time_t          time_forked;
-} rad_fork_t;
-
-/*
- *  This MUST be a power of 2 for it to work properly!
- */
-#define NUM_FORKERS (8192)
-static rad_fork_t forkers[NUM_FORKERS];
-
-/*
- *     This mutex ensures that only one thread is doing certain
- *     kinds of magic to the previous array.
- */
-static pthread_mutex_t fork_mutex;
-
+static void thread_pool_manage(time_t now);
 
 /*
  *     A mapping of configuration file names to internal integers
@@ -159,88 +162,154 @@ static const CONF_PARSER thread_config[] = {
        { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
        { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
        { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
+       { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
        { NULL, -1, 0, NULL, NULL }
 };
 
 
+#ifdef HAVE_OPENSSL_CRYPTO_H
+
 /*
- *     Add a request to the list of waiting requests.
- *     This function gets called ONLY from the main handler thread...
+ *     If we're linking against OpenSSL, then it is the
+ *     duty of the application, if it is multithreaded,
+ *     to provide OpenSSL with appropriate thread id
+ *     and mutex locking functions
  *
- *     This function should never fail.
+ *     Note: this only implements static callbacks.
+ *     OpenSSL does not use dynamic locking callbacks
+ *     right now, but may in the futiure, so we will have
+ *     to add them at some point.
  */
-static void request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
+
+static pthread_mutex_t *ssl_mutexes = NULL;
+
+static unsigned long ssl_id_function(void)
 {
-       int num_entries;
+       return (unsigned long) pthread_self();
+}
 
-       pthread_mutex_lock(&thread_pool.mutex);
+static void ssl_locking_function(int mode, int n, const char *file, int line)
+{
+       file = file;            /* -Wunused */
+       line = line;            /* -Wunused */
 
-       thread_pool.request_count++;
+       if (mode & CRYPTO_LOCK) {
+               pthread_mutex_lock(&(ssl_mutexes[n]));
+       } else {
+               pthread_mutex_unlock(&(ssl_mutexes[n]));
+       }
+}
+
+static int setup_ssl_mutexes(void)
+{
+       int i;
 
+#ifdef HAVE_OPENSSL_EVP_H
        /*
-        *      If the queue is empty, re-set the indices to zero,
-        *      for no particular reason...
+        *      Enable all ciphers and digests.
         */
-       if ((thread_pool.queue_head == thread_pool.queue_tail) &&
-           (thread_pool.queue_head != 0)) {
-               thread_pool.queue_head = thread_pool.queue_tail = 0;
+       OpenSSL_add_all_algorithms();
+#endif
+
+       ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
+       if (!ssl_mutexes) {
+               radlog(L_ERR, "Error allocating memory for SSL mutexes!");
+               return 0;
        }
 
-       /*
-        *      If the queue is full, die.
-        *
-        *      The math is to take into account the fact that it's a
-        *      circular queue.
-        */
-       num_entries = ((thread_pool.queue_tail + thread_pool.queue_size) -
-                      thread_pool.queue_head) % thread_pool.queue_size;
-       if (num_entries == (thread_pool.queue_size - 1)) {
-               request_queue_t *new_queue;
+       for (i = 0; i < CRYPTO_num_locks(); i++) {
+               pthread_mutex_init(&(ssl_mutexes[i]), NULL);
+       }
 
-               /*
-                *      If the queue becomes larger than 65536,
-                *      there's a serious problem.
-                */
-               if (thread_pool.queue_size >= 65536) {
-                       pthread_mutex_unlock(&thread_pool.mutex);
+       CRYPTO_set_id_callback(ssl_id_function);
+       CRYPTO_set_locking_callback(ssl_locking_function);
 
-                       /*
-                        *      Mark the request as done.
-                        */
-                       radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
-                       request->finished = TRUE;
-                       return;
-               }
+       return 1;
+}
+#endif
+
+#ifdef WNOHANG
+/*
+ *     We don't want to catch SIGCHLD for a host of reasons.
+ *
+ *     - exec_wait means that someone, somewhere, somewhen, will
+ *     call waitpid(), and catch the child.
+ *
+ *     - SIGCHLD is delivered to a random thread, not the one that
+ *     forked.
+ *
+ *     - if another thread catches the child, we have to coordinate
+ *     with the thread doing the waiting.
+ *
+ *     - if we don't waitpid() for non-wait children, they'll be zombies,
+ *     and will hang around forever.
+ *
+ */
+static void reap_children(void)
+{
+       pid_t pid;
+       int status;
+       thread_fork_t mytf, *tf;
+
+
+       pthread_mutex_lock(&thread_pool.wait_mutex);
+
+       do {
+       retry:
+               pid = waitpid(0, &status, WNOHANG);
+               if (pid <= 0) break;
+
+               mytf.pid = pid;
+               tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+               if (!tf) goto retry;
+
+               tf->status = status;
+               tf->exited = 1;
+       } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
+
+       pthread_mutex_unlock(&thread_pool.wait_mutex);
+}
+#else
+#define reap_children()
+#endif /* WNOHANG */
+
+/*
+ *     Add a request to the list of waiting requests.
+ *     This function gets called ONLY from the main handler thread...
+ *
+ *     This function should never fail.
+ */
+static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
+{
+       rad_assert(request->process == fun);
+       pthread_mutex_lock(&thread_pool.queue_mutex);
+
+       thread_pool.request_count++;
+
+       if (thread_pool.num_queued >= thread_pool.max_queue_size) {
+               pthread_mutex_unlock(&thread_pool.queue_mutex);
 
                /*
-                *      Malloc a new queue, doubled in size, copy the
-                *      data from the current queue over to it, zero
-                *      out the second half of the queue, free the old
-                *      one, and replace thread_pool.queue with the
-                *      new one.
+                *      Mark the request as done.
                 */
-               new_queue = rad_malloc(sizeof(*new_queue) * thread_pool.queue_size * 2);
-               memcpy(new_queue, thread_pool.queue,
-                      sizeof(*new_queue) * thread_pool.queue_size);
-               memset(new_queue + sizeof(*new_queue) * thread_pool.queue_size,
-                      0, sizeof(*new_queue) * thread_pool.queue_size);
-
-               free(thread_pool.queue);
-               thread_pool.queue = new_queue;
-               thread_pool.queue_size *= 2;
+               radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
+               request->child_state = REQUEST_DONE;
+               return 0;
        }
 
        /*
-        *      Add the data to the queue tail, increment the tail,
-        *      and signal the semaphore that there's another request
-        *      in the queue.
+        *      Push the request onto the appropriate fifo for that
         */
-       thread_pool.queue[thread_pool.queue_tail].request = request;
-       thread_pool.queue[thread_pool.queue_tail].fun = fun;
-       thread_pool.queue_tail++;
-       thread_pool.queue_tail &= (thread_pool.queue_size - 1);
+       if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
+               pthread_mutex_unlock(&thread_pool.queue_mutex);
+               radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
+               request->child_state = REQUEST_DONE;
+               return 0;
+       }
 
-       pthread_mutex_unlock(&thread_pool.mutex);
+       thread_pool.num_queued++;
+
+       pthread_mutex_unlock(&thread_pool.queue_mutex);
 
        /*
         *      There's one more request in the queue.
@@ -251,131 +320,95 @@ static void request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
         *      the mutex, it will be unlocked, and there won't be
         *      contention.
         */
-
        sem_post(&thread_pool.semaphore);
 
-       return;
+       return 1;
 }
 
 /*
  *     Remove a request from the queue.
  */
-static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
+static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
 {
-       pthread_mutex_lock(&thread_pool.mutex);
+       RAD_LISTEN_TYPE i, start;
+       REQUEST *request = NULL;
+
+       reap_children();
 
+       pthread_mutex_lock(&thread_pool.queue_mutex);
+
+       /*
+        *      Clear old requests from all queues.
+        *
+        *      We only do one pass over the queue, in order to
+        *      amortize the work across the child threads.  Since we
+        *      do N checks for one request de-queued, the old
+        *      requests will be quickly cleared.
+        */
+       for (i = 0; i < RAD_LISTEN_MAX; i++) {
+               request = fr_fifo_peek(thread_pool.fifo[i]);
+               if (!request ||
+                   (request->master_state != REQUEST_STOP_PROCESSING)) {
+                       continue;
+}
+               /*
+                *      This entry was marked to be stopped.  Acknowledge it.
+                */
+               request = fr_fifo_pop(thread_pool.fifo[i]);
+               rad_assert(request != NULL);
+               request->child_state = REQUEST_DONE;
+               thread_pool.num_queued--;
+       }
+
+       start = 0;
+ retry:
        /*
-        *      Head & tail are the same.  There's nothing in
-        *      the queue.
+        *      Pop results from the top of the queue
         */
-       if (thread_pool.queue_head == thread_pool.queue_tail) {
-               pthread_mutex_unlock(&thread_pool.mutex);
-               *request = NULL;
+       for (i = start; i < RAD_LISTEN_MAX; i++) {
+               request = fr_fifo_pop(thread_pool.fifo[i]);
+               if (request) {
+                       start = i;
+                       break;
+               }
+       }
+
+       if (!request) {
+               pthread_mutex_unlock(&thread_pool.queue_mutex);
+               *prequest = NULL;
                *fun = NULL;
-               return;
+               return 0;
        }
 
-       *request = thread_pool.queue[thread_pool.queue_head].request;
-       *fun = thread_pool.queue[thread_pool.queue_head].fun;
+       rad_assert(thread_pool.num_queued > 0);
+       thread_pool.num_queued--;
+       *prequest = request;
+       *fun = request->process;
 
-       rad_assert(*request != NULL);
-       rad_assert((*request)->magic == REQUEST_MAGIC);
+       rad_assert(request->magic == REQUEST_MAGIC);
        rad_assert(*fun != NULL);
 
-       thread_pool.queue_head++;
-       thread_pool.queue_head &= (thread_pool.queue_size - 1);
-
        /*
-        *      FIXME: Check the request timestamp.  If it's more than
-        *      "clean_delay" seconds old, then discard the request,
-        *      log an error, and try to de-queue another request.
+        *      If the request has sat in the queue for too long,
+        *      kill it.
         *
-        *      The main clean-up code won't delete the request from
-        *      the request list, because it's not marked "finished"
+        *      The main clean-up code can't delete the request from
+        *      the queue, and therefore won't clean it up until we
+        *      have acknowledged it as "done".
         */
+       if (request->master_state == REQUEST_STOP_PROCESSING) {
+               request->child_state = REQUEST_DONE;
+               goto retry;
+       }
 
        /*
         *      The thread is currently processing a request.
         */
        thread_pool.active_threads++;
 
-       pthread_mutex_unlock(&thread_pool.mutex);
-
-       /*
-        *      If the request is currently being processed, then that
-        *      MAY be OK, if it's a proxy reply.  In that case,
-        *      sending the packet may result in a reply being
-        *      received before that thread clears the child_pid.
-        *
-        *      In that case, we busy-wait for the request to be free.
-        *
-        *      We COULD push it onto the queue and try to grab
-        *      another request, but what if this is the only request?
-        *      What if there are multiple such packets with race
-        *      conditions?  We don't want to thrash the queue...
-        *
-        *      This busy-wait is less than optimal, but it's simple,
-        *      fail-safe, and it works.
-        */
-       if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
-               int count, ok;
-               struct timeval tv;
-#ifdef HAVE_PTHREAD_SIGMASK
-               sigset_t set, old_set;
+       pthread_mutex_unlock(&thread_pool.queue_mutex);
 
-               /*
-                *      Block a large number of signals which could
-                *      cause the select to return EINTR
-                */
-               sigemptyset(&set);
-               sigaddset(&set, SIGPIPE);
-               sigaddset(&set, SIGCONT);
-               sigaddset(&set, SIGSTOP);
-               sigaddset(&set, SIGCHLD);
-               pthread_sigmask(SIG_BLOCK, &set, &old_set);
-#endif
-
-               rad_assert((*request)->proxy_reply != NULL);
-
-               ok = FALSE;
-
-               /*
-                *      Sleep for 100 milliseconds.  If the other thread
-                *      doesn't get serviced in this time, to clear
-                *      the "child_pid" entry, then the server is too
-                *      busy, so we die.
-                */
-               for (count = 0; count < 10; count++) {
-                       tv.tv_sec = 0;
-                       tv.tv_usec = 10000; /* sleep for 10 milliseconds */
-
-                       /*
-                        *      Portable sleep that's thread-safe.
-                        *
-                        *      Don't worry about interrupts, as they're
-                        *      blocked above.
-                        */
-                       select(0, NULL, NULL, NULL, &tv);
-                       if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
-                               ok = TRUE;
-                               break;
-                       }
-               }
-
-#ifdef HAVE_PTHREAD_SIGMASK
-               /*
-                *      Restore the original thread signal mask.
-                */
-               pthread_sigmask(SIG_SETMASK, &old_set, NULL);
-#endif
-
-               if (!ok) {
-                       radlog(L_ERR, "FATAL!  Server is too busy to process requests");
-                       exit(1);
-               }
-       }
-
-       return;
+       return 1;
 }
 
 
@@ -388,26 +421,6 @@ static void *request_handler_thread(void *arg)
 {
        RAD_REQUEST_FUNP  fun;
        THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
-#ifdef HAVE_PTHREAD_SIGMASK
-       sigset_t set;
-
-       /*
-        *      Block SIGHUP handling for the child threads.
-        *
-        *      This ensures that only the main server thread will
-        *      process HUP signals.
-        *
-        *      If we don't have sigprocmask, then it shouldn't be
-        *      a problem, either, as the sig_hup handler should check
-        *      for this condition.
-        */
-       sigemptyset(&set);
-       sigaddset(&set, SIGHUP);
-       sigaddset(&set, SIGINT);
-       sigaddset(&set, SIGQUIT);
-       sigaddset(&set, SIGTERM);
-       pthread_sigmask(SIG_BLOCK, &set, NULL);
-#endif
 
        /*
         *      Loop forever, until told to exit.
@@ -436,14 +449,20 @@ static void *request_handler_thread(void *arg)
 
                DEBUG2("Thread %d got semaphore", self->thread_num);
 
+#ifdef HAVE_OPENSSL_ERR_H
+               /*
+                *      Clear the error queue for the current thread.
+                */
+               ERR_clear_error ();
+#endif
+
                /*
                 *      Try to grab a request from the queue.
                 *
                 *      It may be empty, in which case we fail
                 *      gracefully.
                 */
-               request_dequeue(&self->request, &fun);
-               if (!self->request) continue;
+               if (!request_dequeue(&self->request, &fun)) continue;
 
                self->request->child_pid = self->pthread_id;
                self->request_count++;
@@ -452,35 +471,43 @@ static void *request_handler_thread(void *arg)
                       self->thread_num, self->request->number,
                       self->request_count);
 
-               /*
-                *      Respond, and reset request->child_pid
-                */
-               rad_respond(self->request, fun);
-               self->request = NULL;
+               radius_handle_request(self->request, fun);
 
                /*
                 *      Update the active threads.
                 */
-               pthread_mutex_lock(&thread_pool.mutex);
+               pthread_mutex_lock(&thread_pool.queue_mutex);
                rad_assert(thread_pool.active_threads > 0);
                thread_pool.active_threads--;
-               pthread_mutex_unlock(&thread_pool.mutex);
+               pthread_mutex_unlock(&thread_pool.queue_mutex);
        } while (self->status != THREAD_CANCELLED);
 
        DEBUG2("Thread %d exiting...", self->thread_num);
 
+#ifdef HAVE_OPENSSL_ERR_H
+       /*
+        *      If we linked with OpenSSL, the application
+        *      must remove the thread's error queue before
+        *      exiting to prevent memory leaks.
+        */
+       ERR_remove_state(0);
+#endif
+
        /*
         *  Do this as the LAST thing before exiting.
         */
+       self->request = NULL;
        self->status = THREAD_EXITED;
 
        return NULL;
 }
 
 /*
- *     Take a THREAD_HANDLE, and delete it from the thread pool.
+ *     Take a THREAD_HANDLE, delete it from the thread pool and
+ *     free its resources.
  *
- *     This function is called ONLY from the main server thread.
+ *     This function is called ONLY from the main server thread,
+ *     ONLY after the thread has exited.
  */
 static void delete_thread(THREAD_HANDLE *handle)
 {
@@ -489,6 +516,8 @@ static void delete_thread(THREAD_HANDLE *handle)
 
        rad_assert(handle->request == NULL);
 
+       DEBUG2("Deleting thread %d", handle->thread_num);
+
        prev = handle->prev;
        next = handle->next;
        rad_assert(thread_pool.total_threads > 0);
@@ -511,16 +540,8 @@ static void delete_thread(THREAD_HANDLE *handle)
                next->prev = prev;
        }
 
-       DEBUG2("Deleting thread %d", handle->thread_num);
-
-       /*
-        *      This thread has exited.  Delete any additional
-        *      resources associated with it.
-        */
-
        /*
-        *      Free the memory, now that we're sure the thread
-        *      exited.
+        *      Free the handle, now that it's no longer referencable.
         */
        free(handle);
 }
@@ -553,7 +574,6 @@ static THREAD_HANDLE *spawn_thread(time_t now)
        memset(handle, 0, sizeof(THREAD_HANDLE));
        handle->prev = NULL;
        handle->next = NULL;
-       handle->pthread_id = NO_SUCH_CHILD_PID;
        handle->thread_num = thread_pool.max_thread_num++;
        handle->request_count = 0;
        handle->status = THREAD_RUNNING;
@@ -579,9 +599,9 @@ static THREAD_HANDLE *spawn_thread(time_t now)
        rcode = pthread_create(&handle->pthread_id, &attr,
                        request_handler_thread, handle);
        if (rcode != 0) {
-               radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
+               radlog(L_ERR, "Thread create failed: %s",
                       strerror(rcode));
-               exit(1);
+               return NULL;
        }
        pthread_attr_destroy(&attr);
 
@@ -622,62 +642,103 @@ static THREAD_HANDLE *spawn_thread(time_t now)
  */
 int total_active_threads(void)
 {
-        int rcode = 0;
-       THREAD_HANDLE *handle;
+       /*
+        *      We don't acquire the mutex, so this is just an estimate.
+        *      We can't return with the lock held, so there's no point
+        *      in getting the guaranteed correct value; by the time
+        *      the caller sees it, it can be wrong again.
+        */
+       return thread_pool.active_threads;
+}
 
-       for (handle = thread_pool.head; handle != NULL; handle = handle->next){
-               if (handle->request != NULL) {
-                       rcode ++;
-               }
-       }
-       return (rcode);
+
+#ifdef WNOHANG
+static uint32_t pid_hash(const void *data)
+{
+       const thread_fork_t *tf = data;
+
+       return fr_hash(&tf->pid, sizeof(tf->pid));
 }
 
+static int pid_cmp(const void *one, const void *two)
+{
+       const thread_fork_t *a = one;
+       const thread_fork_t *b = two;
+
+       return (a->pid - b->pid);
+}
+#endif
+
 /*
  *     Allocate the thread pool, and seed it with an initial number
  *     of threads.
  *
  *     FIXME: What to do on a SIGHUP???
  */
-int thread_pool_init(void)
+int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
 {
        int             i, rcode;
        CONF_SECTION    *pool_cf;
        time_t          now;
 
-       DEBUG("Initializing the thread pool...");
        now = time(NULL);
 
+       rad_assert(spawn_flag != NULL);
+       rad_assert(*spawn_flag == TRUE);
+       rad_assert(pool_initialized == FALSE); /* not called on HUP */
+
+       pool_cf = cf_subsection_find_next(cs, NULL, "thread");
+       if (!pool_cf) *spawn_flag = FALSE;
+
        /*
-        *      After a SIGHUP, we don't over-write the previous values.
+        *      Initialize the thread pool to some reasonable values.
         */
-       if (!pool_initialized) {
-               /*
-                *      Initialize the thread pool to some reasonable values.
-                */
-               memset(&thread_pool, 0, sizeof(THREAD_POOL));
-               thread_pool.head = NULL;
-               thread_pool.tail = NULL;
-               thread_pool.total_threads = 0;
-               thread_pool.max_thread_num = 1;
-               thread_pool.cleanup_delay = 5;
+       memset(&thread_pool, 0, sizeof(THREAD_POOL));
+       thread_pool.head = NULL;
+       thread_pool.tail = NULL;
+       thread_pool.total_threads = 0;
+       thread_pool.max_thread_num = 1;
+       thread_pool.cleanup_delay = 5;
+       thread_pool.spawn_flag = *spawn_flag;
+       
+       /*
+        *      Don't bother initializing the mutexes or
+        *      creating the hash tables.  They won't be used.
+        */
+       if (!*spawn_flag) return 0;
+       
+#ifdef WNOHANG
+       if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
+               radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
+                      strerror(errno));
+               return -1;
        }
+       
+       /*
+        *      Create the hash table of child PID's
+        */
+       thread_pool.waiters = fr_hash_table_create(pid_hash,
+                                                  pid_cmp,
+                                                  free);
+       if (!thread_pool.waiters) {
+               radlog(L_ERR, "FATAL: Failed to set up wait hash");
+               return -1;
+       }
+#endif
 
-       pool_cf = cf_section_find("thread");
-       if (pool_cf != NULL) {
-               cf_section_parse(pool_cf, NULL, thread_config);
+       if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
+               return -1;
        }
 
        /*
-        *      Limit the maximum number of threads to the maximum
-        *      number of forks we can do.
-        *
-        *      FIXME: Make this code better...
+        *      Catch corner cases.
         */
-       if (thread_pool.max_threads >= NUM_FORKERS) {
-               thread_pool.max_threads = NUM_FORKERS;
-       }
-
+       if (thread_pool.min_spare_threads < 1)
+               thread_pool.min_spare_threads = 1;
+       if (thread_pool.max_spare_threads < 1)
+               thread_pool.max_spare_threads = 1;
+       if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
+               thread_pool.max_spare_threads = thread_pool.min_spare_threads;
 
        /*
         *      The pool has already been initialized.  Don't spawn
@@ -690,31 +751,43 @@ int thread_pool_init(void)
        /*
         *      Initialize the queue of requests.
         */
+       memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
        rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
        if (rcode != 0) {
-               radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
+               radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
                       strerror(errno));
-               exit(1);
+               return -1;
        }
 
-       rcode = pthread_mutex_init(&thread_pool.mutex,NULL);
+       rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
        if (rcode != 0) {
-               radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
+               radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
                       strerror(errno));
-               exit(1);
+               return -1;
        }
 
        /*
-        *      Queue head & tail are set to zero by the memset,
-        *      above.
-        *
-        *      Allocate an initial queue, always as a power of 2.
+        *      Allocate multiple fifos.
         */
-       thread_pool.queue_size = 256;
-       thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) *
-                                      thread_pool.queue_size);
-       memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) *
-                                     thread_pool.queue_size));
+       for (i = 0; i < RAD_LISTEN_MAX; i++) {
+               thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
+               if (!thread_pool.fifo[i]) {
+                       radlog(L_ERR, "FATAL: Failed to set up request fifo");
+                       return -1;
+               }
+       }
+
+#ifdef HAVE_OPENSSL_CRYPTO_H
+       /*
+        *      If we're linking with OpenSSL too, then we need
+        *      to set up the mutexes and enable the thread callbacks.
+        */
+       if (!setup_ssl_mutexes()) {
+               radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
+               return -1;
+       }
+#endif
+
 
        /*
         *      Create a number of waiting threads.
@@ -741,23 +814,41 @@ int thread_pool_init(void)
  */
 int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
 {
+       time_t now = request->timestamp;
+
+       request->process = fun;
+
        /*
-        *      If the thread pool is busy handling requests, then
-        *      try to spawn another one.
+        *      We've been told not to spawn threads, so don't.
         */
-       if (thread_pool.active_threads == thread_pool.total_threads) {
-               if (spawn_thread(request->timestamp) == NULL) {
-                       radlog(L_INFO,
-                              "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
-                              thread_pool.max_threads);
-                       return 0;
-               }
+       if (!thread_pool.spawn_flag) {
+               radius_handle_request(request, fun);
+
+#ifdef WNOHANG
+               /*
+                *      Requests that care about child process exit
+                *      codes have already either called
+                *      rad_waitpid(), or they've given up.
+                */
+               wait(NULL);
+#endif
+               return 1;
        }
 
        /*
         *      Add the new request to the queue.
         */
-       request_enqueue(request, fun);
+       if (!request_enqueue(request, fun)) return 0;
+
+       /*
+        *      If we haven't checked the number of child threads
+        *      in a while, OR if the thread pool appears to be full,
+        *      go manage it.
+        */
+       if ((last_cleaned < now) ||
+           (thread_pool.active_threads == thread_pool.total_threads)) {
+               thread_pool_manage(now);
+       }
 
        return 1;
 }
@@ -768,32 +859,16 @@ int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
  *     If there are too many or too few threads waiting, then we
  *     either create some more, or delete some.
  */
-int thread_pool_clean(time_t now)
+static void thread_pool_manage(time_t now)
 {
        int spare;
        int i, total;
        THREAD_HANDLE *handle, *next;
        int active_threads;
-       static time_t last_cleaned = 0;
-
-       /*
-        *      Loop over the thread pool deleting exited threads.
-        */
-       for (handle = thread_pool.head; handle; handle = next) {
-               next = handle->next;
-
-               /*
-                *      Maybe we've asked the thread to exit, and it
-                *      has agreed.
-                */
-               if (handle->status == THREAD_EXITED) {
-                       delete_thread(handle);
-               }
-       }
 
        /*
         *      We don't need a mutex lock here, as we're reading
-        *      the location, and not modifying it.  We want a close
+        *      active_threads, and not modifying it.  We want a close
         *      approximation of the number of active threads, and this
         *      is good enough.
         */
@@ -813,26 +888,24 @@ int thread_pool_clean(time_t now)
        }
 
        /*
-        *      If there are too few spare threads, create some more.
+        *      If there are too few spare threads.  Go create some more.
         */
        if (spare < thread_pool.min_spare_threads) {
                total = thread_pool.min_spare_threads - spare;
 
                DEBUG2("Threads: Spawning %d spares", total);
+
                /*
                 *      Create a number of spare threads.
                 */
                for (i = 0; i < total; i++) {
                        handle = spawn_thread(now);
                        if (handle == NULL) {
-                               return -1;
+                               return;
                        }
                }
 
-               /*
-                *      And exit, as there can't be too many spare threads.
-                */
-               return 0;
+               return;         /* there aren't too many spare threads */
        }
 
        /*
@@ -840,17 +913,32 @@ int thread_pool_clean(time_t now)
         *      so this second.
         */
        if (now == last_cleaned) {
-               return 0;
+               return;
        }
        last_cleaned = now;
 
        /*
+        *      Loop over the thread pool, deleting exited threads.
+        */
+       for (handle = thread_pool.head; handle; handle = next) {
+               next = handle->next;
+
+               /*
+                *      Maybe we've asked the thread to exit, and it
+                *      has agreed.
+                */
+               if (handle->status == THREAD_EXITED) {
+                       delete_thread(handle);
+               }
+       }
+
+       /*
         *      Only delete the spare threads if sufficient time has
         *      passed since we last created one.  This helps to minimize
         *      the amount of create/delete cycles.
         */
        if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
-               return 0;
+               return;
        }
 
        /*
@@ -920,301 +1008,128 @@ int thread_pool_clean(time_t now)
         *      Otherwise everything's kosher.  There are not too few,
         *      or too many spare threads.  Exit happily.
         */
-       return 0;
-}
-
-static int exec_initialized = FALSE;
-
-/*
- *     Initialize the stuff for keeping track of child processes.
- */
-void rad_exec_init(void)
-{
-       int i;
-
-       /*
-        *      Initialize the mutex used to remember calls to fork.
-        */
-       pthread_mutex_init(&fork_mutex, NULL);
-
-       /*
-        *      Initialize the data structure where we remember the
-        *      mappings of thread ID && child PID to exit status.
-        */
-       for (i = 0; i < NUM_FORKERS; i++) {
-               forkers[i].thread_id = NO_SUCH_CHILD_PID;
-               forkers[i].child_pid = -1;
-               forkers[i].status = 0;
-       }
-
-       exec_initialized = TRUE;
+       return;
 }
 
-/*
- *     We use the PID number as a base for the array index, so that
- *     we can quickly turn the PID into a free array entry, instead
- *     of rooting blindly through the entire array.
- */
-#define PID_2_ARRAY(pid) (((int) pid ) & (NUM_FORKERS - 1))
 
+#ifdef WNOHANG
 /*
  *     Thread wrapper for fork().
  */
-pid_t rad_fork(int exec_wait)
+pid_t rad_fork(void)
 {
-       sigset_t set;
        pid_t child_pid;
 
-       /*
-        *      The thread is NOT interested in waiting for the exit
-        *      status of the child process, so we don't bother
-        *      updating our kludgy array.
-        *
-        *      Or, there no NO threads, so we can just do the fork
-        *      thing.
-        */
-       if (!exec_wait || !exec_initialized) {
-               return fork();
-       }
+       if (!pool_initialized) return fork();
 
-       /*
-        *      Block SIGCLHD until such time as we've saved the PID.
-        *
-        *      Note that we block SIGCHLD for ALL threads associated
-        *      with this process!  This is to prevent race conditions!
-        */
-       sigemptyset(&set);
-       sigaddset(&set, SIGCHLD);
-       sigprocmask(SIG_BLOCK, &set, NULL);
+       reap_children();        /* be nice to non-wait thingies */
 
-       /*
-        *      Do the fork.
-        */
-       child_pid = fork();
+       if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
+               return -1;
+       }
 
        /*
-        *      We managed to fork.  Let's see if we have a free
-        *      array entry.
+        *      Fork & save the PID for later reaping.
         */
-       if (child_pid > 0) { /* parent */
-               int i;
-               int found;
-               time_t now = time(NULL);
-
-               /*
-                *      We store the information in the array
-                *      indexed by PID.  This means that we have
-                *      on average an O(1) lookup to find the element,
-                *      instead of rooting through the entire array.
-                */
-               i = PID_2_ARRAY(child_pid);
-               found = -1;
+       child_pid = fork();
+       if (child_pid > 0) {
+               int rcode;
+               thread_fork_t *tf;
 
-               /*
-                *      We may have multiple threads trying to find an
-                *      empty position, so we lock the array until
-                *      we've found an entry.
-                */
-               pthread_mutex_lock(&fork_mutex);
-               do {
-                       if (forkers[i].thread_id == NO_SUCH_CHILD_PID) {
-                               found = i;
-                               break;
-                       }
+               tf = rad_malloc(sizeof(*tf));
+               memset(tf, 0, sizeof(*tf));
 
-                       /*
-                        *      Clean up any stale forked sessions.
-                        *
-                        *      This sometimes happens, for crazy reasons.
-                        */
-                       if ((now - forkers[i].time_forked) > 30) {
-                               forkers[i].thread_id = NO_SUCH_CHILD_PID;
+               tf->pid = child_pid;
 
-                               /*
-                                *      Grab the child's exit condition,
-                                *      just in case...
-                                */
-                               waitpid(forkers[i].child_pid,
-                                       &forkers[i].status,
-                                       WNOHANG);
-                               sem_destroy(&forkers[i].child_done);
-                               found = i;
-                               break;
-                       }
+               pthread_mutex_lock(&thread_pool.wait_mutex);
+               rcode = fr_hash_table_insert(thread_pool.waiters, tf);
+               pthread_mutex_unlock(&thread_pool.wait_mutex);
 
-                       /*
-                        *  Increment it, within the array.
-                        */
-                       i++;
-                       i &= (NUM_FORKERS - 1);
-               } while (i != PID_2_ARRAY(child_pid));
-
-               /*
-                *      Arg.  We did a fork, and there was nowhere to
-                *      put the answer.
-                */
-               if (found < 0) {
-                       sigprocmask(SIG_UNBLOCK, &set, NULL);
-                       pthread_mutex_unlock(&fork_mutex);
-                       return (pid_t) -1;
+               if (!rcode) {
+                       radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
+                              (int) child_pid);
+                       free(tf);
                }
-
-               /*
-                *      In the parent, set the status, and create the
-                *      semaphore.
-                */
-               forkers[found].status = -1;
-               forkers[found].child_pid = child_pid;
-               forkers[found].thread_id = pthread_self();
-               forkers[found].time_forked = now;
-               sem_init(&forkers[found].child_done, 0, SEMAPHORE_LOCKED);
-               pthread_mutex_unlock(&fork_mutex);
        }
 
        /*
-        *      Unblock SIGCHLD, now that there's no chance of bad entries
-        *      in the array.
-        */
-       sigprocmask(SIG_UNBLOCK, &set, NULL);
-
-       /*
         *      Return whatever we were told.
         */
        return child_pid;
 }
 
+
 /*
- *     Thread wrapper for waitpid(), so threads can wait for
- *     the PID they forked.
+ *     Wait 10 seconds at most for a child to exit, then give up.
  */
-pid_t rad_waitpid(pid_t pid, int *status, int options)
+pid_t rad_waitpid(pid_t pid, int *status)
 {
-       int i, rcode;
-       int found;
-       pthread_t self = pthread_self();
+       int i;
+       thread_fork_t mytf, *tf;
 
-       /*
-        *      We're only allowed to wait for a SPECIFIC pid.
-        */
-       if (pid <= 0) {
-               return -1;
-       }
+       if (!pool_initialized) return waitpid(pid, status, 0);
 
-       /*
-        *      Find the PID to wait for, starting at an index within
-        *      the array.  This makes the lookups O(1) on average,
-        *      instead of O(n), when the array is filling up.
-        */
-       found = -1;
-       i = PID_2_ARRAY(pid);
-       do {
-               /*
-                *      We were the ones who forked this specific
-                *      child.
-                */
-               if ((forkers[i].thread_id == self) &&
-                   (forkers[i].child_pid == pid)) {
-                       found = i;
-                       break;
-               }
+       if (pid <= 0) return -1;
 
-               i++;
-               i &= (NUM_FORKERS - 1);
-       } while (i != PID_2_ARRAY(pid));
+       mytf.pid = pid;
 
-       /*
-        *      No thread ID found: we're trying to wait for a child
-        *      we've never forked!
-        */
-       if (found < 0) {
-               return -1;
-       }
+       pthread_mutex_lock(&thread_pool.wait_mutex);
+       tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+       pthread_mutex_unlock(&thread_pool.wait_mutex);
 
-       /*
-        *      Wait for the signal that the child's status has been
-        *      returned.
-        */
-       if (options == WNOHANG) {
-               rcode = sem_trywait(&forkers[found].child_done);
-               if (rcode != 0) {
-                       return 0; /* no child available */
-               }
-       } else {                /* wait forever */
-       re_wait:
-               rcode = sem_wait(&forkers[found].child_done);
-               if ((rcode != 0) && (errno == EINTR)) {
-                       goto re_wait;
-               }
-       }
+       if (!tf) return -1;
 
-       /*
-        *      We've got the semaphore.  Now destroy it.
-        *
-        *      FIXME: Maybe we want to set up the semaphores in advance,
-        *      to prevent the creation && deletion of lots of them,
-        *      if creating and deleting them is expensive.
-        */
-       sem_destroy(&forkers[found].child_done);
+       for (i = 0; i < 100; i++) {
+               reap_children();
 
-       /*
-        *      Save the status BEFORE we re-set the thread ID.
-        */
-       *status = forkers[found].status;
+               if (tf->exited) {
+                       *status = tf->status;
+
+                       pthread_mutex_lock(&thread_pool.wait_mutex);
+                       fr_hash_table_delete(thread_pool.waiters, &mytf);
+                       pthread_mutex_unlock(&thread_pool.wait_mutex);
+                       return pid;
+               }
+               usleep(100000); /* sleep for 1/10 of a second */
+       }
 
        /*
-        *      This next line taints the other array entries,
-        *      due to other threads re-using the data structure.
+        *      10 seconds have passed, give up on the child.
         */
-       forkers[found].thread_id = NO_SUCH_CHILD_PID;
+       pthread_mutex_lock(&thread_pool.wait_mutex);
+       fr_hash_table_delete(thread_pool.waiters, &mytf);
+       pthread_mutex_unlock(&thread_pool.wait_mutex);
 
-       return pid;
+       return 0;
 }
-
+#else
 /*
- *     Called by the main signal handler, to save the status of the child
+ *     No rad_fork or rad_waitpid
  */
-int rad_savepid(pid_t pid, int status)
+#endif
+
+void thread_pool_lock(void)
 {
-       int i;
+       pthread_mutex_lock(&thread_pool.queue_mutex);
+}
 
-       /*
-        *      Find the PID to wait for, starting at an index within
-        *      the array.  This makes the lookups O(1) on average,
-        *      instead of O(n), when the array is filling up.
-        */
-       i = PID_2_ARRAY(pid);
+void thread_pool_unlock(void)
+{
+       pthread_mutex_unlock(&thread_pool.queue_mutex);
+}
 
-       /*
-        *      Do NOT lock the array, as nothing else sets the
-        *      status and posts the semaphore.
-        */
-       do {
-               /*
-                *      Any thread can get the sigchild...
-                */
-               if ((forkers[i].thread_id != NO_SUCH_CHILD_PID) &&
-                   (forkers[i].child_pid == pid)) {
-                       /*
-                        *      Save the status, THEN post the
-                        *      semaphore.
-                        */
-                       forkers[i].status = status;
-                       sem_post(&forkers[i].child_done);
+void thread_pool_queue_stats(int *array)
+{
+       int i;
 
-                       /*
-                        *      FIXME: If the child is more than 60
-                        *      seconds out of date, then delete it.
-                        *
-                        *      That is, we've forked, and the forker
-                        *      is waiting nearly forever
-                        */
-                       return 0;
+       if (pool_initialized) {
+               for (i = 0; i < RAD_LISTEN_MAX; i++) {
+                       array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
                }
-
-               i++;
-               i &= (NUM_FORKERS - 1);
-       } while (i != PID_2_ARRAY(pid));
-
-       return -1;
+       } else {
+               for (i = 0; i < RAD_LISTEN_MAX; i++) {
+                       array[i] = 0;
+               }
+       }
 }
-#endif
+#endif /* HAVE_PTHREAD_H */