2 * threads.c request threading support
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 * Copyright 2000 The FreeRADIUS server project
21 * Copyright 2000 Alan DeKok <aland@ox.org>
25 #include "libradius.h"
31 #include <semaphore.h>
35 #include "rad_assert.h"
38 static const char rcsid[] =
41 #define SEMAPHORE_LOCKED (0)
42 #define SEMAPHORE_UNLOCKED (1)
44 #define THREAD_RUNNING (1)
45 #define THREAD_CANCELLED (2)
48 * Prototype to shut the compiler up.
50 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun);
53 * A data structure which contains the information about
56 * pthread_id pthread id
57 * thread_num server thread number, 1...number of threads
58 * semaphore used to block the thread until a request comes in
59 * status is the thread running or exited?
60 * request_count the number of requests that this thread has handled
61 * timestamp when the thread started executing.
62 * request the current request that the thread is processing.
63 * fun the function which is handling the request.
65 typedef struct THREAD_HANDLE {
66 struct THREAD_HANDLE *prev;
67 struct THREAD_HANDLE *next;
79 * A data structure to manage the thread pool. There's no real
80 * need for a data structure, but it makes things conceptually
83 typedef struct THREAD_POOL {
92 int min_spare_threads;
93 int max_spare_threads;
94 int max_requests_per_thread;
95 time_t time_last_spawned;
99 static THREAD_POOL thread_pool;
102 * A mapping of configuration file names to internal integers
104 static const CONF_PARSER thread_config[] = {
105 { "start_servers", PW_TYPE_INTEGER, 0, &thread_pool.start_threads, "5" },
106 { "max_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_threads, "32" },
107 { "min_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads, "3" },
108 { "max_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads, "10" },
109 { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
110 { "cleanup_delay", PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay, "5" },
111 { NULL, -1, 0, NULL, NULL }
115 * The main thread handler for requests.
117 * Wait on the semaphore until we have it, and process the request.
119 static void *request_handler_thread(void *arg)
121 THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
122 #if HAVE_PTHREAD_SIGMASK
126 * Block SIGHUP handling for the child threads.
128 * This ensures that only the main server thread will
129 * process HUP signals.
131 * If we don't have sigprocmask, then it shouldn't be
132 * a problem, either, as the sig_hup handler should check
133 * for this condition.
136 sigaddset(&set, SIGHUP);
137 sigaddset(&set, SIGINT);
138 sigaddset(&set, SIGQUIT);
139 pthread_sigmask(SIG_BLOCK, &set, NULL);
143 * Loop forever, until told to exit.
147 * Wait for the semaphore to be given to us.
149 DEBUG2("Thread %d waiting to be assigned a request",
151 if (sem_wait(&self->semaphore) < 0) {
156 * If we've been told to kill ourselves,
157 * then exit politely.
159 if (self->status == THREAD_CANCELLED) {
160 DEBUG2("Thread %d exiting on request from parent.",
165 DEBUG2("Thread %d handling request %d, (%d handled so far)",
166 self->thread_num, self->request->number,
167 self->request_count);
169 rad_respond(self->request, self->fun);
170 self->request = NULL;
173 * The semaphore's value is zero, because we've
174 * locked it. We now go back to the top of the loop,
175 * where we wait for it's value to become non-zero.
180 * This thread is exiting. Delete any additional resources
181 * associated with it (semaphore, etc), and free the thread
184 sem_destroy(&self->semaphore);
190 * Take a THREAD_HANDLE, and delete it from the thread pool.
192 * This function is called ONLY from the main server thread.
194 static void delete_thread(THREAD_HANDLE *handle)
199 rad_assert(handle->request == NULL);
203 rad_assert(thread_pool.total_threads > 0);
204 thread_pool.total_threads--;
207 rad_assert(thread_pool.head == handle);
208 thread_pool.head = next;
214 rad_assert(thread_pool.tail == handle);
215 thread_pool.tail = prev;
222 * Take a THREAD_HANDLE, and move it to the end of the thread pool.
224 * This function is called ONLY from the main server thread.
225 * It's function is to keep the incoming requests rotating among
226 * the threads in the pool.
228 static void move2tail(THREAD_HANDLE *handle)
234 * Empty list: add it to the head.
236 if (thread_pool.head == NULL) {
237 rad_assert(thread_pool.tail == NULL);
238 rad_assert(thread_pool.total_threads == 1);
242 thread_pool.head = handle;
243 thread_pool.tail = handle;
247 rad_assert(thread_pool.total_threads >= 1);
252 * If the element is in the list, then delete it from where
255 if ((next != NULL) ||
258 * If it's already at the tail, exit immediately,
259 * there's no more work to do.
262 rad_assert(thread_pool.tail == handle);
267 * Maybe it's at the head of the list?
270 rad_assert(thread_pool.head == handle);
271 thread_pool.head = next;
275 * Nope, it's really in the middle.
279 rad_assert(prev != NULL); /* be explicit about it. */
280 rad_assert(next != NULL); /* be explicit about it. */
288 * Finally, add it to the tail, and update the pointers.
291 prev = thread_pool.tail;
292 rad_assert(prev->next == NULL);
294 thread_pool.tail = handle;
300 * Spawn a new thread, and place it in the thread pool.
302 * The thread is started initially in the blocked state, waiting
305 static THREAD_HANDLE *spawn_thread(time_t now)
308 THREAD_HANDLE *handle;
312 * Ensure that we don't spawn too many threads.
314 if (thread_pool.total_threads >= thread_pool.max_threads) {
315 DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads);
320 * Allocate a new thread handle.
322 handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
323 memset(handle, 0, sizeof(THREAD_HANDLE));
326 handle->pthread_id = NO_SUCH_CHILD_PID;
327 handle->thread_num = thread_pool.max_thread_num++;
328 handle->request_count = 0;
329 handle->status = THREAD_RUNNING;
330 handle->timestamp = time(NULL);
333 * Initialize the semaphore to be for this process only,
334 * and to have the thread block until the server gives it
337 rcode = sem_init(&handle->semaphore, 0, SEMAPHORE_LOCKED);
339 radlog(L_ERR|L_CONS, "Failed to allocate semaphore: %s",
345 * The thread isn't currently handling a request.
347 handle->request = NULL;
351 * Initialize the thread's attributes to detached.
353 * We could call pthread_detach() later, but if the thread
354 * exits between the create & detach calls, it will need to
355 * be joined, which will never happen.
357 pthread_attr_init(&attr);
358 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
361 * Create the thread detached, so that it cleans up it's
362 * own memory when it exits.
364 rcode = pthread_create(&handle->pthread_id, &attr,
365 request_handler_thread, handle);
367 radlog(L_ERR|L_CONS, "Thread create failed: %s", strerror(errno));
370 pthread_attr_destroy(&attr);
373 * One more thread to go into the list.
375 thread_pool.total_threads++;
376 DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
377 handle->thread_num, thread_pool.total_threads);
380 * Move the thread handle to the tail of the thread pool list.
385 * Update the time we last spawned a thread.
387 thread_pool.time_last_spawned = now;
390 * And return the new handle to the caller.
396 * Allocate the thread pool, and seed it with an initial number
399 int thread_pool_init(void)
402 THREAD_HANDLE *handle;
403 CONF_SECTION *pool_cf;
406 DEBUG("Initializing the thread pool...");
410 * Initialize the thread pool to some reasonable values.
412 memset(&thread_pool, 0, sizeof(THREAD_POOL));
413 thread_pool.head = NULL;
414 thread_pool.tail = NULL;
415 thread_pool.total_threads = 0;
416 thread_pool.max_thread_num = 1;
417 thread_pool.cleanup_delay = 5;
419 pool_cf = cf_section_find("thread");
420 if (pool_cf != NULL) {
421 cf_section_parse(pool_cf, NULL, thread_config);
425 * Create a number of waiting threads.
427 * If we fail while creating them, do something intelligent.
429 for (i = 0; i < thread_pool.start_threads; i++) {
430 handle = spawn_thread(now);
431 if (handle == NULL) {
440 * Assign a new request to a free thread.
442 * If there isn't a free thread, then try to create a new one,
443 * up to the configured limits.
445 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun)
448 THREAD_HANDLE *handle;
449 THREAD_HANDLE *found;
453 * Loop over the active thread pool, looking for a
458 for (handle = thread_pool.head; handle; handle = next) {
462 * Ignore threads which aren't running.
464 if (handle->status != THREAD_RUNNING) {
469 * If we haven't found a free thread yet, then
470 * check it's semaphore lock. We don't lock it,
471 * so if it's locked, then the thread MUST be the
472 * one locking it, waiting for us to unlock it.
474 if (handle->request == NULL) {
481 } /* loop over all of the threads */
484 * If we haven't found an active thread, then spawn a new one.
486 * If we can't spawn a new one, complain, and exit.
489 found = spawn_thread(request->timestamp);
492 "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
493 thread_pool.max_threads);
499 * OK, now 'handle' points to a waiting thread. We move
500 * it to the tail of the thread pool, so that we can
501 * cycle among the threads.
503 * We then give it the request, signal its semaphore, and
504 * return. The thread eventually wakes up, and handles
507 DEBUG2("Thread %d assigned request %d", found->thread_num, request->number);
509 found->request = request;
511 found->request_count++;
512 sem_post(&found->semaphore);
513 thread_pool.active_threads = active_threads;
519 * Check the min_spare_threads and max_spare_threads.
521 * If there are too many or too few threads waiting, then we
522 * either create some more, or delete some.
524 int thread_pool_clean(time_t now)
528 THREAD_HANDLE *handle, *next;
532 * Loop over the thread pool, doing stuff.
535 for (handle = thread_pool.head; handle; handle = handle->next) {
536 if (handle->request != NULL) {
541 spare = thread_pool.total_threads - active_threads;
543 static int old_total = -1;
544 static int old_active = -1;
546 if ((old_total != thread_pool.total_threads) ||
547 (old_active != active_threads)) {
548 DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
549 thread_pool.total_threads, active_threads, spare);
550 old_total = thread_pool.total_threads;
551 old_active = active_threads;
556 * If there are too few spare threads, create some more.
558 if (spare < thread_pool.min_spare_threads) {
559 total = thread_pool.min_spare_threads - spare;
561 DEBUG2("Threads: Spawning %d spares", total);
563 * Create a number of spare threads.
565 for (i = 0; i < total; i++) {
566 handle = spawn_thread(now);
567 if (handle == NULL) {
573 * And exit, as there can't be too many spare threads.
579 * Only delete the spare threads if sufficient time has
580 * passed since we last created one. This helps to minimize
581 * the amount of create/delete cycles.
583 if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
588 * If there are too many spare threads, delete one.
590 * Note that we only delete ONE at a time, instead of
591 * wiping out many. This allows the excess servers to
592 * be slowly reaped, just in case the load spike comes again.
594 if (spare > thread_pool.max_spare_threads) {
596 spare -= thread_pool.max_spare_threads;
598 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
601 * Walk through the thread pool, deleting the
602 * first N idle threads we come across.
604 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
608 * If the thread is not handling a
609 * request, then tell it to exit.
611 * Note that we delete it from the thread
612 * pool BEFORE telling it to kill itself,
613 * as the child thread can free the 'handle'
614 * structure, without anyone else using it.
616 if (handle->request == NULL) {
617 delete_thread(handle);
618 handle->status = THREAD_CANCELLED;
619 sem_post(&handle->semaphore);
627 * If the thread has handled too many requests, then make it
630 if (thread_pool.max_requests_per_thread > 0) {
631 for (handle = thread_pool.head; handle; handle = next) {
635 * Not handling a request, we can check it.
637 if ((handle->request == NULL) &&
638 (handle->request_count > thread_pool.max_requests_per_thread)) {
639 delete_thread(handle);
640 handle->status = THREAD_CANCELLED;
641 sem_post(&handle->semaphore);
647 * Otherwise everything's kosher. There are not too few,
648 * or too many spare threads. Exit happily.