} request_queue_t;
+#define MAX_WAITERS (256)
/*
* A data structure to manage the thread pool. There's no real
* need for a data structure, but it makes things conceptually
int cleanup_delay;
int spawn_flag;
+ int wait_head;
+ int wait_tail;
+ pthread_mutex_t wait_mutex;
+ pid_t wait[MAX_WAITERS];
+
/*
* All threads wait on this semaphore, for requests
* to enter the queue.
static THREAD_POOL thread_pool;
static int pool_initialized = FALSE;
-static void rad_exec_init(void);
-
-
-/*
- * Data structure to keep track of which child forked which
- * request. If we cared, we'd keep a list of "free" and "active"
- * entries.
- *
- * FIXME: Have a time out, so we clean up entries which haven't
- * been picked up!
- */
-typedef struct rad_fork_t {
- pthread_t thread_id;
- pid_t child_pid;
- sem_t child_done;
- int status; /* exit status of the child */
- time_t time_forked;
-} rad_fork_t;
-
-/*
- * This MUST be a power of 2 for it to work properly!
- */
-#define NUM_FORKERS (8192)
-static rad_fork_t forkers[NUM_FORKERS];
-
-/*
- * This mutex ensures that only one thread is doing certain
- * kinds of magic to the previous array.
- */
-static pthread_mutex_t fork_mutex;
/*
/*
+ * We don't want to catch SIGCHLD for a host of reasons.
+ *
+ * - exec_wait means that someone, somewhere, somewhen, will
+ * call waitpid(), and catch the child.
+ *
+ * - SIGCHLD is delivered to a random thread, not the one that
+ * forked.
+ *
+ * - if another thread catches the child, we have to coordinate
+ * with the thread doing the waiting.
+ *
+ * - if we don't waitpid() for non-wait children, they'll be zombies,
+ * and will hang around forever.
+ *
+ */
+static void reap_children(void)
+{
+ if (thread_pool.wait_head != thread_pool.wait_tail) {
+ int num;
+
+ pthread_mutex_lock(&thread_pool.wait_mutex);
+ for (num = ((thread_pool.wait_tail + MAX_WAITERS) - thread_pool.wait_head) % MAX_WAITERS;
+ num != 0;
+ num--) {
+ pid_t pid = thread_pool.wait[thread_pool.wait_head];
+
+ thread_pool.wait_head++;
+ thread_pool.wait_head %= MAX_WAITERS;
+
+ /*
+ * Child is still alive: move it to the tail.
+ */
+ if (waitpid(pid, NULL, WNOHANG) == 0) {
+ if (((thread_pool.wait_tail + 1) % MAX_WAITERS)
+ == thread_pool.wait_head) {
+ rad_assert(0 == 1);
+ }
+
+ thread_pool.wait[thread_pool.wait_tail] = pid;
+ thread_pool.wait_tail++;
+ thread_pool.wait_tail %= MAX_WAITERS;
+ } /* else no child, or was already reaped */
+ }
+ pthread_mutex_unlock(&thread_pool.wait_mutex);
+ }
+}
+
+/*
* Add a request to the list of waiting requests.
* This function gets called ONLY from the main handler thread...
*
{
int num_entries;
+ reap_children();
+
pthread_mutex_lock(&thread_pool.mutex);
thread_pool.request_count++;
thread_pool.cleanup_delay = 5;
thread_pool.spawn_flag = spawn_flag;
- /*
- * Initialize the forking stuff ONLY if we're
- * using threads.
- */
- if (spawn_flag) rad_exec_init();
+ thread_pool.wait_head = thread_pool.wait_tail = 0;
+ if ((pthread_mutex_init(&thread_pool.mutex,NULL) != 0)) {
+ radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
+ strerror(errno));
+ exit(1);
+ }
}
/*
}
/*
- * Limit the maximum number of threads to the maximum
- * number of forks we can do.
- *
- * FIXME: Make this code better...
- */
- if (thread_pool.max_threads >= NUM_FORKERS) {
- thread_pool.max_threads = NUM_FORKERS;
- }
-
-
- /*
* The pool has already been initialized. Don't spawn
* new threads, and don't forget about forked children,
*/
}
-static int exec_initialized = FALSE;
-
-/*
- * We use the PID number as a base for the array index, so that
- * we can quickly turn the PID into a free array entry, instead
- * of rooting blindly through the entire array.
- */
-#define PID_2_ARRAY(pid) (((int) pid ) & (NUM_FORKERS - 1))
-
-
-/*
- * Called by the main signal handler, to save the status of the child
- */
-static int rad_savepid(pid_t pid, int status)
-{
- int i;
-
- if (!exec_initialized) return 0;
-
- /*
- * Find the PID to wait for, starting at an index within
- * the array. This makes the lookups O(1) on average,
- * instead of O(n), when the array is filling up.
- */
- i = PID_2_ARRAY(pid);
-
- /*
- * Do NOT lock the array, as nothing else sets the
- * status and posts the semaphore.
- */
- do {
- /*
- * Any thread can get the sigchild...
- */
- if ((forkers[i].thread_id != NO_SUCH_CHILD_PID) &&
- (forkers[i].child_pid == pid)) {
- /*
- * Save the status, THEN post the
- * semaphore.
- */
- forkers[i].status = status;
- sem_post(&forkers[i].child_done);
-
- /*
- * FIXME: If the child is more than 60
- * seconds out of date, then delete it.
- *
- * That is, we've forked, and the forker
- * is waiting nearly forever
- */
- return 0;
- }
-
- i++;
- i &= (NUM_FORKERS - 1);
- } while (i != PID_2_ARRAY(pid));
-
- return -1;
-}
-
-
-/*
- * Handle child signals.
- */
-static void sig_child(int sig)
-{
- int status;
- pid_t pid;
-
- sig = sig; /* -Wunused */
-
- /*
- * Reset the signal handler, if required.
- */
- reset_signal(SIGCHLD, sig_child);
-
- /*
- * Wait for the child, without hanging.
- */
- for (;;) {
- pid = waitpid((pid_t)-1, &status, WNOHANG);
- if (pid <= 0)
- return;
-
- /*
- * If we have pthreads, then the only children
- * are from Exec-Program. We don't care about them,
- * so once we've grabbed their PID's, we're done.
- */
- rad_savepid(pid, status);
- }
-}
-
-
-/*
- * Initialize the stuff for keeping track of child processes.
- */
-static void rad_exec_init(void)
-{
- int i;
-#ifdef HAVE_SIGACTION
- struct sigaction act;
-#endif
-
- /*
- * Initialize the mutex used to remember calls to fork.
- */
- pthread_mutex_init(&fork_mutex, NULL);
-
- /*
- * Initialize the data structure where we remember the
- * mappings of thread ID && child PID to exit status.
- */
- for (i = 0; i < NUM_FORKERS; i++) {
- forkers[i].thread_id = NO_SUCH_CHILD_PID;
- forkers[i].child_pid = -1;
- forkers[i].status = 0;
- }
-
- /*
- * If we have pthreads, then the child threads block
- * SIGCHLD, and the main server thread catches it.
- *
- * That way, the SIGCHLD handler can grab the exit status,
- * and save it for the child thread.
- *
- * If we don't have pthreads, then each child process
- * will do a waitpid(), and we ignore SIGCHLD.
- *
- * Once we have multiple child processes to handle
- * requests, and shared memory, then we've got to
- * re-enable SIGCHLD catching.
- */
-#ifdef HAVE_SIGACTION
- act.sa_handler = sig_child;
- sigaction(SIGCHLD, &act, NULL);
-#else
- signal(SIGCHLD, sig_child);
-#endif
-
- exec_initialized = TRUE;
-}
-
/*
* Thread wrapper for fork().
*/
pid_t rad_fork(int exec_wait)
{
- sigset_t set;
pid_t child_pid;
- /*
- * The thread is NOT interested in waiting for the exit
- * status of the child process, so we don't bother
- * updating our kludgy array.
- *
- * Or, there no NO threads, so we can just do the fork
- * thing.
- */
- if (!exec_wait || !exec_initialized) {
- return fork();
- }
+ if (exec_wait) return fork();
/*
- * Block SIGCLHD until such time as we've saved the PID.
- *
- * Note that we block SIGCHLD for ALL threads associated
- * with this process! This is to prevent race conditions!
+ * Lock the mutex.
*/
- sigemptyset(&set);
- sigaddset(&set, SIGCHLD);
- sigprocmask(SIG_BLOCK, &set, NULL);
+ pthread_mutex_lock(&thread_pool.wait_mutex);
/*
- * Do the fork.
+ * No room to save the PID: die.
*/
- child_pid = fork();
+ if (((thread_pool.wait_tail + 1) % MAX_WAITERS)
+ == thread_pool.wait_head) {
+ rad_assert(0 == 1);
+ }
/*
- * We managed to fork. Let's see if we have a free
- * array entry.
+ * Fork & save the PID for later reaping.
*/
- if (child_pid > 0) { /* parent */
- int i;
- int found;
- time_t now = time(NULL);
-
- /*
- * We store the information in the array
- * indexed by PID. This means that we have
- * on average an O(1) lookup to find the element,
- * instead of rooting through the entire array.
- */
- i = PID_2_ARRAY(child_pid);
- found = -1;
-
- /*
- * We may have multiple threads trying to find an
- * empty position, so we lock the array until
- * we've found an entry.
- */
- pthread_mutex_lock(&fork_mutex);
- do {
- if (forkers[i].thread_id == NO_SUCH_CHILD_PID) {
- found = i;
- break;
- }
-
- /*
- * Clean up any stale forked sessions.
- *
- * This sometimes happens, for crazy reasons.
- */
- if ((now - forkers[i].time_forked) > 30) {
- forkers[i].thread_id = NO_SUCH_CHILD_PID;
-
- /*
- * Grab the child's exit condition,
- * just in case...
- */
- waitpid(forkers[i].child_pid,
- &forkers[i].status,
- WNOHANG);
- sem_destroy(&forkers[i].child_done);
- found = i;
- break;
- }
-
- /*
- * Increment it, within the array.
- */
- i++;
- i &= (NUM_FORKERS - 1);
- } while (i != PID_2_ARRAY(child_pid));
-
- /*
- * Arg. We did a fork, and there was nowhere to
- * put the answer.
- */
- if (found < 0) {
- sigprocmask(SIG_UNBLOCK, &set, NULL);
- pthread_mutex_unlock(&fork_mutex);
- return (pid_t) -1;
- }
+ child_pid = fork();
+ if (child_pid != 0) {
+ thread_pool.wait[thread_pool.wait_tail] = child_pid;
+ thread_pool.wait_tail++;
+ thread_pool.wait_tail %= MAX_WAITERS;
/*
- * In the parent, set the status, and create the
- * semaphore.
+ * Unlock the mutex.
*/
- forkers[found].status = -1;
- forkers[found].child_pid = child_pid;
- forkers[found].thread_id = pthread_self();
- forkers[found].time_forked = now;
- sem_init(&forkers[found].child_done, 0, SEMAPHORE_LOCKED);
- pthread_mutex_unlock(&fork_mutex);
+ pthread_mutex_unlock(&thread_pool.wait_mutex);
}
/*
- * Unblock SIGCHLD, now that there's no chance of bad entries
- * in the array.
- */
- sigprocmask(SIG_UNBLOCK, &set, NULL);
-
- /*
* Return whatever we were told.
*/
return child_pid;
}
/*
- * Thread wrapper for waitpid(), so threads can wait for
- * the PID they forked.
+ * We may not need this any more...
*/
pid_t rad_waitpid(pid_t pid, int *status, int options)
{
- int i, rcode;
- int found;
- pthread_t self;
-
- if (!exec_initialized) return waitpid(pid, status, options);
-
- /*
- * We're only allowed to wait for a SPECIFIC pid.
- */
- if (pid <= 0) {
- return -1;
- }
-
- self = pthread_self();
-
- /*
- * Find the PID to wait for, starting at an index within
- * the array. This makes the lookups O(1) on average,
- * instead of O(n), when the array is filling up.
- */
- found = -1;
- i = PID_2_ARRAY(pid);
- do {
- /*
- * We were the ones who forked this specific
- * child.
- */
- if ((forkers[i].thread_id == self) &&
- (forkers[i].child_pid == pid)) {
- found = i;
- break;
- }
-
- i++;
- i &= (NUM_FORKERS - 1);
- } while (i != PID_2_ARRAY(pid));
-
- /*
- * No thread ID found: we're trying to wait for a child
- * we've never forked!
- */
- if (found < 0) {
- return -1;
- }
-
- /*
- * Wait for the signal that the child's status has been
- * returned.
- */
- if (options == WNOHANG) {
- rcode = sem_trywait(&forkers[found].child_done);
- if (rcode != 0) {
- return 0; /* no child available */
- }
- } else { /* wait forever */
- re_wait:
- rcode = sem_wait(&forkers[found].child_done);
- if ((rcode != 0) && (errno == EINTR)) {
- goto re_wait;
- }
- }
-
- /*
- * We've got the semaphore. Now destroy it.
- *
- * FIXME: Maybe we want to set up the semaphores in advance,
- * to prevent the creation && deletion of lots of them,
- * if creating and deleting them is expensive.
- */
- sem_destroy(&forkers[found].child_done);
-
- /*
- * Save the status BEFORE we re-set the thread ID.
- */
- *status = forkers[found].status;
-
- /*
- * This next line taints the other array entries,
- * due to other threads re-using the data structure.
- */
- forkers[found].thread_id = NO_SUCH_CHILD_PID;
-
- return pid;
+ reap_children(); /* be nice to non-wait thingies */
+ return waitpid(pid, status, options);
+
}
#else /* HAVE_PTHREAD_H */