REQUEST *request;
} THREAD_HANDLE;
-/*
- * For the request queue.
- */
-typedef struct request_queue_t {
- REQUEST *request;
- RAD_REQUEST_FUNP fun;
-} request_queue_t;
-
typedef struct thread_fork_t {
pid_t pid;
int status;
int cleanup_delay;
int spawn_flag;
+#ifdef WNOHANG
pthread_mutex_t wait_mutex;
fr_hash_table_t *waiters;
+#endif
/*
* All threads wait on this semaphore, for requests
}
#endif
-
+#ifdef WNOHANG
/*
* We don't want to catch SIGCHLD for a host of reasons.
*
pthread_mutex_lock(&thread_pool.wait_mutex);
do {
+ retry:
pid = waitpid(0, &status, WNOHANG);
if (pid <= 0) break;
mytf.pid = pid;
tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
- if (!tf) continue;
+ if (!tf) goto retry;
tf->status = status;
tf->exited = 1;
pthread_mutex_unlock(&thread_pool.wait_mutex);
}
+#else
+#define reap_children()
+#endif /* WNOHANG */
/*
* Add a request to the list of waiting requests.
*/
static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
{
- request_queue_t *entry;
-
+ rad_assert(request->process == fun);
pthread_mutex_lock(&thread_pool.queue_mutex);
thread_pool.request_count++;
return 0;
}
- entry = rad_malloc(sizeof(*entry));
- entry->request = request;
- entry->fun = fun;
-
/*
* Push the request onto the appropriate fifo for that
*/
- if (!fr_fifo_push(thread_pool.fifo[request->priority],
- entry)) {
+ if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
request->child_state = REQUEST_DONE;
/*
* Remove a request from the queue.
*/
-static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
+static int request_dequeue(REQUEST **prequest, RAD_REQUEST_FUNP *fun)
{
RAD_LISTEN_TYPE i, start;
- request_queue_t *entry;
+ REQUEST *request = NULL;
reap_children();
* requests will be quickly cleared.
*/
for (i = 0; i < RAD_LISTEN_MAX; i++) {
- entry = fr_fifo_peek(thread_pool.fifo[i]);
- if (!entry ||
- (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
+ request = fr_fifo_peek(thread_pool.fifo[i]);
+ if (!request ||
+ (request->master_state != REQUEST_STOP_PROCESSING)) {
continue;
}
/*
* This entry was marked to be stopped. Acknowledge it.
*/
- entry = fr_fifo_pop(thread_pool.fifo[i]);
- rad_assert(entry != NULL);
- entry->request->child_state = REQUEST_DONE;
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ rad_assert(request != NULL);
+ request->child_state = REQUEST_DONE;
+ thread_pool.num_queued--;
}
start = 0;
* Pop results from the top of the queue
*/
for (i = start; i < RAD_LISTEN_MAX; i++) {
- entry = fr_fifo_pop(thread_pool.fifo[i]);
- if (entry) {
+ request = fr_fifo_pop(thread_pool.fifo[i]);
+ if (request) {
start = i;
break;
}
}
- if (!entry) {
+ if (!request) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
- *request = NULL;
+ *prequest = NULL;
*fun = NULL;
return 0;
}
rad_assert(thread_pool.num_queued > 0);
thread_pool.num_queued--;
- *request = entry->request;
- *fun = entry->fun;
- free(entry);
+ *prequest = request;
+ *fun = request->process;
- rad_assert(*request != NULL);
- rad_assert((*request)->magic == REQUEST_MAGIC);
+ rad_assert(request->magic == REQUEST_MAGIC);
rad_assert(*fun != NULL);
/*
* the queue, and therefore won't clean it up until we
* have acknowledged it as "done".
*/
- if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
- (*request)->child_state = REQUEST_DONE;
+ if (request->master_state == REQUEST_STOP_PROCESSING) {
+ request->child_state = REQUEST_DONE;
goto retry;
}
DEBUG2("Thread %d got semaphore", self->thread_num);
+#ifdef HAVE_OPENSSL_ERR_H
+ /*
+ * Clear the error queue for the current thread.
+ */
+ ERR_clear_error ();
+#endif
+
/*
* Try to grab a request from the queue.
*
memset(handle, 0, sizeof(THREAD_HANDLE));
handle->prev = NULL;
handle->next = NULL;
- handle->pthread_id = NO_SUCH_CHILD_PID;
handle->thread_num = thread_pool.max_thread_num++;
handle->request_count = 0;
handle->status = THREAD_RUNNING;
}
+#ifdef WNOHANG
static uint32_t pid_hash(const void *data)
{
const thread_fork_t *tf = data;
return (a->pid - b->pid);
}
+#endif
/*
* Allocate the thread pool, and seed it with an initial number
*
* FIXME: What to do on a SIGHUP???
*/
-int thread_pool_init(CONF_SECTION *cs, int spawn_flag)
+int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
{
int i, rcode;
CONF_SECTION *pool_cf;
now = time(NULL);
+ rad_assert(spawn_flag != NULL);
+ rad_assert(*spawn_flag == TRUE);
+ rad_assert(pool_initialized == FALSE); /* not called on HUP */
+
+ pool_cf = cf_subsection_find_next(cs, NULL, "thread");
+ if (!pool_cf) *spawn_flag = FALSE;
+
/*
- * We're not spawning new threads, don't do
- * anything.
+ * Initialize the thread pool to some reasonable values.
*/
- if (!spawn_flag) return 0;
-
+ memset(&thread_pool, 0, sizeof(THREAD_POOL));
+ thread_pool.head = NULL;
+ thread_pool.tail = NULL;
+ thread_pool.total_threads = 0;
+ thread_pool.max_thread_num = 1;
+ thread_pool.cleanup_delay = 5;
+ thread_pool.spawn_flag = *spawn_flag;
+
/*
- * After a SIGHUP, we don't over-write the previous values.
+ * Don't bother initializing the mutexes or
+ * creating the hash tables. They won't be used.
*/
- if (!pool_initialized) {
- /*
- * Initialize the thread pool to some reasonable values.
- */
- memset(&thread_pool, 0, sizeof(THREAD_POOL));
- thread_pool.head = NULL;
- thread_pool.tail = NULL;
- thread_pool.total_threads = 0;
- thread_pool.max_thread_num = 1;
- thread_pool.cleanup_delay = 5;
- thread_pool.spawn_flag = spawn_flag;
-
- if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
- radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
- strerror(errno));
- return -1;
- }
-
- /*
- * Create the hash table of child PID's
- */
- thread_pool.waiters = fr_hash_table_create(pid_hash,
- pid_cmp,
- free);
- if (!thread_pool.waiters) {
- radlog(L_ERR, "FATAL: Failed to set up wait hash");
- return -1;
- }
+ if (!*spawn_flag) return 0;
+
+#ifdef WNOHANG
+ if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
+ radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
+ strerror(errno));
+ return -1;
}
-
- pool_cf = cf_subsection_find_next(cs, NULL, "thread");
- if (!pool_cf) {
- radlog(L_ERR, "FATAL: Attempting to start in multi-threaded mode with no thread configuration in radiusd.conf");
+
+ /*
+ * Create the hash table of child PID's
+ */
+ thread_pool.waiters = fr_hash_table_create(pid_hash,
+ pid_cmp,
+ free);
+ if (!thread_pool.waiters) {
+ radlog(L_ERR, "FATAL: Failed to set up wait hash");
return -1;
}
+#endif
if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
return -1;
{
time_t now = request->timestamp;
+ request->process = fun;
+
/*
* We've been told not to spawn threads, so don't.
*/
if (!thread_pool.spawn_flag) {
radius_handle_request(request, fun);
+#ifdef WNOHANG
/*
* Requests that care about child process exit
* codes have already either called
* rad_waitpid(), or they've given up.
*/
wait(NULL);
+#endif
return 1;
}
}
+#ifdef WNOHANG
/*
* Thread wrapper for fork().
*/
if (!rcode) {
radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
(int) child_pid);
+ free(tf);
}
}
return 0;
}
+#else
+/*
+ * No rad_fork or rad_waitpid
+ */
+#endif
void thread_pool_lock(void)
{
{
pthread_mutex_unlock(&thread_pool.queue_mutex);
}
-#else
-int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
+
+void thread_pool_queue_stats(int *array)
{
- radius_handle_request(request, fun);
- return 1;
+ int i;
+
+ if (pool_initialized) {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
+ }
+ } else {
+ for (i = 0; i < RAD_LISTEN_MAX; i++) {
+ array[i] = 0;
+ }
+ }
}
#endif /* HAVE_PTHREAD_H */