2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 * @brief Handle pools of connections (threads, sockets, etc.)
20 * @note This API must be used by all modules in the public distribution that
21 * maintain pools of connections.
23 * @copyright 2012 The FreeRADIUS server project
24 * @copyright 2012 Alan DeKok <aland@deployingradius.com>
28 #include <freeradius-devel/radiusd.h>
29 #include <freeradius-devel/heap.h>
30 #include <freeradius-devel/modpriv.h>
31 #include <freeradius-devel/rad_assert.h>
33 typedef struct fr_connection fr_connection_t;
35 static int fr_connection_pool_check(fr_connection_pool_t *pool);
39 /* #define PTHREAD_DEBUG (1) */
43 /** An individual connection within the connection pool
45 * Defines connection counters, timestamps, and holds a pointer to the
46 * connection handle itself.
48 * @see fr_connection_pool_t
50 struct fr_connection {
51 fr_connection_t *prev; //!< Previous connection in list.
52 fr_connection_t *next; //!< Next connection in list.
54 time_t created; //!< Time connection was created.
55 struct timeval last_reserved; //!< Last time the connection was reserved.
57 struct timeval last_released; //!< Time the connection was released.
59 uint32_t num_uses; //!< Number of times the connection has been reserved.
60 uint64_t number; //!< Unique ID assigned when the connection is created,
61 //!< these will monotonically increase over the
62 //!< lifetime of the connection pool.
63 void *connection; //!< Pointer to whatever the module uses for a connection
65 bool in_use; //!< Whether the connection is currently reserved.
67 int heap; //!< For the next connection heap.
70 pthread_t pthread_id; //!< When 'in_use == true'.
76 * Defines the configuration of the connection pool, all the counters and
77 * timestamps related to the connection pool, the mutex that stops multiple
78 * threads leaving the pool in an inconsistent state, and the callbacks
79 * required to open, close and check the status of connections within the pool.
83 struct fr_connection_pool_t {
84 int ref; //!< Reference counter to prevent connection
85 //!< pool being freed multiple times.
86 uint32_t start; //!< Number of initial connections.
87 uint32_t min; //!< Minimum number of concurrent connections to keep open.
88 uint32_t max; //!< Maximum number of concurrent connections to allow.
89 uint32_t spare; //!< Number of spare connections to try.
90 uint32_t pending; //!< Number of pending open connections.
91 uint32_t retry_delay; //!< seconds to delay re-open after a failed open.
92 uint32_t cleanup_interval; //!< Initial timer for how often we sweep the pool
93 //!< for free connections. (0 is infinite).
94 int delay_interval; //!< When we next do a cleanup. Initialized to
95 //!< cleanup_interval, and increase from there based
97 int next_delay; //!< The next delay time. cleanup. Initialized to
98 //!< cleanup_interval, and decays from there.
99 uint64_t max_uses; //!< Maximum number of times a connection can be used
100 //!< before being closed.
101 uint32_t lifetime; //!< How long a connection can be open before being
102 //!< closed (irrespective of whether it's idle or not).
103 uint32_t idle_timeout; //!< How long a connection can be idle before
106 bool spread; //!< If true we spread requests over the connections,
107 //!< using the connection released longest ago, first.
109 time_t last_checked; //!< Last time we pruned the connection pool.
110 time_t last_spawned; //!< Last time we spawned a connection.
111 time_t last_failed; //!< Last time we tried to spawn a connection but failed.
112 time_t last_throttled; //!< Last time we refused to spawn a connection because
113 //!< the last connection failed, or we were already spawning
115 time_t last_at_max; //!< Last time we hit the maximum number of allowed
118 uint32_t max_pending; //!< Max number of connections to open.
120 uint64_t count; //!< Number of connections spawned over the lifetime
122 uint32_t num; //!< Number of connections in the pool.
123 uint32_t active; //!< Number of currently reserved connections.
125 fr_heap_t *heap; //!< For the next connection heap
127 fr_connection_t *head; //!< Start of the connection list.
128 fr_connection_t *tail; //!< End of the connection list.
130 #ifdef HAVE_PTHREAD_H
131 pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making
132 //!< modifications in threaded mode.
135 CONF_SECTION *cs; //!< Configuration section holding the section of parsed
136 //!< config file that relates to this pool.
137 void *opaque; //!< Pointer to context data that will be passed to callbacks.
139 char const *log_prefix; //!< Log prefix to prepend to all log messages created
140 //!< by the connection pool code.
142 char const *trigger_prefix; //!< Prefix to prepend to names of all triggers
143 //!< fired by the connection pool code.
145 fr_connection_create_t create; //!< Function used to create new connections.
146 fr_connection_alive_t alive; //!< Function used to check status of connections.
149 #ifndef HAVE_PTHREAD_H
150 # define pthread_mutex_lock(_x)
151 # define pthread_mutex_unlock(_x)
154 static const CONF_PARSER connection_config[] = {
155 { "start", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, start), "5" },
156 { "min", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, min), "5" },
157 { "max", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max), "10" },
158 { "spare", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, spare), "3" },
159 { "uses", FR_CONF_OFFSET(PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), "0" },
160 { "lifetime", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), "0" },
161 { "cleanup_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), NULL},
162 { "cleanup_interval", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), "30" },
163 { "idle_timeout", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), "60" },
164 { "retry_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), "1" },
165 { "spread", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), "no" },
166 CONF_PARSER_TERMINATOR
169 /** Order connections by reserved most recently
171 static int last_reserved_cmp(void const *one, void const *two)
173 fr_connection_t const *a = one;
174 fr_connection_t const *b = two;
176 if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1;
177 if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1;
179 if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1;
180 if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1;
185 /** Order connections by released longest ago
187 static int last_released_cmp(void const *one, void const *two)
189 fr_connection_t const *a = one;
190 fr_connection_t const *b = two;
192 if (b->last_released.tv_sec < a->last_released.tv_sec) return -1;
193 if (b->last_released.tv_sec > a->last_released.tv_sec) return +1;
195 if (b->last_released.tv_usec < a->last_released.tv_usec) return -1;
196 if (b->last_released.tv_usec > a->last_released.tv_usec) return +1;
201 /** Removes a connection from the connection list
203 * @note Must be called with the mutex held.
205 * @param[in,out] pool to modify.
206 * @param[in] this Connection to delete.
208 static void fr_connection_unlink(fr_connection_pool_t *pool, fr_connection_t *this)
211 rad_assert(pool->head != this);
212 this->prev->next = this->next;
214 rad_assert(pool->head == this);
215 pool->head = this->next;
218 rad_assert(pool->tail != this);
219 this->next->prev = this->prev;
221 rad_assert(pool->tail == this);
222 pool->tail = this->prev;
225 this->prev = this->next = NULL;
228 /** Adds a connection to the head of the connection list
230 * @note Must be called with the mutex held.
232 * @param[in,out] pool to modify.
233 * @param[in] this Connection to add.
235 static void fr_connection_link_head(fr_connection_pool_t *pool, fr_connection_t *this)
237 rad_assert(pool != NULL);
238 rad_assert(this != NULL);
239 rad_assert(pool->head != this);
240 rad_assert(pool->tail != this);
243 pool->head->prev = this;
246 this->next = pool->head;
250 rad_assert(this->next == NULL);
253 rad_assert(this->next != NULL);
257 /** Send a connection pool trigger.
259 * @param[in] pool to send trigger for.
260 * @param[in] name_suffix trigger name suffix.
262 static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *name_suffix)
265 rad_assert(pool != NULL);
266 rad_assert(name_suffix != NULL);
267 snprintf(name, sizeof(name), "%s%s", pool->trigger_prefix, name_suffix);
268 exec_trigger(NULL, pool->cs, name, true);
271 /** Find a connection handle in the connection list
273 * Walks over the list of connections searching for a specified connection
274 * handle and returns the first connection that contains that pointer.
276 * @note Will lock mutex and only release mutex if connection handle
277 * is not found, so will usually return will mutex held.
278 * @note Must be called with the mutex free.
280 * @param[in] pool to search in.
281 * @param[in] conn handle to search for.
283 * - Connection containing the specified handle.
284 * - NULL if non if connection was found.
286 static fr_connection_t *fr_connection_find(fr_connection_pool_t *pool, void *conn)
288 fr_connection_t *this;
290 if (!pool || !conn) return NULL;
292 pthread_mutex_lock(&pool->mutex);
295 * FIXME: This loop could be avoided if we passed a 'void
296 * **connection' instead. We could use "offsetof" in
297 * order to find top of the parent structure.
299 for (this = pool->head; this != NULL; this = this->next) {
300 if (this->connection == conn) {
302 pthread_t pthread_id;
304 pthread_id = pthread_self();
305 rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
308 rad_assert(this->in_use == true);
313 pthread_mutex_unlock(&pool->mutex);
317 /** Spawns a new connection
319 * Spawns a new connection using the create callback, and returns it for
320 * adding to the connection list.
322 * @note Will call the 'open' trigger.
323 * @note Must be called with the mutex free.
325 * @param[in] pool to modify.
326 * @param[in] now Current time.
327 * @param[in] in_use whether the new connection should be "in_use" or not
329 * - New connection struct.
332 static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool, time_t now, bool in_use)
335 uint32_t max_pending;
338 fr_connection_t *this;
341 rad_assert(pool != NULL);
344 * If we have NO connections, and we've previously failed
345 * opening connections, don't open multiple connections until
346 * we successfully open at least one.
348 if ((pool->num == 0) && pool->pending && pool->last_failed) return NULL;
350 pthread_mutex_lock(&pool->mutex);
351 rad_assert(pool->num <= pool->max);
354 * Don't spawn too many connections at the same time.
356 if ((pool->num + pool->pending) >= pool->max) {
357 pthread_mutex_unlock(&pool->mutex);
359 ERROR("%s: Cannot open new connection, already at max", pool->log_prefix);
364 * If the last attempt failed, wait a bit before
367 if (pool->last_failed && ((pool->last_failed + pool->retry_delay) > now)) {
368 bool complain = false;
370 if (pool->last_throttled != now) {
373 pool->last_throttled = now;
376 pthread_mutex_unlock(&pool->mutex);
378 if (!RATE_LIMIT_ENABLED || complain) {
379 ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
380 pool->log_prefix, pool->retry_delay);
387 * We limit the rate of new connections after a failed attempt.
389 if (pool->pending > pool->max_pending) {
390 pthread_mutex_unlock(&pool->mutex);
391 RATE_LIMIT(WARN("%s: Cannot open a new connection due to rate limit after failure",
397 number = pool->count++;
400 * Unlock the mutex while we try to open a new
401 * connection. If there are issues with the back-end,
402 * opening a new connection may take a LONG time. In
403 * that case, we want the other connections to continue
406 pthread_mutex_unlock(&pool->mutex);
409 * The true value for max_pending is the smaller of
410 * free connection slots, or pool->max_pending.
412 max_pending = (pool->max - pool->num);
413 if (pool->max_pending < max_pending) max_pending = pool->max_pending;
414 INFO("%s: Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
415 pool->log_prefix, number, pool->pending, max_pending);
418 * Allocate a new top level ctx for the create callback
419 * to hang its memory off of.
421 ctx = talloc_init("fr_connection_ctx");
422 if (!ctx) return NULL;
425 * This may take a long time, which prevents other
426 * threads from releasing connections. We don't care
427 * about other threads opening new connections, as we
428 * already have no free connections.
430 conn = pool->create(ctx, pool->opaque);
432 ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, number);
434 pool->last_failed = now;
435 pthread_mutex_lock(&pool->mutex);
436 pool->max_pending = 1;
438 pthread_mutex_unlock(&pool->mutex);
446 * And lock the mutex again while we link the new
447 * connection back into the pool.
449 pthread_mutex_lock(&pool->mutex);
451 this = talloc_zero(pool, fr_connection_t);
453 pthread_mutex_unlock(&pool->mutex);
458 fr_link_talloc_ctx_free(this, ctx);
461 this->connection = conn;
462 this->in_use = in_use;
464 this->number = number;
465 gettimeofday(&this->last_reserved, NULL);
466 this->last_released = this->last_reserved;
469 * The connection pool is starting up. Insert the
470 * connection into the heap.
472 if (!in_use) fr_heap_insert(pool->heap, this);
474 fr_connection_link_head(pool, this);
477 * Do NOT insert the connection into the heap. That's
478 * done when the connection is released.
483 rad_assert(pool->pending > 0);
487 * We've successfully opened one more connection. Allow
488 * more connections to open in parallel.
490 if (pool->max_pending < pool->max) pool->max_pending++;
492 pool->last_spawned = time(NULL);
493 pool->delay_interval = pool->cleanup_interval;
494 pool->next_delay = pool->cleanup_interval;
495 pool->last_failed = 0;
497 pthread_mutex_unlock(&pool->mutex);
499 fr_connection_exec_trigger(pool, "open");
504 /** Close an existing connection.
506 * Removes the connection from the list, calls the delete callback to close
507 * the connection, then frees memory allocated to the connection.
509 * @note Will call the 'close' trigger.
510 * @note Must be called with the mutex held.
512 * @param[in,out] pool to modify.
513 * @param[in,out] this Connection to delete.
515 static void fr_connection_close_internal(fr_connection_pool_t *pool, fr_connection_t *this)
518 * If it's in use, release it.
522 pthread_t pthread_id = pthread_self();
523 rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
526 this->in_use = false;
528 rad_assert(pool->active != 0);
533 * Connection isn't used, remove it from the heap.
535 fr_heap_extract(pool->heap, this);
538 fr_connection_exec_trigger(pool, "close");
540 fr_connection_unlink(pool, this);
542 rad_assert(pool->num > 0);
547 /** Check whether a connection needs to be removed from the pool
549 * Will verify that the connection is within idle_timeout, max_uses, and
550 * lifetime values. If it is not, the connection will be closed.
552 * @note Will only close connections not in use.
553 * @note Must be called with the mutex held.
555 * @param[in,out] pool to modify.
556 * @param[in,out] this Connection to manage.
557 * @param[in] now Current time.
559 * - 0 if connection was closed.
560 * - 1 if connection handle was left open.
562 static int fr_connection_manage(fr_connection_pool_t *pool,
563 fr_connection_t *this,
566 rad_assert(pool != NULL);
567 rad_assert(this != NULL);
570 * Don't terminated in-use connections
572 if (this->in_use) return 1;
574 if ((pool->max_uses > 0) &&
575 (this->num_uses >= pool->max_uses)) {
576 DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit max_uses limit", pool->log_prefix,
579 if (pool->num <= pool->min) {
580 DEBUG("%s: You probably need to lower \"min\"", pool->log_prefix);
582 fr_connection_close_internal(pool, this);
586 if ((pool->lifetime > 0) &&
587 ((this->created + pool->lifetime) < now)) {
588 DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit lifetime limit",
589 pool->log_prefix, this->number);
593 if ((pool->idle_timeout > 0) &&
594 ((this->last_released.tv_sec + pool->idle_timeout) < now)) {
595 INFO("%s: Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %u seconds",
596 pool->log_prefix, this->number, (int) (now - this->last_released.tv_sec));
604 /** Check whether any connections need to be removed from the pool
606 * Maintains the number of connections in the pool as per the configuration
607 * parameters for the connection pool.
609 * @note Will only run checks the first time it's called in a given second,
610 * to throttle connection spawning/closing.
611 * @note Will only close connections not in use.
612 * @note Must be called with the mutex held, will release mutex before
615 * @param[in,out] pool to manage.
618 static int fr_connection_pool_check(fr_connection_pool_t *pool)
620 uint32_t spawn, idle, extra;
621 time_t now = time(NULL);
622 fr_connection_t *this, *next;
624 if (pool->last_checked == now) {
625 pthread_mutex_unlock(&pool->mutex);
630 * Some idle connections are OK, if they're within the
631 * configured "spare" range. Any extra connections
632 * outside of that range can be closed.
634 idle = pool->num - pool->active;
635 if (idle <= pool->spare) {
638 extra = idle - pool->spare;
642 * The other end can close connections. If so, we'll
643 * have fewer than "min". When that happens, open more
644 * connections to enforce "min".
646 if ((pool->num + pool->pending) <= pool->min) {
647 spawn = pool->min - (pool->num + pool->pending);
651 * If we're about to create more than "max",
654 } else if ((pool->num + pool->pending) >= pool->max) {
656 * Ensure we don't spawn more connections. If
657 * there are extra idle connections, we can
658 * delete all of them.
661 /* leave extra alone from above */
666 * AND we don't have enough idle connections.
669 } else if (idle <= pool->spare) {
671 * Not enough spare connections. Spawn a few.
672 * But cap the pool size at "max"
674 spawn = pool->spare - idle;
677 if ((pool->num + pool->pending + spawn) > pool->max) {
678 spawn = pool->max - (pool->num + pool->pending);
684 * We have more than enough idle connections, AND
685 * some are pending. Don't open or close any.
687 } else if (pool->pending) {
692 * We have too many idle connections, but closing
693 * some would take us below "min", so we only
694 * close enough to take us to "min".
696 } else if ((pool->min + extra) >= pool->num) {
698 extra = pool->num - pool->min;
702 * Closing the "extra" connections won't take us
703 * below "min". It's therefore safe to close
707 /* leave extra alone from above */
711 * Only try to open spares if we're not already attempting to open
712 * a connection. Avoids spurious log messages.
715 INFO("%s: Need %i more connections to reach %i spares",
716 pool->log_prefix, spawn, pool->spare);
717 pthread_mutex_unlock(&pool->mutex);
718 fr_connection_spawn(pool, now, false); /* ignore return code */
719 pthread_mutex_lock(&pool->mutex);
723 * We haven't spawned connections in a while, and there
724 * are too many spare ones. Close the one which has been
725 * unused for the longest.
727 if (extra && (now >= (pool->last_spawned + pool->delay_interval))) {
728 fr_connection_t *found;
731 for (this = pool->tail; this != NULL; this = this->prev) {
732 if (this->in_use) continue;
735 timercmp(&this->last_reserved, &found->last_reserved, <)) {
740 rad_assert(found != NULL);
742 INFO("%s: Closing connection (%" PRIu64 "), from %d unused connections", pool->log_prefix,
743 found->number, extra);
744 fr_connection_close_internal(pool, found);
747 * Decrease the delay for the next time we clean
750 pool->next_delay >>= 1;
751 if (pool->next_delay == 0) pool->next_delay = 1;
752 pool->delay_interval += pool->next_delay;
756 * Pass over all of the connections in the pool, limiting
757 * lifetime, idle time, max requests, etc.
759 for (this = pool->head; this != NULL; this = next) {
761 fr_connection_manage(pool, this, now);
764 pool->last_checked = now;
765 pthread_mutex_unlock(&pool->mutex);
770 /** Get a connection from the connection pool
772 * @note Must be called with the mutex free.
774 * @param[in,out] pool to reserve the connection from.
775 * @param[in] spawn whether to spawn a new connection
777 * - A pointer to the connection handle.
780 static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn)
783 fr_connection_t *this;
785 if (!pool) return NULL;
788 * Allow CTRL-C to kill the server in debugging mode.
790 if (main_config.exiting) return NULL;
792 #ifdef HAVE_PTHREAD_H
793 if (spawn) pthread_mutex_lock(&pool->mutex);
799 * Grab the link with the lowest latency, and check it
800 * for limits. If "connection manage" says the link is
801 * no longer usable, go grab another one.
804 this = fr_heap_peek(pool->heap);
806 } while (!fr_connection_manage(pool, this, now));
809 * We have a working connection. Extract it from the
813 fr_heap_extract(pool->heap, this);
818 * We were asked to avoid spawning a new connection, by
819 * fr_connection_reconnect_internal(). So we just return
822 if (!spawn) return NULL;
825 * We don't have a connection. Try to open a new one.
827 rad_assert(pool->active == pool->num);
829 if (pool->num == pool->max) {
830 bool complain = false;
833 * Rate-limit complaints.
835 if (pool->last_at_max != now) {
837 pool->last_at_max = now;
840 pthread_mutex_unlock(&pool->mutex);
842 if (!RATE_LIMIT_ENABLED || complain) {
843 ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
849 pthread_mutex_unlock(&pool->mutex);
851 DEBUG("%s: %i of %u connections in use. You may need to increase \"spare\"", pool->log_prefix,
852 pool->active, pool->num);
853 this = fr_connection_spawn(pool, now, true); /* MY connection! */
854 if (!this) return NULL;
856 pthread_mutex_lock(&pool->mutex);
861 gettimeofday(&this->last_reserved, NULL);
865 this->pthread_id = pthread_self();
868 #ifdef HAVE_PTHREAD_H
869 if (spawn) pthread_mutex_unlock(&pool->mutex);
872 DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number);
874 return this->connection;
877 /** Reconnect a suspected inviable connection
879 * @note Must be called with the mutex held, will not release mutex.
881 * @see fr_connection_get
882 * @param[in,out] pool to reconnect the connection in.
883 * @param[in,out] conn to reconnect.
884 * @return new connection handle if successful else NULL.
886 static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn)
889 uint64_t conn_number;
892 conn_number = conn->number;
895 * Destroy any handles associated with the fr_connection_t
897 talloc_free_children(conn);
899 DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number);
902 * Allocate a new top level ctx for the create callback
903 * to hang its memory off of.
905 ctx = talloc_init("fr_connection_ctx");
906 if (!ctx) return NULL;
907 fr_link_talloc_ctx_free(conn, ctx);
909 new_conn = pool->create(ctx, pool->opaque);
912 * We can't create a new connection, so close the current one.
914 fr_connection_close_internal(pool, conn);
917 * Maybe there's a connection which is unused and
918 * available. If so, return it.
920 new_conn = fr_connection_get_internal(pool, false);
921 if (new_conn) return new_conn;
923 RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available",
924 pool->log_prefix, conn_number));
929 fr_connection_exec_trigger(pool, "close");
930 conn->connection = new_conn;
935 /** Create a new connection pool
937 * Allocates structures used by the connection pool, initialises the various
938 * configuration options and counters, and sets the callback functions.
940 * Will also spawn the number of connections specified by the 'start'
941 * configuration options.
943 * @note Will call the 'start' trigger.
945 * @param[in] ctx Context to link pool's destruction to.
946 * @param[in] cs pool section.
947 * @param[in] opaque data pointer to pass to callbacks.
948 * @param[in] c Callback to create new connections.
949 * @param[in] a Callback to check the status of connections.
950 * @param[in] log_prefix prefix to prepend to all log messages.
951 * @param[in] trigger_prefix prefix to prepend to all trigger names.
953 * - New connection pool.
956 fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx,
959 fr_connection_create_t c,
960 fr_connection_alive_t a,
961 char const *log_prefix,
962 char const *trigger_prefix)
965 fr_connection_pool_t *pool;
966 fr_connection_t *this;
969 if (!cs || !opaque || !c) return NULL;
974 * Pool is allocated in the NULL context as
975 * threads are likely to allocate memory
978 pool = talloc_zero(NULL, fr_connection_pool_t);
979 if (!pool) return NULL;
982 * Ensure the pool is freed at the same time
985 if (fr_link_talloc_ctx_free(ctx, pool) < 0) {
992 pool->opaque = opaque;
996 pool->head = pool->tail = NULL;
999 * We keep a heap of connections, sorted by the last time
1000 * we STARTED using them. Newly opened connections
1001 * aren't in the heap. They're only inserted in the list
1002 * once they're released.
1004 * We do "most recently started" instead of "most
1005 * recently used", because MRU is done as most recently
1006 * *released*. We want to order connections by
1007 * responsiveness, and MRU prioritizes high latency
1010 * We want most recently *started*, which gives
1011 * preference to low latency links, and pushes high
1012 * latency links down in the priority heap.
1014 * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
1016 if (!pool->spread) {
1017 pool->heap = fr_heap_create(last_reserved_cmp, offsetof(fr_connection_t, heap));
1019 * For some types of connections we need to used a different
1020 * algorithm, because load balancing benefits are secondary
1021 * to maintaining a cache of open connections.
1023 * With libcurl's multihandle, connections can only be reused
1024 * if all handles that make up the multhandle are done processing
1027 * We can't tell when that's happened using libcurl, and even
1028 * if we could, blocking until all servers had responded
1029 * would have huge cost.
1031 * The solution is to order the heap so that the connection that
1032 * was released longest ago is at the top.
1034 * That way we maximise time between connection use.
1037 pool->heap = fr_heap_create(last_released_cmp, offsetof(fr_connection_t, heap));
1044 pool->log_prefix = log_prefix ? talloc_typed_strdup(pool, log_prefix) : "core";
1045 pool->trigger_prefix = trigger_prefix ? talloc_typed_strdup(pool, trigger_prefix) : "";
1047 #ifdef HAVE_PTHREAD_H
1048 pthread_mutex_init(&pool->mutex, NULL);
1051 DEBUG("%s: Initialising connection pool", pool->log_prefix);
1053 if (cf_section_parse(cs, pool, connection_config) < 0) goto error;
1056 * Some simple limits
1058 if (pool->max == 0) {
1059 cf_log_err_cs(cs, "Cannot set 'max' to zero");
1062 pool->max_pending = pool->max; /* can open all connections now */
1064 if (pool->min > pool->max) {
1065 cf_log_err_cs(cs, "Cannot set 'min' to more than 'max'");
1069 FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1070 FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1071 FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1073 if (pool->lifetime > 0) {
1074 FR_INTEGER_COND_CHECK("idle_timeout", pool->idle_timeout, (pool->idle_timeout <= pool->lifetime), 0);
1077 if (pool->idle_timeout > 0) {
1078 FR_INTEGER_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1082 * Don't open any connections. Instead, force the limits
1083 * to only 1 connection.
1087 pool->start = pool->min = pool->max = 1;
1092 * Create all of the connections, unless the admin says
1095 for (i = 0; i < pool->start; i++) {
1096 this = fr_connection_spawn(pool, now, false);
1099 fr_connection_pool_free(pool);
1104 fr_connection_exec_trigger(pool, "start");
1109 /** Initialise a module specific connection pool
1111 * @see fr_connection_pool_init
1113 * @param[in] module section.
1114 * @param[in] opaque data pointer to pass to callbacks.
1115 * @param[in] c Callback to create new connections.
1116 * @param[in] a Callback to check the status of connections.
1117 * @param[in] log_prefix override, if NULL will be set automatically from the module CONF_SECTION.
1119 * - New connection pool.
1122 fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module,
1124 fr_connection_create_t c,
1125 fr_connection_alive_t a,
1126 char const *log_prefix)
1128 CONF_SECTION *cs, *mycs;
1130 char trigger_prefix[64];
1132 fr_connection_pool_t *pool;
1133 char const *cs_name1, *cs_name2;
1137 #define CONNECTION_POOL_CF_KEY "connection_pool"
1138 #define parent_name(_x) cf_section_name(cf_item_parent(cf_section_to_item(_x)))
1140 cs_name1 = cf_section_name1(module);
1141 cs_name2 = cf_section_name2(module);
1142 if (!cs_name2) cs_name2 = cs_name1;
1144 snprintf(trigger_prefix, sizeof(trigger_prefix), "modules.%s.", cs_name1);
1147 snprintf(buff, sizeof(buff), "rlm_%s (%s)", cs_name1, cs_name2);
1152 * Get sibling's pool config section
1154 ret = find_module_sibling_section(&cs, module, "pool");
1160 DEBUG4("%s: Using pool section from \"%s\"", log_prefix, parent_name(cs));
1164 DEBUG4("%s: Using local pool section", log_prefix);
1169 * Get our pool config section
1171 mycs = cf_section_sub_find(module, "pool");
1173 DEBUG4("%s: Adding pool section to config item \"%s\" to store pool references", log_prefix,
1174 cf_section_name(module));
1176 mycs = cf_section_alloc(module, "pool", NULL);
1177 cf_section_add(module, mycs);
1181 * Sibling didn't have a pool config section
1182 * Use our own local pool.
1185 DEBUG4("%s: \"%s.pool\" section not found, using \"%s.pool\"", log_prefix,
1186 parent_name(cs), parent_name(mycs));
1191 * If fr_connection_pool_init has already been called
1192 * for this config section, reuse the previous instance.
1194 * This allows modules to pass in the config sections
1195 * they would like to use the connection pool from.
1197 pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY);
1199 DEBUG4("%s: No pool reference found for config item \"%s.pool\"", log_prefix, parent_name(cs));
1200 pool = fr_connection_pool_init(cs, cs, opaque, c, a, log_prefix, trigger_prefix);
1201 if (!pool) return NULL;
1203 DEBUG4("%s: Adding pool reference %p to config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
1204 cf_data_add(cs, CONNECTION_POOL_CF_KEY, pool, NULL);
1209 DEBUG4("%s: Found pool reference %p in config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
1212 * We're reusing pool data add it to our local config
1213 * section. This allows other modules to transitively
1214 * re-use a pool through this module.
1217 DEBUG4("%s: Copying pool reference %p from config item \"%s.pool\" to config item \"%s.pool\"",
1218 log_prefix, pool, parent_name(cs), parent_name(mycs));
1219 cf_data_add(mycs, CONNECTION_POOL_CF_KEY, pool, NULL);
1225 /** Allocate a new pool using an existing one as a template
1227 * @param ctx to allocate new pool in.
1228 * @param pool to copy.
1229 * @param opaque data to pass to connection function.
1231 * - New connection pool.
1234 fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque)
1236 return fr_connection_pool_init(ctx, pool->cs, opaque, pool->create,
1237 pool->alive, pool->log_prefix, pool->trigger_prefix);
1240 /** Get the number of connections currently in the pool
1242 * @param pool to count connections for.
1243 * @return the number of connections in the pool
1245 int fr_connection_pool_get_num(fr_connection_pool_t *pool)
1251 /** Delete a connection pool
1253 * Closes, unlinks and frees all connections in the connection pool, then frees
1254 * all memory used by the connection pool.
1256 * @note Will call the 'stop' trigger.
1257 * @note Must be called with the mutex free.
1259 * @param[in,out] pool to delete.
1261 void fr_connection_pool_free(fr_connection_pool_t *pool)
1263 fr_connection_t *this;
1268 * More modules hold a reference to this pool, don't free
1271 if (pool->ref > 0) {
1276 DEBUG("%s: Removing connection pool", pool->log_prefix);
1278 pthread_mutex_lock(&pool->mutex);
1281 * Don't loop over the list. Just keep removing the head
1282 * until they're all gone.
1284 while ((this = pool->head) != NULL) {
1285 INFO("%s: Closing connection (%" PRIu64 ")", pool->log_prefix, this->number);
1287 fr_connection_close_internal(pool, this);
1290 fr_heap_delete(pool->heap);
1292 fr_connection_exec_trigger(pool, "stop");
1294 rad_assert(pool->head == NULL);
1295 rad_assert(pool->tail == NULL);
1296 rad_assert(pool->num == 0);
1298 #ifdef HAVE_PTHREAD_H
1299 pthread_mutex_destroy(&pool->mutex);
1305 /** Reserve a connection in the connection pool
1307 * Will attempt to find an unused connection in the connection pool, if one is
1308 * found, will mark it as in in use increment the number of active connections
1309 * and return the connection handle.
1311 * If no free connections are found will attempt to spawn a new one, conditional
1312 * on a connection spawning not already being in progress, and not being at the
1313 * 'max' connection limit.
1315 * @note fr_connection_release must be called once the caller has finished
1316 * using the connection.
1318 * @see fr_connection_release
1319 * @param[in,out] pool to reserve the connection from.
1321 * - A pointer to the connection handle.
1324 void *fr_connection_get(fr_connection_pool_t *pool)
1326 return fr_connection_get_internal(pool, true);
1329 /** Release a connection
1331 * Will mark a connection as unused and decrement the number of active
1334 * @see fr_connection_get
1335 * @param[in,out] pool to release the connection in.
1336 * @param[in,out] conn to release.
1338 void fr_connection_release(fr_connection_pool_t *pool, void *conn)
1340 fr_connection_t *this;
1342 this = fr_connection_find(pool, conn);
1345 this->in_use = false;
1348 * Record when the connection was last released
1350 gettimeofday(&this->last_released, NULL);
1353 * Insert the connection in the heap.
1355 * This will either be based on when we *started* using it
1356 * (allowing fast links to be re-used, and slow links to be
1357 * gradually expired), or when we released it (allowing
1358 * the maximum amount of time between connection use).
1360 fr_heap_insert(pool->heap, this);
1362 rad_assert(pool->active != 0);
1365 DEBUG("%s: Released connection (%" PRIu64 ")", pool->log_prefix, this->number);
1368 * We mirror the "spawn on get" functionality by having
1369 * "delete on release". If there are too many spare
1370 * connections, go manage the pool && clean some up.
1372 fr_connection_pool_check(pool);
1375 /** Reconnect a suspected inviable connection
1377 * This should be called by the module if it suspects that a connection is
1378 * not viable (e.g. the server has closed it).
1380 * Will attempt to create a new connection handle using the create callback,
1381 * and if this is successful the new handle will be assigned to the existing
1384 * If this is not successful, the connection will be removed from the pool.
1386 * When implementing a module that uses the connection pool API, it is advisable
1387 * to pass a pointer to the pointer to the handle (void **conn)
1388 * to all functions which may call reconnect. This is so that if a new handle
1389 * is created and returned, the handle pointer can be updated up the callstack,
1390 * and a function higher up the stack doesn't attempt to use a now invalid
1391 * connection handle.
1393 * @note Will free any talloced memory hung off the context of the connection,
1394 * being reconnected.
1396 * @warning After calling reconnect the caller *MUST NOT* attempt to use
1397 * the old handle in any other operations, as its memory will have been
1400 * @see fr_connection_get
1401 * @param[in,out] pool to reconnect the connection in.
1402 * @param[in,out] conn to reconnect.
1403 * @return new connection handle if successful else NULL.
1405 void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn)
1408 fr_connection_t *this;
1410 if (!pool || !conn) return NULL;
1413 * Don't allow opening of new connections if we're trying
1416 if (main_config.exiting) {
1417 fr_connection_release(pool, conn);
1422 * If fr_connection_find is successful the pool is now locked
1424 this = fr_connection_find(pool, conn);
1425 if (!this) return NULL;
1427 new_conn = fr_connection_reconnect_internal(pool, this);
1428 pthread_mutex_unlock(&pool->mutex);
1433 /** Delete a connection from the connection pool.
1435 * Resolves the connection handle to a connection, then (if found)
1436 * closes, unlinks and frees that connection.
1438 * @note Must be called with the mutex free.
1440 * @param[in,out] pool Connection pool to modify.
1441 * @param[in] conn to delete.
1443 * - 0 If the connection could not be found.
1444 * - 1 if the connection was deleted.
1446 int fr_connection_close(fr_connection_pool_t *pool, void *conn)
1448 fr_connection_t *this;
1450 this = fr_connection_find(pool, conn);
1451 if (!this) return 0;
1453 INFO("%s: Deleting connection (%" PRIu64 ")", pool->log_prefix, this->number);
1455 fr_connection_close_internal(pool, this);
1456 fr_connection_pool_check(pool);