RCSID("$Id$")
#include <freeradius-devel/radiusd.h>
-
-#include <freeradius-devel/connection.h>
-
#include <freeradius-devel/rad_assert.h>
typedef struct fr_connection fr_connection_t;
static int fr_connection_pool_check(fr_connection_pool_t *pool);
+extern bool check_config;
+
+#ifndef NDEBUG
+#ifdef HAVE_PTHREAD_H
+/* #define PTHREAD_DEBUG (1) */
+#endif
+#endif
+
/** An individual connection within the connection pool
*
* Defines connection counters, timestamps, and holds a pointer to the
struct fr_connection {
fr_connection_t *prev; //!< Previous connection in list.
fr_connection_t *next; //!< Next connection in list.
-
- time_t start; //!< Time connection was created.
+
+ time_t created; //!< Time connection was created.
time_t last_used; //!< Last time the connection was
//!< reserved.
-
- int num_uses; //!< Number of times the connection
+
+ uint32_t num_uses; //!< Number of times the connection
//!< has been reserved.
- int in_use; //!< Whether the connection is currently
- //!< reserved.
- int number; //!< Unique ID assigned when the
+ uint64_t number; //!< Unique ID assigned when the
//!< connection is created, these will
//!< monotonically increase over the
//!< lifetime of the connection pool.
void *connection; //!< Pointer to whatever the module
//!< uses for a connection handle.
+ bool in_use; //!< Whether the connection is currently
+ //!< reserved.
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id; //!< When 'in_use == true'
+#endif
};
/** A connection pool
*
* @see fr_connection
*/
-struct fr_connection_pool_t {
- int start; //!< Number of initial connections
- int min; //!< Minimum number of concurrent
+struct fr_connection_pool_t {
+ uint32_t start; //!< Number of initial connections
+ uint32_t min; //!< Minimum number of concurrent
//!< connections to keep open.
- int max; //!< Maximum number of concurrent
+ uint32_t max; //!< Maximum number of concurrent
//!< connections to allow.
- int spare; //!< Number of spare connections to try
- //!< and maintain.
- int cleanup_delay; //!< How long a connection can go unused
- //!< for before it's closed
- //!< (0 is infinite).
- int max_uses; //!< Maximum number of times a
+ uint32_t spare; //!< Number of spare connections to try
+ uint32_t retry_delay; //!< seconds to delay re-open
+ //!< after a failed open.
+ uint32_t cleanup_interval; //!< Initial timer for how
+ //!< often we sweep the pool
+ //!< for free connections.
+ //!< (0 is infinite).
+ int delay_interval; //!< When we next do a
+ //!< cleanup. Initialized to
+ //!< cleanup_interval, and increase
+ //!< from there based on the delay.
+ int next_delay; //!< The next delay time.
+ //!< cleanup. Initialized to
+ //!< cleanup_interval, and decays
+ //!< from there.
+ uint64_t max_uses; //!< Maximum number of times a
//!< connection can be used before being
//!< closed.
- int lifetime; //!< How long a connection can be open
+ uint32_t lifetime; //!< How long a connection can be open
//!< before being closed (irrespective
//!< of whether it's idle or not).
- int idle_timeout; //!< How long a connection can be idle
+ uint32_t idle_timeout; //!< How long a connection can be idle
//!< before being closed.
-
- int trigger; //!< If TRUE execute connection triggers
+
+ bool trigger; //!< If true execute connection triggers
//!< associated with the connection
//!< pool.
+ bool spread; //!< If true requests will be spread
+ //!< across all connections, instead of
+ //!< re-using the most recently used
+ //!< connections first.
+
time_t last_checked; //!< Last time we pruned the connection
//!< pool.
time_t last_spawned; //!< Last time we spawned a connection.
time_t last_failed; //!< Last time we tried to spawn a
//!< a connection but failed.
- time_t last_complained;//!< Last time we complained about
- //!< configuration parameters.
time_t last_throttled; //!< Last time we refused to spawn a
//!< connection because the last
//!< connection failed, or we were
//!< already spawning a connection.
time_t last_at_max; //!< Last time we hit the maximum number
//!< of allowed connections.
-
- unsigned int count; //!< Number of connections spawned over
+
+ uint64_t count; //!< Number of connections spawned over
//!< the lifetime of the pool.
- int num; //!< Number of connections in the pool.
+ uint32_t num; //!< Number of connections in the pool.
int active; //!< Number of currently reserved
//!< connections.
fr_connection_t *head; //!< Start of the connection list.
fr_connection_t *tail; //!< End of the connection list.
-
- int spawning; //!< Whether we are currently attempting
+
+ bool spawning; //!< Whether we are currently attempting
//!< to spawn a new connection.
#ifdef HAVE_PTHREAD_H
//!< that relates to this pool.
void *ctx; //!< Pointer to context data that will
//!< be passed to callbacks.
-
+
char *log_prefix; //!< Log prefix to prepend to all log
//!< messages created by the connection
//!< pool code.
#endif
static const CONF_PARSER connection_config[] = {
- { "start", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, start),
- 0, "5" },
- { "min", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, min),
- 0, "5" },
- { "max", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, max),
- 0, "10" },
- { "spare", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, spare),
- 0, "3" },
- { "uses", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, max_uses),
- 0, "0" },
- { "lifetime", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, lifetime),
- 0, "0" },
- { "cleanup_delay", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, cleanup_delay),
- 0, "5" },
- { "idle_timeout", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, idle_timeout),
- 0, "60" },
+ { "start", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, start), "5" },
+ { "min", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, min), "5" },
+ { "max", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max), "10" },
+ { "spare", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, spare), "3" },
+ { "uses", FR_CONF_OFFSET(PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), "0" },
+ { "lifetime", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), "0" },
+ { "cleanup_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), NULL},
+ { "cleanup_interval", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), "30" },
+ { "idle_timeout", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), "60" },
+ { "retry_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), "1" },
+ { "spread", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), "no" },
{ NULL, -1, 0, NULL, NULL }
};
this->prev = this->next = NULL;
}
-/** Adds a connection to the connection list
+/** Adds a connection to the head of the connection list
*
* @note Must be called with the mutex held.
*
* @param[in,out] pool to modify.
* @param[in] this Connection to add.
*/
-static void fr_connection_link(fr_connection_pool_t *pool,
- fr_connection_t *this)
+static void fr_connection_link_head(fr_connection_pool_t *pool,
+ fr_connection_t *this)
{
rad_assert(pool != NULL);
rad_assert(this != NULL);
rad_assert(pool->head != this);
rad_assert(pool->tail != this);
- if (pool->head) pool->head->prev = this;
+ if (pool->head) {
+ pool->head->prev = this;
+ }
+
this->next = pool->head;
this->prev = NULL;
pool->head = this;
}
}
+/** Adds a connection to the tail of the connection list
+ *
+ * @note Must be called with the mutex held.
+ *
+ * @param[in,out] pool to modify.
+ * @param[in] this Connection to add.
+ */
+static void fr_connection_link_tail(fr_connection_pool_t *pool,
+ fr_connection_t *this)
+{
+ rad_assert(pool != NULL);
+ rad_assert(this != NULL);
+ rad_assert(pool->head != this);
+ rad_assert(pool->tail != this);
+
+ if (pool->tail) {
+ pool->tail->next = this;
+ }
+ this->prev = pool->tail;
+ this->next = NULL;
+ pool->tail = this;
+ if (!pool->head) {
+ rad_assert(this->prev == NULL);
+ pool->head = this;
+ } else {
+ rad_assert(this->prev != NULL);
+ }
+}
+
/** Spawns a new connection
*
* @note Will call the 'open' trigger.
* @note Must be called with the mutex free.
*
- * @param[in] pool
+ * @param[in] pool to modify.
* @param[in] now Current time.
+ * @param[in] in_use whether the new connection should be "in_use" or not
* @return the new connection struct or NULL on error.
*/
static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool,
- time_t now)
+ time_t now, bool in_use)
{
fr_connection_t *this;
void *conn;
-
+
rad_assert(pool != NULL);
/*
pthread_mutex_lock(&pool->mutex);
rad_assert(pool->num <= pool->max);
- if ((pool->last_failed == now) || pool->spawning) {
- int complain = FALSE;
-
+ /*
+ * Don't spawn multiple connections at the same time.
+ */
+ if (pool->spawning) {
+ pthread_mutex_unlock(&pool->mutex);
+
+ ERROR("%s: Cannot open new connection, "
+ "connection spawning already in "
+ "progress", pool->log_prefix);
+ return NULL;
+ }
+
+ /*
+ * If the last attempt failed, wait a bit before
+ * retrying.
+ */
+ if (pool->last_failed && ((pool->last_failed + pool->retry_delay) > now)) {
+ bool complain = false;
+
if (pool->last_throttled != now) {
- complain = TRUE;
-
+ complain = true;
+
pool->last_throttled = now;
}
pthread_mutex_unlock(&pool->mutex);
-
- if (complain) {
- if (pool->spawning) {
- radlog(L_ERR, "%s: Cannot open new connection, "
- "connection spawning already in "
- "progress", pool->log_prefix);
- } else {
- radlog(L_ERR, "%s: Last connection failed, "
- "throttling connection spawn",
- pool->log_prefix);
- }
+
+ if (!RATE_LIMIT_ENABLED || complain) {
+ ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
+ pool->log_prefix, pool->retry_delay);
}
-
+
return NULL;
}
- pool->spawning = TRUE;
+ pool->spawning = true;
/*
* Unlock the mutex while we try to open a new
*/
pthread_mutex_unlock(&pool->mutex);
- radlog(L_INFO, "%s: Opening additional connection (%i)",
- pool->log_prefix, pool->count);
-
- this = rad_malloc(sizeof(*this));
- memset(this, 0, sizeof(*this));
+ INFO("%s: Opening additional connection (%" PRIu64 ")", pool->log_prefix, pool->count);
/*
* This may take a long time, which prevents other
*/
conn = pool->create(pool->ctx);
if (!conn) {
- radlog(L_ERR, "%s: Opening connection failed (%i)",
- pool->log_prefix, pool->count);
-
+ ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, pool->count);
+
pool->last_failed = now;
- free(this);
- pool->spawning = FALSE; /* atomic, so no lock is needed */
+ pool->spawning = false; /* atomic, so no lock is needed */
return NULL;
}
-
- this->start = now;
- this->connection = conn;
/*
* And lock the mutex again while we link the new
*/
pthread_mutex_lock(&pool->mutex);
- this->number = pool->count++;
- this->last_used = now;
- fr_connection_link(pool, this);
- pool->num++;
- pool->spawning = FALSE;
- pool->last_spawned = time(NULL);
-
- pthread_mutex_unlock(&pool->mutex);
-
- if (pool->trigger) exec_trigger(NULL, pool->cs, "open", TRUE);
-
- return this;
-}
-
-/** Add a new connection to the pool
- *
- * If conn is not NULL will attempt to add that connection handle to the pool.
- * If conn is NULL will attempt to spawn a new connection using the create
- * callback.
- *
- * @note Will call the 'open' trigger.
- *
- * @param[in,out] pool to add connection to.
- * @param[in] conn to add.
- * @return 0 if the connection wasn't added else 1.
- */
-int fr_connection_add(fr_connection_pool_t *pool, void *conn)
-{
- fr_connection_t *this;
-
- if (!pool) return 0;
-
- pthread_mutex_lock(&pool->mutex);
-
- if (!conn) {
- conn = pool->create(pool->ctx);
- if (!conn) {
- pthread_mutex_unlock(&pool->mutex);
- return 0;
- }
-
- radlog(L_INFO, "%s: Opening connection successful (%i)",
- pool->log_prefix, pool->count);
- }
-
- /*
- * Too many connections: can't add it.
- */
- if (pool->num >= pool->max) {
+ this = talloc_zero(pool, fr_connection_t);
+ if (!this) {
pthread_mutex_unlock(&pool->mutex);
- return 0;
+ return NULL;
}
- this = rad_malloc(sizeof(*this));
- memset(this, 0, sizeof(*this));
-
- this->start = time(NULL);
+ this->created = now;
this->connection = conn;
+ this->in_use = in_use;
this->number = pool->count++;
- this->last_used = time(NULL);
- fr_connection_link(pool, this);
+ this->last_used = now;
+ fr_connection_link_head(pool, this);
pool->num++;
+ pool->spawning = false;
+ pool->last_spawned = time(NULL);
+ pool->delay_interval = pool->cleanup_interval;
+ pool->next_delay = pool->cleanup_interval;
+ pool->last_failed = 0;
pthread_mutex_unlock(&pool->mutex);
- if (pool->trigger) exec_trigger(NULL, pool->cs, "open", TRUE);
+ if (pool->trigger) exec_trigger(NULL, pool->cs, "open", true);
- return 1;
+ return this;
}
/** Close an existing connection.
static void fr_connection_close(fr_connection_pool_t *pool,
fr_connection_t *this)
{
- if (pool->trigger) exec_trigger(NULL, pool->cs, "close", TRUE);
+ /*
+ * If it's in use, release it.
+ */
+ if (this->in_use) {
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id = pthread_self();
+ rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
+#endif
+ this->in_use = false;
+
+ rad_assert(pool->active > 0);
+ pool->active--;
+ }
- rad_assert(this->in_use == FALSE);
+ if (pool->trigger) exec_trigger(NULL, pool->cs, "close", true);
fr_connection_unlink(pool, this);
pool->delete(pool->ctx, this->connection);
rad_assert(pool->num > 0);
pool->num--;
- free(this);
+ talloc_free(this);
}
/** Find a connection handle in the connection list
* order to find top of the parent structure.
*/
for (this = pool->head; this != NULL; this = this->next) {
- if (this->connection == conn) return this;
+ if (this->connection == conn) {
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id;
+
+ pthread_id = pthread_self();
+ rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
+#endif
+
+ rad_assert(this->in_use == true);
+ return this;
+ }
}
pthread_mutex_unlock(&pool->mutex);
this = fr_connection_find(pool, conn);
if (!this) return 0;
- /*
- * If it's in use, release it.
- */
- if (this->in_use) {
- rad_assert(this->in_use == TRUE);
- this->in_use = FALSE;
-
- rad_assert(pool->active > 0);
- pool->active--;
- }
-
- radlog(L_INFO, "%s: Deleting connection (%i)", pool->log_prefix,
- this->number);
+ INFO("%s: Deleting connection (%" PRIu64 ")", pool->log_prefix, this->number);
fr_connection_close(pool, this);
fr_connection_pool_check(pool);
for (this = pool->head; this != NULL; this = next) {
next = this->next;
-
- radlog(L_INFO, "%s: Closing connection (%i)", pool->log_prefix,
- this->number);
-
+
+ INFO("%s: Closing connection (%" PRIu64 ")", pool->log_prefix, this->number);
+
fr_connection_close(pool, this);
}
- if (pool->trigger) exec_trigger(NULL, pool->cs, "stop", TRUE);
+ if (pool->trigger) exec_trigger(NULL, pool->cs, "stop", true);
rad_assert(pool->head == NULL);
rad_assert(pool->tail == NULL);
rad_assert(pool->num == 0);
-
- free(pool->log_prefix);
- free(pool);
+
+ talloc_free(pool);
}
/** Create a new connection pool
* @param[in] c Callback to create new connections.
* @param[in] a Callback to check the status of connections.
* @param[in] d Callback to delete connections.
+ * @param[in] prefix to prepend to all log message, if NULL will create prefix
+ * from parent conf section names.
* @return A new connection pool or NULL on error.
*/
fr_connection_pool_t *fr_connection_pool_init(CONF_SECTION *parent,
void *ctx,
fr_connection_create_t c,
fr_connection_alive_t a,
- fr_connection_delete_t d)
+ fr_connection_delete_t d,
+ char const *prefix)
{
- int i, lp_len;
+ uint32_t i;
fr_connection_pool_t *pool;
fr_connection_t *this;
CONF_SECTION *modules;
CONF_SECTION *cs;
- const char *cs_name1, *cs_name2;
+ char const *cs_name1, *cs_name2;
time_t now = time(NULL);
if (!parent || !ctx || !c || !d) return NULL;
cs = cf_section_sub_find(parent, "pool");
if (!cs) cs = cf_section_sub_find(parent, "limit");
- pool = rad_malloc(sizeof(*pool));
- memset(pool, 0, sizeof(*pool));
+ /*
+ * Pool is allocated in the NULL context as
+ * threads are likely to allocate memory
+ * beneath the pool.
+ */
+ pool = talloc_zero(NULL, fr_connection_pool_t);
+ if (!pool) return NULL;
+
+ /*
+ * Ensure the pool is freed at the same time
+ * as its parent.
+ */
+ if (fr_link_talloc_ctx_free(cs ? cs : parent, pool) < 0) {
+ talloc_free(pool);
+ return NULL;
+ }
pool->cs = cs;
pool->ctx = ctx;
pthread_mutex_init(&pool->mutex, NULL);
#endif
- modules = cf_item_parent(cf_sectiontoitem(parent));
- if (modules) {
- cs_name1 = cf_section_name1(modules);
- if (cs_name1 && (strcmp(cs_name1, "modules") == 0)) {
- cs_name1 = cf_section_name1(parent);
- cs_name2 = cf_section_name2(parent);
- if (!cs_name2) {
- cs_name2 = cs_name1;
+ if (!prefix) {
+ modules = cf_item_parent(cf_sectiontoitem(parent));
+ if (modules) {
+ cs_name1 = cf_section_name1(modules);
+ if (cs_name1 && (strcmp(cs_name1, "modules") == 0)) {
+ cs_name1 = cf_section_name1(parent);
+ cs_name2 = cf_section_name2(parent);
+ if (!cs_name2) {
+ cs_name2 = cs_name1;
+ }
+
+ pool->log_prefix = talloc_typed_asprintf(pool, LOG_PREFIX, cs_name1,
+ cs_name2);
}
+ } else { /* not a module configuration */
+ cs_name1 = cf_section_name1(parent);
- lp_len = (sizeof(LOG_PREFIX) - 4) + strlen(cs_name1) + strlen(cs_name2);
- pool->log_prefix = rad_malloc(lp_len);
- snprintf(pool->log_prefix, lp_len, LOG_PREFIX, cs_name1,
- cs_name2);
+ pool->log_prefix = talloc_typed_strdup(pool, cs_name1);
}
- } else { /* not a module configuration */
- cs_name1 = cf_section_name1(parent);
-
- pool->log_prefix = strdup(cs_name1);
+ } else {
+ pool->log_prefix = talloc_typed_strdup(pool, prefix);
}
-
+
DEBUG("%s: Initialising connection pool", pool->log_prefix);
if (cs) {
goto error;
}
- if (cf_section_sub_find(cs, "trigger")) pool->trigger = TRUE;
+ if (cf_section_sub_find(cs, "trigger")) pool->trigger = true;
} else {
pool->start = 5;
pool->min = 5;
pool->max = 10;
pool->spare = 3;
- pool->cleanup_delay = 5;
+ pool->cleanup_interval = 30;
pool->idle_timeout = 60;
}
/*
* Some simple limits
*/
+ if (pool->max == 0) {
+ cf_log_err_cs(cs, "Cannot set 'max' to zero");
+ goto error;
+ }
+ if (pool->min > pool->max) {
+ cf_log_err_cs(cs, "Cannot set 'min' to more than 'max'");
+ goto error;
+ }
+
if (pool->max > 1024) pool->max = 1024;
if (pool->start > pool->max) pool->start = pool->max;
if (pool->spare > (pool->max - pool->min)) {
pool->idle_timeout = 0;
}
+ if ((pool->idle_timeout > 0) && (pool->cleanup_interval > pool->idle_timeout)) {
+ pool->cleanup_interval = pool->idle_timeout;
+ }
+
+ /*
+ * Don't open any connections. Instead, force the limits
+ * to only 1 connection.
+ *
+ */
+ if (check_config) {
+ pool->start = pool->min = pool->max = 1;
+ return pool;
+ }
+
/*
* Create all of the connections, unless the admin says
* not to.
*/
for (i = 0; i < pool->start; i++) {
- this = fr_connection_spawn(pool, now);
+ this = fr_connection_spawn(pool, now, false);
if (!this) {
error:
fr_connection_pool_delete(pool);
}
}
- if (pool->trigger) exec_trigger(NULL, pool->cs, "start", TRUE);
+ if (pool->trigger) exec_trigger(NULL, pool->cs, "start", true);
return pool;
}
* @note Will only close connections not in use.
* @note Must be called with the mutex held.
*
- * @param[in,out] pool
+ * @param[in,out] pool to modify.
* @param[in,out] this Connection to manage.
* @param[in] now Current time.
* @return 0 if the connection was closed, otherwise 1.
{
rad_assert(pool != NULL);
rad_assert(this != NULL);
-
+
/*
* Don't terminated in-use connections
*/
if ((pool->max_uses > 0) &&
(this->num_uses >= pool->max_uses)) {
- DEBUG("%s: Closing expired connection (%i): Hit max_uses limit",
- pool->log_prefix, this->number);
+ DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit max_uses limit", pool->log_prefix,
+ this->number);
do_delete:
- if ((pool->num <= pool->min) &&
- (pool->last_complained < now)) {
- radlog(L_INFO, "%s: WARNING: You probably need to "
- "lower \"min\"", pool->log_prefix);
-
- pool->last_complained = now;
+ if (pool->num <= pool->min) {
+ RATE_LIMIT(WARN("%s: You probably need to lower \"min\"", pool->log_prefix));
}
fr_connection_close(pool, this);
return 0;
}
if ((pool->lifetime > 0) &&
- ((this->start + pool->lifetime) < now)) {
- DEBUG("%s: Closing expired connection (%i) ", pool->log_prefix,
- this->number);
+ ((this->created + pool->lifetime) < now)) {
+ DEBUG("%s: Closing expired connection (%" PRIu64 ")", pool->log_prefix, this->number);
goto do_delete;
}
if ((pool->idle_timeout > 0) &&
((this->last_used + pool->idle_timeout) < now)) {
- radlog(L_INFO, "%s: Closing connection (%i): Hit idle_timeout, "
- "was idle for %u seconds", pool->log_prefix, this->number,
- (int) (now - this->last_used));
+ INFO("%s: Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %u seconds",
+ pool->log_prefix, this->number, (int) (now - this->last_used));
goto do_delete;
}
-
+
return 1;
}
*/
static int fr_connection_pool_check(fr_connection_pool_t *pool)
{
- int spare, spawn;
+ uint32_t spawn, idle, extra;
time_t now = time(NULL);
fr_connection_t *this, *next;
return 1;
}
- spare = pool->num - pool->active;
+ /*
+ * Some idle connections are OK, if they're within the
+ * configured "spare" range. Any extra connections
+ * outside of that range can be closed.
+ */
+ idle = pool->num - pool->active;
+ if (idle <= pool->spare) {
+ extra = 0;
+ } else {
+ extra = idle - pool->spare;
+ }
- spawn = 0;
- if ((pool->num < pool->max) && (spare < pool->spare)) {
- spawn = pool->spare - spare;
- if ((spawn + pool->num) > pool->max) {
- spawn = pool->max - pool->num;
+ /*
+ * The other end can close connections. If so, we'll
+ * have fewer than "min". When that happens, open more
+ * connections to enforce "min".
+ */
+ if (pool->num <= pool->min) {
+ if (pool->spawning) {
+ spawn = 0;
+ } else {
+ spawn = pool->min - pool->num;
}
- if (pool->spawning) spawn = 0;
+ extra = 0;
- if (spawn) {
- pthread_mutex_unlock(&pool->mutex);
- fr_connection_spawn(pool, now); /* ignore return code */
- pthread_mutex_lock(&pool->mutex);
+ } else if (pool->num >= pool->max) {
+ /*
+ * Ensure we don't spawn more connections. If
+ * there are extra idle connections, we can
+ * delete all of them.
+ */
+ spawn = 0;
+ /* leave extra alone from above */
+
+ } else if (idle <= pool->spare) {
+ /*
+ * Not enough spare connections. Spawn a few.
+ * But cap the pool size at "max"
+ */
+ spawn = pool->spare - idle;
+ extra = 0;
+
+ if ((pool->num + spawn) > pool->max) {
+ spawn = pool->max - pool->num;
}
+
+ } else if ((pool->min + extra) >= pool->num) {
+ /*
+ * If closing the extra connections would take us
+ * below "min", then don't do that. Cap the
+ * spare connections at the ones which will take
+ * us exactly to "min".
+ */
+ spawn = 0;
+ extra = pool->num - pool->min;
+
+ } else {
+ /*
+ * Closing the "extra" connections won't take us
+ * below "min". It's therefore safe to close
+ * them all.
+ */
+ spawn = 0;
+ /* leave extra alone from above */
+ }
+
+ if (spawn) {
+ pthread_mutex_unlock(&pool->mutex);
+ fr_connection_spawn(pool, now, false); /* ignore return code */
+ pthread_mutex_lock(&pool->mutex);
}
/*
* We haven't spawned connections in a while, and there
* are too many spare ones. Close the one which has been
- * idle for the longest.
+ * unused for the longest.
*/
- if ((now >= (pool->last_spawned + pool->cleanup_delay)) &&
- (spare > pool->spare)) {
- fr_connection_t *idle;
+ if (extra && (now >= (pool->last_spawned + pool->delay_interval))) {
+ fr_connection_t *found;
- idle = NULL;
+ found = NULL;
for (this = pool->tail; this != NULL; this = this->prev) {
if (this->in_use) continue;
- if (!idle ||
- (this->last_used < idle->last_used)) {
- idle = this;
+ if (!found ||
+ (this->last_used < found->last_used)) {
+ found = this;
}
}
- rad_assert(idle != NULL);
-
- radlog(L_INFO, "%s: Closing connection (%i): Too many "
- "free connections (%d > %d)", pool->log_prefix,
- idle->number, spare, pool->spare);
- fr_connection_close(pool, idle);
+ rad_assert(found != NULL);
+
+ INFO("%s: Closing connection (%" PRIu64 "), from %d unused connections", pool->log_prefix,
+ found->number, extra);
+ fr_connection_close(pool, found);
+
+ /*
+ * Decrease the delay for the next time we clean
+ * up.
+ */
+ pool->next_delay >>= 1;
+ if (pool->next_delay == 0) pool->next_delay = 1;
+ pool->delay_interval += pool->next_delay;
}
/*
return 1;
}
-/** Trigger connection check for a given connection or all connections
- *
- * If conn is not NULL then we call fr_connection_manage on the connection.
- * If conn is NULL we call fr_connection_pool_check on the pool.
- *
- * @note Only connections that are not in use will be closed.
- *
- * @see fr_connection_manage
- * @see fr_connection_pool_check
- * @param[in,out] pool to manage.
- * @param[in,out] conn to check.
- * @return 0 if the connection was closed, else 1.
- */
-int fr_connection_check(fr_connection_pool_t *pool, void *conn)
-{
- fr_connection_t *this;
- time_t now;
- int ret = 1;
-
- if (!pool) return 1;
-
- now = time(NULL);
- pthread_mutex_lock(&pool->mutex);
-
- if (!conn) return fr_connection_pool_check(pool);
-
- for (this = pool->head; this != NULL; this = this->next) {
- if (this->connection == conn) {
- ret = fr_connection_manage(pool, conn, now);
- break;
- }
- }
-
- pthread_mutex_unlock(&pool->mutex);
-
- return ret;
-}
-
-/** Reserve a connection in the connection pool
- *
- * Will attempt to find an unused connection in the connection pool, if one is
- * found, will mark it as in in use increment the number of active connections
- * and return the connection handle.
- *
- * If no free connections are found will attempt to spawn a new one, conditional
- * on a connection spawning not already being in progress, and not being at the
- * 'max' connection limit.
+/** Get a connection from the connection pool
*
- * @note fr_connection_release must be called once the caller has finished
- * using the connection.
- *
- * @see fr_connection_release
* @param[in,out] pool to reserve the connection from.
+ * @param[in] spawn whether to spawn a new connection
* @return a pointer to the connection handle, or NULL on error.
*/
-void *fr_connection_get(fr_connection_pool_t *pool)
+static void *fr_connection_get_internal(fr_connection_pool_t *pool, int spawn)
{
time_t now;
fr_connection_t *this, *next;
}
if (pool->num == pool->max) {
- int complain = FALSE;
+ bool complain = false;
/*
* Rate-limit complaints.
*/
if (pool->last_at_max != now) {
- complain = TRUE;
+ complain = true;
pool->last_at_max = now;
}
-
+
pthread_mutex_unlock(&pool->mutex);
-
- if (complain) {
- radlog(L_ERR, "%s: No connections available and at max "
- "connection limit", pool->log_prefix);
+
+ if (!RATE_LIMIT_ENABLED || complain) {
+ ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
}
-
+
return NULL;
}
pthread_mutex_unlock(&pool->mutex);
- this = fr_connection_spawn(pool, now);
+
+ if (!spawn) return NULL;
+
+ this = fr_connection_spawn(pool, now, true); /* MY connection! */
if (!this) return NULL;
pthread_mutex_lock(&pool->mutex);
pool->active++;
this->num_uses++;
this->last_used = now;
- this->in_use = TRUE;
+ this->in_use = true;
+#ifdef PTHREAD_DEBUG
+ this->pthread_id = pthread_self();
+#endif
pthread_mutex_unlock(&pool->mutex);
-
- DEBUG("%s: Reserved connection (%i)", pool->log_prefix, this->number);
-
+
+ DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number);
+
return this->connection;
}
+
+/** Reserve a connection in the connection pool
+ *
+ * Will attempt to find an unused connection in the connection pool, if one is
+ * found, will mark it as in in use increment the number of active connections
+ * and return the connection handle.
+ *
+ * If no free connections are found will attempt to spawn a new one, conditional
+ * on a connection spawning not already being in progress, and not being at the
+ * 'max' connection limit.
+ *
+ * @note fr_connection_release must be called once the caller has finished
+ * using the connection.
+ *
+ * @see fr_connection_release
+ * @param[in,out] pool to reserve the connection from.
+ * @return a pointer to the connection handle, or NULL on error.
+ */
+void *fr_connection_get(fr_connection_pool_t *pool)
+{
+ return fr_connection_get_internal(pool, true);
+}
+
+/** Get the number of connections currently in the pool
+ *
+ * @param pool to count connections for.
+ * @return the number of connections in the pool
+ */
+int fr_connection_get_num(fr_connection_pool_t *pool)
+{
+ return pool->num;
+}
+
/** Release a connection
*
* Will mark a connection as unused and decrement the number of active
this = fr_connection_find(pool, conn);
if (!this) return;
- rad_assert(this->in_use == TRUE);
- this->in_use = FALSE;
-
+ this->in_use = false;
+
/*
- * Put it at the head of the list, so
- * that it will get re-used quickly.
+ * Determines whether the last used connection gets
+ * re-used first.
*/
- if (this != pool->head) {
- fr_connection_unlink(pool, this);
- fr_connection_link(pool, this);
+ if (pool->spread) {
+ /*
+ * Put it at the tail of the list, so
+ * that it will get re-used last.
+ */
+ if (this != pool->tail) {
+ fr_connection_unlink(pool, this);
+ fr_connection_link_tail(pool, this);
+ }
+ } else {
+ /*
+ * Put it at the head of the list, so
+ * that it will get re-used quickly.
+ */
+ if (this != pool->head) {
+ fr_connection_unlink(pool, this);
+ fr_connection_link_head(pool, this);
+ }
}
+
rad_assert(pool->active > 0);
pool->active--;
- DEBUG("%s: Released connection (%i)", pool->log_prefix, this->number);
+ DEBUG("%s: Released connection (%" PRIu64 ")", pool->log_prefix, this->number);
/*
* We mirror the "spawn on get" functionality by having
* @see fr_connection_get
* @param[in,out] pool to reconnect the connection in.
* @param[in,out] conn to reconnect.
- * @return ew connection handle if successful else NULL.
+ * @return new connection handle if successful else NULL.
*/
void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn)
{
void *new_conn;
fr_connection_t *this;
- int conn_number;
+ uint64_t conn_number;
if (!pool || !conn) return NULL;
this = fr_connection_find(pool, conn);
if (!this) return NULL;
-
+
+ /*
+ * The pool is now locked.
+ */
conn_number = this->number;
- rad_assert(this->in_use == TRUE);
-
- DEBUG("%s: Reconnecting (%i)", pool->log_prefix, conn_number);
-
+ DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number);
+
new_conn = pool->create(pool->ctx);
if (!new_conn) {
- time_t now = time(NULL);
-
- if (pool->last_complained == now) {
- now = 0;
- } else {
- pool->last_complained = now;
- }
-
- this->in_use = FALSE;
-
- rad_assert(pool->active > 0);
- pool->active--;
-
+ /*
+ * We can't create a new connection, so close
+ * this one.
+ */
fr_connection_close(pool, this);
- pthread_mutex_unlock(&pool->mutex);
-
+
/*
- * Can't create a new socket.
- * Try grabbing a pre-existing one.
+ * Maybe there's a connection which is unused and
+ * available. If so, return it.
*/
- new_conn = fr_connection_get(pool);
+ pthread_mutex_unlock(&pool->mutex);
+ new_conn = fr_connection_get_internal(pool, false);
if (new_conn) return new_conn;
-
- if (!now) return NULL;
-
- radlog(L_ERR, "%s: Failed to reconnect (%i), and no other "
- "connections available", pool->log_prefix, conn_number);
-
+
+ RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available", pool->log_prefix,
+ conn_number));
return NULL;
}
-
+
+ if (pool->trigger) exec_trigger(NULL, pool->cs, "close", true);
pool->delete(pool->ctx, conn);
this->connection = new_conn;
pthread_mutex_unlock(&pool->mutex);