set request->module, too, when dequeuing a request
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 RCSID("$Id$")
25 USES_APPLE_DEPRECATED_API       /* OpenSSL API has been deprecated by Apple */
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/process.h>
29 #include <freeradius-devel/rad_assert.h>
30
31 /*
32  *      Other OS's have sem_init, OS X doesn't.
33  */
34 #ifdef HAVE_SEMAPHORE_H
35 #include <semaphore.h>
36 #endif
37
38 #ifdef __APPLE__
39 #ifdef WITH_GCD
40 #include <dispatch/dispatch.h>
41 #endif
42 #include <mach/task.h>
43 #include <mach/mach_init.h>
44 #include <mach/semaphore.h>
45
46 #ifndef WITH_GCD
47 #undef sem_t
48 #define sem_t semaphore_t
49 #undef sem_init
50 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
51 #undef sem_wait
52 #define sem_wait(s) semaphore_wait(*s)
53 #undef sem_post
54 #define sem_post(s) semaphore_signal(*s)
55 #endif  /* WITH_GCD */
56 #endif  /* __APPLE__ */
57
58 #ifdef HAVE_SYS_WAIT_H
59 #include <sys/wait.h>
60 #endif
61
62 #ifdef HAVE_PTHREAD_H
63
64 #ifdef HAVE_OPENSSL_CRYPTO_H
65 #include <openssl/crypto.h>
66 #endif
67 #ifdef HAVE_OPENSSL_ERR_H
68 #include <openssl/err.h>
69 #endif
70 #ifdef HAVE_OPENSSL_EVP_H
71 #include <openssl/evp.h>
72 #endif
73
74 #ifndef WITH_GCD
75 #define SEMAPHORE_LOCKED        (0)
76
77 #define THREAD_RUNNING          (1)
78 #define THREAD_CANCELLED        (2)
79 #define THREAD_EXITED           (3)
80
81 #define NUM_FIFOS              RAD_LISTEN_MAX
82
83 /*
84  *  A data structure which contains the information about
85  *  the current thread.
86  */
87 typedef struct THREAD_HANDLE {
88         struct THREAD_HANDLE    *prev;          //!< Previous thread handle (in the linked list).
89         struct THREAD_HANDLE    *next;          //!< Next thread handle (int the linked list).
90         pthread_t               pthread_id;     //!< pthread_id.
91         int                     thread_num;     //!< Server thread number, 1...number of threads.
92         int                     status;         //!< Is the thread running or exited?
93         unsigned int            request_count;  //!< The number of requests that this thread has handled.
94         time_t                  timestamp;      //!< When the thread started executing.
95         REQUEST                 *request;
96 } THREAD_HANDLE;
97
98 #endif  /* WITH_GCD */
99
100 #ifdef WNOHANG
101 typedef struct thread_fork_t {
102         pid_t           pid;
103         int             status;
104         int             exited;
105 } thread_fork_t;
106 #endif
107
108
109 #ifdef WITH_STATS
110 typedef struct fr_pps_t {
111         uint32_t        pps_old;
112         uint32_t        pps_now;
113         uint32_t        pps;
114         time_t          time_old;
115 } fr_pps_t;
116 #endif
117
118
119 /*
120  *      A data structure to manage the thread pool.  There's no real
121  *      need for a data structure, but it makes things conceptually
122  *      easier.
123  */
124 typedef struct THREAD_POOL {
125 #ifndef WITH_GCD
126         THREAD_HANDLE   *head;
127         THREAD_HANDLE   *tail;
128
129         uint32_t        active_threads; /* protected by queue_mutex */
130         uint32_t        total_threads;
131
132         uint32_t        exited_threads;
133         uint32_t        max_thread_num;
134         uint32_t        start_threads;
135         uint32_t        max_threads;
136         uint32_t        min_spare_threads;
137         uint32_t        max_spare_threads;
138         uint32_t        max_requests_per_thread;
139         uint32_t        request_count;
140         time_t          time_last_spawned;
141         uint32_t        cleanup_delay;
142         bool            stop_flag;
143 #endif  /* WITH_GCD */
144         bool            spawn_flag;
145
146 #ifdef WNOHANG
147         pthread_mutex_t wait_mutex;
148         fr_hash_table_t *waiters;
149 #endif
150
151 #ifdef WITH_GCD
152         dispatch_queue_t        queue;
153 #else
154
155 #ifdef WITH_STATS
156         fr_pps_t        pps_in, pps_out;
157 #ifdef WITH_ACCOUNTING
158         bool            auto_limit_acct;
159 #endif
160 #endif
161
162         /*
163          *      All threads wait on this semaphore, for requests
164          *      to enter the queue.
165          */
166         sem_t           semaphore;
167
168         /*
169          *      To ensure only one thread at a time touches the queue.
170          */
171         pthread_mutex_t queue_mutex;
172
173         uint32_t        max_queue_size;
174         uint32_t        num_queued;
175         fr_fifo_t       *fifo[NUM_FIFOS];
176 #endif  /* WITH_GCD */
177 } THREAD_POOL;
178
179 static THREAD_POOL thread_pool;
180 static bool pool_initialized = false;
181
182 #ifndef WITH_GCD
183 static time_t last_cleaned = 0;
184
185 static void thread_pool_manage(time_t now);
186 #endif
187
188 #ifndef WITH_GCD
189 /*
190  *      A mapping of configuration file names to internal integers
191  */
192 static const CONF_PARSER thread_config[] = {
193         { "start_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.start_threads), "5" },
194         { "max_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_threads), "32" },
195         { "min_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.min_spare_threads), "3" },
196         { "max_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_spare_threads), "10" },
197         { "max_requests_per_server", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_requests_per_thread), "0" },
198         { "cleanup_delay", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.cleanup_delay), "5" },
199         { "max_queue_size", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_queue_size), "65536" },
200 #ifdef WITH_STATS
201 #ifdef WITH_ACCOUNTING
202         { "auto_limit_acct", FR_CONF_POINTER(PW_TYPE_BOOLEAN, &thread_pool.auto_limit_acct), NULL },
203 #endif
204 #endif
205         CONF_PARSER_TERMINATOR
206 };
207 #endif
208
209 #ifdef HAVE_OPENSSL_CRYPTO_H
210
211 /*
212  *      If we're linking against OpenSSL, then it is the
213  *      duty of the application, if it is multithreaded,
214  *      to provide OpenSSL with appropriate thread id
215  *      and mutex locking functions
216  *
217  *      Note: this only implements static callbacks.
218  *      OpenSSL does not use dynamic locking callbacks
219  *      right now, but may in the future, so we will have
220  *      to add them at some point.
221  */
222
223 static pthread_mutex_t *ssl_mutexes = NULL;
224
225 static unsigned long ssl_id_function(void)
226 {
227         unsigned long ret;
228         pthread_t thread = pthread_self();
229
230         if (sizeof(ret) >= sizeof(thread)) {
231                 memcpy(&ret, &thread, sizeof(thread));
232         } else {
233                 memcpy(&ret, &thread, sizeof(ret));
234         }
235
236         return ret;
237 }
238
239 static void ssl_locking_function(int mode, int n, UNUSED char const *file, UNUSED int line)
240 {
241         if (mode & CRYPTO_LOCK) {
242                 pthread_mutex_lock(&(ssl_mutexes[n]));
243         } else {
244                 pthread_mutex_unlock(&(ssl_mutexes[n]));
245         }
246 }
247
248 static int setup_ssl_mutexes(void)
249 {
250         int i;
251
252         ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
253         if (!ssl_mutexes) {
254                 ERROR("Error allocating memory for SSL mutexes!");
255                 return 0;
256         }
257
258         for (i = 0; i < CRYPTO_num_locks(); i++) {
259                 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
260         }
261
262         CRYPTO_set_id_callback(ssl_id_function);
263         CRYPTO_set_locking_callback(ssl_locking_function);
264
265         return 1;
266 }
267 #endif
268
269 #ifdef WNOHANG
270 /*
271  *      We don't want to catch SIGCHLD for a host of reasons.
272  *
273  *      - exec_wait means that someone, somewhere, somewhen, will
274  *      call waitpid(), and catch the child.
275  *
276  *      - SIGCHLD is delivered to a random thread, not the one that
277  *      forked.
278  *
279  *      - if another thread catches the child, we have to coordinate
280  *      with the thread doing the waiting.
281  *
282  *      - if we don't waitpid() for non-wait children, they'll be zombies,
283  *      and will hang around forever.
284  *
285  */
286 static void reap_children(void)
287 {
288         pid_t pid;
289         int status;
290         thread_fork_t mytf, *tf;
291
292
293         pthread_mutex_lock(&thread_pool.wait_mutex);
294
295         do {
296         retry:
297                 pid = waitpid(0, &status, WNOHANG);
298                 if (pid <= 0) break;
299
300                 mytf.pid = pid;
301                 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
302                 if (!tf) goto retry;
303
304                 tf->status = status;
305                 tf->exited = 1;
306         } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
307
308         pthread_mutex_unlock(&thread_pool.wait_mutex);
309 }
310 #else
311 #define reap_children()
312 #endif /* WNOHANG */
313
314 #ifndef WITH_GCD
315 /*
316  *      Add a request to the list of waiting requests.
317  *      This function gets called ONLY from the main handler thread...
318  *
319  *      This function should never fail.
320  */
321 int request_enqueue(REQUEST *request)
322 {
323         rad_assert(pool_initialized == true);
324
325         /*
326          *      If we haven't checked the number of child threads
327          *      in a while, OR if the thread pool appears to be full,
328          *      go manage it.
329          */
330         if ((last_cleaned < request->timestamp) ||
331             (thread_pool.active_threads == thread_pool.total_threads) ||
332             (thread_pool.exited_threads > 0)) {
333                 thread_pool_manage(request->timestamp);
334         }
335
336
337         pthread_mutex_lock(&thread_pool.queue_mutex);
338
339 #ifdef WITH_STATS
340 #ifdef WITH_ACCOUNTING
341         if (thread_pool.auto_limit_acct) {
342                 struct timeval now;
343
344                 /*
345                  *      Throw away accounting requests if we're too
346                  *      busy.  The NAS should retransmit these, and no
347                  *      one should notice.
348                  *
349                  *      In contrast, we always try to process
350                  *      authentication requests.  Those are more time
351                  *      critical, and it's harder to determine which
352                  *      we can throw away, and which we can keep.
353                  *
354                  *      We allow the queue to get half full before we
355                  *      start worrying.  Even then, we still require
356                  *      that the rate of input packets is higher than
357                  *      the rate of outgoing packets.  i.e. the queue
358                  *      is growing.
359                  *
360                  *      Once that happens, we roll a dice to see where
361                  *      the barrier is for "keep" versus "toss".  If
362                  *      the queue is smaller than the barrier, we
363                  *      allow it.  If the queue is larger than the
364                  *      barrier, we throw the packet away.  Otherwise,
365                  *      we keep it.
366                  *
367                  *      i.e. the probability of throwing the packet
368                  *      away increases from 0 (queue is half full), to
369                  *      100 percent (queue is completely full).
370                  *
371                  *      A probabilistic approach allows us to process
372                  *      SOME of the new accounting packets.
373                  */
374                 if ((request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
375                     (thread_pool.num_queued > (thread_pool.max_queue_size / 2)) &&
376                     (thread_pool.pps_in.pps_now > thread_pool.pps_out.pps_now)) {
377                         uint32_t prob;
378                         uint32_t keep;
379
380                         /*
381                          *      Take a random value of how full we
382                          *      want the queue to be.  It's OK to be
383                          *      half full, but we get excited over
384                          *      anything more than that.
385                          */
386                         keep = (thread_pool.max_queue_size / 2);
387                         prob = fr_rand() & ((1 << 10) - 1);
388                         keep *= prob;
389                         keep >>= 10;
390                         keep += (thread_pool.max_queue_size / 2);
391
392                         /*
393                          *      If the queue is larger than our dice
394                          *      roll, we throw the packet away.
395                          */
396                         if (thread_pool.num_queued > keep) {
397                                 pthread_mutex_unlock(&thread_pool.queue_mutex);
398                                 return 0;
399                         }
400                 }
401
402                 gettimeofday(&now, NULL);
403
404                 /*
405                  *      Calculate the instantaneous arrival rate into
406                  *      the queue.
407                  */
408                 thread_pool.pps_in.pps = rad_pps(&thread_pool.pps_in.pps_old,
409                                                  &thread_pool.pps_in.pps_now,
410                                                  &thread_pool.pps_in.time_old,
411                                                  &now);
412
413                 thread_pool.pps_in.pps_now++;
414         }
415 #endif  /* WITH_ACCOUNTING */
416 #endif
417
418         thread_pool.request_count++;
419
420         if (thread_pool.num_queued >= thread_pool.max_queue_size) {
421                 pthread_mutex_unlock(&thread_pool.queue_mutex);
422
423                 /*
424                  *      Mark the request as done.
425                  */
426                 RATE_LIMIT(ERROR("Something is blocking the server.  There are %d packets in the queue, "
427                                  "waiting to be processed.  Ignoring the new request.", thread_pool.num_queued));
428                 return 0;
429         }
430         request->component = "<core>";
431         request->module = "<queue>";
432         request->child_state = REQUEST_QUEUED;
433
434         /*
435          *      Push the request onto the appropriate fifo for that
436          */
437         if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
438                 pthread_mutex_unlock(&thread_pool.queue_mutex);
439                 ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
440                 return 0;
441         }
442
443         thread_pool.num_queued++;
444
445         pthread_mutex_unlock(&thread_pool.queue_mutex);
446
447         /*
448          *      There's one more request in the queue.
449          *
450          *      Note that we're not touching the queue any more, so
451          *      the semaphore post is outside of the mutex.  This also
452          *      means that when the thread wakes up and tries to lock
453          *      the mutex, it will be unlocked, and there won't be
454          *      contention.
455          */
456         sem_post(&thread_pool.semaphore);
457
458         return 1;
459 }
460
461 /*
462  *      Remove a request from the queue.
463  */
464 static int request_dequeue(REQUEST **prequest)
465 {
466         time_t blocked;
467         static time_t last_complained = 0;
468         static time_t total_blocked = 0;
469         int num_blocked = 0;
470         RAD_LISTEN_TYPE i, start;
471         REQUEST *request = NULL;
472         reap_children();
473
474         rad_assert(pool_initialized == true);
475
476         pthread_mutex_lock(&thread_pool.queue_mutex);
477
478 #ifdef WITH_STATS
479 #ifdef WITH_ACCOUNTING
480         if (thread_pool.auto_limit_acct) {
481                 struct timeval now;
482
483                 gettimeofday(&now, NULL);
484
485                 /*
486                  *      Calculate the instantaneous departure rate
487                  *      from the queue.
488                  */
489                 thread_pool.pps_out.pps  = rad_pps(&thread_pool.pps_out.pps_old,
490                                                    &thread_pool.pps_out.pps_now,
491                                                    &thread_pool.pps_out.time_old,
492                                                    &now);
493                 thread_pool.pps_out.pps_now++;
494         }
495 #endif
496 #endif
497
498         /*
499          *      Clear old requests from all queues.
500          *
501          *      We only do one pass over the queue, in order to
502          *      amortize the work across the child threads.  Since we
503          *      do N checks for one request de-queued, the old
504          *      requests will be quickly cleared.
505          */
506         for (i = 0; i < RAD_LISTEN_MAX; i++) {
507                 request = fr_fifo_peek(thread_pool.fifo[i]);
508                 if (!request) continue;
509
510                 VERIFY_REQUEST(request);
511
512                 if (request->master_state != REQUEST_STOP_PROCESSING) {
513                         continue;
514                 }
515
516                 /*
517                  *      This entry was marked to be stopped.  Acknowledge it.
518                  */
519                 request = fr_fifo_pop(thread_pool.fifo[i]);
520                 rad_assert(request != NULL);
521                 VERIFY_REQUEST(request);
522                 request->child_state = REQUEST_DONE;
523                 thread_pool.num_queued--;
524         }
525
526         start = 0;
527  retry:
528         /*
529          *      Pop results from the top of the queue
530          */
531         for (i = start; i < RAD_LISTEN_MAX; i++) {
532                 request = fr_fifo_pop(thread_pool.fifo[i]);
533                 if (request) {
534                         VERIFY_REQUEST(request);
535                         start = i;
536                         break;
537                 }
538         }
539
540         if (!request) {
541                 pthread_mutex_unlock(&thread_pool.queue_mutex);
542                 *prequest = NULL;
543                 return 0;
544         }
545
546         rad_assert(thread_pool.num_queued > 0);
547         thread_pool.num_queued--;
548         *prequest = request;
549
550         rad_assert(*prequest != NULL);
551         rad_assert(request->magic == REQUEST_MAGIC);
552
553         request->component = "<core>";
554         request->module = "<running>";
555         request->child_state = REQUEST_RUNNING;
556
557         /*
558          *      If the request has sat in the queue for too long,
559          *      kill it.
560          *
561          *      The main clean-up code can't delete the request from
562          *      the queue, and therefore won't clean it up until we
563          *      have acknowledged it as "done".
564          */
565         if (request->master_state == REQUEST_STOP_PROCESSING) {
566                 request->module = "<done>";
567                 request->child_state = REQUEST_DONE;
568                 goto retry;
569         }
570
571         /*
572          *      The thread is currently processing a request.
573          */
574         thread_pool.active_threads++;
575
576         blocked = time(NULL);
577         if (!request->proxy && (blocked - request->timestamp) > 5) {
578                 total_blocked++;
579                 if (last_complained < blocked) {
580                         last_complained = blocked;
581                         blocked -= request->timestamp;
582                         num_blocked = total_blocked;
583                 } else {
584                         blocked = 0;
585                 }
586         } else {
587                 total_blocked = 0;
588                 blocked = 0;
589         }
590
591         pthread_mutex_unlock(&thread_pool.queue_mutex);
592
593         if (blocked) {
594                 ERROR("%d requests have been waiting in the processing queue for %d seconds.  Check that all databases are running properly!",
595                       num_blocked, (int) blocked);
596         }
597
598         return 1;
599 }
600
601
602 /*
603  *      The main thread handler for requests.
604  *
605  *      Wait on the semaphore until we have it, and process the request.
606  */
607 static void *request_handler_thread(void *arg)
608 {
609         THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
610
611         /*
612          *      Loop forever, until told to exit.
613          */
614         do {
615                 /*
616                  *      Wait to be signalled.
617                  */
618                 DEBUG2("Thread %d waiting to be assigned a request",
619                        self->thread_num);
620         re_wait:
621                 if (sem_wait(&thread_pool.semaphore) != 0) {
622                         /*
623                          *      Interrupted system call.  Go back to
624                          *      waiting, but DON'T print out any more
625                          *      text.
626                          */
627                         if (errno == EINTR) {
628                                 DEBUG2("Re-wait %d", self->thread_num);
629                                 goto re_wait;
630                         }
631                         ERROR("Thread %d failed waiting for semaphore: %s: Exiting\n",
632                                self->thread_num, fr_syserror(errno));
633                         break;
634                 }
635
636                 DEBUG2("Thread %d got semaphore", self->thread_num);
637
638 #ifdef HAVE_OPENSSL_ERR_H
639                 /*
640                  *      Clear the error queue for the current thread.
641                  */
642                 ERR_clear_error();
643 #endif
644
645                 /*
646                  *      The server is exiting.  Don't dequeue any
647                  *      requests.
648                  */
649                 if (thread_pool.stop_flag) break;
650
651                 /*
652                  *      Try to grab a request from the queue.
653                  *
654                  *      It may be empty, in which case we fail
655                  *      gracefully.
656                  */
657                 if (!request_dequeue(&self->request)) continue;
658
659                 self->request->child_pid = self->pthread_id;
660                 self->request_count++;
661
662                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
663                        self->thread_num, self->request->number,
664                        self->request_count);
665
666 #ifdef WITH_ACCOUNTING
667                 if ((self->request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
668                     thread_pool.auto_limit_acct) {
669                         VALUE_PAIR *vp;
670                         REQUEST *request = self->request;
671
672                         vp = radius_pair_create(request, &request->config,
673                                                181, VENDORPEC_FREERADIUS);
674                         if (vp) vp->vp_integer = thread_pool.pps_in.pps;
675
676                         vp = radius_pair_create(request, &request->config,
677                                                182, VENDORPEC_FREERADIUS);
678                         if (vp) vp->vp_integer = thread_pool.pps_in.pps;
679
680                         vp = radius_pair_create(request, &request->config,
681                                                183, VENDORPEC_FREERADIUS);
682                         if (vp) {
683                                 vp->vp_integer = thread_pool.max_queue_size - thread_pool.num_queued;
684                                 vp->vp_integer *= 100;
685                                 vp->vp_integer /= thread_pool.max_queue_size;
686                         }
687                 }
688 #endif
689
690                 self->request->process(self->request, FR_ACTION_RUN);
691                 self->request = NULL;
692
693                 /*
694                  *      Update the active threads.
695                  */
696                 pthread_mutex_lock(&thread_pool.queue_mutex);
697                 rad_assert(thread_pool.active_threads > 0);
698                 thread_pool.active_threads--;
699                 pthread_mutex_unlock(&thread_pool.queue_mutex);
700
701                 /*
702                  *      If the thread has handled too many requests, then make it
703                  *      exit.
704                  */
705                 if ((thread_pool.max_requests_per_thread > 0) &&
706                     (self->request_count >= thread_pool.max_requests_per_thread)) {
707                         DEBUG2("Thread %d handled too many requests",
708                                self->thread_num);
709                         break;
710                 }
711         } while (self->status != THREAD_CANCELLED);
712
713         DEBUG2("Thread %d exiting...", self->thread_num);
714
715 #ifdef HAVE_OPENSSL_ERR_H
716         /*
717          *      If we linked with OpenSSL, the application
718          *      must remove the thread's error queue before
719          *      exiting to prevent memory leaks.
720          */
721         ERR_remove_state(0);
722 #endif
723
724         pthread_mutex_lock(&thread_pool.queue_mutex);
725         thread_pool.exited_threads++;
726         pthread_mutex_unlock(&thread_pool.queue_mutex);
727
728         /*
729          *  Do this as the LAST thing before exiting.
730          */
731         self->request = NULL;
732         self->status = THREAD_EXITED;
733         exec_trigger(NULL, NULL, "server.thread.stop", true);
734
735         return NULL;
736 }
737
738 /*
739  *      Take a THREAD_HANDLE, delete it from the thread pool and
740  *      free its resources.
741  *
742  *      This function is called ONLY from the main server thread,
743  *      ONLY after the thread has exited.
744  */
745 static void delete_thread(THREAD_HANDLE *handle)
746 {
747         THREAD_HANDLE *prev;
748         THREAD_HANDLE *next;
749
750         rad_assert(handle->request == NULL);
751
752         DEBUG2("Deleting thread %d", handle->thread_num);
753
754         prev = handle->prev;
755         next = handle->next;
756         rad_assert(thread_pool.total_threads > 0);
757         thread_pool.total_threads--;
758
759         /*
760          *      Remove the handle from the list.
761          */
762         if (prev == NULL) {
763                 rad_assert(thread_pool.head == handle);
764                 thread_pool.head = next;
765         } else {
766                 prev->next = next;
767         }
768
769         if (next == NULL) {
770                 rad_assert(thread_pool.tail == handle);
771                 thread_pool.tail = prev;
772         } else {
773                 next->prev = prev;
774         }
775
776         /*
777          *      Free the handle, now that it's no longer referencable.
778          */
779         free(handle);
780 }
781
782
783 /*
784  *      Spawn a new thread, and place it in the thread pool.
785  *
786  *      The thread is started initially in the blocked state, waiting
787  *      for the semaphore.
788  */
789 static THREAD_HANDLE *spawn_thread(time_t now, int do_trigger)
790 {
791         int rcode;
792         THREAD_HANDLE *handle;
793
794         /*
795          *      Ensure that we don't spawn too many threads.
796          */
797         if (thread_pool.total_threads >= thread_pool.max_threads) {
798                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
799                 return NULL;
800         }
801
802         /*
803          *      Allocate a new thread handle.
804          */
805         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
806         memset(handle, 0, sizeof(THREAD_HANDLE));
807         handle->prev = NULL;
808         handle->next = NULL;
809         handle->thread_num = thread_pool.max_thread_num++;
810         handle->request_count = 0;
811         handle->status = THREAD_RUNNING;
812         handle->timestamp = time(NULL);
813
814         /*
815          *      Create the thread joinable, so that it can be cleaned up
816          *      using pthread_join().
817          *
818          *      Note that the function returns non-zero on error, NOT
819          *      -1.  The return code is the error, and errno isn't set.
820          */
821         rcode = pthread_create(&handle->pthread_id, 0, request_handler_thread, handle);
822         if (rcode != 0) {
823                 free(handle);
824                 ERROR("Thread create failed: %s",
825                        fr_syserror(rcode));
826                 return NULL;
827         }
828
829         /*
830          *      One more thread to go into the list.
831          */
832         thread_pool.total_threads++;
833         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
834                         handle->thread_num, thread_pool.total_threads);
835         if (do_trigger) exec_trigger(NULL, NULL, "server.thread.start", true);
836
837         /*
838          *      Add the thread handle to the tail of the thread pool list.
839          */
840         if (thread_pool.tail) {
841                 thread_pool.tail->next = handle;
842                 handle->prev = thread_pool.tail;
843                 thread_pool.tail = handle;
844         } else {
845                 rad_assert(thread_pool.head == NULL);
846                 thread_pool.head = thread_pool.tail = handle;
847         }
848
849         /*
850          *      Update the time we last spawned a thread.
851          */
852         thread_pool.time_last_spawned = now;
853
854         /*
855          * Fire trigger if maximum number of threads reached
856          */
857         if (thread_pool.total_threads >= thread_pool.max_threads)
858                 exec_trigger(NULL, NULL, "server.thread.max_threads", true);
859
860         /*
861          *      And return the new handle to the caller.
862          */
863         return handle;
864 }
865 #endif  /* WITH_GCD */
866
867
868 #ifdef WNOHANG
869 static uint32_t pid_hash(void const *data)
870 {
871         thread_fork_t const *tf = data;
872
873         return fr_hash(&tf->pid, sizeof(tf->pid));
874 }
875
876 static int pid_cmp(void const *one, void const *two)
877 {
878         thread_fork_t const *a = one;
879         thread_fork_t const *b = two;
880
881         return (a->pid - b->pid);
882 }
883 #endif
884
885 /*
886  *      Allocate the thread pool, and seed it with an initial number
887  *      of threads.
888  *
889  *      FIXME: What to do on a SIGHUP???
890  */
891 int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag)
892 {
893 #ifndef WITH_GCD
894         uint32_t        i;
895         int             rcode;
896 #endif
897         CONF_SECTION    *pool_cf;
898         time_t          now;
899
900         now = time(NULL);
901
902         rad_assert(spawn_flag != NULL);
903         rad_assert(*spawn_flag == true);
904         rad_assert(pool_initialized == false); /* not called on HUP */
905
906         pool_cf = cf_subsection_find_next(cs, NULL, "thread");
907 #ifdef WITH_GCD
908         if (pool_cf) WARN("Built with Grand Central Dispatch.  Ignoring 'thread' subsection");
909 #else
910         if (!pool_cf) *spawn_flag = false;
911 #endif
912
913         /*
914          *      Initialize the thread pool to some reasonable values.
915          */
916         memset(&thread_pool, 0, sizeof(THREAD_POOL));
917 #ifndef WITH_GCD
918         thread_pool.head = NULL;
919         thread_pool.tail = NULL;
920         thread_pool.total_threads = 0;
921         thread_pool.max_thread_num = 1;
922         thread_pool.cleanup_delay = 5;
923         thread_pool.stop_flag = false;
924 #endif
925         thread_pool.spawn_flag = *spawn_flag;
926
927         /*
928          *      Don't bother initializing the mutexes or
929          *      creating the hash tables.  They won't be used.
930          */
931         if (!*spawn_flag) return 0;
932
933 #ifdef WNOHANG
934         if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
935                 ERROR("FATAL: Failed to initialize wait mutex: %s",
936                        fr_syserror(errno));
937                 return -1;
938         }
939
940         /*
941          *      Create the hash table of child PID's
942          */
943         thread_pool.waiters = fr_hash_table_create(pid_hash,
944                                                    pid_cmp,
945                                                    free);
946         if (!thread_pool.waiters) {
947                 ERROR("FATAL: Failed to set up wait hash");
948                 return -1;
949         }
950 #endif
951
952 #ifndef WITH_GCD
953         if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
954                 return -1;
955         }
956
957         /*
958          *      Catch corner cases.
959          */
960         if (thread_pool.min_spare_threads < 1)
961                 thread_pool.min_spare_threads = 1;
962         if (thread_pool.max_spare_threads < 1)
963                 thread_pool.max_spare_threads = 1;
964         if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
965                 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
966         if (thread_pool.max_threads == 0)
967                 thread_pool.max_threads = 256;
968         if ((thread_pool.max_queue_size < 2) || (thread_pool.max_queue_size > 1024*1024)) {
969                 ERROR("FATAL: max_queue_size value must be in range 2-1048576");
970                 return -1;
971         }
972
973         if (thread_pool.start_threads > thread_pool.max_threads) {
974                 ERROR("FATAL: start_servers (%i) must be <= max_servers (%i)",
975                       thread_pool.start_threads, thread_pool.max_threads);
976                 return -1;
977         }
978 #endif  /* WITH_GCD */
979
980         /*
981          *      The pool has already been initialized.  Don't spawn
982          *      new threads, and don't forget about forked children.
983          */
984         if (pool_initialized) {
985                 return 0;
986         }
987
988 #ifndef WITH_GCD
989         /*
990          *      Initialize the queue of requests.
991          */
992         memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
993         rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
994         if (rcode != 0) {
995                 ERROR("FATAL: Failed to initialize semaphore: %s",
996                        fr_syserror(errno));
997                 return -1;
998         }
999
1000         rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
1001         if (rcode != 0) {
1002                 ERROR("FATAL: Failed to initialize queue mutex: %s",
1003                        fr_syserror(errno));
1004                 return -1;
1005         }
1006
1007         /*
1008          *      Allocate multiple fifos.
1009          */
1010         for (i = 0; i < RAD_LISTEN_MAX; i++) {
1011                 thread_pool.fifo[i] = fr_fifo_create(NULL, thread_pool.max_queue_size, NULL);
1012                 if (!thread_pool.fifo[i]) {
1013                         ERROR("FATAL: Failed to set up request fifo");
1014                         return -1;
1015                 }
1016         }
1017 #endif
1018
1019 #ifdef HAVE_OPENSSL_CRYPTO_H
1020         /*
1021          *      If we're linking with OpenSSL too, then we need
1022          *      to set up the mutexes and enable the thread callbacks.
1023          */
1024         if (!setup_ssl_mutexes()) {
1025                 ERROR("FATAL: Failed to set up SSL mutexes");
1026                 return -1;
1027         }
1028 #endif
1029
1030
1031 #ifndef WITH_GCD
1032         /*
1033          *      Create a number of waiting threads.
1034          *
1035          *      If we fail while creating them, do something intelligent.
1036          */
1037         for (i = 0; i < thread_pool.start_threads; i++) {
1038                 if (spawn_thread(now, 0) == NULL) {
1039                         return -1;
1040                 }
1041         }
1042 #else
1043         thread_pool.queue = dispatch_queue_create("org.freeradius.threads", NULL);
1044         if (!thread_pool.queue) {
1045                 ERROR("Failed creating dispatch queue: %s", fr_syserror(errno));
1046                 fr_exit(1);
1047         }
1048 #endif
1049
1050         DEBUG2("Thread pool initialized");
1051         pool_initialized = true;
1052         return 0;
1053 }
1054
1055
1056 /*
1057  *      Stop all threads in the pool.
1058  */
1059 void thread_pool_stop(void)
1060 {
1061 #ifndef WITH_GCD
1062         int i;
1063         int total_threads;
1064         THREAD_HANDLE *handle;
1065         THREAD_HANDLE *next;
1066
1067         if (!pool_initialized) return;
1068
1069         /*
1070          *      Set pool stop flag.
1071          */
1072         thread_pool.stop_flag = true;
1073
1074         /*
1075          *      Wakeup all threads to make them see stop flag.
1076          */
1077         total_threads = thread_pool.total_threads;
1078         for (i = 0; i != total_threads; i++) {
1079                 sem_post(&thread_pool.semaphore);
1080         }
1081
1082         /*
1083          *      Join and free all threads.
1084          */
1085         for (handle = thread_pool.head; handle; handle = next) {
1086                 next = handle->next;
1087                 pthread_join(handle->pthread_id, NULL);
1088                 delete_thread(handle);
1089         }
1090
1091         for (i = 0; i < RAD_LISTEN_MAX; i++) {
1092                 fr_fifo_free(thread_pool.fifo[i]);
1093         }
1094
1095 #ifdef WNOHANG
1096         fr_hash_table_free(thread_pool.waiters);
1097 #endif
1098
1099 #ifdef HAVE_OPENSSL_CRYPTO_H
1100         /*
1101          *      We're no longer threaded.  Remove the mutexes and free
1102          *      the memory.
1103          */
1104         CRYPTO_set_id_callback(NULL);
1105         CRYPTO_set_locking_callback(NULL);
1106
1107         free(ssl_mutexes);
1108 #endif
1109
1110 #endif
1111 }
1112
1113
1114 #ifdef WITH_GCD
1115 int request_enqueue(REQUEST *request)
1116 {
1117         dispatch_block_t block;
1118
1119         block = ^{
1120                 request->process(request, FR_ACTION_RUN);
1121         };
1122
1123         dispatch_async(thread_pool.queue, block);
1124
1125         return 1;
1126 }
1127 #endif
1128
1129 #ifndef WITH_GCD
1130 /*
1131  *      Check the min_spare_threads and max_spare_threads.
1132  *
1133  *      If there are too many or too few threads waiting, then we
1134  *      either create some more, or delete some.
1135  */
1136 static void thread_pool_manage(time_t now)
1137 {
1138         uint32_t spare;
1139         int i, total;
1140         THREAD_HANDLE *handle, *next;
1141         uint32_t active_threads;
1142
1143         /*
1144          *      Loop over the thread pool, deleting exited threads.
1145          */
1146         for (handle = thread_pool.head; handle; handle = next) {
1147                 next = handle->next;
1148
1149                 /*
1150                  *      Maybe we've asked the thread to exit, and it
1151                  *      has agreed.
1152                  */
1153                 if (handle->status == THREAD_EXITED) {
1154                         pthread_join(handle->pthread_id, NULL);
1155                         delete_thread(handle);
1156                         pthread_mutex_lock(&thread_pool.queue_mutex);
1157                         thread_pool.exited_threads--;
1158                         pthread_mutex_unlock(&thread_pool.queue_mutex);
1159                 }
1160         }
1161
1162         /*
1163          *      We don't need a mutex lock here, as we're reading
1164          *      active_threads, and not modifying it.  We want a close
1165          *      approximation of the number of active threads, and this
1166          *      is good enough.
1167          */
1168         active_threads = thread_pool.active_threads;
1169         spare = thread_pool.total_threads - active_threads;
1170         if (rad_debug_lvl) {
1171                 static uint32_t old_total = 0;
1172                 static uint32_t old_active = 0;
1173
1174                 if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) {
1175                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
1176                                thread_pool.total_threads, active_threads, spare);
1177                         old_total = thread_pool.total_threads;
1178                         old_active = active_threads;
1179                 }
1180         }
1181
1182         /*
1183          *      If there are too few spare threads.  Go create some more.
1184          */
1185         if ((thread_pool.total_threads < thread_pool.max_threads) &&
1186             (spare < thread_pool.min_spare_threads)) {
1187                 total = thread_pool.min_spare_threads - spare;
1188
1189                 if ((total + thread_pool.total_threads) > thread_pool.max_threads) {
1190                         total = thread_pool.max_threads - thread_pool.total_threads;
1191                 }
1192
1193                 DEBUG2("Threads: Spawning %d spares", total);
1194
1195                 /*
1196                  *      Create a number of spare threads.
1197                  */
1198                 for (i = 0; i < total; i++) {
1199                         handle = spawn_thread(now, 1);
1200                         if (handle == NULL) {
1201                                 return;
1202                         }
1203                 }
1204
1205                 return;         /* there aren't too many spare threads */
1206         }
1207
1208         /*
1209          *      Only delete spare threads if we haven't already done
1210          *      so this second.
1211          */
1212         if (now == last_cleaned) {
1213                 return;
1214         }
1215         last_cleaned = now;
1216
1217         /*
1218          *      Only delete the spare threads if sufficient time has
1219          *      passed since we last created one.  This helps to minimize
1220          *      the amount of create/delete cycles.
1221          */
1222         if ((now - thread_pool.time_last_spawned) < (int)thread_pool.cleanup_delay) {
1223                 return;
1224         }
1225
1226         /*
1227          *      If there are too many spare threads, delete one.
1228          *
1229          *      Note that we only delete ONE at a time, instead of
1230          *      wiping out many.  This allows the excess servers to
1231          *      be slowly reaped, just in case the load spike comes again.
1232          */
1233         if (spare > thread_pool.max_spare_threads) {
1234
1235                 spare -= thread_pool.max_spare_threads;
1236
1237                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1238
1239                 /*
1240                  *      Walk through the thread pool, deleting the
1241                  *      first idle thread we come across.
1242                  */
1243                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1244                         next = handle->next;
1245
1246                         /*
1247                          *      If the thread is not handling a
1248                          *      request, but still live, then tell it
1249                          *      to exit.
1250                          *
1251                          *      It will eventually wake up, and realize
1252                          *      it's been told to commit suicide.
1253                          */
1254                         if ((handle->request == NULL) &&
1255                             (handle->status == THREAD_RUNNING)) {
1256                                 handle->status = THREAD_CANCELLED;
1257                                 /*
1258                                  *      Post an extra semaphore, as a
1259                                  *      signal to wake up, and exit.
1260                                  */
1261                                 sem_post(&thread_pool.semaphore);
1262                                 spare--;
1263                                 break;
1264                         }
1265                 }
1266         }
1267
1268         /*
1269          *      Otherwise everything's kosher.  There are not too few,
1270          *      or too many spare threads.  Exit happily.
1271          */
1272         return;
1273 }
1274 #endif  /* WITH_GCD */
1275
1276 #ifdef WNOHANG
1277 /*
1278  *      Thread wrapper for fork().
1279  */
1280 pid_t rad_fork(void)
1281 {
1282         pid_t child_pid;
1283
1284         if (!pool_initialized) return fork();
1285
1286         reap_children();        /* be nice to non-wait thingies */
1287
1288         if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1289                 return -1;
1290         }
1291
1292         /*
1293          *      Fork & save the PID for later reaping.
1294          */
1295         child_pid = fork();
1296         if (child_pid > 0) {
1297                 int rcode;
1298                 thread_fork_t *tf;
1299
1300                 tf = rad_malloc(sizeof(*tf));
1301                 memset(tf, 0, sizeof(*tf));
1302
1303                 tf->pid = child_pid;
1304
1305                 pthread_mutex_lock(&thread_pool.wait_mutex);
1306                 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1307                 pthread_mutex_unlock(&thread_pool.wait_mutex);
1308
1309                 if (!rcode) {
1310                         ERROR("Failed to store PID, creating what will be a zombie process %d",
1311                                (int) child_pid);
1312                         free(tf);
1313                 }
1314         }
1315
1316         /*
1317          *      Return whatever we were told.
1318          */
1319         return child_pid;
1320 }
1321
1322
1323 /*
1324  *      Wait 10 seconds at most for a child to exit, then give up.
1325  */
1326 pid_t rad_waitpid(pid_t pid, int *status)
1327 {
1328         int i;
1329         thread_fork_t mytf, *tf;
1330
1331         if (!pool_initialized) return waitpid(pid, status, 0);
1332
1333         if (pid <= 0) return -1;
1334
1335         mytf.pid = pid;
1336
1337         pthread_mutex_lock(&thread_pool.wait_mutex);
1338         tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1339         pthread_mutex_unlock(&thread_pool.wait_mutex);
1340
1341         if (!tf) return -1;
1342
1343         for (i = 0; i < 100; i++) {
1344                 reap_children();
1345
1346                 if (tf->exited) {
1347                         *status = tf->status;
1348
1349                         pthread_mutex_lock(&thread_pool.wait_mutex);
1350                         fr_hash_table_delete(thread_pool.waiters, &mytf);
1351                         pthread_mutex_unlock(&thread_pool.wait_mutex);
1352                         return pid;
1353                 }
1354                 usleep(100000); /* sleep for 1/10 of a second */
1355         }
1356
1357         /*
1358          *      10 seconds have passed, give up on the child.
1359          */
1360         pthread_mutex_lock(&thread_pool.wait_mutex);
1361         fr_hash_table_delete(thread_pool.waiters, &mytf);
1362         pthread_mutex_unlock(&thread_pool.wait_mutex);
1363
1364         return 0;
1365 }
1366 #else
1367 /*
1368  *      No rad_fork or rad_waitpid
1369  */
1370 #endif
1371
1372 void thread_pool_queue_stats(int array[RAD_LISTEN_MAX], int pps[2])
1373 {
1374         int i;
1375
1376 #ifndef WITH_GCD
1377         if (pool_initialized) {
1378                 struct timeval now;
1379
1380                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1381                         array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1382                 }
1383
1384                 gettimeofday(&now, NULL);
1385
1386                 pps[0] = rad_pps(&thread_pool.pps_in.pps_old,
1387                                  &thread_pool.pps_in.pps_now,
1388                                  &thread_pool.pps_in.time_old,
1389                                  &now);
1390                 pps[1] = rad_pps(&thread_pool.pps_out.pps_old,
1391                                  &thread_pool.pps_out.pps_now,
1392                                  &thread_pool.pps_out.time_old,
1393                                  &now);
1394
1395         } else
1396 #endif  /* WITH_GCD */
1397         {
1398                 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1399                         array[i] = 0;
1400                 }
1401
1402                 pps[0] = pps[1] = 0;
1403         }
1404 }
1405 #endif /* HAVE_PTHREAD_H */
1406
1407 static void time_free(void *data)
1408 {
1409         free(data);
1410 }
1411
1412 void exec_trigger(REQUEST *request, CONF_SECTION *cs, char const *name, int quench)
1413 {
1414         CONF_SECTION *subcs;
1415         CONF_ITEM *ci;
1416         CONF_PAIR *cp;
1417         char const *attr;
1418         char const *value;
1419         VALUE_PAIR *vp;
1420         bool alloc = false;
1421
1422         /*
1423          *      Use global "trigger" section if no local config is given.
1424          */
1425         if (!cs) {
1426                 cs = main_config.config;
1427                 attr = name;
1428         } else {
1429                 /*
1430                  *      Try to use pair name, rather than reference.
1431                  */
1432                 attr = strrchr(name, '.');
1433                 if (attr) {
1434                         attr++;
1435                 } else {
1436                         attr = name;
1437                 }
1438         }
1439
1440         /*
1441          *      Find local "trigger" subsection.  If it isn't found,
1442          *      try using the global "trigger" section, and reset the
1443          *      reference to the full path, rather than the sub-path.
1444          */
1445         subcs = cf_section_sub_find(cs, "trigger");
1446         if (!subcs && (cs != main_config.config)) {
1447                 subcs = cf_section_sub_find(main_config.config, "trigger");
1448                 attr = name;
1449         }
1450
1451         if (!subcs) return;
1452
1453         ci = cf_reference_item(subcs, main_config.config, attr);
1454         if (!ci) {
1455                 ERROR("No such item in trigger section: %s", attr);
1456                 return;
1457         }
1458
1459         if (!cf_item_is_pair(ci)) {
1460                 ERROR("Trigger is not a configuration variable: %s", attr);
1461                 return;
1462         }
1463
1464         cp = cf_item_to_pair(ci);
1465         if (!cp) return;
1466
1467         value = cf_pair_value(cp);
1468         if (!value) {
1469                 ERROR("Trigger has no value: %s", name);
1470                 return;
1471         }
1472
1473         /*
1474          *      May be called for Status-Server packets.
1475          */
1476         vp = NULL;
1477         if (request && request->packet) vp = request->packet->vps;
1478
1479         /*
1480          *      Perform periodic quenching.
1481          */
1482         if (quench) {
1483                 time_t *last_time;
1484
1485                 last_time = cf_data_find(cs, value);
1486                 if (!last_time) {
1487                         last_time = rad_malloc(sizeof(*last_time));
1488                         *last_time = 0;
1489
1490                         if (cf_data_add(cs, value, last_time, time_free) < 0) {
1491                                 free(last_time);
1492                                 last_time = NULL;
1493                         }
1494                 }
1495
1496                 /*
1497                  *      Send the quenched traps at most once per second.
1498                  */
1499                 if (last_time) {
1500                         time_t now = time(NULL);
1501                         if (*last_time == now) return;
1502
1503                         *last_time = now;
1504                 }
1505         }
1506
1507         /*
1508          *      radius_exec_program always needs a request.
1509          */
1510         if (!request) {
1511                 request = request_alloc(NULL);
1512                 alloc = true;
1513         }
1514
1515         DEBUG("Trigger %s -> %s", name, value);
1516
1517         radius_exec_program(request, NULL, 0, NULL, request, value, vp, false, true, EXEC_TIMEOUT);
1518
1519         if (alloc) talloc_free(request);
1520 }