Pass a threadsafe ctx into fr_connection_pool create callback
[freeradius.git] / src / modules / rlm_redis / rlm_redis.c
index 2726488..f07d494 100644 (file)
@@ -1,12 +1,7 @@
 /*
- * rlm_redis.c
- *
- * Version:    $Id$
- *
- *   This program is free software; you can redistribute it and/or modify
- *   it under the terms of the GNU General Public License as published by
- *   the Free Software Foundation; either version 2 of the License, or
- *   (at your option) any later version.
+ *   This program is is free software; you can redistribute it and/or modify
+ *   it under the terms of the GNU General Public License, version 2 if the
+ *   License as published by the Free Software Foundation.
  *
  *   This program is distributed in the hope that it will be useful,
  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  *   You should have received a copy of the GNU General Public License
  *   along with this program; if not, write to the Free Software
  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
- *
- * Copyright 2000,2006  The FreeRADIUS server project
- * Copyright 2011  TekSavvy Solutions <gabe@teksavvy.com>
  */
 
-#include <freeradius-devel/ident.h>
+/**
+ * $Id$
+ * @file rlm_redis.c
+ * @brief Driver for the REDIS noSQL key value stores.
+ *
+ * @copyright 2000,2006  The FreeRADIUS server project
+ * @copyright 2011  TekSavvy Solutions <gabe@teksavvy.com>
+ */
 
 RCSID("$Id$")
 
@@ -31,147 +30,152 @@ RCSID("$Id$")
 #include "rlm_redis.h"
 
 static const CONF_PARSER module_config[] = {
-       { "num_connections", PW_TYPE_INTEGER,
-         offsetof(REDIS_INST, numconnections), NULL, "20"},
-       { "hostname", PW_TYPE_STRING_PTR,
-         offsetof(REDIS_INST, hostname), NULL, "127.0.0.1"},
-       { "port", PW_TYPE_INTEGER,
-         offsetof(REDIS_INST, port), NULL, "6379"},
-       {"connect_failure_retry_delay", PW_TYPE_INTEGER,
-        offsetof(REDIS_INST, connect_failure_retry_delay), NULL, "60"},
-       {"lifetime", PW_TYPE_INTEGER,
-        offsetof(REDIS_INST, lifetime), NULL, "0"},
-       {"max_queries", PW_TYPE_INTEGER,
-        offsetof(REDIS_INST, max_queries), NULL, "0"},
+       { "hostname", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_DEPRECATED, REDIS_INST, hostname), NULL },
+       { "server", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_REQUIRED, REDIS_INST, hostname), NULL },
+       { "port", FR_CONF_OFFSET(PW_TYPE_SHORT, REDIS_INST, port), "6379" },
+       { "database", FR_CONF_OFFSET(PW_TYPE_INTEGER, REDIS_INST, database), "0" },
+       { "password", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_SECRET, REDIS_INST, password), NULL },
 
        { NULL, -1, 0, NULL, NULL} /* end the list */
 };
 
-static int redis_close_socket(REDIS_INST *inst, REDISSOCK *dissocket)
+static int _mod_conn_free(REDISSOCK *dissocket)
 {
-       radlog(L_INFO, "rlm_redis (%s): Closing socket %d",
-              inst->xlat_name, dissocket->id);
+       redisFree(dissocket->conn);
 
-       if (dissocket->state == sockconnected) {
-               redisFree(dissocket->conn);
+       if (dissocket->reply) {
+               freeReplyObject(dissocket->reply);
+               dissocket->reply = NULL;
        }
 
-#ifdef HAVE_PTHREAD_H
-       pthread_mutex_destroy(&dissocket->mutex);
-#endif
-
-       free(dissocket);
-       return 1;
+       return 0;
 }
 
-static int connect_single_socket(REDISSOCK *dissocket, REDIS_INST *inst)
+static void *mod_conn_create(TALLOC_CTX *ctx, void *instance)
 {
-       radlog(L_INFO, "rlm_redis (%s): Attempting to connect #%d",
-              inst->xlat_name, dissocket->id);
+       REDIS_INST *inst = instance;
+       REDISSOCK *dissocket = NULL;
+       redisContext *conn;
+       redisReply *reply = NULL;
+       char buffer[1024];
+
+       conn = redisConnect(inst->hostname, inst->port);
+       if (conn->err) return NULL;
 
-       dissocket->conn = redisConnect((char *) inst->hostname, inst->port);
+       if (inst->password) {
+               snprintf(buffer, sizeof(buffer), "AUTH %s", inst->password);
 
-       if (!dissocket->conn->err) {
-               radlog(L_INFO, "rlm_redis (%s): Connected new DB handle, #%d",
-                      inst->xlat_name, dissocket->id);
+               reply = redisCommand(conn, buffer);
+               if (!reply) {
+                       ERROR("rlm_redis (%s): Failed to run AUTH", inst->xlat_name);
+
+               do_close:
+                       if (reply) freeReplyObject(reply);
+                       redisFree(conn);
+                       return NULL;
+               }
 
-               dissocket->state = sockconnected;
 
-               dissocket->queries = 0;
-               return (0);
+               switch (reply->type) {
+               case REDIS_REPLY_STATUS:
+                       if (strcmp(reply->str, "OK") != 0) {
+                               ERROR("rlm_redis (%s): Failed authentication: reply %s",
+                                      inst->xlat_name, reply->str);
+                               goto do_close;
+                       }
+                       break;  /* else it's OK */
+
+               default:
+                       ERROR("rlm_redis (%s): Unexpected reply to AUTH",
+                              inst->xlat_name);
+                       goto do_close;
+               }
        }
 
-       /*
-        *  Error, or redis is DOWN.
-        */
-       radlog(L_CONS | L_ERR, "rlm_redis (%s): Failed to connect DB handle #%d",
-              inst->xlat_name, dissocket->id);
-       inst->connect_after = time(NULL) + inst->connect_failure_retry_delay;
-       dissocket->state = sockunconnected;
-       return (-1);
-}
+       if (inst->database) {
+               snprintf(buffer, sizeof(buffer), "SELECT %d", inst->database);
 
-static void redis_poolfree(REDIS_INST * inst)
-{
-       REDISSOCK *cur;
-       REDISSOCK *next;
+               reply = redisCommand(conn, buffer);
+               if (!reply) {
+                       ERROR("rlm_redis (%s): Failed to run SELECT",
+                              inst->xlat_name);
+                       goto do_close;
+               }
+
+
+               switch (reply->type) {
+               case REDIS_REPLY_STATUS:
+                       if (strcmp(reply->str, "OK") != 0) {
+                               ERROR("rlm_redis (%s): Failed SELECT %d: reply %s",
+                                      inst->xlat_name, inst->database,
+                                      reply->str);
+                               goto do_close;
+                       }
+                       break;  /* else it's OK */
 
-       for (cur = inst->redispool; cur; cur = next) {
-               next = cur->next;
-               redis_close_socket(inst, cur);
+               default:
+                       ERROR("rlm_redis (%s): Unexpected reply to SELECT",
+                              inst->xlat_name);
+                       goto do_close;
+               }
        }
 
-       inst->redispool = NULL;
+       dissocket = talloc_zero(ctx, REDISSOCK);
+       dissocket->conn = conn;
+       talloc_set_destructor(dissocket, _mod_conn_free);
+
+       return dissocket;
 }
 
-static int redis_xlat(void *instance, REQUEST *request,
-                     char *fmt, char *out, size_t freespace,
-                     RADIUS_ESCAPE_STRING func)
+static ssize_t redis_xlat(void *instance, REQUEST *request, char const *fmt, char *out, size_t freespace)
 {
        REDIS_INST *inst = instance;
        REDISSOCK *dissocket;
        size_t ret = 0;
        char *buffer_ptr;
        char buffer[21];
-       char querystr[MAX_QUERY_LEN];
 
-       if (!radius_xlat(querystr, sizeof(querystr), fmt, request, func)) {
-               radlog(L_ERR, "rlm_redis (%s): xlat failed.",
-                      inst->xlat_name);
-
-               return 0;
-       }
-
-       if ((dissocket = redis_get_socket(inst)) == NULL) {
-               radlog(L_ERR, "rlm_redis (%s): redis_get_socket() failed",
-                      inst->xlat_name);
-        
-               return 0;
-       }
+       dissocket = fr_connection_get(inst->pool);
+       if (!dissocket) return -1;
 
        /* Query failed for some reason, release socket and return */
-       if (rlm_redis_query(dissocket, inst, querystr) < 0) {
-               rlm_redis_finish_query(dissocket);
-               redis_release_socket(inst,dissocket);
-        
-               return 0;
+       if (rlm_redis_query(&dissocket, inst, fmt, request) < 0) {
+               goto release;
        }
 
-        switch (dissocket->reply->type) {
+       switch (dissocket->reply->type) {
        case REDIS_REPLY_INTEGER:
-                buffer_ptr = buffer;
-                snprintf(buffer_ptr, sizeof(buffer), "%lld",
+               buffer_ptr = buffer;
+               snprintf(buffer_ptr, sizeof(buffer), "%lld",
                         dissocket->reply->integer);
 
-                ret = strlen(buffer_ptr);
-                break;
+               ret = strlen(buffer_ptr);
+               break;
 
        case REDIS_REPLY_STATUS:
        case REDIS_REPLY_STRING:
-                buffer_ptr = dissocket->reply->str;
-                ret = dissocket->reply->len;
-                break;
+               buffer_ptr = dissocket->reply->str;
+               ret = dissocket->reply->len;
+               break;
 
        default:
-                buffer_ptr = NULL;
-                break;
-        }
+               buffer_ptr = NULL;
+               break;
+       }
 
-       if ((ret >= freespace) || (buffer_ptr == NULL)) {
+       if ((ret >= freespace) || (!buffer_ptr)) {
                RDEBUG("rlm_redis (%s): Can't write result, insufficient space or unsupported result\n",
                       inst->xlat_name);
-               
-               rlm_redis_finish_query(dissocket);
-               redis_release_socket(inst,dissocket);
-               
-               return 0;
+               ret = -1;
+               goto release;
        }
-       
-       strlcpy(out,buffer_ptr,freespace);
-       
+
+       strlcpy(out, buffer_ptr, freespace);
+
+release:
        rlm_redis_finish_query(dissocket);
-       redis_release_socket(inst,dissocket);
-       
+       fr_connection_release(inst->pool, dissocket);
+
        return ret;
 }
 
@@ -179,152 +183,61 @@ static int redis_xlat(void *instance, REQUEST *request,
  *     Only free memory we allocated.  The strings allocated via
  *     cf_section_parse() do not need to be freed.
  */
-static int redis_detach(void *instance)
+static int mod_detach(void *instance)
 {
        REDIS_INST *inst = instance;
 
-       redis_poolfree(inst);
-
-       if (inst->xlat_name) {
-               xlat_unregister(inst->xlat_name, (RAD_XLAT_FUNC)redis_xlat);
-               free(inst->xlat_name);
-       }
-       free(inst->xlat_name);
-       free(inst);
+       fr_connection_pool_delete(inst->pool);
 
        return 0;
 }
 
-static int redis_init_socketpool(REDIS_INST *inst)
-{
-       int i, rcode;
-       int success = 0;
-       REDISSOCK *dissocket;
-
-       inst->connect_after = 0;
-       inst->redispool = NULL;
-
-       for (i = 0; i < inst->numconnections; i++) {
-               radlog(L_DBG, "rlm_redis (%s): starting %d",
-                      inst->xlat_name, i);
-
-               dissocket = rad_malloc(sizeof (*dissocket));
-               if (dissocket == NULL) {
-                       return -1;
-               }
-               memset(dissocket, 0, sizeof (*dissocket));
-               dissocket->conn = NULL;
-               dissocket->id = i;
-               dissocket->state = sockunconnected;
-
-#ifdef HAVE_PTHREAD_H
-               rcode = pthread_mutex_init(&dissocket->mutex, NULL);
-               if (rcode != 0) {
-                       free(dissocket);
-                       radlog(L_ERR, "rlm_redis: Failed to init lock: %s",
-                              strerror(errno));
-                       return 0;
-               }
-#endif
-
-               if (time(NULL) > inst->connect_after) {
-                       /*
-                        *      This sets the dissocket->state, and
-                        *      possibly also inst->connect_after
-                        */
-                       if (connect_single_socket(dissocket, inst) == 0) {
-                               success = 1;
-                       }
-               }
-
-               /* Add "dis" socket to the list of sockets
-                * pun intended
-                */
-               dissocket->next = inst->redispool;
-               inst->redispool = dissocket;
-       }
-       inst->last_used = NULL;
-
-       if (!success) {
-               radlog(L_DBG, "rlm_redis (%s): Failed to connect to any redis server.",
-                      inst->xlat_name);
-       }
-
-       return 1;
-}
-
 /*
- *     TODO, Actually escape something...
+ *     Query the redis database
  */
-static size_t redis_escape_func(char *out, size_t outlen, const char *in)
+int rlm_redis_query(REDISSOCK **dissocket_p, REDIS_INST *inst,
+                   char const *query, REQUEST *request)
 {
-       size_t len = 0;
-
-       while (in[0]) {
-
-               if (outlen <= 1) {
-                       break;
-               }
-
-               /*
-                *      Allowed character.
-                */
-               *out = *in;
-               out++;
-               in++;
-               outlen--;
-               len++;
-       }
-       *out = '\0';
-       return len;
-}
+       REDISSOCK *dissocket;
+       int argc;
+       char *argv[MAX_REDIS_ARGS];
+       char argv_buf[MAX_QUERY_LEN];
 
-/*
- *     Free the redis database
- */
-int rlm_redis_query(REDISSOCK *dissocket, REDIS_INST *inst, char *query)
-{
-       if (!query || !*query) {
+       if (!query || !*query || !inst || !dissocket_p) {
                return -1;
        }
 
-       DEBUG2("executing query %s", query);
-       dissocket->reply = redisCommand(dissocket->conn, query);
+       argc = rad_expand_xlat(request, query, MAX_REDIS_ARGS, argv, false,
+                               sizeof(argv_buf), argv_buf);
+       if (argc <= 0)
+               return -1;
 
-       if (dissocket->reply == NULL) {
-               radlog(L_ERR, "rlm_redis: (%s) REDIS error: %s",
-                      inst->xlat_name, dissocket->conn->errstr);
+       dissocket = *dissocket_p;
 
-               /* close the socket that failed */
-               if (dissocket->state == sockconnected) {
-                       redis_close_socket(inst, dissocket);
-               }
+       DEBUG2("executing %s ...", argv[0]);
+       dissocket->reply = redisCommandArgv(dissocket->conn, argc, (char const **)(void **)argv, NULL);
+       if (!dissocket->reply) {
+               RERROR("%s", dissocket->conn->errstr);
 
-               /* reconnect the socket */
-               if (connect_single_socket(dissocket, inst) < 0) {
-                       radlog(L_ERR, "rlm_redis (%s): reconnect failed, database down?",
-                              inst->xlat_name);
+               dissocket = fr_connection_reconnect(inst->pool, dissocket);
+               if (!dissocket) {
+               error:
+                       *dissocket_p = NULL;
                        return -1;
                }
 
-               DEBUG2("executing query %s", query);
-               /* retry the query on the newly connected socket */
                dissocket->reply = redisCommand(dissocket->conn, query);
-
-               if (dissocket->reply == NULL) {
-                       radlog(L_ERR, "rlm_redis (%s): failed after re-connect",
-                              inst->xlat_name);
-                       return -1;
+               if (!dissocket->reply) {
+                       RERROR("Failed after re-connect");
+                       fr_connection_del(inst->pool, dissocket);
+                       goto error;
                }
+
+               *dissocket_p = dissocket;
        }
 
        if (dissocket->reply->type == REDIS_REPLY_ERROR) {
-               radlog(L_ERR, "rlm_redis (%s): query failed, %s",
-                      inst->xlat_name, query);
-
-               /* Free the reply just in case */
-               rlm_redis_finish_query(dissocket);
-
+               RERROR("Query failed, %s", query);
                return -1;
        }
 
@@ -332,235 +245,44 @@ int rlm_redis_query(REDISSOCK *dissocket, REDIS_INST *inst, char *query)
 }
 
 /*
- * Clear the redis reply object if any
+ *     Clear the redis reply object if any
  */
 int rlm_redis_finish_query(REDISSOCK *dissocket)
 {
-       if (dissocket == NULL) {
-               return -1;
-       }
-
-       if (dissocket->reply != NULL) {
-               freeReplyObject(dissocket->reply);
-       } else {
+       if (!dissocket || !dissocket->reply) {
                return -1;
        }
 
+       freeReplyObject(dissocket->reply);
+       dissocket->reply = NULL;
        return 0;
 }
 
-static time_t last_logged_failure = 0;
-
-/*************************************************************************
- *
- *     Function: redis_get_socket
- *
- *     Purpose: Return a REDIS socket from the connection pool
- *
- *************************************************************************/
-REDISSOCK *redis_get_socket(REDIS_INST *inst)
+static int mod_instantiate(CONF_SECTION *conf, void *instance)
 {
-       REDISSOCK *cur, *start;
-       int tried_to_connect = 0;
-       int unconnected = 0;
-       time_t now = time(NULL);
-
-       /*
-        *      Start at the last place we left off.
-        */
-       start = inst->last_used;
-       if (!start) start = inst->redispool;
-
-       cur = start;
-
-       while (cur) {
-#ifdef HAVE_PTHREAD_H
-               /*
-                *      If this socket is in use by another thread,
-                *      skip it, and try another socket.
-                *
-                *      If it isn't used, then grab it ourselves.
-                */
-               if (pthread_mutex_trylock(&cur->mutex) != 0) {
-                       goto next;
-               } /* else we now have the lock */
-#endif
-
-               /*
-                *      If the socket has outlived its lifetime, and
-                *      is connected, close it, and mark it as open for
-                *      reconnections.
-                */
-               if (inst->lifetime && (cur->state == sockconnected) &&
-                   ((cur->connected + inst->lifetime) < now)) {
-                       DEBUG2("Closing socket %d as its lifetime has been exceeded", cur->id);
-                       redis_close_socket(inst, cur);
-                       cur->state = sockunconnected;
-                       goto reconnect;
-               }
-
-               /*
-                *      If we have performed too many queries over this
-                *      socket, then close it.
-                */
-               if (inst->max_queries && (cur->state == sockconnected) &&
-                   (cur->queries >= inst->max_queries)) {
-                       DEBUG2("Closing socket %d as its max_queries has been exceeded", cur->id);
-                       redis_close_socket(inst, cur);
-                       cur->state = sockunconnected;
-                       goto reconnect;
-               }
+       static bool version_done;
 
-               /*
-                *      If we happen upon an unconnected socket, and
-                *      this instance's grace period on
-                *      (re)connecting has expired, then try to
-                *      connect it.  This should be really rare.
-                */
-               if ((cur->state == sockunconnected) && (now > inst->connect_after)) {
-               reconnect:
-                       radlog(L_INFO, "rlm_redis (%s): Trying to (re)connect unconnected handle %d..", inst->xlat_name, cur->id);
-                       tried_to_connect++;
-                       connect_single_socket(cur, inst);
-               }
-
-               /* if we still aren't connected, ignore this handle */
-               if (cur->state == sockunconnected) {
-                       DEBUG("rlm_redis (%s): Ignoring unconnected handle %d..", inst->xlat_name, cur->id);
-                       unconnected++;
-#ifdef HAVE_PTHREAD_H
-                       pthread_mutex_unlock(&cur->mutex);
-#endif
-                       goto next;
-               }
-
-               /* should be connected, grab it */
-               DEBUG("rlm_redis (%s): Reserving redis socket id: %d",
-                     inst->xlat_name, cur->id);
-
-               if (unconnected != 0 || tried_to_connect != 0) {
-                       DEBUG("rlm_redis (%s): got socket %d after skipping %d unconnected handles, tried to reconnect %d though",
-                             inst->xlat_name, cur->id, unconnected, tried_to_connect);
-               }
-
-               /*
-                *      The socket is returned in the locked
-                *      state.
-                *
-                *      We also remember where we left off,
-                *      so that the next search can start from
-                *      here.
-                *
-                *      Note that multiple threads MAY over-write
-                *      the 'inst->last_used' variable.  This is OK,
-                *      as it's a pointer only used for reading.
-                */
-               inst->last_used = cur->next;
-               cur->queries++;
-               return cur;
-
-               /* move along the list */
-       next:
-               cur = cur->next;
-
-               /*
-                *      Because we didnt start at the start, once we
-                *      hit the end of the linklist, we should go
-                *      back to the beginning and work toward the
-                *      middle!
-                */
-               if (!cur) {
-                       cur = inst->redispool;
-               }
-
-               /*
-                *      If we're at the socket we started
-                */
-               if (cur == start) {
-                       break;
-               }
-       }
-
-       /*
-        *      Suppress most of the log messages.  We don't want to
-        *      flood the log with this message for EVERY packet.
-        *      Instead, write to the log only once a second or so.
-        *
-        *      This code has race conditions when threaded, but the
-        *      only result is that a few more messages are logged.
-        */
-       if (now <= last_logged_failure) return NULL;
-       last_logged_failure = now;
-
-       /* We get here if every DB handle is unconnected and unconnectABLE */
-       radlog(L_INFO, "rlm_redis (%s): There are no DB handles to use! skipped %d, tried to connect %d",
-              inst->xlat_name, unconnected, tried_to_connect);
-       return NULL;
-}
-
-/*************************************************************************
- *
- *     Function: redis_release_socket
- *
- *     Purpose: Frees a REDIS socket back to the connection pool
- *
- *************************************************************************/
-int redis_release_socket(REDIS_INST *inst, REDISSOCK *dissocket)
-{
-
-#ifdef HAVE_PTHREAD_H
-       pthread_mutex_unlock(&dissocket->mutex);
-#endif
-
-       radlog(L_DBG, "rlm_redis (%s): Released redis socket id: %d",
-              inst->xlat_name, dissocket->id);
+       REDIS_INST *inst = instance;
 
-       return 0;
-}
+       if (!version_done) {
+               version_done = true;
 
-static int redis_instantiate(CONF_SECTION *conf, void **instance)
-{
-       REDIS_INST *inst;
-       const char *xlat_name;
-
-       /*
-        *      Set up a storage area for instance data
-        */
-       inst = rad_malloc(sizeof (REDIS_INST));
-       if (!inst) {
-               return -1;
-       }
-       memset(inst, 0, sizeof (*inst));
-
-       /*
-        *      If the configuration parameters can't be parsed, then
-        *      fail.
-        */
-       if (cf_section_parse(conf, inst, module_config) < 0) {
-               free(inst);
-               return -1;
+               INFO("rlm_redis: libhiredis version: %i.%i.%i", HIREDIS_MAJOR, HIREDIS_MINOR, HIREDIS_PATCH);
        }
 
-       xlat_name = cf_section_name2(conf);
+       inst->xlat_name = cf_section_name2(conf);
 
-       if (!xlat_name)
-               xlat_name = cf_section_name1(conf);
+       if (!inst->xlat_name) inst->xlat_name = cf_section_name1(conf);
 
-       inst->xlat_name = strdup(xlat_name);
-       xlat_register(inst->xlat_name, (RAD_XLAT_FUNC)redis_xlat, inst);
+       xlat_register(inst->xlat_name, redis_xlat, NULL, inst); /* FIXME! */
 
-       if (redis_init_socketpool(inst) < 0) {
-               redis_detach(inst);
+       inst->pool = fr_connection_pool_init(conf, inst, mod_conn_create, NULL, NULL, NULL);
+       if (!inst->pool) {
                return -1;
        }
 
        inst->redis_query = rlm_redis_query;
        inst->redis_finish_query = rlm_redis_finish_query;
-       inst->redis_get_socket = redis_get_socket;
-       inst->redis_release_socket = redis_release_socket;
-       inst->redis_escape_func = redis_escape_func;
-
-       *instance = inst;
 
        return 0;
 }
@@ -569,8 +291,10 @@ module_t rlm_redis = {
        RLM_MODULE_INIT,
        "redis",
        RLM_TYPE_THREAD_SAFE, /* type */
-       redis_instantiate, /* instantiation */
-       redis_detach, /* detach */
+       sizeof(REDIS_INST),     /* yuck */
+       module_config,
+       mod_instantiate, /* instantiation */
+       mod_detach, /* detach */
        {
                NULL, /* authentication */
                NULL, /* authorization */