2 * threads.c request threading support
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 * Copyright 2000 The FreeRADIUS server project
21 * Copyright 2000 Alan DeKok <aland@ox.org>
25 #include "libradius.h"
27 #ifdef WITH_THREAD_POOL
31 #include <semaphore.h>
38 static const char rcsid[] =
41 #define SEMAPHORE_LOCKED (0)
42 #define SEMAPHORE_UNLOCKED (1)
44 #define THREAD_RUNNING (1)
45 #define THREAD_CANCELLED (2)
48 * Prototype to shut the compiler up.
50 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun);
53 * A data structure which contains the information about
56 * pthread_id pthread id
57 * thread_num server thread number, 1...number of threads
58 * semaphore used to block the thread until a request comes in
59 * status is the thread running or exited?
60 * request_count the number of requests that this thread has handled
61 * timestamp when the thread started executing.
62 * request the current request that the thread is processing.
63 * fun the function which is handling the request.
65 typedef struct THREAD_HANDLE {
66 struct THREAD_HANDLE *prev;
67 struct THREAD_HANDLE *next;
79 * A data structure to manage the thread pool. There's no real
80 * need for a data structure, but it makes things conceptually
83 typedef struct THREAD_POOL {
92 int min_spare_threads;
93 int max_spare_threads;
94 int max_requests_per_thread;
95 time_t time_last_spawned;
99 static THREAD_POOL thread_pool;
102 * A mapping of configuration file names to internal integers
104 static const CONF_PARSER thread_config[] = {
105 { "start_servers", PW_TYPE_INTEGER, 0, &thread_pool.start_threads, "5" },
106 { "max_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_threads, "32" },
107 { "min_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads, "3" },
108 { "max_spare_servers", PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads, "10" },
109 { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
110 { "cleanup_delay", PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay, "5" },
111 { NULL, -1, 0, NULL, NULL }
115 * The main thread handler for requests.
117 * Wait on the semaphore until we have it, and process the request.
119 static void *request_handler_thread(void *arg)
121 THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
126 * Block SIGHUP handling for the child threads.
128 * This ensures that only the main server thread will
129 * process HUP signals.
131 * If we don't have sigprocmask, then it shouldn't be
132 * a problem, either, as the sig_hup handler should check
133 * for this condition.
136 sigaddset(&set, SIGHUP);
137 sigaddset(&set, SIGCHLD);
138 sigprocmask(SIG_BLOCK, &set, NULL);
142 * Loop forever, until told to exit.
146 * Wait for the semaphore to be given to us.
148 DEBUG2("Thread %d waiting to be assigned a request",
150 sem_wait(&self->semaphore);
153 * If we've been told to kill ourselves,
154 * then exit politely.
156 if (self->status == THREAD_CANCELLED) {
157 DEBUG2("Thread %d exiting on request from parent.",
162 DEBUG2("Thread %d handling request %d, (%d handled so far)",
163 self->thread_num, self->request->number,
164 self->request_count);
166 rad_respond(self->request, self->fun);
167 self->request = NULL;
170 * The semaphore's value is zero, because we've
171 * locked it. We now go back to the top of the loop,
172 * where we wait for it's value to become non-zero.
177 * This thread is exiting. Delete any additional resources
178 * associated with it (semaphore, etc), and free the thread
181 sem_destroy(&self->semaphore);
187 * Take a THREAD_HANDLE, and delete it from the thread pool.
189 * This function is called ONLY from the main server thread.
191 static void delete_thread(THREAD_HANDLE *handle)
196 assert(handle->request == NULL);
200 assert(thread_pool.total_threads > 0);
201 thread_pool.total_threads--;
204 assert(thread_pool.head == handle);
205 thread_pool.head = next;
211 assert(thread_pool.tail == handle);
212 thread_pool.tail = prev;
219 * Take a THREAD_HANDLE, and move it to the end of the thread pool.
221 * This function is called ONLY from the main server thread.
222 * It's function is to keep the incoming requests rotating among
223 * the threads in the pool.
225 static void move2tail(THREAD_HANDLE *handle)
231 * Empty list: add it to the head.
233 if (thread_pool.head == NULL) {
234 assert(thread_pool.tail == NULL);
235 assert(thread_pool.total_threads == 1);
239 thread_pool.head = handle;
240 thread_pool.tail = handle;
244 assert(thread_pool.total_threads >= 1);
249 * If the element is in the list, then delete it from where
252 if ((next != NULL) ||
255 * If it's already at the tail, exit immediately,
256 * there's no more work to do.
259 assert(thread_pool.tail == handle);
264 * Maybe it's at the head of the list?
267 assert(thread_pool.head == handle);
268 thread_pool.head = next;
272 * Nope, it's really in the middle.
276 assert(prev != NULL); /* be explicit about it. */
277 assert(next != NULL); /* be explicit about it. */
285 * Finally, add it to the tail, and update the pointers.
288 prev = thread_pool.tail;
289 assert(prev->next == NULL);
291 thread_pool.tail = handle;
297 * Spawn a new thread, and place it in the thread pool.
299 * The thread is started initially in the blocked state, waiting
302 static THREAD_HANDLE *spawn_thread(time_t now)
305 THREAD_HANDLE *handle;
309 * Ensure that we don't spawn too many threads.
311 if (thread_pool.total_threads >= thread_pool.max_threads) {
312 DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads);
317 * Allocate a new thread handle.
319 handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
320 memset(handle, 0, sizeof(THREAD_HANDLE));
323 handle->pthread_id = NO_SUCH_CHILD_PID;
324 handle->thread_num = thread_pool.max_thread_num++;
325 handle->request_count = 0;
326 handle->status = THREAD_RUNNING;
327 handle->timestamp = time(NULL);
330 * Initialize the semaphore to be for this process only,
331 * and to have the thread block until the server gives it
334 rcode = sem_init(&handle->semaphore, 0, SEMAPHORE_LOCKED);
336 radlog(L_ERR|L_CONS, "Failed to allocate semaphore: %s",
342 * The thread isn't currently handling a request.
344 handle->request = NULL;
348 * Initialize the thread's attributes to detached.
350 * We could call pthread_detach() later, but if the thread
351 * exits between the create & detach calls, it will need to
352 * be joined, which will never happen.
354 pthread_attr_init(&attr);
355 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
358 * Create the thread detached, so that it cleans up it's
359 * own memory when it exits.
361 rcode = pthread_create(&handle->pthread_id, &attr,
362 request_handler_thread, handle);
364 radlog(L_ERR|L_CONS, "Thread create failed: %s", strerror(errno));
367 pthread_attr_destroy(&attr);
370 * One more thread to go into the list.
372 thread_pool.total_threads++;
373 DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
374 handle->thread_num, thread_pool.total_threads);
377 * Move the thread handle to the tail of the thread pool list.
382 * Update the time we last spawned a thread.
384 thread_pool.time_last_spawned = now;
387 * And return the new handle to the caller.
393 * Allocate the thread pool, and seed it with an initial number
396 int thread_pool_init(void)
399 THREAD_HANDLE *handle;
400 CONF_SECTION *pool_cf;
403 DEBUG("Initializing the thread pool...");
407 * Initialize the thread pool to some reasonable values.
409 memset(&thread_pool, 0, sizeof(THREAD_POOL));
410 thread_pool.head = NULL;
411 thread_pool.tail = NULL;
412 thread_pool.total_threads = 0;
413 thread_pool.max_thread_num = 1;
414 thread_pool.cleanup_delay = 5;
416 pool_cf = cf_section_find("thread");
417 if (pool_cf != NULL) {
418 cf_section_parse(pool_cf, NULL, thread_config);
422 * Create a number of waiting threads.
424 * If we fail while creating them, do something intelligent.
426 for (i = 0; i < thread_pool.start_threads; i++) {
427 handle = spawn_thread(now);
428 if (handle == NULL) {
437 * Assign a new request to a free thread.
439 * If there isn't a free thread, then try to create a new one,
440 * up to the configured limits.
442 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun)
445 THREAD_HANDLE *handle;
446 THREAD_HANDLE *found;
450 * Loop over the active thread pool, looking for a
455 for (handle = thread_pool.head; handle; handle = next) {
459 * Ignore threads which aren't running.
461 if (handle->status != THREAD_RUNNING) {
466 * If we haven't found a free thread yet, then
467 * check it's semaphore lock. We don't lock it,
468 * so if it's locked, then the thread MUST be the
469 * one locking it, waiting for us to unlock it.
471 if (handle->request == NULL) {
478 } /* loop over all of the threads */
481 * If we haven't found an active thread, then spawn a new one.
483 * If we can't spawn a new one, complain, and exit.
486 found = spawn_thread(request->timestamp);
489 "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
490 thread_pool.max_threads);
496 * OK, now 'handle' points to a waiting thread. We move
497 * it to the tail of the thread pool, so that we can
498 * cycle among the threads.
500 * We then give it the request, signal its semaphore, and
501 * return. The thread eventually wakes up, and handles
504 DEBUG2("Thread %d assigned request %d", found->thread_num, request->number);
506 found->request = request;
508 found->request_count++;
509 sem_post(&found->semaphore);
510 thread_pool.active_threads = active_threads;
516 * Check the min_spare_threads and max_spare_threads.
518 * If there are too many or too few threads waiting, then we
519 * either create some more, or delete some.
521 int thread_pool_clean(time_t now)
525 THREAD_HANDLE *handle, *next;
529 * Loop over the thread pool, doing stuff.
532 for (handle = thread_pool.head; handle; handle = handle->next) {
533 if (handle->request != NULL) {
538 spare = thread_pool.total_threads - active_threads;
540 static int old_total = -1;
541 static int old_active = -1;
543 if ((old_total != thread_pool.total_threads) ||
544 (old_active != active_threads)) {
545 DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
546 thread_pool.total_threads, active_threads, spare);
547 old_total = thread_pool.total_threads;
548 old_active = active_threads;
553 * If there are too few spare threads, create some more.
555 if (spare < thread_pool.min_spare_threads) {
556 total = thread_pool.min_spare_threads - spare;
558 DEBUG2("Threads: Spawning %d spares", total);
560 * Create a number of spare threads.
562 for (i = 0; i < total; i++) {
563 handle = spawn_thread(now);
564 if (handle == NULL) {
570 * And exit, as there can't be too many spare threads.
576 * Only delete the spare threads if sufficient time has
577 * passed since we last created one. This helps to minimize
578 * the amount of create/delete cycles.
580 if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
585 * If there are too many spare threads, delete one.
587 * Note that we only delete ONE at a time, instead of
588 * wiping out many. This allows the excess servers to
589 * be slowly reaped, just in case the load spike comes again.
591 if (spare > thread_pool.max_spare_threads) {
593 spare -= thread_pool.max_spare_threads;
595 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
598 * Walk through the thread pool, deleting the
599 * first N idle threads we come across.
601 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
605 * If the thread is not handling a
606 * request, then tell it to exit.
608 * Note that we delete it from the thread
609 * pool BEFORE telling it to kill itself,
610 * as the child thread can free the 'handle'
611 * structure, without anyone else using it.
613 if (handle->request == NULL) {
614 delete_thread(handle);
615 handle->status = THREAD_CANCELLED;
616 sem_post(&handle->semaphore);
624 * Otherwise everything's kosher. There are not too few,
625 * or too many spare threads. Exit happily.