*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*
- * Copyright 2000 The FreeRADIUS server project
+ * Copyright 2000,2006 The FreeRADIUS server project
* Copyright 2000 Alan DeKok <aland@ox.org>
*/
-#include "libradius.h"
+#include <freeradius-devel/ident.h>
+RCSID("$Id$")
-#ifdef HAVE_PTHREAD_H
-
-#include <stdlib.h>
-#include <string.h>
+#include <freeradius-devel/radiusd.h>
+#include <freeradius-devel/rad_assert.h>
/*
* Other OS's have sem_init, OS X doesn't.
*/
-#ifndef DARWIN
+#ifdef HAVE_SEMAPHORE_H
#include <semaphore.h>
-#else
+#endif
+
+#ifdef DARWIN
#include <mach/task.h>
#include <mach/semaphore.h>
#define sem_post(s) semaphore_signal(*s)
#endif
-#include <signal.h>
-
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
-#include "radiusd.h"
-#include "rad_assert.h"
-#include "conffile.h"
+#ifdef HAVE_PTHREAD_H
-static const char rcsid[] =
-"$Id$";
+#ifdef HAVE_OPENSSL_CRYPTO_H
+#include <openssl/crypto.h>
+#endif
+#ifdef HAVE_OPENSSL_ERR_H
+#include <openssl/err.h>
+#endif
+#ifdef HAVE_OPENSSL_EVP_H
+#include <openssl/evp.h>
+#endif
#define SEMAPHORE_LOCKED (0)
#define SEMAPHORE_UNLOCKED (1)
#define THREAD_CANCELLED (2)
#define THREAD_EXITED (3)
+#define NUM_FIFOS RAD_LISTEN_MAX
+
+
/*
* A data structure which contains the information about
* the current thread.
REQUEST *request;
} THREAD_HANDLE;
-/*
- * For the request queue.
- */
-typedef struct request_queue_t {
- REQUEST *request;
- RAD_REQUEST_FUNP fun;
-} request_queue_t;
-
-
typedef struct thread_fork_t {
pid_t pid;
int status;
THREAD_HANDLE *tail;
int total_threads;
+ int active_threads; /* protected by queue_mutex */
int max_thread_num;
int start_threads;
int max_threads;
unsigned long request_count;
time_t time_last_spawned;
int cleanup_delay;
+ int spawn_flag;
- /*
- * If threaded, we have to pay more attention to
- * child PID's when we fork...
- */
+#ifdef WNOHANG
pthread_mutex_t wait_mutex;
- lrad_hash_table_t *waiters;
+ fr_hash_table_t *waiters;
+#endif
/*
* All threads wait on this semaphore, for requests
/*
* To ensure only one thread at a time touches the queue.
*/
- pthread_mutex_t mutex;
+ pthread_mutex_t queue_mutex;
- int active_threads;
- int queue_head; /* first filled entry */
- int queue_tail; /* first empty entry */
- int queue_size;
- request_queue_t *queue;
+ int max_queue_size;
+ int num_queued;
+ fr_fifo_t *fifo[NUM_FIFOS];
} THREAD_POOL;
static THREAD_POOL thread_pool;
static int pool_initialized = FALSE;
+static time_t last_cleaned = 0;
+static void thread_pool_manage(time_t now);
/*
* A mapping of configuration file names to internal integers
{ "max_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads, "10" },
{ "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
{ "cleanup_delay", PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay, "5" },
+ { "max_queue_size", PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size, "65536" },
{ NULL, -1, 0, NULL, NULL }
};
+#ifdef HAVE_OPENSSL_CRYPTO_H
+
+/*
+ * If we're linking against OpenSSL, then it is the
+ * duty of the application, if it is multithreaded,
+ * to provide OpenSSL with appropriate thread id
+ * and mutex locking functions
+ *
+ * Note: this only implements static callbacks.
+ * OpenSSL does not use dynamic locking callbacks
+ * right now, but may in the futiure, so we will have
+ * to add them at some point.
+ */
+
+static pthread_mutex_t *ssl_mutexes = NULL;
+
+static unsigned long ssl_id_function(void)
+{
+ return (unsigned long) pthread_self();
+}
+
+static void ssl_locking_function(int mode, int n, const char *file, int line)
+{
+ file = file; /* -Wunused */
+ line = line; /* -Wunused */
+
+ if (mode & CRYPTO_LOCK) {
+ pthread_mutex_lock(&(ssl_mutexes[n]));
+ } else {
+ pthread_mutex_unlock(&(ssl_mutexes[n]));
+ }
+}
+
+static int setup_ssl_mutexes(void)
+{
+ int i;
+
+#ifdef HAVE_OPENSSL_EVP_H
+ /*
+ * Enable all ciphers and digests.
+ */
+ OpenSSL_add_all_algorithms();
+#endif
+
+ ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
+ if (!ssl_mutexes) {
+ radlog(L_ERR, "Error allocating memory for SSL mutexes!");
+ return 0;
+ }
+
+ for (i = 0; i < CRYPTO_num_locks(); i++) {
+ pthread_mutex_init(&(ssl_mutexes[i]), NULL);
+ }
+
+ CRYPTO_set_id_callback(ssl_id_function);
+ CRYPTO_set_locking_callback(ssl_locking_function);
+
+ return 1;
+}
+#endif
+
+#ifdef WNOHANG
/*
* We don't want to catch SIGCHLD for a host of reasons.
*
int status;
thread_fork_t mytf, *tf;
+
pthread_mutex_lock(&thread_pool.wait_mutex);
do {
+ retry:
pid = waitpid(0, &status, WNOHANG);
if (pid <= 0) break;
mytf.pid = pid;
- tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
- if (!tf) continue;
-
+ tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+ if (!tf) goto retry;
+
tf->status = status;
tf->exited = 1;
- } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
+ } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
pthread_mutex_unlock(&thread_pool.wait_mutex);
}
-
+#else
+#define reap_children()
+#endif /* WNOHANG */
/*
* Add a request to the list of waiting requests.
*/
static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
{
- int num_entries;
-
- pthread_mutex_lock(&thread_pool.mutex);
+ rad_assert(request->process == fun);
+ pthread_mutex_lock(&thread_pool.queue_mutex);
thread_pool.request_count++;
- /*
- * If the queue is empty, re-set the indices to zero,
- * for no particular reason...
- */
- if ((thread_pool.queue_head == thread_pool.queue_tail) &&
- (thread_pool.queue_head != 0)) {
- thread_pool.queue_head = thread_pool.queue_tail = 0;
- }
-
- /*
- * If the queue is full, die.
- *
- * The math is to take into account the fact that it's a
- * circular queue.
- */
- num_entries = ((thread_pool.queue_tail + thread_pool.queue_size) -
- thread_pool.queue_head) % thread_pool.queue_size;
- if (num_entries == (thread_pool.queue_size - 1)) {
- int i;
- request_queue_t *new_queue;
+ if (thread_pool.num_queued >= thread_pool.max_queue_size) {
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
/*
- * If the queue becomes larger than 65536,
- * there's a serious problem.
+ * Mark the request as done.
*/
- if (thread_pool.queue_size >= 65536) {
- pthread_mutex_unlock(&thread_pool.mutex);
-
- /*
- * Mark the request as done.
- */
- radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
- request->finished = TRUE;
- return 0;
- }
-
- /*
- * Malloc a new queue, doubled in size, copy the
- * data from the current queue over to it, zero
- * out the second half of the queue, free the old
- * one, and replace thread_pool.queue with the
- * new one.
- */
- new_queue = rad_malloc(sizeof(*new_queue) * thread_pool.queue_size * 2);
- /*
- * Copy the queue element by element
- */
- for (i = 0; i < thread_pool.queue_size; i++) {
- new_queue[i] = thread_pool.queue[(i + thread_pool.queue_head) % thread_pool.queue_size];
- }
- memset(new_queue + thread_pool.queue_size,
- 0, sizeof(*new_queue) * thread_pool.queue_size);
-
- free(thread_pool.queue);
- thread_pool.queue = new_queue;
- thread_pool.queue_tail = ((thread_pool.queue_tail + thread_pool.queue_size) - thread_pool.queue_head) % thread_pool.queue_size;
- thread_pool.queue_head = 0;
- thread_pool.queue_size *= 2;
+ radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
+ request->child_state = REQUEST_DONE;
+ return 0;
}
/*
- * Add the data to the queue tail, increment the tail,
- * and signal the semaphore that there's another request
- * in the queue.
+ * Push the request onto the appropriate fifo for that
*/
- thread_pool.queue[thread_pool.queue_tail].request = request;
- thread_pool.queue[thread_pool.queue_tail].fun = fun;
- thread_pool.queue_tail++;
- thread_pool.queue_tail &= (thread_pool.queue_size - 1);
+ if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
+ radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
+ request->child_state = REQUEST_DONE;
+ return 0;
+ }
- pthread_mutex_unlock(&thread_pool.mutex);
+ thread_pool.num_queued++;
+
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
/*
* There's one more request in the queue.
* the mutex, it will be unlocked, and there won't be
* contention.
*/
-
sem_post(&thread_pool.semaphore);
return 1;
/*
* Remove a request from the queue.
*/
-static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
+static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
{
+ RAD_LISTEN_TYPE i, start;
+ REQUEST *request = NULL;
+
reap_children();
- pthread_mutex_lock(&thread_pool.mutex);
+ pthread_mutex_lock(&thread_pool.queue_mutex);
+
+ /*
+ * Clear old requests from all queues.
+ *
+ * We only do one pass over the queue, in order to
+ * amortize the work across the child threads. Since we
+ * do N checks for one request de-queued, the old
+ * requests will be quickly cleared.
+ */
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ request = fr_fifo_peek(thread_pool.fifo[i]);
+ if (!request ||
+ (request->master_state != REQUEST_STOP_PROCESSING)) {
+ continue;
+}
+ /*
+ * This entry was marked to be stopped. Acknowledge it.
+ */
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ rad_assert(request != NULL);
+ request->child_state = REQUEST_DONE;
+ thread_pool.num_queued--;
+ }
+ start = 0;
+ retry:
/*
- * Head & tail are the same. There's nothing in
- * the queue.
+ * Pop results from the top of the queue
*/
- if (thread_pool.queue_head == thread_pool.queue_tail) {
- pthread_mutex_unlock(&thread_pool.mutex);
- *request = NULL;
+ for (i = start; i < RAD_LISTEN_MAX; i++) {
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ if (request) {
+ start = i;
+ break;
+ }
+ }
+
+ if (!request) {
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
+ *prequest = NULL;
*fun = NULL;
- return;
+ return 0;
}
- *request = thread_pool.queue[thread_pool.queue_head].request;
- *fun = thread_pool.queue[thread_pool.queue_head].fun;
+ rad_assert(thread_pool.num_queued > 0);
+ thread_pool.num_queued--;
+ *prequest = request;
+ *fun = request->process;
- rad_assert(*request != NULL);
- rad_assert((*request)->magic == REQUEST_MAGIC);
+ rad_assert(request->magic == REQUEST_MAGIC);
rad_assert(*fun != NULL);
- thread_pool.queue_head++;
- thread_pool.queue_head &= (thread_pool.queue_size - 1);
-
/*
- * FIXME: Check the request timestamp. If it's more than
- * "clean_delay" seconds old, then discard the request,
- * log an error, and try to de-queue another request.
+ * If the request has sat in the queue for too long,
+ * kill it.
*
- * The main clean-up code won't delete the request from
- * the request list, because it's not marked "finished"
+ * The main clean-up code can't delete the request from
+ * the queue, and therefore won't clean it up until we
+ * have acknowledged it as "done".
*/
+ if (request->master_state == REQUEST_STOP_PROCESSING) {
+ request->child_state = REQUEST_DONE;
+ goto retry;
+ }
/*
* The thread is currently processing a request.
*/
thread_pool.active_threads++;
- pthread_mutex_unlock(&thread_pool.mutex);
-
- /*
- * If the request is currently being processed, then that
- * MAY be OK, if it's a proxy reply. In that case,
- * sending the packet may result in a reply being
- * received before that thread clears the child_pid.
- *
- * In that case, we busy-wait for the request to be free.
- *
- * We COULD push it onto the queue and try to grab
- * another request, but what if this is the only request?
- * What if there are multiple such packets with race
- * conditions? We don't want to thrash the queue...
- *
- * This busy-wait is less than optimal, but it's simple,
- * fail-safe, and it works.
- */
- if ((*request)->child_pid != NO_SUCH_CHILD_PID) {
- int count, ok;
- struct timeval tv;
-#ifdef HAVE_PTHREAD_SIGMASK
- sigset_t set, old_set;
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
- /*
- * Block a large number of signals which could
- * cause the select to return EINTR
- */
- sigemptyset(&set);
- sigaddset(&set, SIGPIPE);
- sigaddset(&set, SIGCONT);
- sigaddset(&set, SIGSTOP);
- sigaddset(&set, SIGCHLD);
- pthread_sigmask(SIG_BLOCK, &set, &old_set);
-#endif
-
- rad_assert((*request)->proxy_reply != NULL);
-
- ok = FALSE;
-
- /*
- * Sleep for 100 milliseconds. If the other thread
- * doesn't get serviced in this time, to clear
- * the "child_pid" entry, then the server is too
- * busy, so we die.
- */
- for (count = 0; count < 10; count++) {
- tv.tv_sec = 0;
- tv.tv_usec = 10000; /* sleep for 10 milliseconds */
-
- /*
- * Portable sleep that's thread-safe.
- *
- * Don't worry about interrupts, as they're
- * blocked above.
- */
- select(0, NULL, NULL, NULL, &tv);
- if ((*request)->child_pid == NO_SUCH_CHILD_PID) {
- ok = TRUE;
- break;
- }
- }
-
-#ifdef HAVE_PTHREAD_SIGMASK
- /*
- * Restore the original thread signal mask.
- */
- pthread_sigmask(SIG_SETMASK, &old_set, NULL);
-#endif
-
- if (!ok) {
- radlog(L_ERR, "FATAL! Server is too busy to process requests");
- exit(1);
- }
- }
-
- return;
+ return 1;
}
{
RAD_REQUEST_FUNP fun;
THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
-#ifdef HAVE_PTHREAD_SIGMASK
- sigset_t set;
-
- /*
- * Block SIGHUP handling for the child threads.
- *
- * This ensures that only the main server thread will
- * process HUP signals.
- *
- * If we don't have sigprocmask, then it shouldn't be
- * a problem, either, as the sig_hup handler should check
- * for this condition.
- */
- sigemptyset(&set);
- sigaddset(&set, SIGHUP);
- sigaddset(&set, SIGINT);
- sigaddset(&set, SIGQUIT);
- sigaddset(&set, SIGTERM);
- pthread_sigmask(SIG_BLOCK, &set, NULL);
-#endif
/*
* Loop forever, until told to exit.
DEBUG2("Thread %d got semaphore", self->thread_num);
+#ifdef HAVE_OPENSSL_ERR_H
+ /*
+ * Clear the error queue for the current thread.
+ */
+ ERR_clear_error ();
+#endif
+
/*
* Try to grab a request from the queue.
*
* It may be empty, in which case we fail
* gracefully.
*/
- request_dequeue(&self->request, &fun);
- if (!self->request) continue;
+ if (!request_dequeue(&self->request, &fun)) continue;
self->request->child_pid = self->pthread_id;
self->request_count++;
self->thread_num, self->request->number,
self->request_count);
- /*
- * Respond, and reset request->child_pid
- */
- rad_respond(self->request, fun);
- self->request = NULL;
+ radius_handle_request(self->request, fun);
/*
* Update the active threads.
*/
- pthread_mutex_lock(&thread_pool.mutex);
+ pthread_mutex_lock(&thread_pool.queue_mutex);
rad_assert(thread_pool.active_threads > 0);
thread_pool.active_threads--;
- pthread_mutex_unlock(&thread_pool.mutex);
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
} while (self->status != THREAD_CANCELLED);
DEBUG2("Thread %d exiting...", self->thread_num);
+#ifdef HAVE_OPENSSL_ERR_H
+ /*
+ * If we linked with OpenSSL, the application
+ * must remove the thread's error queue before
+ * exiting to prevent memory leaks.
+ */
+ ERR_remove_state(0);
+#endif
+
/*
* Do this as the LAST thing before exiting.
*/
+ self->request = NULL;
self->status = THREAD_EXITED;
return NULL;
}
/*
- * Take a THREAD_HANDLE, and delete it from the thread pool.
+ * Take a THREAD_HANDLE, delete it from the thread pool and
+ * free its resources.
*
- * This function is called ONLY from the main server thread.
+ * This function is called ONLY from the main server thread,
+ * ONLY after the thread has exited.
*/
static void delete_thread(THREAD_HANDLE *handle)
{
rad_assert(handle->request == NULL);
+ DEBUG2("Deleting thread %d", handle->thread_num);
+
prev = handle->prev;
next = handle->next;
rad_assert(thread_pool.total_threads > 0);
next->prev = prev;
}
- DEBUG2("Deleting thread %d", handle->thread_num);
-
- /*
- * This thread has exited. Delete any additional
- * resources associated with it.
- */
-
/*
- * Free the memory, now that we're sure the thread
- * exited.
+ * Free the handle, now that it's no longer referencable.
*/
free(handle);
}
memset(handle, 0, sizeof(THREAD_HANDLE));
handle->prev = NULL;
handle->next = NULL;
- handle->pthread_id = NO_SUCH_CHILD_PID;
handle->thread_num = thread_pool.max_thread_num++;
handle->request_count = 0;
handle->status = THREAD_RUNNING;
rcode = pthread_create(&handle->pthread_id, &attr,
request_handler_thread, handle);
if (rcode != 0) {
- radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
+ radlog(L_ERR, "Thread create failed: %s",
strerror(rcode));
- exit(1);
+ return NULL;
}
pthread_attr_destroy(&attr);
*/
int total_active_threads(void)
{
- int rcode = 0;
- THREAD_HANDLE *handle;
-
- for (handle = thread_pool.head; handle != NULL; handle = handle->next){
- if (handle->request != NULL) {
- rcode ++;
- }
- }
- return (rcode);
+ /*
+ * We don't acquire the mutex, so this is just an estimate.
+ * We can't return with the lock held, so there's no point
+ * in getting the guaranteed correct value; by the time
+ * the caller sees it, it can be wrong again.
+ */
+ return thread_pool.active_threads;
}
+#ifdef WNOHANG
static uint32_t pid_hash(const void *data)
{
const thread_fork_t *tf = data;
- return lrad_hash(&tf->pid, sizeof(tf->pid));
+ return fr_hash(&tf->pid, sizeof(tf->pid));
}
static int pid_cmp(const void *one, const void *two)
return (a->pid - b->pid);
}
+#endif
/*
* Allocate the thread pool, and seed it with an initial number
*
* FIXME: What to do on a SIGHUP???
*/
-int thread_pool_init(void)
+int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
{
int i, rcode;
CONF_SECTION *pool_cf;
time_t now;
- DEBUG("Initializing the thread pool...");
now = time(NULL);
+ rad_assert(spawn_flag != NULL);
+ rad_assert(*spawn_flag == TRUE);
+ rad_assert(pool_initialized == FALSE); /* not called on HUP */
+
+ pool_cf = cf_subsection_find_next(cs, NULL, "thread");
+ if (!pool_cf) *spawn_flag = FALSE;
+
/*
- * After a SIGHUP, we don't over-write the previous values.
+ * Initialize the thread pool to some reasonable values.
*/
- if (!pool_initialized) {
- /*
- * Initialize the thread pool to some reasonable values.
- */
- memset(&thread_pool, 0, sizeof(THREAD_POOL));
- thread_pool.head = NULL;
- thread_pool.tail = NULL;
- thread_pool.total_threads = 0;
- thread_pool.max_thread_num = 1;
- thread_pool.cleanup_delay = 5;
-
- if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
- radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
- strerror(errno));
- exit(1);
- }
-
- /*
- * Create the hash table of child PID's
- */
- thread_pool.waiters = lrad_hash_table_create(pid_hash,
- pid_cmp,
- free);
- if (!thread_pool.waiters) {
- radlog(L_ERR, "FATAL: Failed to set up wait hash");
- exit(1);
- }
+ memset(&thread_pool, 0, sizeof(THREAD_POOL));
+ thread_pool.head = NULL;
+ thread_pool.tail = NULL;
+ thread_pool.total_threads = 0;
+ thread_pool.max_thread_num = 1;
+ thread_pool.cleanup_delay = 5;
+ thread_pool.spawn_flag = *spawn_flag;
+
+ /*
+ * Don't bother initializing the mutexes or
+ * creating the hash tables. They won't be used.
+ */
+ if (!*spawn_flag) return 0;
+
+#ifdef WNOHANG
+ if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
+ radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
+ strerror(errno));
+ return -1;
}
+
+ /*
+ * Create the hash table of child PID's
+ */
+ thread_pool.waiters = fr_hash_table_create(pid_hash,
+ pid_cmp,
+ free);
+ if (!thread_pool.waiters) {
+ radlog(L_ERR, "FATAL: Failed to set up wait hash");
+ return -1;
+ }
+#endif
- pool_cf = cf_section_find("thread");
- if (pool_cf != NULL) {
- cf_section_parse(pool_cf, NULL, thread_config);
+ if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
+ return -1;
}
/*
+ * Catch corner cases.
+ */
+ if (thread_pool.min_spare_threads < 1)
+ thread_pool.min_spare_threads = 1;
+ if (thread_pool.max_spare_threads < 1)
+ thread_pool.max_spare_threads = 1;
+ if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
+ thread_pool.max_spare_threads = thread_pool.min_spare_threads;
+
+ /*
* The pool has already been initialized. Don't spawn
* new threads, and don't forget about forked children,
*/
/*
* Initialize the queue of requests.
*/
+ memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
if (rcode != 0) {
- radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
+ radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
strerror(errno));
- exit(1);
+ return -1;
}
- rcode = pthread_mutex_init(&thread_pool.mutex,NULL);
+ rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
if (rcode != 0) {
- radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
+ radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
strerror(errno));
- exit(1);
+ return -1;
}
/*
- * Queue head & tail are set to zero by the memset,
- * above.
- *
- * Allocate an initial queue, always as a power of 2.
+ * Allocate multiple fifos.
+ */
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
+ if (!thread_pool.fifo[i]) {
+ radlog(L_ERR, "FATAL: Failed to set up request fifo");
+ return -1;
+ }
+ }
+
+#ifdef HAVE_OPENSSL_CRYPTO_H
+ /*
+ * If we're linking with OpenSSL too, then we need
+ * to set up the mutexes and enable the thread callbacks.
*/
- thread_pool.queue_size = 256;
- thread_pool.queue = rad_malloc(sizeof(*thread_pool.queue) *
- thread_pool.queue_size);
- memset(thread_pool.queue, 0, (sizeof(*thread_pool.queue) *
- thread_pool.queue_size));
+ if (!setup_ssl_mutexes()) {
+ radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
+ return -1;
+ }
+#endif
+
/*
* Create a number of waiting threads.
*/
int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
{
+ time_t now = request->timestamp;
+
+ request->process = fun;
+
+ /*
+ * We've been told not to spawn threads, so don't.
+ */
+ if (!thread_pool.spawn_flag) {
+ radius_handle_request(request, fun);
+
+#ifdef WNOHANG
+ /*
+ * Requests that care about child process exit
+ * codes have already either called
+ * rad_waitpid(), or they've given up.
+ */
+ wait(NULL);
+#endif
+ return 1;
+ }
+
/*
* Add the new request to the queue.
*/
if (!request_enqueue(request, fun)) return 0;
/*
- * If the thread pool is busy handling requests, then
- * try to spawn another one.
+ * If we haven't checked the number of child threads
+ * in a while, OR if the thread pool appears to be full,
+ * go manage it.
*/
- if (thread_pool.active_threads == thread_pool.total_threads) {
- if (spawn_thread(request->timestamp) == NULL) {
- radlog(L_INFO,
- "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
- thread_pool.max_threads);
- return 1;
- }
+ if ((last_cleaned < now) ||
+ (thread_pool.active_threads == thread_pool.total_threads)) {
+ thread_pool_manage(now);
}
return 1;
* If there are too many or too few threads waiting, then we
* either create some more, or delete some.
*/
-int thread_pool_clean(time_t now)
+static void thread_pool_manage(time_t now)
{
int spare;
int i, total;
THREAD_HANDLE *handle, *next;
int active_threads;
- static time_t last_cleaned = 0;
-
- /*
- * Loop over the thread pool deleting exited threads.
- */
- for (handle = thread_pool.head; handle; handle = next) {
- next = handle->next;
-
- /*
- * Maybe we've asked the thread to exit, and it
- * has agreed.
- */
- if (handle->status == THREAD_EXITED) {
- delete_thread(handle);
- }
- }
/*
* We don't need a mutex lock here, as we're reading
- * the location, and not modifying it. We want a close
+ * active_threads, and not modifying it. We want a close
* approximation of the number of active threads, and this
* is good enough.
*/
}
/*
- * If there are too few spare threads, create some more.
+ * If there are too few spare threads. Go create some more.
*/
if (spare < thread_pool.min_spare_threads) {
total = thread_pool.min_spare_threads - spare;
DEBUG2("Threads: Spawning %d spares", total);
+
/*
* Create a number of spare threads.
*/
for (i = 0; i < total; i++) {
handle = spawn_thread(now);
if (handle == NULL) {
- return -1;
+ return;
}
}
- /*
- * And exit, as there can't be too many spare threads.
- */
- return 0;
+ return; /* there aren't too many spare threads */
}
/*
* so this second.
*/
if (now == last_cleaned) {
- return 0;
+ return;
}
last_cleaned = now;
/*
+ * Loop over the thread pool, deleting exited threads.
+ */
+ for (handle = thread_pool.head; handle; handle = next) {
+ next = handle->next;
+
+ /*
+ * Maybe we've asked the thread to exit, and it
+ * has agreed.
+ */
+ if (handle->status == THREAD_EXITED) {
+ delete_thread(handle);
+ }
+ }
+
+ /*
* Only delete the spare threads if sufficient time has
* passed since we last created one. This helps to minimize
* the amount of create/delete cycles.
*/
if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
- return 0;
+ return;
}
/*
* Otherwise everything's kosher. There are not too few,
* or too many spare threads. Exit happily.
*/
- return 0;
+ return;
}
+#ifdef WNOHANG
/*
* Thread wrapper for fork().
*/
reap_children(); /* be nice to non-wait thingies */
- if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
+ if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
return -1;
}
tf = rad_malloc(sizeof(*tf));
memset(tf, 0, sizeof(*tf));
-
+
tf->pid = child_pid;
pthread_mutex_lock(&thread_pool.wait_mutex);
- rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
+ rcode = fr_hash_table_insert(thread_pool.waiters, tf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
if (!rcode) {
radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
(int) child_pid);
+ free(tf);
}
}
return child_pid;
}
+
/*
* Wait 10 seconds at most for a child to exit, then give up.
*/
mytf.pid = pid;
pthread_mutex_lock(&thread_pool.wait_mutex);
- tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
+ tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
if (!tf) return -1;
-
+
for (i = 0; i < 100; i++) {
reap_children();
-
+
if (tf->exited) {
*status = tf->status;
pthread_mutex_lock(&thread_pool.wait_mutex);
- lrad_hash_table_delete(thread_pool.waiters, &mytf);
+ fr_hash_table_delete(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
return pid;
}
- usleep(100000);
+ usleep(100000); /* sleep for 1/10 of a second */
}
-
+
/*
* 10 seconds have passed, give up on the child.
*/
pthread_mutex_lock(&thread_pool.wait_mutex);
- lrad_hash_table_delete(thread_pool.waiters, &mytf);
+ fr_hash_table_delete(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
return 0;
}
-
+#else
+/*
+ * No rad_fork or rad_waitpid
+ */
#endif
+
+void thread_pool_lock(void)
+{
+ pthread_mutex_lock(&thread_pool.queue_mutex);
+}
+
+void thread_pool_unlock(void)
+{
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
+}
+
+void thread_pool_queue_stats(int *array)
+{
+ int i;
+
+ if (pool_initialized) {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
+ }
+ } else {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = 0;
+ }
+ }
+}
+#endif /* HAVE_PTHREAD_H */