#ifndef NDEBUG
#ifdef HAVE_PTHREAD_H
-#define PTHREAD_DEBUG (1)
+/* #define PTHREAD_DEBUG (1) */
#endif
#endif
time_t last_used; //!< Last time the connection was
//!< reserved.
- uint64_t num_uses; //!< Number of times the connection
+ uint32_t num_uses; //!< Number of times the connection
//!< has been reserved.
- int in_use; //!< Whether the connection is currently
- //!< reserved.
uint64_t number; //!< Unique ID assigned when the
//!< connection is created, these will
//!< monotonically increase over the
//!< lifetime of the connection pool.
void *connection; //!< Pointer to whatever the module
//!< uses for a connection handle.
+ bool in_use; //!< Whether the connection is currently
+ //!< reserved.
#ifdef PTHREAD_DEBUG
pthread_t pthread_id; //!< When 'in_use == true'
#endif
* @see fr_connection
*/
struct fr_connection_pool_t {
- int start; //!< Number of initial connections
- int min; //!< Minimum number of concurrent
+ uint32_t start; //!< Number of initial connections
+ uint32_t min; //!< Minimum number of concurrent
//!< connections to keep open.
- int max; //!< Maximum number of concurrent
+ uint32_t max; //!< Maximum number of concurrent
//!< connections to allow.
- int spare; //!< Number of spare connections to try
- int retry_delay; //!< seconds to delay re-open
+ uint32_t spare; //!< Number of spare connections to try
+ uint32_t retry_delay; //!< seconds to delay re-open
//!< after a failed open.
- int cleanup_interval; //!< Initial timer for how
+ uint32_t cleanup_interval; //!< Initial timer for how
//!< often we sweep the pool
//!< for free connections.
//!< (0 is infinite).
uint64_t max_uses; //!< Maximum number of times a
//!< connection can be used before being
//!< closed.
- int lifetime; //!< How long a connection can be open
+ uint32_t lifetime; //!< How long a connection can be open
//!< before being closed (irrespective
//!< of whether it's idle or not).
- int idle_timeout; //!< How long a connection can be idle
+ uint32_t idle_timeout; //!< How long a connection can be idle
//!< before being closed.
bool trigger; //!< If true execute connection triggers
time_t last_spawned; //!< Last time we spawned a connection.
time_t last_failed; //!< Last time we tried to spawn a
//!< a connection but failed.
- time_t last_complained;//!< Last time we complained about
- //!< configuration parameters.
time_t last_throttled; //!< Last time we refused to spawn a
//!< connection because the last
//!< connection failed, or we were
uint64_t count; //!< Number of connections spawned over
//!< the lifetime of the pool.
- int num; //!< Number of connections in the pool.
+ uint32_t num; //!< Number of connections in the pool.
int active; //!< Number of currently reserved
//!< connections.
CONF_SECTION *cs; //!< Configuration section holding
//!< the section of parsed config file
//!< that relates to this pool.
- void *ctx; //!< Pointer to context data that will
+ void *opaque; //!< Pointer to context data that will
//!< be passed to callbacks.
char *log_prefix; //!< Log prefix to prepend to all log
#endif
static const CONF_PARSER connection_config[] = {
- { "start", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, start),
- 0, "5" },
- { "min", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, min),
- 0, "5" },
- { "max", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, max),
- 0, "10" },
- { "spare", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, spare),
- 0, "3" },
- { "uses", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, max_uses),
- 0, "0" },
- { "lifetime", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, lifetime),
- 0, "0" },
- { "cleanup_delay", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, cleanup_interval),
- 0, NULL},
- { "cleanup_interval", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, cleanup_interval),
- 0, "30" },
- { "idle_timeout", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, idle_timeout),
- 0, "60" },
- { "retry_delay", PW_TYPE_INTEGER, offsetof(fr_connection_pool_t, retry_delay),
- 0, "1" },
- { "spread", PW_TYPE_BOOLEAN, offsetof(fr_connection_pool_t, spread),
- 0, "no" },
+ { "start", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, start), "5" },
+ { "min", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, min), "5" },
+ { "max", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max), "10" },
+ { "spare", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, spare), "3" },
+ { "uses", FR_CONF_OFFSET(PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), "0" },
+ { "lifetime", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), "0" },
+ { "cleanup_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), NULL},
+ { "cleanup_interval", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), "30" },
+ { "idle_timeout", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), "60" },
+ { "retry_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), "1" },
+ { "spread", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), "no" },
{ NULL, -1, 0, NULL, NULL }
};
*
* @param[in] pool to modify.
* @param[in] now Current time.
+ * @param[in] in_use whether the new connection should be "in_use" or not
* @return the new connection struct or NULL on error.
*/
static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool,
- time_t now)
+ time_t now, bool in_use)
{
+ TALLOC_CTX *ctx;
+
fr_connection_t *this;
void *conn;
pthread_mutex_unlock(&pool->mutex);
- if (complain) {
- ERROR("%s: Last connection attempt failed, "
- "waiting %d seconds before retrying",
+ if (!RATE_LIMIT_ENABLED || complain) {
+ ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
pool->log_prefix, pool->retry_delay);
}
INFO("%s: Opening additional connection (%" PRIu64 ")", pool->log_prefix, pool->count);
- this = rad_malloc(sizeof(*this));
- memset(this, 0, sizeof(*this));
+ /*
+ * Allocate a new top level ctx for the create callback
+ * to hang its memory off of.
+ */
+ ctx = talloc_init("fr_connection_ctx");
+ if (!ctx) return NULL;
/*
* This may take a long time, which prevents other
* about other threads opening new connections, as we
* already have no free connections.
*/
- conn = pool->create(pool->ctx);
+ conn = pool->create(ctx, pool->opaque);
if (!conn) {
ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, pool->count);
pool->last_failed = now;
- free(this);
pool->spawning = false; /* atomic, so no lock is needed */
return NULL;
}
- this->created = now;
- this->connection = conn;
-
/*
* And lock the mutex again while we link the new
* connection back into the pool.
*/
pthread_mutex_lock(&pool->mutex);
+ this = talloc_zero(pool, fr_connection_t);
+ if (!this) {
+ pthread_mutex_unlock(&pool->mutex);
+ return NULL;
+ }
+ fr_link_talloc_ctx_free(this, ctx);
+
+ this->created = now;
+ this->connection = conn;
+ this->in_use = in_use;
+
this->number = pool->count++;
this->last_used = now;
fr_connection_link_head(pool, this);
if (pool->trigger) exec_trigger(NULL, pool->cs, "close", true);
fr_connection_unlink(pool, this);
- pool->delete(pool->ctx, this->connection);
+ if (pool->delete) pool->delete(pool->opaque, this->connection);
rad_assert(pool->num > 0);
pool->num--;
- free(this);
+ talloc_free(this);
}
/** Find a connection handle in the connection list
* order to find top of the parent structure.
*/
for (this = pool->head; this != NULL; this = this->next) {
- if (this->connection == conn) return this;
+ if (this->connection == conn) {
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id;
+
+ pthread_id = pthread_self();
+ rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
+#endif
+
+ rad_assert(this->in_use == true);
+ return this;
+ }
}
pthread_mutex_unlock(&pool->mutex);
rad_assert(pool->tail == NULL);
rad_assert(pool->num == 0);
- free(pool->log_prefix);
- free(pool);
+ talloc_free(pool);
}
/** Create a new connection pool
* @note Will call the 'start' trigger.
*
* @param[in] parent configuration section containing a 'pool' subsection.
- * @param[in] ctx pointer to pass to callbacks.
+ * @param[in] opaque data pointer to pass to callbacks.
* @param[in] c Callback to create new connections.
* @param[in] a Callback to check the status of connections.
* @param[in] d Callback to delete connections.
* @return A new connection pool or NULL on error.
*/
fr_connection_pool_t *fr_connection_pool_init(CONF_SECTION *parent,
- void *ctx,
+ void *opaque,
fr_connection_create_t c,
fr_connection_alive_t a,
fr_connection_delete_t d,
- char *prefix)
+ char const *prefix)
{
- int i, lp_len;
+ uint32_t i;
fr_connection_pool_t *pool;
fr_connection_t *this;
CONF_SECTION *modules;
char const *cs_name1, *cs_name2;
time_t now = time(NULL);
- if (!parent || !ctx || !c || !d) return NULL;
+ if (!parent || !opaque || !c) return NULL;
cs = cf_section_sub_find(parent, "pool");
if (!cs) cs = cf_section_sub_find(parent, "limit");
- pool = rad_malloc(sizeof(*pool));
- memset(pool, 0, sizeof(*pool));
+ /*
+ * Pool is allocated in the NULL context as
+ * threads are likely to allocate memory
+ * beneath the pool.
+ */
+ pool = talloc_zero(NULL, fr_connection_pool_t);
+ if (!pool) return NULL;
+
+ /*
+ * Ensure the pool is freed at the same time
+ * as its parent.
+ */
+ if (fr_link_talloc_ctx_free(cs ? cs : parent, pool) < 0) {
+ talloc_free(pool);
+ return NULL;
+ }
pool->cs = cs;
- pool->ctx = ctx;
+ pool->opaque = opaque;
pool->create = c;
pool->alive = a;
pool->delete = d;
cs_name2 = cs_name1;
}
- lp_len = (sizeof(LOG_PREFIX) - 4) + strlen(cs_name1) + strlen(cs_name2);
- pool->log_prefix = rad_malloc(lp_len);
- snprintf(pool->log_prefix, lp_len, LOG_PREFIX, cs_name1,
- cs_name2);
+ pool->log_prefix = talloc_typed_asprintf(pool, LOG_PREFIX, cs_name1,
+ cs_name2);
}
} else { /* not a module configuration */
cs_name1 = cf_section_name1(parent);
- pool->log_prefix = strdup(cs_name1);
+ pool->log_prefix = talloc_typed_strdup(pool, cs_name1);
}
} else {
- pool->log_prefix = strdup(prefix);
+ pool->log_prefix = talloc_typed_strdup(pool, prefix);
}
DEBUG("%s: Initialising connection pool", pool->log_prefix);
pool->idle_timeout = 0;
}
- if (pool->cleanup_interval < 0) {
- pool->cleanup_interval = 30;
- }
-
if ((pool->idle_timeout > 0) && (pool->cleanup_interval > pool->idle_timeout)) {
pool->cleanup_interval = pool->idle_timeout;
}
* not to.
*/
for (i = 0; i < pool->start; i++) {
- this = fr_connection_spawn(pool, now);
+ this = fr_connection_spawn(pool, now, false);
if (!this) {
error:
fr_connection_pool_delete(pool);
DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit max_uses limit", pool->log_prefix,
this->number);
do_delete:
- if ((pool->num <= pool->min) &&
- (pool->last_complained < now)) {
- WARN("%s: You probably need to lower \"min\"", pool->log_prefix);
-
- pool->last_complained = now;
+ if (pool->num <= pool->min) {
+ RATE_LIMIT(WARN("%s: You probably need to lower \"min\"", pool->log_prefix));
}
fr_connection_close(pool, this);
return 0;
*/
static int fr_connection_pool_check(fr_connection_pool_t *pool)
{
- int spawn, idle, extra;
+ uint32_t spawn, idle, extra;
time_t now = time(NULL);
fr_connection_t *this, *next;
if (spawn) {
pthread_mutex_unlock(&pool->mutex);
- fr_connection_spawn(pool, now); /* ignore return code */
+ fr_connection_spawn(pool, now, false); /* ignore return code */
pthread_mutex_lock(&pool->mutex);
}
pthread_mutex_unlock(&pool->mutex);
- if (complain) {
+ if (!RATE_LIMIT_ENABLED || complain) {
ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
}
if (!spawn) return NULL;
- this = fr_connection_spawn(pool, now);
+ this = fr_connection_spawn(pool, now, true); /* MY connection! */
if (!this) return NULL;
pthread_mutex_lock(&pool->mutex);
return fr_connection_get_internal(pool, true);
}
+/** Get the number of connections currently in the pool
+ *
+ * @param pool to count connections for.
+ * @return the number of connections in the pool
+ */
+int fr_connection_get_num(fr_connection_pool_t *pool)
+{
+ return pool->num;
+}
/** Release a connection
*
void fr_connection_release(fr_connection_pool_t *pool, void *conn)
{
fr_connection_t *this;
-#ifdef PTHREAD_DEBUG
- pthread_t pthread_id;
-#endif
this = fr_connection_find(pool, conn);
if (!this) return;
-#ifdef PTHREAD_DEBUG
- /*
- * The thread which grabbed the connection must be the
- * thread which releases it.
- */
- pthread_id = pthread_self();
- rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
-#endif
-
- rad_assert(this->in_use == true);
this->in_use = false;
/*
void *new_conn;
fr_connection_t *this;
uint64_t conn_number;
-#ifdef PTHREAD_DEBUG
- pthread_t pthread_id;
-#endif
+ TALLOC_CTX *ctx;
if (!pool || !conn) return NULL;
+ /*
+ * If fr_connection_find is successful the pool is now locked
+ */
this = fr_connection_find(pool, conn);
if (!this) return NULL;
-#ifdef PTHREAD_DEBUG
- /*
- * The thread which grabbed the connection must be the
- * thread which releases it.
- */
- pthread_id = pthread_self();
- rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
-#endif
- rad_assert(this->in_use == true);
+ conn_number = this->number;
/*
- * The pool is now locked.
+ * Destroy any handles associated with the fr_connection_t
*/
- conn_number = this->number;
+ talloc_free_children(this);
DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number);
- new_conn = pool->create(pool->ctx);
- if (!new_conn) {
- time_t now = time(NULL);
-
- if (pool->last_complained == now) {
- now = 0;
- } else {
- pool->last_complained = now;
- }
+ /*
+ * Allocate a new top level ctx for the create callback
+ * to hang its memory off of.
+ */
+ ctx = talloc_init("fr_connection_ctx");
+ if (!ctx) return NULL;
+ fr_link_talloc_ctx_free(this, ctx);
+ new_conn = pool->create(ctx, pool->opaque);
+ if (!new_conn) {
/*
* We can't create a new connection, so close
* this one.
new_conn = fr_connection_get_internal(pool, false);
if (new_conn) return new_conn;
- if (!now) return NULL;
-
- ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available", pool->log_prefix,
- conn_number);
+ RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available",
+ pool->log_prefix, conn_number));
return NULL;
}
if (pool->trigger) exec_trigger(NULL, pool->cs, "close", true);
- pool->delete(pool->ctx, conn);
+ pool->delete(pool->opaque, conn);
this->connection = new_conn;
pthread_mutex_unlock(&pool->mutex);
return new_conn;