#ifdef HAVE_OPENSSL_ERR_H
#include <openssl/err.h>
#endif
+#ifdef HAVE_OPENSSL_EVP_H
+#include <openssl/evp.h>
+#endif
#define SEMAPHORE_LOCKED (0)
#define SEMAPHORE_UNLOCKED (1)
REQUEST *request;
} THREAD_HANDLE;
-/*
- * For the request queue.
- */
-typedef struct request_queue_t {
- REQUEST *request;
- RAD_REQUEST_FUNP fun;
-} request_queue_t;
-
typedef struct thread_fork_t {
pid_t pid;
int status;
int cleanup_delay;
int spawn_flag;
+#ifdef WNOHANG
pthread_mutex_t wait_mutex;
- lrad_hash_table_t *waiters;
+ fr_hash_table_t *waiters;
+#endif
/*
* All threads wait on this semaphore, for requests
int max_queue_size;
int num_queued;
- int can_read_detail;
- lrad_fifo_t *fifo[NUM_FIFOS];
+ fr_fifo_t *fifo[NUM_FIFOS];
} THREAD_POOL;
static THREAD_POOL thread_pool;
static int pool_initialized = FALSE;
+static time_t last_cleaned = 0;
+static void thread_pool_manage(time_t now);
/*
* A mapping of configuration file names to internal integers
{
int i;
+#ifdef HAVE_OPENSSL_EVP_H
+ /*
+ * Enable all ciphers and digests.
+ */
+ OpenSSL_add_all_algorithms();
+#endif
+
ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
if (!ssl_mutexes) {
radlog(L_ERR, "Error allocating memory for SSL mutexes!");
}
#endif
-
+#ifdef WNOHANG
/*
* We don't want to catch SIGCHLD for a host of reasons.
*
pthread_mutex_lock(&thread_pool.wait_mutex);
do {
+ retry:
pid = waitpid(0, &status, WNOHANG);
if (pid <= 0) break;
mytf.pid = pid;
- tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
- if (!tf) continue;
+ tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
+ if (!tf) goto retry;
tf->status = status;
tf->exited = 1;
- } while (lrad_hash_table_num_elements(thread_pool.waiters) > 0);
+ } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
pthread_mutex_unlock(&thread_pool.wait_mutex);
}
+#else
+#define reap_children()
+#endif /* WNOHANG */
/*
* Add a request to the list of waiting requests.
*/
static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
{
- request_queue_t *entry;
-
+ rad_assert(request->process == fun);
pthread_mutex_lock(&thread_pool.queue_mutex);
thread_pool.request_count++;
/*
* Mark the request as done.
*/
- radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
+ radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
request->child_state = REQUEST_DONE;
return 0;
}
- entry = rad_malloc(sizeof(*entry));
- entry->request = request;
- entry->fun = fun;
-
/*
* Push the request onto the appropriate fifo for that
*/
- if (!lrad_fifo_push(thread_pool.fifo[request->priority],
- entry)) {
+ if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
request->child_state = REQUEST_DONE;
return 0;
}
- /*
- * We've added an entry that didn't come from the detail
- * file. Note that the child thread should signal the
- * main worker thread again when the queue becomes empty.
- */
- if (request->listener->type != RAD_LISTEN_DETAIL) {
- thread_pool.can_read_detail = FALSE;
- }
-
thread_pool.num_queued++;
pthread_mutex_unlock(&thread_pool.queue_mutex);
/*
* Remove a request from the queue.
*/
-static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
+static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
{
RAD_LISTEN_TYPE i, start;
- request_queue_t *entry;
+ REQUEST *request = NULL;
reap_children();
* requests will be quickly cleared.
*/
for (i = 0; i < RAD_LISTEN_MAX; i++) {
- entry = lrad_fifo_peek(thread_pool.fifo[i]);
- if (!entry ||
- (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
+ request = fr_fifo_peek(thread_pool.fifo[i]);
+ if (!request ||
+ (request->master_state != REQUEST_STOP_PROCESSING)) {
continue;
}
/*
* This entry was marked to be stopped. Acknowledge it.
*/
- entry = lrad_fifo_pop(thread_pool.fifo[i]);
- rad_assert(entry != NULL);
- entry->request->child_state = REQUEST_DONE;
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ rad_assert(request != NULL);
+ request->child_state = REQUEST_DONE;
+ thread_pool.num_queued--;
}
start = 0;
* Pop results from the top of the queue
*/
for (i = start; i < RAD_LISTEN_MAX; i++) {
- entry = lrad_fifo_pop(thread_pool.fifo[i]);
- if (entry) {
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ if (request) {
start = i;
break;
}
}
- if (!entry) {
+ if (!request) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
- *request = NULL;
+ *prequest = NULL;
*fun = NULL;
return 0;
}
rad_assert(thread_pool.num_queued > 0);
thread_pool.num_queued--;
- *request = entry->request;
- *fun = entry->fun;
- free(entry);
+ *prequest = request;
+ *fun = request->process;
- rad_assert(*request != NULL);
- rad_assert((*request)->magic == REQUEST_MAGIC);
+ rad_assert(request->magic == REQUEST_MAGIC);
rad_assert(*fun != NULL);
/*
* the queue, and therefore won't clean it up until we
* have acknowledged it as "done".
*/
- if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
- (*request)->child_state = REQUEST_DONE;
+ if (request->master_state == REQUEST_STOP_PROCESSING) {
+ request->child_state = REQUEST_DONE;
goto retry;
}
* Loop forever, until told to exit.
*/
do {
- int can_read_detail;
-
/*
* Wait to be signalled.
*/
DEBUG2("Thread %d got semaphore", self->thread_num);
+#ifdef HAVE_OPENSSL_ERR_H
+ /*
+ * Clear the error queue for the current thread.
+ */
+ ERR_clear_error ();
+#endif
+
/*
* Try to grab a request from the queue.
*
pthread_mutex_lock(&thread_pool.queue_mutex);
rad_assert(thread_pool.active_threads > 0);
thread_pool.active_threads--;
-
- /*
- * If we're not currently allowed to read the
- * detail file, AND there are no requests queued,
- * THEN signal the main worker thread that
- * there's at least one waiting thread (us) who
- * can accept a packet from the detail file.
- */
- can_read_detail = FALSE;
- if (!thread_pool.can_read_detail &&
- (thread_pool.num_queued == 0)) {
- can_read_detail = TRUE;
- }
-
pthread_mutex_unlock(&thread_pool.queue_mutex);
-
- /*
- * Do this out of the lock to be nice to everyone.
- */
- if (can_read_detail) {
- radius_signal_self(RADIUS_SIGNAL_SELF_DETAIL);
- }
-
} while (self->status != THREAD_CANCELLED);
DEBUG2("Thread %d exiting...", self->thread_num);
memset(handle, 0, sizeof(THREAD_HANDLE));
handle->prev = NULL;
handle->next = NULL;
- handle->pthread_id = NO_SUCH_CHILD_PID;
handle->thread_num = thread_pool.max_thread_num++;
handle->request_count = 0;
handle->status = THREAD_RUNNING;
rcode = pthread_create(&handle->pthread_id, &attr,
request_handler_thread, handle);
if (rcode != 0) {
- radlog(L_ERR|L_CONS, "FATAL: Thread create failed: %s",
+ radlog(L_ERR, "Thread create failed: %s",
strerror(rcode));
- exit(1);
+ return NULL;
}
pthread_attr_destroy(&attr);
}
+#ifdef WNOHANG
static uint32_t pid_hash(const void *data)
{
const thread_fork_t *tf = data;
- return lrad_hash(&tf->pid, sizeof(tf->pid));
+ return fr_hash(&tf->pid, sizeof(tf->pid));
}
static int pid_cmp(const void *one, const void *two)
return (a->pid - b->pid);
}
+#endif
/*
* Allocate the thread pool, and seed it with an initial number
*
* FIXME: What to do on a SIGHUP???
*/
-int thread_pool_init(int spawn_flag)
+int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
{
int i, rcode;
CONF_SECTION *pool_cf;
time_t now;
- DEBUG("Initializing the thread pool...");
now = time(NULL);
+ rad_assert(spawn_flag != NULL);
+ rad_assert(*spawn_flag == TRUE);
+ rad_assert(pool_initialized == FALSE); /* not called on HUP */
+
+ pool_cf = cf_subsection_find_next(cs, NULL, "thread");
+ if (!pool_cf) *spawn_flag = FALSE;
+
/*
- * We're not spawning new threads, don't do
- * anything.
+ * Initialize the thread pool to some reasonable values.
*/
- if (!spawn_flag) return 0;
-
+ memset(&thread_pool, 0, sizeof(THREAD_POOL));
+ thread_pool.head = NULL;
+ thread_pool.tail = NULL;
+ thread_pool.total_threads = 0;
+ thread_pool.max_thread_num = 1;
+ thread_pool.cleanup_delay = 5;
+ thread_pool.spawn_flag = *spawn_flag;
+
/*
- * After a SIGHUP, we don't over-write the previous values.
+ * Don't bother initializing the mutexes or
+ * creating the hash tables. They won't be used.
*/
- if (!pool_initialized) {
- /*
- * Initialize the thread pool to some reasonable values.
- */
- memset(&thread_pool, 0, sizeof(THREAD_POOL));
- thread_pool.head = NULL;
- thread_pool.tail = NULL;
- thread_pool.total_threads = 0;
- thread_pool.max_thread_num = 1;
- thread_pool.cleanup_delay = 5;
- thread_pool.spawn_flag = spawn_flag;
-
- if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
- radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
- strerror(errno));
- exit(1);
- }
-
- /*
- * Create the hash table of child PID's
- */
- thread_pool.waiters = lrad_hash_table_create(pid_hash,
- pid_cmp,
- free);
- if (!thread_pool.waiters) {
- radlog(L_ERR, "FATAL: Failed to set up wait hash");
- exit(1);
- }
+ if (!*spawn_flag) return 0;
+
+#ifdef WNOHANG
+ if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
+ radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
+ strerror(errno));
+ return -1;
}
-
- pool_cf = cf_section_find("thread");
- if (!pool_cf) {
- radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
- exit(1);
+
+ /*
+ * Create the hash table of child PID's
+ */
+ thread_pool.waiters = fr_hash_table_create(pid_hash,
+ pid_cmp,
+ free);
+ if (!thread_pool.waiters) {
+ radlog(L_ERR, "FATAL: Failed to set up wait hash");
+ return -1;
}
+#endif
if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
- exit(1);
+ return -1;
}
/*
memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
if (rcode != 0) {
- radlog(L_ERR|L_CONS, "FATAL: Failed to initialize semaphore: %s",
+ radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
strerror(errno));
- exit(1);
+ return -1;
}
rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
if (rcode != 0) {
radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
strerror(errno));
- exit(1);
+ return -1;
}
/*
* Allocate multiple fifos.
*/
for (i = 0; i < RAD_LISTEN_MAX; i++) {
- thread_pool.fifo[i] = lrad_fifo_create(65536, NULL);
+ thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
if (!thread_pool.fifo[i]) {
radlog(L_ERR, "FATAL: Failed to set up request fifo");
- exit(1);
+ return -1;
}
}
*/
if (!setup_ssl_mutexes()) {
radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
- exit(1);
+ return -1;
}
#endif
*/
int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
{
+ time_t now = request->timestamp;
+
+ request->process = fun;
+
/*
* We've been told not to spawn threads, so don't.
*/
if (!thread_pool.spawn_flag) {
radius_handle_request(request, fun);
+#ifdef WNOHANG
/*
* Requests that care about child process exit
* codes have already either called
* rad_waitpid(), or they've given up.
*/
wait(NULL);
+#endif
return 1;
}
if (!request_enqueue(request, fun)) return 0;
/*
- * If the thread pool is busy handling requests, then
- * try to spawn another one. We don't acquire the mutex
- * before reading active_threads, so our thread count is
- * just an estimate. It's fine to go ahead and spawn an
- * extra thread in that case.
- * NOTE: the log message may be in error since active_threads
- * is an estimate, but it's only in error about the thread
- * count, not about the fact that we can't create a new one.
+ * If we haven't checked the number of child threads
+ * in a while, OR if the thread pool appears to be full,
+ * go manage it.
*/
- if (thread_pool.active_threads == thread_pool.total_threads) {
- if (spawn_thread(request->timestamp) == NULL) {
- radlog(L_INFO,
- "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
- thread_pool.max_threads);
- return 1;
- }
+ if ((last_cleaned < now) ||
+ (thread_pool.active_threads == thread_pool.total_threads)) {
+ thread_pool_manage(now);
}
return 1;
* If there are too many or too few threads waiting, then we
* either create some more, or delete some.
*/
-int thread_pool_clean(time_t now)
+static void thread_pool_manage(time_t now)
{
int spare;
int i, total;
THREAD_HANDLE *handle, *next;
int active_threads;
- static time_t last_cleaned = 0;
-
- /*
- * Loop over the thread pool deleting exited threads.
- */
- for (handle = thread_pool.head; handle; handle = next) {
- next = handle->next;
-
- /*
- * Maybe we've asked the thread to exit, and it
- * has agreed.
- */
- if (handle->status == THREAD_EXITED) {
- delete_thread(handle);
- }
- }
/*
* We don't need a mutex lock here, as we're reading
}
/*
- * If there are too few spare threads, create some more.
+ * If there are too few spare threads. Go create some more.
*/
if (spare < thread_pool.min_spare_threads) {
total = thread_pool.min_spare_threads - spare;
DEBUG2("Threads: Spawning %d spares", total);
+
/*
* Create a number of spare threads.
*/
for (i = 0; i < total; i++) {
handle = spawn_thread(now);
if (handle == NULL) {
- return -1;
+ return;
}
}
- /*
- * And exit, as there can't be too many spare threads.
- */
- return 0;
+ return; /* there aren't too many spare threads */
}
/*
* so this second.
*/
if (now == last_cleaned) {
- return 0;
+ return;
}
last_cleaned = now;
/*
+ * Loop over the thread pool, deleting exited threads.
+ */
+ for (handle = thread_pool.head; handle; handle = next) {
+ next = handle->next;
+
+ /*
+ * Maybe we've asked the thread to exit, and it
+ * has agreed.
+ */
+ if (handle->status == THREAD_EXITED) {
+ delete_thread(handle);
+ }
+ }
+
+ /*
* Only delete the spare threads if sufficient time has
* passed since we last created one. This helps to minimize
* the amount of create/delete cycles.
*/
if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
- return 0;
+ return;
}
/*
* Otherwise everything's kosher. There are not too few,
* or too many spare threads. Exit happily.
*/
- return 0;
+ return;
}
+#ifdef WNOHANG
/*
* Thread wrapper for fork().
*/
reap_children(); /* be nice to non-wait thingies */
- if (lrad_hash_table_num_elements(thread_pool.waiters) >= 1024) {
+ if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
return -1;
}
tf->pid = child_pid;
pthread_mutex_lock(&thread_pool.wait_mutex);
- rcode = lrad_hash_table_insert(thread_pool.waiters, tf);
+ rcode = fr_hash_table_insert(thread_pool.waiters, tf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
if (!rcode) {
radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
(int) child_pid);
+ free(tf);
}
}
mytf.pid = pid;
pthread_mutex_lock(&thread_pool.wait_mutex);
- tf = lrad_hash_table_finddata(thread_pool.waiters, &mytf);
+ tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
if (!tf) return -1;
*status = tf->status;
pthread_mutex_lock(&thread_pool.wait_mutex);
- lrad_hash_table_delete(thread_pool.waiters, &mytf);
+ fr_hash_table_delete(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
return pid;
}
* 10 seconds have passed, give up on the child.
*/
pthread_mutex_lock(&thread_pool.wait_mutex);
- lrad_hash_table_delete(thread_pool.waiters, &mytf);
+ fr_hash_table_delete(thread_pool.waiters, &mytf);
pthread_mutex_unlock(&thread_pool.wait_mutex);
return 0;
}
-
-#else /* HAVE_PTHREAD_H */
+#else
/*
- * "thread" code when we don't have threads.
+ * No rad_fork or rad_waitpid
*/
-int thread_pool_init(int spawn_flag)
+#endif
+
+void thread_pool_lock(void)
{
- return 0;
+ pthread_mutex_lock(&thread_pool.queue_mutex);
}
-/*
- * call "radrespond".
- */
-int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
+void thread_pool_unlock(void)
{
- radius_handle_request(request, fun);
- return 1;
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
}
+void thread_pool_queue_stats(int *array)
+{
+ int i;
+
+ if (pool_initialized) {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
+ }
+ } else {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = 0;
+ }
+ }
+}
#endif /* HAVE_PTHREAD_H */