Threads don't block SIGCHLD.
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include "autoconf.h"
25 #include "libradius.h"
26
27 #if HAVE_PTHREAD_H
28
29 #include <stdlib.h>
30 #include <string.h>
31 #include <semaphore.h>
32 #include <signal.h>
33
34 #include "radiusd.h"
35 #include "rad_assert.h"
36 #include "conffile.h"
37
38 static const char rcsid[] =
39 "$Id$";
40
41 #define SEMAPHORE_LOCKED        (0)
42 #define SEMAPHORE_UNLOCKED      (1)
43
44 #define THREAD_RUNNING          (1)
45 #define THREAD_CANCELLED        (2)
46
47 /*
48  *      Prototype to shut the compiler up.
49  */
50 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun);
51
52 /*
53  *  A data structure which contains the information about
54  *  the current thread.
55  *
56  *  pthread_id     pthread id
57  *  thread_num     server thread number, 1...number of threads
58  *  semaphore     used to block the thread until a request comes in
59  *  status        is the thread running or exited?
60  *  request_count the number of requests that this thread has handled
61  *  timestamp     when the thread started executing.
62  *  request       the current request that the thread is processing.
63  *  fun           the function which is handling the request.
64  */
65 typedef struct THREAD_HANDLE {
66         struct THREAD_HANDLE *prev;
67         struct THREAD_HANDLE *next;
68         pthread_t pthread_id;
69         int thread_num;
70         sem_t semaphore;
71         int status;
72         int request_count;
73         time_t timestamp;
74         REQUEST *request;
75         RAD_REQUEST_FUNP fun;
76 } THREAD_HANDLE;
77
78 /*
79  *      A data structure to manage the thread pool.  There's no real
80  *      need for a data structure, but it makes things conceptually
81  *      easier.
82  */
83 typedef struct THREAD_POOL {
84         THREAD_HANDLE *head;
85         THREAD_HANDLE *tail;
86         
87         int total_threads;
88         int active_threads;
89         int max_thread_num;
90         int start_threads;
91         int max_threads;
92         int min_spare_threads;
93         int max_spare_threads;
94         int max_requests_per_thread;
95         time_t time_last_spawned;
96         int cleanup_delay;
97 } THREAD_POOL;
98
99 static THREAD_POOL thread_pool;
100
101 /*
102  *      A mapping of configuration file names to internal integers
103  */
104 static const CONF_PARSER thread_config[] = {
105         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" }, 
106         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" }, 
107         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" }, 
108         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" }, 
109         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" }, 
110         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" }, 
111         { NULL, -1, 0, NULL, NULL }
112 };
113
114 /*
115  *      The main thread handler for requests.
116  *
117  *      Wait on the semaphore until we have it, and process the request.
118  */
119 static void *request_handler_thread(void *arg)
120 {
121         THREAD_HANDLE   *self = (THREAD_HANDLE *) arg;
122 #if HAVE_PTHREAD_SIGMASK
123         sigset_t set;
124
125         /*
126          *      Block SIGHUP handling for the child threads.
127          *
128          *      This ensures that only the main server thread will
129          *      process HUP signals.
130          *
131          *      If we don't have sigprocmask, then it shouldn't be
132          *      a problem, either, as the sig_hup handler should check
133          *      for this condition.
134          */
135         sigemptyset(&set);
136         sigaddset(&set, SIGHUP);
137         sigaddset(&set, SIGINT);
138         sigaddset(&set, SIGQUIT);
139         pthread_sigmask(SIG_BLOCK, &set, NULL);
140 #endif
141         
142         /*
143          *      Loop forever, until told to exit.
144          */
145         for (;;) {
146                 /*
147                  *      Wait for the semaphore to be given to us.
148                  */
149                 DEBUG2("Thread %d waiting to be assigned a request",
150                                 self->thread_num);
151                 if (sem_wait(&self->semaphore) < 0) {
152                         break;
153                 }
154
155                 /*
156                  *      If we've been told to kill ourselves,
157                  *      then exit politely.
158                  */
159                 if (self->status == THREAD_CANCELLED) {
160                         DEBUG2("Thread %d exiting on request from parent.",
161                                         self->thread_num);
162                         break;
163                 }
164                 
165                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
166                                 self->thread_num, self->request->number,
167                                 self->request_count);
168                 
169                 rad_respond(self->request, self->fun);
170                 self->request = NULL;
171
172                 /*
173                  *      The semaphore's value is zero, because we've
174                  *      locked it.  We now go back to the top of the loop,
175                  *      where we wait for it's value to become non-zero.
176                  */
177         }
178
179         /*
180          *      This thread is exiting.  Delete any additional resources
181          *      associated with it (semaphore, etc), and free the thread
182          *      handle memory.
183          */
184         sem_destroy(&self->semaphore);
185         free(self);
186         return NULL;
187 }
188
189 /*
190  *      Take a THREAD_HANDLE, and delete it from the thread pool.
191  *
192  *      This function is called ONLY from the main server thread.
193  */
194 static void delete_thread(THREAD_HANDLE *handle)
195 {
196         THREAD_HANDLE *prev;
197         THREAD_HANDLE *next;
198
199         rad_assert(handle->request == NULL);
200
201         prev = handle->prev;
202         next = handle->next;
203         rad_assert(thread_pool.total_threads > 0);
204         thread_pool.total_threads--;
205
206         if (prev == NULL) {
207                 rad_assert(thread_pool.head == handle);
208                 thread_pool.head = next;
209         } else {
210                 prev->next = next;
211         }
212   
213         if (next == NULL) {
214                 rad_assert(thread_pool.tail == handle);
215                 thread_pool.tail = prev;
216         } else {
217                 next->prev = prev;
218         }
219 }
220
221 /*
222  *      Take a THREAD_HANDLE, and move it to the end of the thread pool.
223  *
224  *      This function is called ONLY from the main server thread.
225  *      It's function is to keep the incoming requests rotating among
226  *      the threads in the pool.
227  */
228 static void move2tail(THREAD_HANDLE *handle)
229 {
230         THREAD_HANDLE *prev;
231         THREAD_HANDLE *next;
232
233         /*
234          *      Empty list: add it to the head.
235          */
236         if (thread_pool.head == NULL) {
237                 rad_assert(thread_pool.tail == NULL);
238                 rad_assert(thread_pool.total_threads == 1);
239
240                 handle->prev = NULL;
241                 handle->next = NULL;
242                 thread_pool.head = handle;
243                 thread_pool.tail = handle;
244                 return;
245         }
246
247         rad_assert(thread_pool.total_threads >= 1);
248         prev = handle->prev;
249         next = handle->next;
250   
251         /*
252          *      If the element is in the list, then delete it from where
253          *      it is.
254          */
255         if ((next != NULL) ||
256                         (prev != NULL)) {
257                 /*
258                  *      If it's already at the tail, exit immediately,
259                  *      there's no more work to do.
260                  */
261                 if (next == NULL) {
262                         rad_assert(thread_pool.tail == handle);
263                         return;
264                 }
265
266                 /*
267                  *      Maybe it's at the head of the list?
268                  */
269                 if (prev == NULL) {
270                         rad_assert(thread_pool.head == handle);
271                         thread_pool.head = next;
272                         next->prev = NULL;
273
274                         /*
275                          *      Nope, it's really in the middle.
276                          *      Unlink it, then.
277                          */
278                 } else {
279                         rad_assert(prev != NULL); /* be explicit about it. */
280                         rad_assert(next != NULL); /* be explicit about it. */
281
282                         prev->next = next;
283                         next->prev = prev;
284                 }
285         }
286
287         /*
288          *      Finally, add it to the tail, and update the pointers.
289          */
290         handle->next = NULL;
291         prev = thread_pool.tail;
292         rad_assert(prev->next == NULL);
293
294         thread_pool.tail = handle;
295         handle->prev = prev;
296         prev->next = handle;
297 }
298
299 /*
300  *      Spawn a new thread, and place it in the thread pool.
301  *
302  *      The thread is started initially in the blocked state, waiting
303  *      for the semaphore.
304  */
305 static THREAD_HANDLE *spawn_thread(time_t now)
306 {
307         int rcode;
308         THREAD_HANDLE *handle;
309         pthread_attr_t attr;
310
311         /*
312          *      Ensure that we don't spawn too many threads.
313          */
314         if (thread_pool.total_threads >= thread_pool.max_threads) {
315                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
316                 return NULL;
317         }
318
319         /*
320          *      Allocate a new thread handle.
321          */
322         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
323         memset(handle, 0, sizeof(THREAD_HANDLE));
324         handle->prev = NULL;
325         handle->next = NULL;
326         handle->pthread_id = NO_SUCH_CHILD_PID;
327         handle->thread_num = thread_pool.max_thread_num++;
328         handle->request_count = 0;
329         handle->status = THREAD_RUNNING;
330         handle->timestamp = time(NULL);
331
332         /*
333          *      Initialize the semaphore to be for this process only,
334          *      and to have the thread block until the server gives it
335          *      the semaphore.
336          */
337         rcode = sem_init(&handle->semaphore, 0, SEMAPHORE_LOCKED);
338         if (rcode != 0) {
339                 radlog(L_ERR|L_CONS, "Failed to allocate semaphore: %s",
340                                 strerror(errno));
341                 exit(1);
342         }
343
344         /*
345          *      The thread isn't currently handling a request.
346          */
347         handle->request = NULL;
348         handle->fun = NULL;
349
350         /*
351          *      Initialize the thread's attributes to detached.
352          *
353          *      We could call pthread_detach() later, but if the thread
354          *      exits between the create & detach calls, it will need to
355          *      be joined, which will never happen.
356          */
357         pthread_attr_init(&attr);
358         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
359         
360         /*
361          *      Create the thread detached, so that it cleans up it's
362          *      own memory when it exits.
363          */
364         rcode = pthread_create(&handle->pthread_id, &attr,
365                         request_handler_thread, handle);
366         if (rcode != 0) {
367                 radlog(L_ERR|L_CONS, "Thread create failed: %s", strerror(errno));
368                 exit(1);
369         }
370         pthread_attr_destroy(&attr);
371
372         /*
373          *      One more thread to go into the list.
374          */
375         thread_pool.total_threads++;
376         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
377                         handle->thread_num, thread_pool.total_threads);
378
379         /*
380          *      Move the thread handle to the tail of the thread pool list.
381          */
382         move2tail(handle);
383
384         /*
385          *      Update the time we last spawned a thread.
386          */
387         thread_pool.time_last_spawned = now;
388
389         /*
390          *      And return the new handle to the caller.
391          */
392         return handle;
393 }
394
395 /*
396  *      Allocate the thread pool, and seed it with an initial number
397  *      of threads.
398   */
399 int thread_pool_init(void)
400 {
401         int i;
402         THREAD_HANDLE   *handle;
403         CONF_SECTION    *pool_cf;
404         time_t          now;
405
406         DEBUG("Initializing the thread pool...");
407         now = time(NULL);
408
409         /*
410          *      Initialize the thread pool to some reasonable values.
411          */
412         memset(&thread_pool, 0, sizeof(THREAD_POOL));
413         thread_pool.head = NULL;
414         thread_pool.tail = NULL;
415         thread_pool.total_threads = 0;
416         thread_pool.max_thread_num = 1;
417         thread_pool.cleanup_delay = 5;
418
419         pool_cf = cf_section_find("thread");
420         if (pool_cf != NULL) {
421                 cf_section_parse(pool_cf, NULL, thread_config);
422         }
423
424         /*
425          *      Create a number of waiting threads.
426          *
427          *      If we fail while creating them, do something intelligent.
428          */
429         for (i = 0; i < thread_pool.start_threads; i++) {
430                 handle = spawn_thread(now);
431                 if (handle == NULL) {
432                         return -1;
433                 }
434         }
435
436         return 0;
437 }
438
439 /*
440  *      Assign a new request to a free thread.
441  *
442  *      If there isn't a free thread, then try to create a new one,
443  *      up to the configured limits.
444  */
445 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun)
446 {
447         int active_threads;
448         THREAD_HANDLE *handle;
449         THREAD_HANDLE *found;
450         THREAD_HANDLE *next;
451
452         /*
453          *      Loop over the active thread pool, looking for a
454          *      waiting thread.
455          */
456         found = NULL;
457         active_threads = 0;
458         for (handle = thread_pool.head; handle; handle = next) {
459                 next = handle->next;
460
461                 /*
462                  *      Ignore threads which aren't running.
463                  */
464                 if (handle->status != THREAD_RUNNING) {
465                         continue;
466                 }
467
468                 /*
469                  *      If we haven't found a free thread yet, then
470                  *      check it's semaphore lock.  We don't lock it,
471                  *      so if it's locked, then the thread MUST be the
472                  *      one locking it, waiting for us to unlock it.
473                  */
474                 if (handle->request == NULL) {
475                         if (found == NULL) {
476                                 found = handle;
477                         }
478                 } else {
479                         active_threads++;
480                 }
481         } /* loop over all of the threads */
482
483         /*
484          *      If we haven't found an active thread, then spawn a new one.
485          *
486          *      If we can't spawn a new one, complain, and exit.
487          */
488         if (found == NULL) {
489                 found = spawn_thread(request->timestamp);
490                 if (found == NULL) {
491                         radlog(L_INFO, 
492                                         "The maximum number of threads (%d) are active, cannot spawn new thread to handle request", 
493                                         thread_pool.max_threads);
494                         return -1;
495                 }
496         }
497
498         /*
499          *      OK, now 'handle' points to a waiting thread.  We move
500          *      it to the tail of the thread pool, so that we can
501          *      cycle among the threads.
502          *
503          *      We then give it the request, signal its semaphore, and
504          *      return.  The thread eventually wakes up, and handles
505          *      the request.
506          */
507         DEBUG2("Thread %d assigned request %d", found->thread_num, request->number);
508         move2tail(found);
509         found->request = request;
510         found->fun = fun;
511         found->request_count++;
512         sem_post(&found->semaphore);
513         thread_pool.active_threads = active_threads;
514
515         return 0;
516 }
517
518 /*
519  *      Check the min_spare_threads and max_spare_threads.
520  *
521  *      If there are too many or too few threads waiting, then we
522  *      either create some more, or delete some.
523  */
524 int thread_pool_clean(time_t now)
525 {
526         int spare;
527         int i, total;
528         THREAD_HANDLE *handle, *next;
529         int active_threads;
530
531         /*
532          *      Loop over the thread pool, doing stuff.
533          */
534         active_threads = 0;
535         for (handle = thread_pool.head; handle; handle = handle->next) {
536                 if (handle->request != NULL) {
537                         active_threads++;
538                 }
539         }
540
541         spare = thread_pool.total_threads - active_threads;
542         if (debug_flag) {
543                 static int old_total = -1;
544                 static int old_active = -1;
545
546                 if ((old_total != thread_pool.total_threads) ||
547                                 (old_active != active_threads)) {
548                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
549                                         thread_pool.total_threads, active_threads, spare);
550                         old_total = thread_pool.total_threads;
551                         old_active = active_threads;
552                 }
553         }
554
555         /*
556          *      If there are too few spare threads, create some more.
557          */
558         if (spare < thread_pool.min_spare_threads) {
559                 total = thread_pool.min_spare_threads - spare;
560
561                 DEBUG2("Threads: Spawning %d spares", total);
562                 /*
563                  *      Create a number of spare threads.
564                  */
565                 for (i = 0; i < total; i++) {
566                         handle = spawn_thread(now);
567                         if (handle == NULL) {
568                                 return -1;
569                         }
570                 }
571
572                 /*
573                  *      And exit, as there can't be too many spare threads.
574                  */
575                 return 0;
576         }
577
578         /*
579          *      Only delete the spare threads if sufficient time has
580          *      passed since we last created one.  This helps to minimize
581          *      the amount of create/delete cycles.
582          */
583         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
584                 return 0;
585         }
586
587         /*
588          *      If there are too many spare threads, delete one.
589          *
590          *      Note that we only delete ONE at a time, instead of
591          *      wiping out many.  This allows the excess servers to
592          *      be slowly reaped, just in case the load spike comes again.
593          */
594         if (spare > thread_pool.max_spare_threads) {
595
596                 spare -= thread_pool.max_spare_threads;
597
598                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
599
600                 /*
601                  *      Walk through the thread pool, deleting the
602                  *      first N idle threads we come across.
603                  */
604                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
605                         next = handle->next;
606
607                         /*
608                          *      If the thread is not handling a
609                          *      request, then tell it to exit.
610                          *
611                          *      Note that we delete it from the thread
612                          *      pool BEFORE telling it to kill itself,
613                          *      as the child thread can free the 'handle'
614                          *      structure, without anyone else using it.
615                          */
616                         if (handle->request == NULL) {
617                                 delete_thread(handle);
618                                 handle->status = THREAD_CANCELLED;
619                                 sem_post(&handle->semaphore);
620                                 spare--;
621                                 break;
622                         }
623                 }
624         }
625   
626         /*
627          *      If the thread has handled too many requests, then make it
628          *      exit.
629          */
630         if (thread_pool.max_requests_per_thread > 0) {
631                 for (handle = thread_pool.head; handle; handle = next) {
632                         next = handle->next;
633
634                         /*
635                          *      Not handling a request, we can check it.
636                          */
637                         if ((handle->request == NULL) &&
638                             (handle->request_count > thread_pool.max_requests_per_thread)) {
639                                 delete_thread(handle);
640                                 handle->status = THREAD_CANCELLED;
641                                 sem_post(&handle->semaphore);
642                         }
643                 }
644         }
645   
646         /*
647          *      Otherwise everything's kosher.  There are not too few,
648          *      or too many spare threads.  Exit happily.
649          */
650         return 0;
651 }
652 #endif