2 * threads.c request threading support
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20 * Copyright 2000,2006 The FreeRADIUS server project
21 * Copyright 2000 Alan DeKok <aland@ox.org>
25 USES_APPLE_DEPRECATED_API /* OpenSSL API has been deprecated by Apple */
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/process.h>
29 #include <freeradius-devel/rad_assert.h>
32 * Other OS's have sem_init, OS X doesn't.
34 #ifdef HAVE_SEMAPHORE_H
35 #include <semaphore.h>
40 #include <dispatch/dispatch.h>
42 #include <mach/task.h>
43 #include <mach/mach_init.h>
44 #include <mach/semaphore.h>
48 #define sem_t semaphore_t
50 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
52 #define sem_wait(s) semaphore_wait(*s)
54 #define sem_post(s) semaphore_signal(*s)
56 #endif /* __APPLE__ */
58 #ifdef HAVE_SYS_WAIT_H
64 #ifdef HAVE_OPENSSL_CRYPTO_H
65 #include <openssl/crypto.h>
67 #ifdef HAVE_OPENSSL_ERR_H
68 #include <openssl/err.h>
70 #ifdef HAVE_OPENSSL_EVP_H
71 #include <openssl/evp.h>
75 #define SEMAPHORE_LOCKED (0)
77 #define THREAD_RUNNING (1)
78 #define THREAD_CANCELLED (2)
79 #define THREAD_EXITED (3)
81 #define NUM_FIFOS RAD_LISTEN_MAX
84 * A data structure which contains the information about
87 typedef struct THREAD_HANDLE {
88 struct THREAD_HANDLE *prev; //!< Previous thread handle (in the linked list).
89 struct THREAD_HANDLE *next; //!< Next thread handle (int the linked list).
90 pthread_t pthread_id; //!< pthread_id.
91 int thread_num; //!< Server thread number, 1...number of threads.
92 int status; //!< Is the thread running or exited?
93 unsigned int request_count; //!< The number of requests that this thread has handled.
94 time_t timestamp; //!< When the thread started executing.
101 typedef struct thread_fork_t {
110 typedef struct fr_pps_t {
120 * A data structure to manage the thread pool. There's no real
121 * need for a data structure, but it makes things conceptually
124 typedef struct THREAD_POOL {
129 uint32_t active_threads; /* protected by queue_mutex */
130 uint32_t total_threads;
132 uint32_t exited_threads;
133 uint32_t max_thread_num;
134 uint32_t start_threads;
135 uint32_t max_threads;
136 uint32_t min_spare_threads;
137 uint32_t max_spare_threads;
138 uint32_t max_requests_per_thread;
139 uint32_t request_count;
140 time_t time_last_spawned;
141 uint32_t cleanup_delay;
143 #endif /* WITH_GCD */
147 pthread_mutex_t wait_mutex;
148 fr_hash_table_t *waiters;
152 dispatch_queue_t queue;
156 fr_pps_t pps_in, pps_out;
157 #ifdef WITH_ACCOUNTING
158 bool auto_limit_acct;
163 * All threads wait on this semaphore, for requests
164 * to enter the queue.
169 * To ensure only one thread at a time touches the queue.
171 pthread_mutex_t queue_mutex;
173 uint32_t max_queue_size;
175 fr_fifo_t *fifo[NUM_FIFOS];
176 #endif /* WITH_GCD */
179 static THREAD_POOL thread_pool;
180 static bool pool_initialized = false;
183 static time_t last_cleaned = 0;
185 static void thread_pool_manage(time_t now);
190 * A mapping of configuration file names to internal integers
192 static const CONF_PARSER thread_config[] = {
193 { "start_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.start_threads), "5" },
194 { "max_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_threads), "32" },
195 { "min_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.min_spare_threads), "3" },
196 { "max_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_spare_threads), "10" },
197 { "max_requests_per_server", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_requests_per_thread), "0" },
198 { "cleanup_delay", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.cleanup_delay), "5" },
199 { "max_queue_size", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_queue_size), "65536" },
201 #ifdef WITH_ACCOUNTING
202 { "auto_limit_acct", FR_CONF_POINTER(PW_TYPE_BOOLEAN, &thread_pool.auto_limit_acct), NULL },
205 CONF_PARSER_TERMINATOR
209 #ifdef HAVE_OPENSSL_CRYPTO_H
212 * If we're linking against OpenSSL, then it is the
213 * duty of the application, if it is multithreaded,
214 * to provide OpenSSL with appropriate thread id
215 * and mutex locking functions
217 * Note: this only implements static callbacks.
218 * OpenSSL does not use dynamic locking callbacks
219 * right now, but may in the future, so we will have
220 * to add them at some point.
223 static pthread_mutex_t *ssl_mutexes = NULL;
225 static unsigned long ssl_id_function(void)
228 pthread_t thread = pthread_self();
230 if (sizeof(ret) >= sizeof(thread)) {
231 memcpy(&ret, &thread, sizeof(thread));
233 memcpy(&ret, &thread, sizeof(ret));
239 static void ssl_locking_function(int mode, int n, UNUSED char const *file, UNUSED int line)
241 if (mode & CRYPTO_LOCK) {
242 pthread_mutex_lock(&(ssl_mutexes[n]));
244 pthread_mutex_unlock(&(ssl_mutexes[n]));
248 static int setup_ssl_mutexes(void)
252 ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
254 ERROR("Error allocating memory for SSL mutexes!");
258 for (i = 0; i < CRYPTO_num_locks(); i++) {
259 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
262 CRYPTO_set_id_callback(ssl_id_function);
263 CRYPTO_set_locking_callback(ssl_locking_function);
271 * We don't want to catch SIGCHLD for a host of reasons.
273 * - exec_wait means that someone, somewhere, somewhen, will
274 * call waitpid(), and catch the child.
276 * - SIGCHLD is delivered to a random thread, not the one that
279 * - if another thread catches the child, we have to coordinate
280 * with the thread doing the waiting.
282 * - if we don't waitpid() for non-wait children, they'll be zombies,
283 * and will hang around forever.
286 static void reap_children(void)
290 thread_fork_t mytf, *tf;
293 pthread_mutex_lock(&thread_pool.wait_mutex);
297 pid = waitpid(0, &status, WNOHANG);
301 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
306 } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
308 pthread_mutex_unlock(&thread_pool.wait_mutex);
311 #define reap_children()
316 * Add a request to the list of waiting requests.
317 * This function gets called ONLY from the main handler thread...
319 * This function should never fail.
321 int request_enqueue(REQUEST *request)
323 rad_assert(pool_initialized == true);
326 * If we haven't checked the number of child threads
327 * in a while, OR if the thread pool appears to be full,
330 if ((last_cleaned < request->timestamp) ||
331 (thread_pool.active_threads == thread_pool.total_threads) ||
332 (thread_pool.exited_threads > 0)) {
333 thread_pool_manage(request->timestamp);
337 pthread_mutex_lock(&thread_pool.queue_mutex);
340 #ifdef WITH_ACCOUNTING
341 if (thread_pool.auto_limit_acct) {
345 * Throw away accounting requests if we're too
346 * busy. The NAS should retransmit these, and no
349 * In contrast, we always try to process
350 * authentication requests. Those are more time
351 * critical, and it's harder to determine which
352 * we can throw away, and which we can keep.
354 * We allow the queue to get half full before we
355 * start worrying. Even then, we still require
356 * that the rate of input packets is higher than
357 * the rate of outgoing packets. i.e. the queue
360 * Once that happens, we roll a dice to see where
361 * the barrier is for "keep" versus "toss". If
362 * the queue is smaller than the barrier, we
363 * allow it. If the queue is larger than the
364 * barrier, we throw the packet away. Otherwise,
367 * i.e. the probability of throwing the packet
368 * away increases from 0 (queue is half full), to
369 * 100 percent (queue is completely full).
371 * A probabilistic approach allows us to process
372 * SOME of the new accounting packets.
374 if ((request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
375 (thread_pool.num_queued > (thread_pool.max_queue_size / 2)) &&
376 (thread_pool.pps_in.pps_now > thread_pool.pps_out.pps_now)) {
381 * Take a random value of how full we
382 * want the queue to be. It's OK to be
383 * half full, but we get excited over
384 * anything more than that.
386 keep = (thread_pool.max_queue_size / 2);
387 prob = fr_rand() & ((1 << 10) - 1);
390 keep += (thread_pool.max_queue_size / 2);
393 * If the queue is larger than our dice
394 * roll, we throw the packet away.
396 if (thread_pool.num_queued > keep) {
397 pthread_mutex_unlock(&thread_pool.queue_mutex);
402 gettimeofday(&now, NULL);
405 * Calculate the instantaneous arrival rate into
408 thread_pool.pps_in.pps = rad_pps(&thread_pool.pps_in.pps_old,
409 &thread_pool.pps_in.pps_now,
410 &thread_pool.pps_in.time_old,
413 thread_pool.pps_in.pps_now++;
415 #endif /* WITH_ACCOUNTING */
418 thread_pool.request_count++;
420 if (thread_pool.num_queued >= thread_pool.max_queue_size) {
421 pthread_mutex_unlock(&thread_pool.queue_mutex);
424 * Mark the request as done.
426 RATE_LIMIT(ERROR("Something is blocking the server. There are %d packets in the queue, "
427 "waiting to be processed. Ignoring the new request.", thread_pool.num_queued));
430 request->component = "<core>";
431 request->module = "<queue>";
432 request->child_state = REQUEST_QUEUED;
435 * Push the request onto the appropriate fifo for that
437 if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
438 pthread_mutex_unlock(&thread_pool.queue_mutex);
439 ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
443 thread_pool.num_queued++;
445 pthread_mutex_unlock(&thread_pool.queue_mutex);
448 * There's one more request in the queue.
450 * Note that we're not touching the queue any more, so
451 * the semaphore post is outside of the mutex. This also
452 * means that when the thread wakes up and tries to lock
453 * the mutex, it will be unlocked, and there won't be
456 sem_post(&thread_pool.semaphore);
462 * Remove a request from the queue.
464 static int request_dequeue(REQUEST **prequest)
467 static time_t last_complained = 0;
468 static time_t total_blocked = 0;
470 RAD_LISTEN_TYPE i, start;
471 REQUEST *request = NULL;
474 rad_assert(pool_initialized == true);
476 pthread_mutex_lock(&thread_pool.queue_mutex);
479 #ifdef WITH_ACCOUNTING
480 if (thread_pool.auto_limit_acct) {
483 gettimeofday(&now, NULL);
486 * Calculate the instantaneous departure rate
489 thread_pool.pps_out.pps = rad_pps(&thread_pool.pps_out.pps_old,
490 &thread_pool.pps_out.pps_now,
491 &thread_pool.pps_out.time_old,
493 thread_pool.pps_out.pps_now++;
499 * Clear old requests from all queues.
501 * We only do one pass over the queue, in order to
502 * amortize the work across the child threads. Since we
503 * do N checks for one request de-queued, the old
504 * requests will be quickly cleared.
506 for (i = 0; i < RAD_LISTEN_MAX; i++) {
507 request = fr_fifo_peek(thread_pool.fifo[i]);
508 if (!request) continue;
510 VERIFY_REQUEST(request);
512 if (request->master_state != REQUEST_STOP_PROCESSING) {
517 * This entry was marked to be stopped. Acknowledge it.
519 request = fr_fifo_pop(thread_pool.fifo[i]);
520 rad_assert(request != NULL);
521 VERIFY_REQUEST(request);
522 request->child_state = REQUEST_DONE;
523 thread_pool.num_queued--;
529 * Pop results from the top of the queue
531 for (i = start; i < RAD_LISTEN_MAX; i++) {
532 request = fr_fifo_pop(thread_pool.fifo[i]);
534 VERIFY_REQUEST(request);
541 pthread_mutex_unlock(&thread_pool.queue_mutex);
546 rad_assert(thread_pool.num_queued > 0);
547 thread_pool.num_queued--;
550 rad_assert(*prequest != NULL);
551 rad_assert(request->magic == REQUEST_MAGIC);
553 request->component = "<core>";
554 request->module = "<running>";
555 request->child_state = REQUEST_RUNNING;
558 * If the request has sat in the queue for too long,
561 * The main clean-up code can't delete the request from
562 * the queue, and therefore won't clean it up until we
563 * have acknowledged it as "done".
565 if (request->master_state == REQUEST_STOP_PROCESSING) {
566 request->module = "<done>";
567 request->child_state = REQUEST_DONE;
572 * The thread is currently processing a request.
574 thread_pool.active_threads++;
576 blocked = time(NULL);
577 if (!request->proxy && (blocked - request->timestamp) > 5) {
579 if (last_complained < blocked) {
580 last_complained = blocked;
581 blocked -= request->timestamp;
582 num_blocked = total_blocked;
591 pthread_mutex_unlock(&thread_pool.queue_mutex);
594 ERROR("%d requests have been waiting in the processing queue for %d seconds. Check that all databases are running properly!",
595 num_blocked, (int) blocked);
603 * The main thread handler for requests.
605 * Wait on the semaphore until we have it, and process the request.
607 static void *request_handler_thread(void *arg)
609 THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
612 * Loop forever, until told to exit.
616 * Wait to be signalled.
618 DEBUG2("Thread %d waiting to be assigned a request",
621 if (sem_wait(&thread_pool.semaphore) != 0) {
623 * Interrupted system call. Go back to
624 * waiting, but DON'T print out any more
627 if (errno == EINTR) {
628 DEBUG2("Re-wait %d", self->thread_num);
631 ERROR("Thread %d failed waiting for semaphore: %s: Exiting\n",
632 self->thread_num, fr_syserror(errno));
636 DEBUG2("Thread %d got semaphore", self->thread_num);
638 #ifdef HAVE_OPENSSL_ERR_H
640 * Clear the error queue for the current thread.
646 * The server is exiting. Don't dequeue any
649 if (thread_pool.stop_flag) break;
652 * Try to grab a request from the queue.
654 * It may be empty, in which case we fail
657 if (!request_dequeue(&self->request)) continue;
659 self->request->child_pid = self->pthread_id;
660 self->request_count++;
662 DEBUG2("Thread %d handling request %d, (%d handled so far)",
663 self->thread_num, self->request->number,
664 self->request_count);
666 #ifdef WITH_ACCOUNTING
667 if ((self->request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
668 thread_pool.auto_limit_acct) {
670 REQUEST *request = self->request;
672 vp = radius_pair_create(request, &request->config,
673 181, VENDORPEC_FREERADIUS);
674 if (vp) vp->vp_integer = thread_pool.pps_in.pps;
676 vp = radius_pair_create(request, &request->config,
677 182, VENDORPEC_FREERADIUS);
678 if (vp) vp->vp_integer = thread_pool.pps_in.pps;
680 vp = radius_pair_create(request, &request->config,
681 183, VENDORPEC_FREERADIUS);
683 vp->vp_integer = thread_pool.max_queue_size - thread_pool.num_queued;
684 vp->vp_integer *= 100;
685 vp->vp_integer /= thread_pool.max_queue_size;
690 self->request->process(self->request, FR_ACTION_RUN);
691 self->request = NULL;
694 * Update the active threads.
696 pthread_mutex_lock(&thread_pool.queue_mutex);
697 rad_assert(thread_pool.active_threads > 0);
698 thread_pool.active_threads--;
699 pthread_mutex_unlock(&thread_pool.queue_mutex);
702 * If the thread has handled too many requests, then make it
705 if ((thread_pool.max_requests_per_thread > 0) &&
706 (self->request_count >= thread_pool.max_requests_per_thread)) {
707 DEBUG2("Thread %d handled too many requests",
711 } while (self->status != THREAD_CANCELLED);
713 DEBUG2("Thread %d exiting...", self->thread_num);
715 #ifdef HAVE_OPENSSL_ERR_H
717 * If we linked with OpenSSL, the application
718 * must remove the thread's error queue before
719 * exiting to prevent memory leaks.
724 pthread_mutex_lock(&thread_pool.queue_mutex);
725 thread_pool.exited_threads++;
726 pthread_mutex_unlock(&thread_pool.queue_mutex);
729 * Do this as the LAST thing before exiting.
731 self->request = NULL;
732 self->status = THREAD_EXITED;
733 exec_trigger(NULL, NULL, "server.thread.stop", true);
739 * Take a THREAD_HANDLE, delete it from the thread pool and
740 * free its resources.
742 * This function is called ONLY from the main server thread,
743 * ONLY after the thread has exited.
745 static void delete_thread(THREAD_HANDLE *handle)
750 rad_assert(handle->request == NULL);
752 DEBUG2("Deleting thread %d", handle->thread_num);
756 rad_assert(thread_pool.total_threads > 0);
757 thread_pool.total_threads--;
760 * Remove the handle from the list.
763 rad_assert(thread_pool.head == handle);
764 thread_pool.head = next;
770 rad_assert(thread_pool.tail == handle);
771 thread_pool.tail = prev;
777 * Free the handle, now that it's no longer referencable.
784 * Spawn a new thread, and place it in the thread pool.
786 * The thread is started initially in the blocked state, waiting
789 static THREAD_HANDLE *spawn_thread(time_t now, int do_trigger)
792 THREAD_HANDLE *handle;
795 * Ensure that we don't spawn too many threads.
797 if (thread_pool.total_threads >= thread_pool.max_threads) {
798 DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads);
803 * Allocate a new thread handle.
805 handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
806 memset(handle, 0, sizeof(THREAD_HANDLE));
809 handle->thread_num = thread_pool.max_thread_num++;
810 handle->request_count = 0;
811 handle->status = THREAD_RUNNING;
812 handle->timestamp = time(NULL);
815 * Create the thread joinable, so that it can be cleaned up
816 * using pthread_join().
818 * Note that the function returns non-zero on error, NOT
819 * -1. The return code is the error, and errno isn't set.
821 rcode = pthread_create(&handle->pthread_id, 0, request_handler_thread, handle);
824 ERROR("Thread create failed: %s",
830 * One more thread to go into the list.
832 thread_pool.total_threads++;
833 DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
834 handle->thread_num, thread_pool.total_threads);
835 if (do_trigger) exec_trigger(NULL, NULL, "server.thread.start", true);
838 * Add the thread handle to the tail of the thread pool list.
840 if (thread_pool.tail) {
841 thread_pool.tail->next = handle;
842 handle->prev = thread_pool.tail;
843 thread_pool.tail = handle;
845 rad_assert(thread_pool.head == NULL);
846 thread_pool.head = thread_pool.tail = handle;
850 * Update the time we last spawned a thread.
852 thread_pool.time_last_spawned = now;
855 * Fire trigger if maximum number of threads reached
857 if (thread_pool.total_threads >= thread_pool.max_threads)
858 exec_trigger(NULL, NULL, "server.thread.max_threads", true);
861 * And return the new handle to the caller.
865 #endif /* WITH_GCD */
869 static uint32_t pid_hash(void const *data)
871 thread_fork_t const *tf = data;
873 return fr_hash(&tf->pid, sizeof(tf->pid));
876 static int pid_cmp(void const *one, void const *two)
878 thread_fork_t const *a = one;
879 thread_fork_t const *b = two;
881 return (a->pid - b->pid);
886 * Allocate the thread pool, and seed it with an initial number
889 * FIXME: What to do on a SIGHUP???
891 int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag)
897 CONF_SECTION *pool_cf;
902 rad_assert(spawn_flag != NULL);
903 rad_assert(*spawn_flag == true);
904 rad_assert(pool_initialized == false); /* not called on HUP */
906 pool_cf = cf_subsection_find_next(cs, NULL, "thread");
908 if (pool_cf) WARN("Built with Grand Central Dispatch. Ignoring 'thread' subsection");
910 if (!pool_cf) *spawn_flag = false;
914 * Initialize the thread pool to some reasonable values.
916 memset(&thread_pool, 0, sizeof(THREAD_POOL));
918 thread_pool.head = NULL;
919 thread_pool.tail = NULL;
920 thread_pool.total_threads = 0;
921 thread_pool.max_thread_num = 1;
922 thread_pool.cleanup_delay = 5;
923 thread_pool.stop_flag = false;
925 thread_pool.spawn_flag = *spawn_flag;
928 * Don't bother initializing the mutexes or
929 * creating the hash tables. They won't be used.
931 if (!*spawn_flag) return 0;
934 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
935 ERROR("FATAL: Failed to initialize wait mutex: %s",
941 * Create the hash table of child PID's
943 thread_pool.waiters = fr_hash_table_create(pid_hash,
946 if (!thread_pool.waiters) {
947 ERROR("FATAL: Failed to set up wait hash");
953 if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
958 * Catch corner cases.
960 if (thread_pool.min_spare_threads < 1)
961 thread_pool.min_spare_threads = 1;
962 if (thread_pool.max_spare_threads < 1)
963 thread_pool.max_spare_threads = 1;
964 if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
965 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
966 if (thread_pool.max_threads == 0)
967 thread_pool.max_threads = 256;
968 if ((thread_pool.max_queue_size < 2) || (thread_pool.max_queue_size > 1024*1024)) {
969 ERROR("FATAL: max_queue_size value must be in range 2-1048576");
973 if (thread_pool.start_threads > thread_pool.max_threads) {
974 ERROR("FATAL: start_servers (%i) must be <= max_servers (%i)",
975 thread_pool.start_threads, thread_pool.max_threads);
978 #endif /* WITH_GCD */
981 * The pool has already been initialized. Don't spawn
982 * new threads, and don't forget about forked children.
984 if (pool_initialized) {
990 * Initialize the queue of requests.
992 memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
993 rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
995 ERROR("FATAL: Failed to initialize semaphore: %s",
1000 rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
1002 ERROR("FATAL: Failed to initialize queue mutex: %s",
1003 fr_syserror(errno));
1008 * Allocate multiple fifos.
1010 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1011 thread_pool.fifo[i] = fr_fifo_create(NULL, thread_pool.max_queue_size, NULL);
1012 if (!thread_pool.fifo[i]) {
1013 ERROR("FATAL: Failed to set up request fifo");
1019 #ifdef HAVE_OPENSSL_CRYPTO_H
1021 * If we're linking with OpenSSL too, then we need
1022 * to set up the mutexes and enable the thread callbacks.
1024 if (!setup_ssl_mutexes()) {
1025 ERROR("FATAL: Failed to set up SSL mutexes");
1033 * Create a number of waiting threads.
1035 * If we fail while creating them, do something intelligent.
1037 for (i = 0; i < thread_pool.start_threads; i++) {
1038 if (spawn_thread(now, 0) == NULL) {
1043 thread_pool.queue = dispatch_queue_create("org.freeradius.threads", NULL);
1044 if (!thread_pool.queue) {
1045 ERROR("Failed creating dispatch queue: %s", fr_syserror(errno));
1050 DEBUG2("Thread pool initialized");
1051 pool_initialized = true;
1057 * Stop all threads in the pool.
1059 void thread_pool_stop(void)
1064 THREAD_HANDLE *handle;
1065 THREAD_HANDLE *next;
1067 if (!pool_initialized) return;
1070 * Set pool stop flag.
1072 thread_pool.stop_flag = true;
1075 * Wakeup all threads to make them see stop flag.
1077 total_threads = thread_pool.total_threads;
1078 for (i = 0; i != total_threads; i++) {
1079 sem_post(&thread_pool.semaphore);
1083 * Join and free all threads.
1085 for (handle = thread_pool.head; handle; handle = next) {
1086 next = handle->next;
1087 pthread_join(handle->pthread_id, NULL);
1088 delete_thread(handle);
1091 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1092 fr_fifo_free(thread_pool.fifo[i]);
1096 fr_hash_table_free(thread_pool.waiters);
1099 #ifdef HAVE_OPENSSL_CRYPTO_H
1101 * We're no longer threaded. Remove the mutexes and free
1104 CRYPTO_set_id_callback(NULL);
1105 CRYPTO_set_locking_callback(NULL);
1115 int request_enqueue(REQUEST *request)
1117 dispatch_block_t block;
1120 request->process(request, FR_ACTION_RUN);
1123 dispatch_async(thread_pool.queue, block);
1131 * Check the min_spare_threads and max_spare_threads.
1133 * If there are too many or too few threads waiting, then we
1134 * either create some more, or delete some.
1136 static void thread_pool_manage(time_t now)
1140 THREAD_HANDLE *handle, *next;
1141 uint32_t active_threads;
1144 * Loop over the thread pool, deleting exited threads.
1146 for (handle = thread_pool.head; handle; handle = next) {
1147 next = handle->next;
1150 * Maybe we've asked the thread to exit, and it
1153 if (handle->status == THREAD_EXITED) {
1154 pthread_join(handle->pthread_id, NULL);
1155 delete_thread(handle);
1156 pthread_mutex_lock(&thread_pool.queue_mutex);
1157 thread_pool.exited_threads--;
1158 pthread_mutex_unlock(&thread_pool.queue_mutex);
1163 * We don't need a mutex lock here, as we're reading
1164 * active_threads, and not modifying it. We want a close
1165 * approximation of the number of active threads, and this
1168 active_threads = thread_pool.active_threads;
1169 spare = thread_pool.total_threads - active_threads;
1170 if (rad_debug_lvl) {
1171 static uint32_t old_total = 0;
1172 static uint32_t old_active = 0;
1174 if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) {
1175 DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
1176 thread_pool.total_threads, active_threads, spare);
1177 old_total = thread_pool.total_threads;
1178 old_active = active_threads;
1183 * If there are too few spare threads. Go create some more.
1185 if ((thread_pool.total_threads < thread_pool.max_threads) &&
1186 (spare < thread_pool.min_spare_threads)) {
1187 total = thread_pool.min_spare_threads - spare;
1189 if ((total + thread_pool.total_threads) > thread_pool.max_threads) {
1190 total = thread_pool.max_threads - thread_pool.total_threads;
1193 DEBUG2("Threads: Spawning %d spares", total);
1196 * Create a number of spare threads.
1198 for (i = 0; i < total; i++) {
1199 handle = spawn_thread(now, 1);
1200 if (handle == NULL) {
1205 return; /* there aren't too many spare threads */
1209 * Only delete spare threads if we haven't already done
1212 if (now == last_cleaned) {
1218 * Only delete the spare threads if sufficient time has
1219 * passed since we last created one. This helps to minimize
1220 * the amount of create/delete cycles.
1222 if ((now - thread_pool.time_last_spawned) < (int)thread_pool.cleanup_delay) {
1227 * If there are too many spare threads, delete one.
1229 * Note that we only delete ONE at a time, instead of
1230 * wiping out many. This allows the excess servers to
1231 * be slowly reaped, just in case the load spike comes again.
1233 if (spare > thread_pool.max_spare_threads) {
1235 spare -= thread_pool.max_spare_threads;
1237 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1240 * Walk through the thread pool, deleting the
1241 * first idle thread we come across.
1243 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1244 next = handle->next;
1247 * If the thread is not handling a
1248 * request, but still live, then tell it
1251 * It will eventually wake up, and realize
1252 * it's been told to commit suicide.
1254 if ((handle->request == NULL) &&
1255 (handle->status == THREAD_RUNNING)) {
1256 handle->status = THREAD_CANCELLED;
1258 * Post an extra semaphore, as a
1259 * signal to wake up, and exit.
1261 sem_post(&thread_pool.semaphore);
1269 * Otherwise everything's kosher. There are not too few,
1270 * or too many spare threads. Exit happily.
1274 #endif /* WITH_GCD */
1278 * Thread wrapper for fork().
1280 pid_t rad_fork(void)
1284 if (!pool_initialized) return fork();
1286 reap_children(); /* be nice to non-wait thingies */
1288 if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1293 * Fork & save the PID for later reaping.
1296 if (child_pid > 0) {
1300 tf = rad_malloc(sizeof(*tf));
1301 memset(tf, 0, sizeof(*tf));
1303 tf->pid = child_pid;
1305 pthread_mutex_lock(&thread_pool.wait_mutex);
1306 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1307 pthread_mutex_unlock(&thread_pool.wait_mutex);
1310 ERROR("Failed to store PID, creating what will be a zombie process %d",
1317 * Return whatever we were told.
1324 * Wait 10 seconds at most for a child to exit, then give up.
1326 pid_t rad_waitpid(pid_t pid, int *status)
1329 thread_fork_t mytf, *tf;
1331 if (!pool_initialized) return waitpid(pid, status, 0);
1333 if (pid <= 0) return -1;
1337 pthread_mutex_lock(&thread_pool.wait_mutex);
1338 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1339 pthread_mutex_unlock(&thread_pool.wait_mutex);
1343 for (i = 0; i < 100; i++) {
1347 *status = tf->status;
1349 pthread_mutex_lock(&thread_pool.wait_mutex);
1350 fr_hash_table_delete(thread_pool.waiters, &mytf);
1351 pthread_mutex_unlock(&thread_pool.wait_mutex);
1354 usleep(100000); /* sleep for 1/10 of a second */
1358 * 10 seconds have passed, give up on the child.
1360 pthread_mutex_lock(&thread_pool.wait_mutex);
1361 fr_hash_table_delete(thread_pool.waiters, &mytf);
1362 pthread_mutex_unlock(&thread_pool.wait_mutex);
1368 * No rad_fork or rad_waitpid
1372 void thread_pool_queue_stats(int array[RAD_LISTEN_MAX], int pps[2])
1377 if (pool_initialized) {
1380 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1381 array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1384 gettimeofday(&now, NULL);
1386 pps[0] = rad_pps(&thread_pool.pps_in.pps_old,
1387 &thread_pool.pps_in.pps_now,
1388 &thread_pool.pps_in.time_old,
1390 pps[1] = rad_pps(&thread_pool.pps_out.pps_old,
1391 &thread_pool.pps_out.pps_now,
1392 &thread_pool.pps_out.time_old,
1396 #endif /* WITH_GCD */
1398 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1402 pps[0] = pps[1] = 0;
1405 #endif /* HAVE_PTHREAD_H */
1407 static void time_free(void *data)
1412 void exec_trigger(REQUEST *request, CONF_SECTION *cs, char const *name, int quench)
1414 CONF_SECTION *subcs;
1423 * Use global "trigger" section if no local config is given.
1426 cs = main_config.config;
1430 * Try to use pair name, rather than reference.
1432 attr = strrchr(name, '.');
1441 * Find local "trigger" subsection. If it isn't found,
1442 * try using the global "trigger" section, and reset the
1443 * reference to the full path, rather than the sub-path.
1445 subcs = cf_section_sub_find(cs, "trigger");
1446 if (!subcs && (cs != main_config.config)) {
1447 subcs = cf_section_sub_find(main_config.config, "trigger");
1453 ci = cf_reference_item(subcs, main_config.config, attr);
1455 ERROR("No such item in trigger section: %s", attr);
1459 if (!cf_item_is_pair(ci)) {
1460 ERROR("Trigger is not a configuration variable: %s", attr);
1464 cp = cf_item_to_pair(ci);
1467 value = cf_pair_value(cp);
1469 ERROR("Trigger has no value: %s", name);
1474 * May be called for Status-Server packets.
1477 if (request && request->packet) vp = request->packet->vps;
1480 * Perform periodic quenching.
1485 last_time = cf_data_find(cs, value);
1487 last_time = rad_malloc(sizeof(*last_time));
1490 if (cf_data_add(cs, value, last_time, time_free) < 0) {
1497 * Send the quenched traps at most once per second.
1500 time_t now = time(NULL);
1501 if (*last_time == now) return;
1508 * radius_exec_program always needs a request.
1511 request = request_alloc(NULL);
1515 DEBUG("Trigger %s -> %s", name, value);
1517 radius_exec_program(request, NULL, 0, NULL, request, value, vp, false, true, EXEC_TIMEOUT);
1519 if (alloc) talloc_free(request);