*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*
- * Copyright 2000 The FreeRADIUS server project
+ * Copyright 2000,2006 The FreeRADIUS server project
* Copyright 2000 Alan DeKok <aland@ox.org>
*/
-#include "autoconf.h"
+#include <freeradius-devel/ident.h>
+RCSID("$Id$")
-#include <stdlib.h>
-#include <string.h>
+#include <freeradius-devel/radiusd.h>
+#include <freeradius-devel/rad_assert.h>
/*
* Other OS's have sem_init, OS X doesn't.
*/
-#ifndef DARWIN
+#ifdef HAVE_SEMAPHORE_H
#include <semaphore.h>
-#else
+#endif
+
+#ifdef DARWIN
#include <mach/task.h>
#include <mach/semaphore.h>
#define sem_post(s) semaphore_signal(*s)
#endif
-#include <signal.h>
-
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
-#include "radiusd.h"
-#include "rad_assert.h"
-#include "conffile.h"
-#include "modules.h"
-
-static const char rcsid[] =
-"$Id$";
-
#ifdef HAVE_PTHREAD_H
#ifdef HAVE_OPENSSL_CRYPTO_H
#ifdef HAVE_OPENSSL_ERR_H
#include <openssl/err.h>
#endif
+#ifdef HAVE_OPENSSL_EVP_H
+#include <openssl/evp.h>
+#endif
#define SEMAPHORE_LOCKED (0)
#define SEMAPHORE_UNLOCKED (1)
#define THREAD_CANCELLED (2)
#define THREAD_EXITED (3)
+#define NUM_FIFOS RAD_LISTEN_MAX
+
+
/*
* A data structure which contains the information about
* the current thread.
REQUEST *request;
} THREAD_HANDLE;
-/*
- * For the request queue.
- */
-typedef struct request_queue_t {
- REQUEST *request;
- RAD_REQUEST_FUNP fun;
-} request_queue_t;
-
+typedef struct thread_fork_t {
+ pid_t pid;
+ int status;
+ int exited;
+} thread_fork_t;
-#define MAX_WAITERS (256)
/*
* A data structure to manage the thread pool. There's no real
THREAD_HANDLE *tail;
int total_threads;
- int active_threads;
+ int active_threads; /* protected by queue_mutex */
int max_thread_num;
int start_threads;
int max_threads;
int cleanup_delay;
int spawn_flag;
- int wait_head;
- int wait_tail;
+#ifdef WNOHANG
pthread_mutex_t wait_mutex;
- pid_t wait[MAX_WAITERS];
+ fr_hash_table_t *waiters;
+#endif
/*
* All threads wait on this semaphore, for requests
pthread_mutex_t queue_mutex;
int max_queue_size;
- uint32_t queue_head; /* first filled entry */
- uint32_t queue_tail; /* first empty entry */
- lrad_hash_table_t *queue;
+ int num_queued;
+ fr_fifo_t *fifo[NUM_FIFOS];
} THREAD_POOL;
static THREAD_POOL thread_pool;
static int pool_initialized = FALSE;
+static time_t last_cleaned = 0;
+static void thread_pool_manage(time_t now);
/*
* A mapping of configuration file names to internal integers
{
int i;
+#ifdef HAVE_OPENSSL_EVP_H
+ /*
+ * Enable all ciphers and digests.
+ */
+ OpenSSL_add_all_algorithms();
+#endif
+
ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
if (!ssl_mutexes) {
radlog(L_ERR, "Error allocating memory for SSL mutexes!");
}
#endif
-
+#ifdef WNOHANG
/*
* We don't want to catch SIGCHLD for a host of reasons.
*
*/
static void reap_children(void)
{
- if (thread_pool.wait_head != thread_pool.wait_tail) {
- int num;
-
- pthread_mutex_lock(&thread_pool.wait_mutex);
- for (num = ((thread_pool.wait_tail + MAX_WAITERS) - thread_pool.wait_head) % MAX_WAITERS;
- num != 0;
- num--) {
- pid_t pid = thread_pool.wait[thread_pool.wait_head];
-
- thread_pool.wait_head++;
- thread_pool.wait_head %= MAX_WAITERS;
-
- /*
- * Child is still alive: move it to the tail.
- */
- if (waitpid(pid, NULL, WNOHANG) == 0) {
- if (((thread_pool.wait_tail + 1) % MAX_WAITERS)
- == thread_pool.wait_head) {
- rad_assert(0 == 1);
- }
-
- thread_pool.wait[thread_pool.wait_tail] = pid;
- thread_pool.wait_tail++;
- thread_pool.wait_tail %= MAX_WAITERS;
- } /* else no child, or was already reaped */
- }
- pthread_mutex_unlock(&thread_pool.wait_mutex);
- }
+ pid_t pid;
+ int status;
+ thread_fork_t mytf, *tf;
+
+
+ pthread_mutex_lock(&thread_pool.wait_mutex);
+
+ do {
+ retry:
+ pid = waitpid(0, &status, WNOHANG);
+ if (pid <= 0) break;
+
+ mytf.pid = pid;
+ tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+ if (!tf) goto retry;
+
+ tf->status = status;
+ tf->exited = 1;
+ } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
+
+ pthread_mutex_unlock(&thread_pool.wait_mutex);
}
+#else
+#define reap_children()
+#endif /* WNOHANG */
/*
* Add a request to the list of waiting requests.
*/
static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
{
- request_queue_t entry;
-
+ rad_assert(request->process == fun);
pthread_mutex_lock(&thread_pool.queue_mutex);
thread_pool.request_count++;
- /*
- * If the queue is empty, re-set the indices to zero,
- * for no particular reason...
- */
- if (lrad_hash_table_num_elements(thread_pool.queue) == 0) {
- thread_pool.queue_head = thread_pool.queue_tail = 0;
- }
-
- /*
- * If the queue has hit it's maximum allowed size, then
- * toss the request, as we're too busy!
- */
- if (lrad_hash_table_num_elements(thread_pool.queue) >= thread_pool.max_queue_size) {
+ if (thread_pool.num_queued >= thread_pool.max_queue_size) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
-
+
/*
* Mark the request as done.
*/
- radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
- request->finished = TRUE;
+ radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
+ request->child_state = REQUEST_DONE;
return 0;
}
/*
- * FIXME: If there are too many outstanding requests,
- * return data to the server core which tells it to slow
- * down on reading from the sockets.
- */
-
- /*
- * Add the data to the queue tail, increment the tail,
- * and signal the semaphore that there's another request
- * in the queue.
- *
- * Hmm... too many malloc's are a problem. Could we put this
- * into some kind of paged memory?
+ * Push the request onto the appropriate fifo for that
*/
- entry.request = request;
- entry.fun = fun;
-
- if (!lrad_hash_table_insert(thread_pool.queue, thread_pool.queue_tail,
- &entry)) {
+ if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
- request->finished = TRUE;
+ request->child_state = REQUEST_DONE;
return 0;
}
- thread_pool.queue_tail++;
+ thread_pool.num_queued++;
pthread_mutex_unlock(&thread_pool.queue_mutex);
/*
* Remove a request from the queue.
*/
-static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
+static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
{
- request_queue_t *entry;
+ RAD_LISTEN_TYPE i, start;
+ REQUEST *request = NULL;
+
reap_children();
pthread_mutex_lock(&thread_pool.queue_mutex);
/*
- * Head & tail are the same. There's nothing in
- * the queue.
+ * Clear old requests from all queues.
+ *
+ * We only do one pass over the queue, in order to
+ * amortize the work across the child threads. Since we
+ * do N checks for one request de-queued, the old
+ * requests will be quickly cleared.
*/
- if (thread_pool.queue_head == thread_pool.queue_tail) {
- pthread_mutex_unlock(&thread_pool.queue_mutex);
- *request = NULL;
- *fun = NULL;
- return;
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ request = fr_fifo_peek(thread_pool.fifo[i]);
+ if (!request ||
+ (request->master_state != REQUEST_STOP_PROCESSING)) {
+ continue;
+}
+ /*
+ * This entry was marked to be stopped. Acknowledge it.
+ */
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ rad_assert(request != NULL);
+ request->child_state = REQUEST_DONE;
+ thread_pool.num_queued--;
}
+ start = 0;
+ retry:
/*
- * Grab the entry from the hash table
- *
- * Note that we *don't* have a "free" function,
- * so "entry" is still valid after the "delete"
+ * Pop results from the top of the queue
*/
- entry = lrad_hash_table_finddata(thread_pool.queue, thread_pool.queue_head);
- if (!entry) {
- lrad_hash_table_delete(thread_pool.queue, thread_pool.queue_head++);
+ for (i = start; i < RAD_LISTEN_MAX; i++) {
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ if (request) {
+ start = i;
+ break;
+ }
+ }
+
+ if (!request) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
- *request = NULL;
+ *prequest = NULL;
*fun = NULL;
- return;
+ return 0;
}
- *request = entry->request;
- *fun = entry->fun;
+ rad_assert(thread_pool.num_queued > 0);
+ thread_pool.num_queued--;
+ *prequest = request;
+ *fun = request->process;
- rad_assert(*request != NULL);
- rad_assert((*request)->magic == REQUEST_MAGIC);
+ rad_assert(request->magic == REQUEST_MAGIC);
rad_assert(*fun != NULL);
- lrad_hash_table_delete(thread_pool.queue, thread_pool.queue_head++);
-
/*
- * FIXME: Check the request timestamp. If it's more than
- * "clean_delay" seconds old, then discard the request,
- * log an error, and try to de-queue another request.
+ * If the request has sat in the queue for too long,
+ * kill it.
*
- * The main clean-up code won't delete the request from
- * the request list, because it's not marked "finished"
+ * The main clean-up code can't delete the request from
+ * the queue, and therefore won't clean it up until we
+ * have acknowledged it as "done".
*/
+ if (request->master_state == REQUEST_STOP_PROCESSING) {
+ request->child_state = REQUEST_DONE;
+ goto retry;
+ }
/*
* The thread is currently processing a request.
pthread_mutex_unlock(&thread_pool.queue_mutex);
- /*
- * If the request is currently being processed, then that
- * MAY be OK, if it's a proxy reply. In that case,
- * sending the packet may result in a reply being
- * received before that thread clears the child_pid.
- *
- * In that case, we busy-wait for the request to be free.
- *
- * We COULD push it onto the queue and try to grab
- * another request, but what if this is the only request?
- * What if there are multiple such packets with race
- * conditions? We don't want to thrash the queue...
- *
- * This busy-wait is less than optimal, but it's simple,
- * fail-safe, and it works.
- */
- if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
- int count, ok;
- struct timeval tv;
-#ifdef HAVE_PTHREAD_SIGMASK
- sigset_t set, old_set;
-
- /*
- * Block a large number of signals which could
- * cause the select to return EINTR
- */
- sigemptyset(&set);
- sigaddset(&set, SIGPIPE);
- sigaddset(&set, SIGCONT);
- sigaddset(&set, SIGSTOP);
- sigaddset(&set, SIGCHLD);
- pthread_sigmask(SIG_BLOCK, &set, &old_set);
-#endif
-
- rad_assert((*request)->proxy_reply != NULL);
-
- ok = FALSE;
-
- /*
- * Sleep for 100 milliseconds. If the other thread
- * doesn't get serviced in this time, to clear
- * the "child_pid" entry, then the server is too
- * busy, so we die.
- */
- for (count = 0; count < 10; count++) {
- tv.tv_sec = 0;
- tv.tv_usec = 10000; /* sleep for 10 milliseconds */
-
- /*
- * Portable sleep that's thread-safe.
- *
- * Don't worry about interrupts, as they're
- * blocked above.
- */
- select(0, NULL, NULL, NULL, &tv);
- if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
- ok = TRUE;
- break;
- }
- }
-
-#ifdef HAVE_PTHREAD_SIGMASK
- /*
- * Restore the original thread signal mask.
- */
- pthread_sigmask(SIG_SETMASK, &old_set, NULL);
-#endif
-
- if (!ok) {
- radlog(L_ERR, "FATAL! Server is too busy to process requests");
- exit(1);
- }
- }
-
- return;
+ return 1;
}
{
RAD_REQUEST_FUNP fun;
THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
-#ifdef HAVE_PTHREAD_SIGMASK
- sigset_t set;
-
- /*
- * Block SIGHUP handling for the child threads.
- *
- * This ensures that only the main server thread will
- * process HUP signals.
- *
- * If we don't have sigprocmask, then it shouldn't be
- * a problem, either, as the sig_hup handler should check
- * for this condition.
- */
- sigemptyset(&set);
- sigaddset(&set, SIGHUP);
- sigaddset(&set, SIGINT);
- sigaddset(&set, SIGQUIT);
- sigaddset(&set, SIGTERM);
- pthread_sigmask(SIG_BLOCK, &set, NULL);
-#endif
/*
* Loop forever, until told to exit.
DEBUG2("Thread %d got semaphore", self->thread_num);
+#ifdef HAVE_OPENSSL_ERR_H
+ /*
+ * Clear the error queue for the current thread.
+ */
+ ERR_clear_error ();
+#endif
+
/*
* Try to grab a request from the queue.
*
* It may be empty, in which case we fail
* gracefully.
*/
- request_dequeue(&self->request, &fun);
- if (!self->request) continue;
+ if (!request_dequeue(&self->request, &fun)) continue;
self->request->child_pid = self->pthread_id;
self->request_count++;
self->thread_num, self->request->number,
self->request_count);
- /*
- * Respond, and reset request->child_pid
- */
- rad_respond(self->request, fun);
- self->request = NULL;
+ radius_handle_request(self->request, fun);
/*
* Update the active threads.
/*
* Do this as the LAST thing before exiting.
*/
+ self->request = NULL;
self->status = THREAD_EXITED;
return NULL;
}
/*
- * Take a THREAD_HANDLE, and delete it from the thread pool.
+ * Take a THREAD_HANDLE, delete it from the thread pool and
+ * free its resources.
*
- * This function is called ONLY from the main server thread.
+ * This function is called ONLY from the main server thread,
+ * ONLY after the thread has exited.
*/
static void delete_thread(THREAD_HANDLE *handle)
{
rad_assert(handle->request == NULL);
+ DEBUG2("Deleting thread %d", handle->thread_num);
+
prev = handle->prev;
next = handle->next;
rad_assert(thread_pool.total_threads > 0);
next->prev = prev;
}
- DEBUG2("Deleting thread %d", handle->thread_num);
-
- /*
- * This thread has exited. Delete any additional
- * resources associated with it.
- */
-
/*
- * Free the memory, now that we're sure the thread
- * exited.
+ * Free the handle, now that it's no longer referencable.
*/
free(handle);
}
memset(handle, 0, sizeof(THREAD_HANDLE));
handle->prev = NULL;
handle->next = NULL;
- handle->pthread_id = NO_SUCH_CHILD_PID;
handle->thread_num = thread_pool.max_thread_num++;
handle->request_count = 0;
handle->status = THREAD_RUNNING;
rcode = pthread_create(&handle->pthread_id, &attr,
request_handler_thread, handle);
if (rcode != 0) {
- radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
+ radlog(L_ERR, "Thread create failed: %s",
strerror(rcode));
- exit(1);
+ return NULL;
}
pthread_attr_destroy(&attr);
*/
int total_active_threads(void)
{
- int rcode = 0;
- THREAD_HANDLE *handle;
+ /*
+ * We don't acquire the mutex, so this is just an estimate.
+ * We can't return with the lock held, so there's no point
+ * in getting the guaranteed correct value; by the time
+ * the caller sees it, it can be wrong again.
+ */
+ return thread_pool.active_threads;
+}
- for (handle = thread_pool.head; handle != NULL; handle = handle->next){
- if (handle->request != NULL) {
- rcode ++;
- }
- }
- return (rcode);
+
+#ifdef WNOHANG
+static uint32_t pid_hash(const void *data)
+{
+ const thread_fork_t *tf = data;
+
+ return fr_hash(&tf->pid, sizeof(tf->pid));
}
+static int pid_cmp(const void *one, const void *two)
+{
+ const thread_fork_t *a = one;
+ const thread_fork_t *b = two;
+
+ return (a->pid - b->pid);
+}
+#endif
/*
* Allocate the thread pool, and seed it with an initial number
*
* FIXME: What to do on a SIGHUP???
*/
-int thread_pool_init(int spawn_flag)
+int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
{
int i, rcode;
CONF_SECTION *pool_cf;
time_t now;
- DEBUG("Initializing the thread pool...");
now = time(NULL);
+ rad_assert(spawn_flag != NULL);
+ rad_assert(*spawn_flag == TRUE);
+ rad_assert(pool_initialized == FALSE); /* not called on HUP */
+
+ pool_cf = cf_subsection_find_next(cs, NULL, "thread");
+ if (!pool_cf) *spawn_flag = FALSE;
+
/*
- * After a SIGHUP, we don't over-write the previous values.
+ * Initialize the thread pool to some reasonable values.
*/
- if (!pool_initialized) {
- /*
- * Initialize the thread pool to some reasonable values.
- */
- memset(&thread_pool, 0, sizeof(THREAD_POOL));
- thread_pool.head = NULL;
- thread_pool.tail = NULL;
- thread_pool.total_threads = 0;
- thread_pool.max_thread_num = 1;
- thread_pool.cleanup_delay = 5;
- thread_pool.spawn_flag = spawn_flag;
-
- thread_pool.wait_head = thread_pool.wait_tail = 0;
- if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
- radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
- strerror(errno));
- exit(1);
- }
+ memset(&thread_pool, 0, sizeof(THREAD_POOL));
+ thread_pool.head = NULL;
+ thread_pool.tail = NULL;
+ thread_pool.total_threads = 0;
+ thread_pool.max_thread_num = 1;
+ thread_pool.cleanup_delay = 5;
+ thread_pool.spawn_flag = *spawn_flag;
+
+ /*
+ * Don't bother initializing the mutexes or
+ * creating the hash tables. They won't be used.
+ */
+ if (!*spawn_flag) return 0;
+
+#ifdef WNOHANG
+ if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
+ radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
+ strerror(errno));
+ return -1;
}
-
+
/*
- * We're not spawning new threads, don't do
- * anything.
+ * Create the hash table of child PID's
*/
- if (!spawn_flag) return 0;
+ thread_pool.waiters = fr_hash_table_create(pid_hash,
+ pid_cmp,
+ free);
+ if (!thread_pool.waiters) {
+ radlog(L_ERR, "FATAL: Failed to set up wait hash");
+ return -1;
+ }
+#endif
- pool_cf = cf_section_find("thread");
- if (pool_cf != NULL) {
- /*
- * FIXME: Check for errors?
- */
- cf_section_parse(pool_cf, NULL, thread_config);
+ if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
+ return -1;
}
/*
+ * Catch corner cases.
+ */
+ if (thread_pool.min_spare_threads < 1)
+ thread_pool.min_spare_threads = 1;
+ if (thread_pool.max_spare_threads < 1)
+ thread_pool.max_spare_threads = 1;
+ if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
+ thread_pool.max_spare_threads = thread_pool.min_spare_threads;
+
+ /*
* The pool has already been initialized. Don't spawn
* new threads, and don't forget about forked children,
*/
memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
if (rcode != 0) {
- radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
+ radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
strerror(errno));
- exit(1);
+ return -1;
}
rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
if (rcode != 0) {
- radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
+ radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
strerror(errno));
- exit(1);
+ return -1;
}
/*
- * Queue head & tail are set to zero by the memset,
- * above.
- *
- * Allocate an initial queue, always as a power of 2.
+ * Allocate multiple fifos.
*/
- thread_pool.queue = lrad_hash_table_create(8, NULL, 0);
- lrad_hash_table_set_data_size(thread_pool.queue,
- sizeof(request_queue_t));
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
+ if (!thread_pool.fifo[i]) {
+ radlog(L_ERR, "FATAL: Failed to set up request fifo");
+ return -1;
+ }
+ }
#ifdef HAVE_OPENSSL_CRYPTO_H
/*
*/
if (!setup_ssl_mutexes()) {
radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
- exit(1);
+ return -1;
}
#endif
*/
int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
{
+ time_t now = request->timestamp;
+
+ request->process = fun;
+
/*
* We've been told not to spawn threads, so don't.
*/
if (!thread_pool.spawn_flag) {
- rad_respond(request, fun);
+ radius_handle_request(request, fun);
+
+#ifdef WNOHANG
+ /*
+ * Requests that care about child process exit
+ * codes have already either called
+ * rad_waitpid(), or they've given up.
+ */
+ wait(NULL);
+#endif
return 1;
}
if (!request_enqueue(request, fun)) return 0;
/*
- * If the thread pool is busy handling requests, then
- * try to spawn another one.
+ * If we haven't checked the number of child threads
+ * in a while, OR if the thread pool appears to be full,
+ * go manage it.
*/
- if (thread_pool.active_threads == thread_pool.total_threads) {
- if (spawn_thread(request->timestamp) == NULL) {
- radlog(L_INFO,
- "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
- thread_pool.max_threads);
- return 1;
- }
+ if ((last_cleaned < now) ||
+ (thread_pool.active_threads == thread_pool.total_threads)) {
+ thread_pool_manage(now);
}
return 1;
* If there are too many or too few threads waiting, then we
* either create some more, or delete some.
*/
-int thread_pool_clean(time_t now)
+static void thread_pool_manage(time_t now)
{
int spare;
int i, total;
THREAD_HANDLE *handle, *next;
int active_threads;
- static time_t last_cleaned = 0;
-
- /*
- * Loop over the thread pool deleting exited threads.
- */
- for (handle = thread_pool.head; handle; handle = next) {
- next = handle->next;
-
- /*
- * Maybe we've asked the thread to exit, and it
- * has agreed.
- */
- if (handle->status == THREAD_EXITED) {
- delete_thread(handle);
- }
- }
/*
* We don't need a mutex lock here, as we're reading
- * the location, and not modifying it. We want a close
+ * active_threads, and not modifying it. We want a close
* approximation of the number of active threads, and this
* is good enough.
*/
}
/*
- * If there are too few spare threads, create some more.
+ * If there are too few spare threads. Go create some more.
*/
if (spare < thread_pool.min_spare_threads) {
total = thread_pool.min_spare_threads - spare;
DEBUG2("Threads: Spawning %d spares", total);
+
/*
* Create a number of spare threads.
*/
for (i = 0; i < total; i++) {
handle = spawn_thread(now);
if (handle == NULL) {
- return -1;
+ return;
}
}
- /*
- * And exit, as there can't be too many spare threads.
- */
- return 0;
+ return; /* there aren't too many spare threads */
}
/*
* so this second.
*/
if (now == last_cleaned) {
- return 0;
+ return;
}
last_cleaned = now;
/*
+ * Loop over the thread pool, deleting exited threads.
+ */
+ for (handle = thread_pool.head; handle; handle = next) {
+ next = handle->next;
+
+ /*
+ * Maybe we've asked the thread to exit, and it
+ * has agreed.
+ */
+ if (handle->status == THREAD_EXITED) {
+ delete_thread(handle);
+ }
+ }
+
+ /*
* Only delete the spare threads if sufficient time has
* passed since we last created one. This helps to minimize
* the amount of create/delete cycles.
*/
if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
- return 0;
+ return;
}
/*
* Otherwise everything's kosher. There are not too few,
* or too many spare threads. Exit happily.
*/
- return 0;
+ return;
}
+#ifdef WNOHANG
/*
* Thread wrapper for fork().
*/
-pid_t rad_fork(int exec_wait)
+pid_t rad_fork(void)
{
pid_t child_pid;
- if (exec_wait) return fork();
+ if (!pool_initialized) return fork();
reap_children(); /* be nice to non-wait thingies */
- /*
- * Lock the mutex.
- */
- pthread_mutex_lock(&thread_pool.wait_mutex);
-
- /*
- * No room to save the PID: die.
- */
- if (((thread_pool.wait_tail + 1) % MAX_WAITERS)
- == thread_pool.wait_head) {
- rad_assert(0 == 1);
+ if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
+ return -1;
}
/*
* Fork & save the PID for later reaping.
*/
child_pid = fork();
- if (child_pid != 0) {
- thread_pool.wait[thread_pool.wait_tail] = child_pid;
- thread_pool.wait_tail++;
- thread_pool.wait_tail %= MAX_WAITERS;
+ if (child_pid > 0) {
+ int rcode;
+ thread_fork_t *tf;
- /*
- * Unlock the mutex.
- */
+ tf = rad_malloc(sizeof(*tf));
+ memset(tf, 0, sizeof(*tf));
+
+ tf->pid = child_pid;
+
+ pthread_mutex_lock(&thread_pool.wait_mutex);
+ rcode = fr_hash_table_insert(thread_pool.waiters, tf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
+
+ if (!rcode) {
+ radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
+ (int) child_pid);
+ free(tf);
+ }
}
/*
return child_pid;
}
+
/*
- * We may not need this any more...
+ * Wait 10 seconds at most for a child to exit, then give up.
*/
-pid_t rad_waitpid(pid_t pid, int *status, int options)
+pid_t rad_waitpid(pid_t pid, int *status)
{
- reap_children(); /* be nice to non-wait thingies */
- return waitpid(pid, status, options);
-
-}
+ int i;
+ thread_fork_t mytf, *tf;
+
+ if (!pool_initialized) return waitpid(pid, status, 0);
+
+ if (pid <= 0) return -1;
+
+ mytf.pid = pid;
+
+ pthread_mutex_lock(&thread_pool.wait_mutex);
+ tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+ pthread_mutex_unlock(&thread_pool.wait_mutex);
-#else /* HAVE_PTHREAD_H */
+ if (!tf) return -1;
+
+ for (i = 0; i < 100; i++) {
+ reap_children();
+
+ if (tf->exited) {
+ *status = tf->status;
+
+ pthread_mutex_lock(&thread_pool.wait_mutex);
+ fr_hash_table_delete(thread_pool.waiters, &mytf);
+ pthread_mutex_unlock(&thread_pool.wait_mutex);
+ return pid;
+ }
+ usleep(100000); /* sleep for 1/10 of a second */
+ }
+
+ /*
+ * 10 seconds have passed, give up on the child.
+ */
+ pthread_mutex_lock(&thread_pool.wait_mutex);
+ fr_hash_table_delete(thread_pool.waiters, &mytf);
+ pthread_mutex_unlock(&thread_pool.wait_mutex);
+
+ return 0;
+}
+#else
/*
- * "thread" code when we don't have threads.
+ * No rad_fork or rad_waitpid
*/
-int thread_pool_init(int spawn_flag)
+#endif
+
+void thread_pool_lock(void)
{
- return 0;
+ pthread_mutex_lock(&thread_pool.queue_mutex);
}
-/*
- * call "radrespond".
- */
-int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
+void thread_pool_unlock(void)
{
- rad_respond(request, fun);
- return 1;
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
}
+void thread_pool_queue_stats(int *array)
+{
+ int i;
+
+ if (pool_initialized) {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
+ }
+ } else {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = 0;
+ }
+ }
+}
#endif /* HAVE_PTHREAD_H */