Code "cleanups." I confess that I sometimes went beyond the TODO
[freeradius.git] / src / main / threads.c
1 /*
2  * threads.c    request threading support
3  *
4  * Version:     $Id$
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  * Copyright 2000  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23
24 #include "autoconf.h"
25 #include "libradius.h"
26
27 #ifdef WITH_THREAD_POOL
28
29 #include <stdlib.h>
30 #include <string.h>
31 #include <semaphore.h>
32 #include <assert.h>
33 #include <signal.h>
34
35 #include "radiusd.h"
36 #include "conffile.h"
37
38 static const char rcsid[] =
39 "$Id$";
40
41 #define SEMAPHORE_LOCKED        (0)
42 #define SEMAPHORE_UNLOCKED      (1)
43
44 #define THREAD_RUNNING          (1)
45 #define THREAD_CANCELLED        (2)
46
47 /*
48  *      Prototype to shut the compiler up.
49  */
50 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun);
51
52 /*
53  *  A data structure which contains the information about
54  *  the current thread.
55  *
56  *  pthread_id     pthread id
57  *  thread_num     server thread number, 1...number of threads
58  *  semaphore     used to block the thread until a request comes in
59  *  status        is the thread running or exited?
60  *  request_count the number of requests that this thread has handled
61  *  timestamp     when the thread started executing.
62  *  request       the current request that the thread is processing.
63  *  fun           the function which is handling the request.
64  */
65 typedef struct THREAD_HANDLE {
66         struct THREAD_HANDLE *prev;
67         struct THREAD_HANDLE *next;
68         pthread_t pthread_id;
69         int thread_num;
70         sem_t semaphore;
71         int status;
72         int request_count;
73         time_t timestamp;
74         REQUEST *request;
75         RAD_REQUEST_FUNP fun;
76 } THREAD_HANDLE;
77
78 /*
79  *      A data structure to manage the thread pool.  There's no real
80  *      need for a data structure, but it makes things conceptually
81  *      easier.
82  */
83 typedef struct THREAD_POOL {
84         THREAD_HANDLE *head;
85         THREAD_HANDLE *tail;
86         
87         int total_threads;
88         int active_threads;
89         int max_thread_num;
90         int start_threads;
91         int max_threads;
92         int min_spare_threads;
93         int max_spare_threads;
94         int max_requests_per_thread;
95         time_t time_last_spawned;
96         int cleanup_delay;
97 } THREAD_POOL;
98
99 static THREAD_POOL thread_pool;
100
101 /*
102  *      A mapping of configuration file names to internal integers
103  */
104 static const CONF_PARSER thread_config[] = {
105         { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" }, 
106         { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" }, 
107         { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" }, 
108         { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" }, 
109         { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" }, 
110         { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" }, 
111         { NULL, -1, 0, NULL, NULL }
112 };
113
114 /*
115  *      The main thread handler for requests.
116  *
117  *      Wait on the semaphore until we have it, and process the request.
118  */
119 static void *request_handler_thread(void *arg)
120 {
121         THREAD_HANDLE   *self = (THREAD_HANDLE *) arg;
122 #if HAVE_SIGPROCMASK
123         sigset_t set;
124
125         /*
126          *      Block SIGHUP handling for the child threads.
127          *
128          *      This ensures that only the main server thread will
129          *      process HUP signals.
130          *
131          *      If we don't have sigprocmask, then it shouldn't be
132          *      a problem, either, as the sig_hup handler should check
133          *      for this condition.
134          */
135         sigemptyset(&set);
136         sigaddset(&set, SIGHUP);
137         sigaddset(&set, SIGCHLD);
138         sigprocmask(SIG_BLOCK, &set, NULL);
139 #endif
140         
141         /*
142          *      Loop forever, until told to exit.
143          */
144         for (;;) {
145                 /*
146                  *      Wait for the semaphore to be given to us.
147                  */
148                 DEBUG2("Thread %d waiting to be assigned a request",
149                                 self->thread_num);
150                 sem_wait(&self->semaphore);
151
152                 /*
153                  *      If we've been told to kill ourselves,
154                  *      then exit politely.
155                  */
156                 if (self->status == THREAD_CANCELLED) {
157                         DEBUG2("Thread %d exiting on request from parent.",
158                                         self->thread_num);
159                         break;
160                 }
161                 
162                 DEBUG2("Thread %d handling request %d, (%d handled so far)",
163                                 self->thread_num, self->request->number,
164                                 self->request_count);
165                 
166                 rad_respond(self->request, self->fun);
167                 self->request = NULL;
168
169                 /*
170                  *      The semaphore's value is zero, because we've
171                  *      locked it.  We now go back to the top of the loop,
172                  *      where we wait for it's value to become non-zero.
173                  */
174         }
175
176         /*
177          *      This thread is exiting.  Delete any additional resources
178          *      associated with it (semaphore, etc), and free the thread
179          *      handle memory.
180          */
181         sem_destroy(&self->semaphore);
182         free(self);
183         return NULL;
184 }
185
186 /*
187  *      Take a THREAD_HANDLE, and delete it from the thread pool.
188  *
189  *      This function is called ONLY from the main server thread.
190  */
191 static void delete_thread(THREAD_HANDLE *handle)
192 {
193         THREAD_HANDLE *prev;
194         THREAD_HANDLE *next;
195
196         assert(handle->request == NULL);
197
198         prev = handle->prev;
199         next = handle->next;
200         assert(thread_pool.total_threads > 0);
201         thread_pool.total_threads--;
202
203         if (prev == NULL) {
204                 assert(thread_pool.head == handle);
205                 thread_pool.head = next;
206         } else {
207                 prev->next = next;
208         }
209   
210         if (next == NULL) {
211                 assert(thread_pool.tail == handle);
212                 thread_pool.tail = prev;
213         } else {
214                 next->prev = prev;
215         }
216 }
217
218 /*
219  *      Take a THREAD_HANDLE, and move it to the end of the thread pool.
220  *
221  *      This function is called ONLY from the main server thread.
222  *      It's function is to keep the incoming requests rotating among
223  *      the threads in the pool.
224  */
225 static void move2tail(THREAD_HANDLE *handle)
226 {
227         THREAD_HANDLE *prev;
228         THREAD_HANDLE *next;
229
230         /*
231          *      Empty list: add it to the head.
232          */
233         if (thread_pool.head == NULL) {
234                 assert(thread_pool.tail == NULL);
235                 assert(thread_pool.total_threads == 1);
236
237                 handle->prev = NULL;
238                 handle->next = NULL;
239                 thread_pool.head = handle;
240                 thread_pool.tail = handle;
241                 return;
242         }
243
244         assert(thread_pool.total_threads >= 1);
245         prev = handle->prev;
246         next = handle->next;
247   
248         /*
249          *      If the element is in the list, then delete it from where
250          *      it is.
251          */
252         if ((next != NULL) ||
253                         (prev != NULL)) {
254                 /*
255                  *      If it's already at the tail, exit immediately,
256                  *      there's no more work to do.
257                  */
258                 if (next == NULL) {
259                         assert(thread_pool.tail == handle);
260                         return;
261                 }
262
263                 /*
264                  *      Maybe it's at the head of the list?
265                  */
266                 if (prev == NULL) {
267                         assert(thread_pool.head == handle);
268                         thread_pool.head = next;
269                         next->prev = NULL;
270
271                         /*
272                          *      Nope, it's really in the middle.
273                          *      Unlink it, then.
274                          */
275                 } else {
276                         assert(prev != NULL); /* be explicit about it. */
277                         assert(next != NULL); /* be explicit about it. */
278
279                         prev->next = next;
280                         next->prev = prev;
281                 }
282         }
283
284         /*
285          *      Finally, add it to the tail, and update the pointers.
286          */
287         handle->next = NULL;
288         prev = thread_pool.tail;
289         assert(prev->next == NULL);
290
291         thread_pool.tail = handle;
292         handle->prev = prev;
293         prev->next = handle;
294 }
295
296 /*
297  *      Spawn a new thread, and place it in the thread pool.
298  *
299  *      The thread is started initially in the blocked state, waiting
300  *      for the semaphore.
301  */
302 static THREAD_HANDLE *spawn_thread(time_t now)
303 {
304         int rcode;
305         THREAD_HANDLE *handle;
306         pthread_attr_t attr;
307
308         /*
309          *      Ensure that we don't spawn too many threads.
310          */
311         if (thread_pool.total_threads >= thread_pool.max_threads) {
312                 DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
313                 return NULL;
314         }
315
316         /*
317          *      Allocate a new thread handle.
318          */
319         handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
320         memset(handle, 0, sizeof(THREAD_HANDLE));
321         handle->prev = NULL;
322         handle->next = NULL;
323         handle->pthread_id = NO_SUCH_CHILD_PID;
324         handle->thread_num = thread_pool.max_thread_num++;
325         handle->request_count = 0;
326         handle->status = THREAD_RUNNING;
327         handle->timestamp = time(NULL);
328
329         /*
330          *      Initialize the semaphore to be for this process only,
331          *      and to have the thread block until the server gives it
332          *      the semaphore.
333          */
334         rcode = sem_init(&handle->semaphore, 0, SEMAPHORE_LOCKED);
335         if (rcode != 0) {
336                 radlog(L_ERR|L_CONS, "Failed to allocate semaphore: %s",
337                                 strerror(errno));
338                 exit(1);
339         }
340
341         /*
342          *      The thread isn't currently handling a request.
343          */
344         handle->request = NULL;
345         handle->fun = NULL;
346
347         /*
348          *      Initialize the thread's attributes to detached.
349          *
350          *      We could call pthread_detach() later, but if the thread
351          *      exits between the create & detach calls, it will need to
352          *      be joined, which will never happen.
353          */
354         pthread_attr_init(&attr);
355         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
356         
357         /*
358          *      Create the thread detached, so that it cleans up it's
359          *      own memory when it exits.
360          */
361         rcode = pthread_create(&handle->pthread_id, &attr,
362                         request_handler_thread, handle);
363         if (rcode != 0) {
364                 radlog(L_ERR|L_CONS, "Thread create failed: %s", strerror(errno));
365                 exit(1);
366         }
367         pthread_attr_destroy(&attr);
368
369         /*
370          *      One more thread to go into the list.
371          */
372         thread_pool.total_threads++;
373         DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
374                         handle->thread_num, thread_pool.total_threads);
375
376         /*
377          *      Move the thread handle to the tail of the thread pool list.
378          */
379         move2tail(handle);
380
381         /*
382          *      Update the time we last spawned a thread.
383          */
384         thread_pool.time_last_spawned = now;
385
386         /*
387          *      And return the new handle to the caller.
388          */
389         return handle;
390 }
391
392 /*
393  *      Allocate the thread pool, and seed it with an initial number
394  *      of threads.
395   */
396 int thread_pool_init(void)
397 {
398         int i;
399         THREAD_HANDLE   *handle;
400         CONF_SECTION    *pool_cf;
401         time_t          now;
402
403         DEBUG("Initializing the thread pool...");
404         now = time(NULL);
405
406         /*
407          *      Initialize the thread pool to some reasonable values.
408          */
409         memset(&thread_pool, 0, sizeof(THREAD_POOL));
410         thread_pool.head = NULL;
411         thread_pool.tail = NULL;
412         thread_pool.total_threads = 0;
413         thread_pool.max_thread_num = 1;
414         thread_pool.cleanup_delay = 5;
415
416         pool_cf = cf_section_find("thread");
417         if (pool_cf != NULL) {
418                 cf_section_parse(pool_cf, NULL, thread_config);
419         }
420
421         /*
422          *      Create a number of waiting threads.
423          *
424          *      If we fail while creating them, do something intelligent.
425          */
426         for (i = 0; i < thread_pool.start_threads; i++) {
427                 handle = spawn_thread(now);
428                 if (handle == NULL) {
429                         return -1;
430                 }
431         }
432
433         return 0;
434 }
435
436 /*
437  *      Assign a new request to a free thread.
438  *
439  *      If there isn't a free thread, then try to create a new one,
440  *      up to the configured limits.
441  */
442 int rad_spawn_child(REQUEST *request, RAD_REQUEST_FUNP fun)
443 {
444         int active_threads;
445         THREAD_HANDLE *handle;
446         THREAD_HANDLE *found;
447         THREAD_HANDLE *next;
448
449         /*
450          *      Loop over the active thread pool, looking for a
451          *      waiting thread.
452          */
453         found = NULL;
454         active_threads = 0;
455         for (handle = thread_pool.head; handle; handle = next) {
456                 next = handle->next;
457
458                 /*
459                  *      Ignore threads which aren't running.
460                  */
461                 if (handle->status != THREAD_RUNNING) {
462                         continue;
463                 }
464
465                 /*
466                  *      If we haven't found a free thread yet, then
467                  *      check it's semaphore lock.  We don't lock it,
468                  *      so if it's locked, then the thread MUST be the
469                  *      one locking it, waiting for us to unlock it.
470                  */
471                 if (handle->request == NULL) {
472                         if (found == NULL) {
473                                 found = handle;
474                         }
475                 } else {
476                         active_threads++;
477                 }
478         } /* loop over all of the threads */
479
480         /*
481          *      If we haven't found an active thread, then spawn a new one.
482          *
483          *      If we can't spawn a new one, complain, and exit.
484          */
485         if (found == NULL) {
486                 found = spawn_thread(request->timestamp);
487                 if (found == NULL) {
488                         radlog(L_INFO, 
489                                         "The maximum number of threads (%d) are active, cannot spawn new thread to handle request", 
490                                         thread_pool.max_threads);
491                         return -1;
492                 }
493         }
494
495         /*
496          *      OK, now 'handle' points to a waiting thread.  We move
497          *      it to the tail of the thread pool, so that we can
498          *      cycle among the threads.
499          *
500          *      We then give it the request, signal its semaphore, and
501          *      return.  The thread eventually wakes up, and handles
502          *      the request.
503          */
504         DEBUG2("Thread %d assigned request %d", found->thread_num, request->number);
505         move2tail(found);
506         found->request = request;
507         found->fun = fun;
508         found->request_count++;
509         sem_post(&found->semaphore);
510         thread_pool.active_threads = active_threads;
511
512         return 0;
513 }
514
515 /*
516  *      Check the min_spare_threads and max_spare_threads.
517  *
518  *      If there are too many or too few threads waiting, then we
519  *      either create some more, or delete some.
520  */
521 int thread_pool_clean(time_t now)
522 {
523         int spare;
524         int i, total;
525         THREAD_HANDLE *handle, *next;
526         int active_threads;
527
528         /*
529          *      Loop over the thread pool, doing stuff.
530          */
531         active_threads = 0;
532         for (handle = thread_pool.head; handle; handle = handle->next) {
533                 if (handle->request != NULL) {
534                         active_threads++;
535                 }
536         }
537
538         spare = thread_pool.total_threads - active_threads;
539         if (debug_flag) {
540                 static int old_total = -1;
541                 static int old_active = -1;
542
543                 if ((old_total != thread_pool.total_threads) ||
544                                 (old_active != active_threads)) {
545                         DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
546                                         thread_pool.total_threads, active_threads, spare);
547                         old_total = thread_pool.total_threads;
548                         old_active = active_threads;
549                 }
550         }
551
552         /*
553          *      If there are too few spare threads, create some more.
554          */
555         if (spare < thread_pool.min_spare_threads) {
556                 total = thread_pool.min_spare_threads - spare;
557
558                 DEBUG2("Threads: Spawning %d spares", total);
559                 /*
560                  *      Create a number of spare threads.
561                  */
562                 for (i = 0; i < total; i++) {
563                         handle = spawn_thread(now);
564                         if (handle == NULL) {
565                                 return -1;
566                         }
567                 }
568
569                 /*
570                  *      And exit, as there can't be too many spare threads.
571                  */
572                 return 0;
573         }
574
575         /*
576          *      Only delete the spare threads if sufficient time has
577          *      passed since we last created one.  This helps to minimize
578          *      the amount of create/delete cycles.
579          */
580         if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
581                 return 0;
582         }
583
584         /*
585          *      If there are too many spare threads, delete one.
586          *
587          *      Note that we only delete ONE at a time, instead of
588          *      wiping out many.  This allows the excess servers to
589          *      be slowly reaped, just in case the load spike comes again.
590          */
591         if (spare > thread_pool.max_spare_threads) {
592
593                 spare -= thread_pool.max_spare_threads;
594
595                 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
596
597                 /*
598                  *      Walk through the thread pool, deleting the
599                  *      first N idle threads we come across.
600                  */
601                 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
602                         next = handle->next;
603
604                         /*
605                          *      If the thread is not handling a
606                          *      request, then tell it to exit.
607                          *
608                          *      Note that we delete it from the thread
609                          *      pool BEFORE telling it to kill itself,
610                          *      as the child thread can free the 'handle'
611                          *      structure, without anyone else using it.
612                          */
613                         if (handle->request == NULL) {
614                                 delete_thread(handle);
615                                 handle->status = THREAD_CANCELLED;
616                                 sem_post(&handle->semaphore);
617                                 spare--;
618                                 return 0;
619                         }
620                 }
621         }
622   
623         /*
624          *      Otherwise everything's kosher.  There are not too few,
625          *      or too many spare threads.  Exit happily.
626          */
627         return 0;
628 }
629 #endif