Do waitpid() for ALL children, even if there are no threads waiting
[freeradius.git] / src / main / threads.c
index b35709d..130fe9e 100644 (file)
@@ -96,14 +96,6 @@ typedef struct THREAD_HANDLE {
        REQUEST              *request;
 } THREAD_HANDLE;
 
-/*
- *     For the request queue.
- */
-typedef struct request_queue_t {
-       REQUEST           *request;
-       RAD_REQUEST_FUNP  fun;
-} request_queue_t;
-
 typedef struct thread_fork_t {
        pid_t           pid;
        int             status;
@@ -133,8 +125,10 @@ typedef struct THREAD_POOL {
        int cleanup_delay;
        int spawn_flag;
 
+#ifdef WNOHANG
        pthread_mutex_t wait_mutex;
-       lrad_hash_table_t *waiters;
+       fr_hash_table_t *waiters;
+#endif
 
        /*
         *      All threads wait on this semaphore, for requests
@@ -149,8 +143,7 @@ typedef struct THREAD_POOL {
 
        int             max_queue_size;
        int             num_queued;
-       int             can_read_detail;
-       lrad_fifo_t     *fifo[NUM_FIFOS];
+       fr_fifo_t       *fifo[NUM_FIFOS];
 } THREAD_POOL;
 
 static THREAD_POOL thread_pool;
@@ -235,7 +228,7 @@ static int setup_ssl_mutexes(void)
 }
 #endif
 
-
+#ifdef WNOHANG
 /*
  *     We don't want to catch SIGCHLD for a host of reasons.
  *
@@ -262,19 +255,23 @@ static void reap_children(void)
        pthread_mutex_lock(&thread_pool.wait_mutex);
 
        do {
+       retry:
                pid = waitpid(0, &status, WNOHANG);
                if (pid <= 0) break;
 
                mytf.pid = pid;
-               tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
-               if (!tf) continue;
+               tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+               if (!tf) goto retry;
 
                tf->status = status;
                tf->exited = 1;
-       } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
+       } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
 
        pthread_mutex_unlock(&thread_pool.wait_mutex);
 }
+#else
+#define reap_children()
+#endif /* WNOHANG */
 
 /*
  *     Add a request to the list of waiting requests.
@@ -284,8 +281,7 @@ static void reap_children(void)
  */
 static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
 {
-       request_queue_t *entry;
-
+       rad_assert(request->process == fun);
        pthread_mutex_lock(&thread_pool.queue_mutex);
 
        thread_pool.request_count++;
@@ -301,30 +297,16 @@ static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
                return 0;
        }
 
-       entry = rad_malloc(sizeof(*entry));
-       entry->request = request;
-       entry->fun = fun;
-
        /*
         *      Push the request onto the appropriate fifo for that
         */
-       if (!lrad_fifo_push(thread_pool.fifo[request->priority],
-                           entry)) {
+       if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
                pthread_mutex_unlock(&thread_pool.queue_mutex);
                radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
                request->child_state = REQUEST_DONE;
                return 0;
        }
 
-       /*
-        *      We've added an entry that didn't come from the detail
-        *      file.  Note that the child thread should signal the
-        *      main worker thread again when the queue becomes empty.
-        */
-       if (request->listener->type != RAD_LISTEN_DETAIL) {
-               thread_pool.can_read_detail = FALSE;
-       }
-
        thread_pool.num_queued++;
 
        pthread_mutex_unlock(&thread_pool.queue_mutex);
@@ -346,10 +328,10 @@ static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
 /*
  *     Remove a request from the queue.
  */
-static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
+static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
 {
        RAD_LISTEN_TYPE i, start;
-       request_queue_t *entry;
+       REQUEST *request = NULL;
 
        reap_children();
 
@@ -364,17 +346,18 @@ static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
         *      requests will be quickly cleared.
         */
        for (i = 0; i < RAD_LISTEN_MAX; i++) {
-               entry = lrad_fifo_peek(thread_pool.fifo[i]);
-               if (!entry ||
-                   (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
+               request = fr_fifo_peek(thread_pool.fifo[i]);
+               if (!request ||
+                   (request->master_state != REQUEST_STOP_PROCESSING)) {
                        continue;
 }
                /*
                 *      This entry was marked to be stopped.  Acknowledge it.
                 */
-               entry = lrad_fifo_pop(thread_pool.fifo[i]);
-               rad_assert(entry != NULL);
-               entry->request->child_state = REQUEST_DONE;
+               request = fr_fifo_pop(thread_pool.fifo[i]);
+               rad_assert(request != NULL);
+               request->child_state = REQUEST_DONE;
+               thread_pool.num_queued--;
        }
 
        start = 0;
@@ -383,28 +366,26 @@ static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
         *      Pop results from the top of the queue
         */
        for (i = start; i < RAD_LISTEN_MAX; i++) {
-               entry = lrad_fifo_pop(thread_pool.fifo[i]);
-               if (entry) {
+               request = fr_fifo_pop(thread_pool.fifo[i]);
+               if (request) {
                        start = i;
                        break;
                }
        }
 
-       if (!entry) {
+       if (!request) {
                pthread_mutex_unlock(&thread_pool.queue_mutex);
-               *request = NULL;
+               *prequest = NULL;
                *fun = NULL;
                return 0;
        }
 
        rad_assert(thread_pool.num_queued > 0);
        thread_pool.num_queued--;
-       *request = entry->request;
-       *fun = entry->fun;
-       free(entry);
+       *prequest = request;
+       *fun = request->process;
 
-       rad_assert(*request != NULL);
-       rad_assert((*request)->magic == REQUEST_MAGIC);
+       rad_assert(request->magic == REQUEST_MAGIC);
        rad_assert(*fun != NULL);
 
        /*
@@ -415,8 +396,8 @@ static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
         *      the queue, and therefore won't clean it up until we
         *      have acknowledged it as "done".
         */
-       if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
-               (*request)->child_state = REQUEST_DONE;
+       if (request->master_state == REQUEST_STOP_PROCESSING) {
+               request->child_state = REQUEST_DONE;
                goto retry;
        }
 
@@ -445,8 +426,6 @@ static void *request_handler_thread(void *arg)
         *      Loop forever, until told to exit.
         */
        do {
-               int can_read_detail;
-
                /*
                 *      Wait to be signalled.
                 */
@@ -470,6 +449,13 @@ static void *request_handler_thread(void *arg)
 
                DEBUG2("Thread %d got semaphore", self->thread_num);
 
+#ifdef HAVE_OPENSSL_ERR_H
+               /*
+                *      Clear the error queue for the current thread.
+                */
+               ERR_clear_error ();
+#endif
+
                /*
                 *      Try to grab a request from the queue.
                 *
@@ -493,29 +479,7 @@ static void *request_handler_thread(void *arg)
                pthread_mutex_lock(&thread_pool.queue_mutex);
                rad_assert(thread_pool.active_threads > 0);
                thread_pool.active_threads--;
-
-               /*
-                *      If we're not currently allowed to read the
-                *      detail file, AND there are no requests queued,
-                *      THEN signal the main worker thread that
-                *      there's at least one waiting thread (us) who
-                *      can accept a packet from the detail file.
-                */
-               can_read_detail = FALSE;
-               if (!thread_pool.can_read_detail &&
-                   (thread_pool.num_queued == 0)) {
-                       can_read_detail = TRUE;
-               }
-
                pthread_mutex_unlock(&thread_pool.queue_mutex);
-
-               /*
-                *      Do this out of the lock to be nice to everyone.
-                */
-               if (can_read_detail) {
-                       radius_signal_self(RADIUS_SIGNAL_SELF_DETAIL);
-               }
-
        } while (self->status != THREAD_CANCELLED);
 
        DEBUG2("Thread %d exiting...", self->thread_num);
@@ -610,7 +574,6 @@ static THREAD_HANDLE *spawn_thread(time_t now)
        memset(handle, 0, sizeof(THREAD_HANDLE));
        handle->prev = NULL;
        handle->next = NULL;
-       handle->pthread_id = NO_SUCH_CHILD_PID;
        handle->thread_num = thread_pool.max_thread_num++;
        handle->request_count = 0;
        handle->status = THREAD_RUNNING;
@@ -689,11 +652,12 @@ int total_active_threads(void)
 }
 
 
+#ifdef WNOHANG
 static uint32_t pid_hash(const void *data)
 {
        const thread_fork_t *tf = data;
 
-       return lrad_hash(&tf->pid, sizeof(tf->pid));
+       return fr_hash(&tf->pid, sizeof(tf->pid));
 }
 
 static int pid_cmp(const void *one, const void *two)
@@ -703,6 +667,7 @@ static int pid_cmp(const void *one, const void *two)
 
        return (a->pid - b->pid);
 }
+#endif
 
 /*
  *     Allocate the thread pool, and seed it with an initial number
@@ -710,7 +675,7 @@ static int pid_cmp(const void *one, const void *two)
  *
  *     FIXME: What to do on a SIGHUP???
  */
-int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
+int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
 {
        int             i, rcode;
        CONF_SECTION    *pool_cf;
@@ -718,50 +683,48 @@ int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
 
        now = time(NULL);
 
+       rad_assert(spawn_flag != NULL);
+       rad_assert(*spawn_flag == TRUE);
+       rad_assert(pool_initialized == FALSE); /* not called on HUP */
+
+       pool_cf = cf_subsection_find_next(cs, NULL, "thread");
+       if (!pool_cf) *spawn_flag = FALSE;
+
        /*
-        *      We're not spawning new threads, don't do
-        *      anything.
+        *      Initialize the thread pool to some reasonable values.
         */
-       if (!spawn_flag) return 0;
-
+       memset(&thread_pool, 0, sizeof(THREAD_POOL));
+       thread_pool.head = NULL;
+       thread_pool.tail = NULL;
+       thread_pool.total_threads = 0;
+       thread_pool.max_thread_num = 1;
+       thread_pool.cleanup_delay = 5;
+       thread_pool.spawn_flag = *spawn_flag;
+       
        /*
-        *      After a SIGHUP, we don't over-write the previous values.
+        *      Don't bother initializing the mutexes or
+        *      creating the hash tables.  They won't be used.
         */
-       if (!pool_initialized) {
-               /*
-                *      Initialize the thread pool to some reasonable values.
-                */
-               memset(&thread_pool, 0, sizeof(THREAD_POOL));
-               thread_pool.head = NULL;
-               thread_pool.tail = NULL;
-               thread_pool.total_threads = 0;
-               thread_pool.max_thread_num = 1;
-               thread_pool.cleanup_delay = 5;
-               thread_pool.spawn_flag = spawn_flag;
-
-               if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
-                       radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
-                              strerror(errno));
-                       return -1;
-               }
-
-               /*
-                *      Create the hash table of child PID's
-                */
-               thread_pool.waiters = lrad_hash_table_create(pid_hash,
-                                                            pid_cmp,
-                                                            free);
-               if (!thread_pool.waiters) {
-                       radlog(L_ERR, "FATAL: Failed to set up wait hash");
-                       return -1;
-               }
+       if (!*spawn_flag) return 0;
+       
+#ifdef WNOHANG
+       if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
+               radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
+                      strerror(errno));
+               return -1;
        }
-
-       pool_cf = cf_subsection_find_next(cs, NULL, "thread");
-       if (!pool_cf) {
-               radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
+       
+       /*
+        *      Create the hash table of child PID's
+        */
+       thread_pool.waiters = fr_hash_table_create(pid_hash,
+                                                  pid_cmp,
+                                                  free);
+       if (!thread_pool.waiters) {
+               radlog(L_ERR, "FATAL: Failed to set up wait hash");
                return -1;
        }
+#endif
 
        if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
                return -1;
@@ -807,7 +770,7 @@ int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
         *      Allocate multiple fifos.
         */
        for (i = 0; i < RAD_LISTEN_MAX; i++) {
-               thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
+               thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
                if (!thread_pool.fifo[i]) {
                        radlog(L_ERR, "FATAL: Failed to set up request fifo");
                        return -1;
@@ -853,18 +816,22 @@ int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
 {
        time_t now = request->timestamp;
 
+       request->process = fun;
+
        /*
         *      We've been told not to spawn threads, so don't.
         */
        if (!thread_pool.spawn_flag) {
                radius_handle_request(request, fun);
 
+#ifdef WNOHANG
                /*
                 *      Requests that care about child process exit
                 *      codes have already either called
                 *      rad_waitpid(), or they've given up.
                 */
                wait(NULL);
+#endif
                return 1;
        }
 
@@ -1045,6 +1012,7 @@ static void thread_pool_manage(time_t now)
 }
 
 
+#ifdef WNOHANG
 /*
  *     Thread wrapper for fork().
  */
@@ -1056,7 +1024,7 @@ pid_t rad_fork(void)
 
        reap_children();        /* be nice to non-wait thingies */
 
-       if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
+       if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
                return -1;
        }
 
@@ -1074,12 +1042,13 @@ pid_t rad_fork(void)
                tf->pid = child_pid;
 
                pthread_mutex_lock(&thread_pool.wait_mutex);
-               rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
+               rcode = fr_hash_table_insert(thread_pool.waiters, tf);
                pthread_mutex_unlock(&thread_pool.wait_mutex);
 
                if (!rcode) {
                        radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
                               (int) child_pid);
+                       free(tf);
                }
        }
 
@@ -1105,7 +1074,7 @@ pid_t rad_waitpid(pid_t pid, int *status)
        mytf.pid = pid;
 
        pthread_mutex_lock(&thread_pool.wait_mutex);
-       tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
+       tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
        pthread_mutex_unlock(&thread_pool.wait_mutex);
 
        if (!tf) return -1;
@@ -1117,7 +1086,7 @@ pid_t rad_waitpid(pid_t pid, int *status)
                        *status = tf->status;
 
                        pthread_mutex_lock(&thread_pool.wait_mutex);
-                       lrad_hash_table_delete(thread_pool.waiters, &mytf);
+                       fr_hash_table_delete(thread_pool.waiters, &mytf);
                        pthread_mutex_unlock(&thread_pool.wait_mutex);
                        return pid;
                }
@@ -1128,11 +1097,16 @@ pid_t rad_waitpid(pid_t pid, int *status)
         *      10 seconds have passed, give up on the child.
         */
        pthread_mutex_lock(&thread_pool.wait_mutex);
-       lrad_hash_table_delete(thread_pool.waiters, &mytf);
+       fr_hash_table_delete(thread_pool.waiters, &mytf);
        pthread_mutex_unlock(&thread_pool.wait_mutex);
 
        return 0;
 }
+#else
+/*
+ *     No rad_fork or rad_waitpid
+ */
+#endif
 
 void thread_pool_lock(void)
 {
@@ -1143,4 +1117,19 @@ void thread_pool_unlock(void)
 {
        pthread_mutex_unlock(&thread_pool.queue_mutex);
 }
+
+void thread_pool_queue_stats(int *array)
+{
+       int i;
+
+       if (pool_initialized) {
+               for (i = 0; i < RAD_LISTEN_MAX; i++) {
+                       array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
+               }
+       } else {
+               for (i = 0; i < RAD_LISTEN_MAX; i++) {
+                       array[i] = 0;
+               }
+       }
+}
 #endif /* HAVE_PTHREAD_H */