Add retry feature for insert and update operations.
[shibboleth/sp.git] / odbc-store / odbc-store.cpp
index 2fd13c4..838cc4b 100644 (file)
@@ -92,7 +92,9 @@ CREATE TABLE texts (
 
 namespace {
     static const XMLCh cleanupInterval[] =  UNICODE_LITERAL_15(c,l,e,a,n,u,p,I,n,t,e,r,v,a,l);
+    static const XMLCh isolationLevel[] =   UNICODE_LITERAL_14(i,s,o,l,a,t,i,o,n,L,e,v,e,l);
     static const XMLCh ConnectionString[] = UNICODE_LITERAL_16(C,o,n,n,e,c,t,i,o,n,S,t,r,i,n,g);
+    static const XMLCh RetryOnError[] =     UNICODE_LITERAL_12(R,e,t,r,y,O,n,E,r,r,o,r);
 
     // RAII for ODBC handles
     struct ODBCConn {
@@ -172,7 +174,7 @@ namespace {
         SQLHDBC getHDBC();
         SQLHSTMT getHSTMT(SQLHDBC);
         pair<int,int> getVersion(SQLHDBC);
-        bool log_error(SQLHANDLE handle, SQLSMALLINT htype, const char* checkfor=NULL);
+        pair<bool,bool> log_error(SQLHANDLE handle, SQLSMALLINT htype, const char* checkfor=NULL);
 
         static void* cleanup_fn(void*); 
         void cleanup();
@@ -185,6 +187,8 @@ namespace {
 
         SQLHENV m_henv;
         string m_connstring;
+        long m_isolation;
+        vector<SQLINTEGER> m_retries;
     };
 
     StorageService* ODBCStorageServiceFactory(const DOMElement* const & e)
@@ -253,7 +257,7 @@ namespace {
 };
 
 ODBCStorageService::ODBCStorageService(const DOMElement* e) : m_log(Category::getInstance("XMLTooling.StorageService")),
-   m_cleanupInterval(900), shutdown_wait(NULL), cleanup_thread(NULL), shutdown(false), m_henv(SQL_NULL_HANDLE)
+   m_cleanupInterval(900), shutdown_wait(NULL), cleanup_thread(NULL), shutdown(false), m_henv(SQL_NULL_HANDLE), m_isolation(SQL_TXN_SERIALIZABLE)
 {
 #ifdef _DEBUG
     xmltooling::NDC ndc("ODBCStorageService");
@@ -265,6 +269,20 @@ ODBCStorageService::ODBCStorageService(const DOMElement* e) : m_log(Category::ge
     if (!m_cleanupInterval)
         m_cleanupInterval = 900;
 
+    auto_ptr_char iso(e ? e->getAttributeNS(NULL,isolationLevel) : NULL);
+    if (iso.get() && *iso.get()) {
+        if (!strcmp(iso.get(),"SERIALIZABLE"))
+            m_isolation = SQL_TXN_SERIALIZABLE;
+        else if (!strcmp(iso.get(),"REPEATABLE_READ"))
+            m_isolation = SQL_TXN_REPEATABLE_READ;
+        else if (!strcmp(iso.get(),"READ_COMMITTED"))
+            m_isolation = SQL_TXN_READ_COMMITTED;
+        else if (!strcmp(iso.get(),"READ_UNCOMMITTED"))
+            m_isolation = SQL_TXN_READ_UNCOMMITTED;
+        else
+            throw XMLToolingException("Unknown transaction isolationLevel property.");
+    }
+
     if (m_henv == SQL_NULL_HANDLE) {
         // Enable connection pooling.
         SQLSetEnvAttr(SQL_NULL_HANDLE, SQL_ATTR_CONNECTION_POOLING, (void*)SQL_CP_ONE_PER_HENV, 0);
@@ -299,6 +317,16 @@ ODBCStorageService::ODBCStorageService(const DOMElement* e) : m_log(Category::ge
         throw XMLToolingException("Unknown database version for ODBC StorageService.");
     }
 
+    // Load any retry errors to check.
+    e = XMLHelper::getNextSiblingElement(e,RetryOnError);
+    while (e) {
+        if (e->hasChildNodes()) {
+            m_retries.push_back(XMLString::parseInt(e->getFirstChild()->getNodeValue()));
+            m_log.info("will retry operations when native ODBC error (%ld) is returned", m_retries.back());
+        }
+        e = XMLHelper::getNextSiblingElement(e,RetryOnError);
+    }
+
     // Initialize the cleanup thread
     shutdown_wait = CondWait::create();
     cleanup_thread = Thread::create(&cleanup_fn, (void*)this);
@@ -314,7 +342,7 @@ ODBCStorageService::~ODBCStorageService()
         SQLFreeHandle(SQL_HANDLE_ENV, m_henv);
 }
 
-bool ODBCStorageService::log_error(SQLHANDLE handle, SQLSMALLINT htype, const char* checkfor)
+pair<bool,bool> ODBCStorageService::log_error(SQLHANDLE handle, SQLSMALLINT htype, const char* checkfor)
 {
     SQLSMALLINT         i = 0;
     SQLINTEGER  native;
@@ -323,13 +351,15 @@ bool ODBCStorageService::log_error(SQLHANDLE handle, SQLSMALLINT htype, const ch
     SQLSMALLINT         len;
     SQLRETURN   ret;
 
-    bool res = false;
+    pair<bool,bool> res = make_pair(false,false);
     do {
         ret = SQLGetDiagRec(htype, handle, ++i, state, &native, text, sizeof(text), &len);
         if (SQL_SUCCEEDED(ret)) {
             m_log.error("ODBC Error: %s:%ld:%ld:%s", state, i, native, text);
+            for (vector<SQLINTEGER>::const_iterator n = m_retries.begin(); !res.first && n != m_retries.end(); ++n)
+                res.first = (*n == native);
             if (checkfor && !strcmp(checkfor, (const char*)state))
-                res = true;
+                res.second = true;
         }
     } while(SQL_SUCCEEDED(ret));
     return res;
@@ -357,9 +387,9 @@ SQLHDBC ODBCStorageService::getHDBC()
         throw IOException("ODBC StorageService failed to connect to database.");
     }
 
-    sr = SQLSetConnectAttr(handle, SQL_ATTR_TXN_ISOLATION, (SQLPOINTER)SQL_TXN_SERIALIZABLE, NULL);
+    sr = SQLSetConnectAttr(handle, SQL_ATTR_TXN_ISOLATION, (SQLPOINTER)m_isolation, NULL);
     if (!SQL_SUCCEEDED(sr))
-        throw IOException("ODBC StorageService failed to enable transaction isolation.");
+        throw IOException("ODBC StorageService failed to set transaction isolation level.");
 
     return handle;
 }
@@ -460,16 +490,23 @@ bool ODBCStorageService::createRow(const char* table, const char* context, const
     //freeSafeSQL(svalue, value);
     //m_log.debug("SQL: %s", q.c_str());
 
-    sr=SQLExecute(stmt);
-    if (!SQL_SUCCEEDED(sr)) {
+    int attempts = 3;
+    pair<bool,bool> logres;
+    do {
+        logres = make_pair(false,false);
+        attempts--;
+        sr=SQLExecute(stmt);
+        if (SQL_SUCCEEDED(sr)) {
+            m_log.debug("SQLExecute of insert succeeded");
+            return true;
+        }
         m_log.error("insert record failed (t=%s, c=%s, k=%s)", table, context, key);
-        if (log_error(stmt, SQL_HANDLE_STMT, "23000"))
+        logres = log_error(stmt, SQL_HANDLE_STMT, "23000");
+        if (logres.second)
             return false;   // supposedly integrity violation?
-        throw IOException("ODBC StorageService failed to insert record.");
-    }
+    } while (attempts && logres.first);
 
-    m_log.debug("SQLExecute of insert succeeded");
-    return true;
+    throw IOException("ODBC StorageService failed to insert record.");
 }
 
 int ODBCStorageService::readRow(
@@ -563,7 +600,7 @@ int ODBCStorageService::updateRow(const char *table, const char* context, const
     char *scontext = makeSafeSQL(context);
     char *skey = makeSafeSQL(key);
     string q("SELECT version FROM ");
-    q = q + table + " WHERE context='" + scontext + "' AND id='" + key + "' AND expires > " + timebuf;
+    q = q + table + " WHERE context='" + scontext + "' AND id='" + skey + "' AND expires > " + timebuf;
 
     m_log.debug("SQL: %s", q.c_str());
 
@@ -607,7 +644,7 @@ int ODBCStorageService::updateRow(const char *table, const char* context, const
         q = q + "expires = " + timebuf;
     }
 
-    q = q + " WHERE context='" + scontext + "' AND id='" + key + "'";
+    q = q + " WHERE context='" + scontext + "' AND id='" + skey + "'";
     freeSafeSQL(scontext, context);
     freeSafeSQL(skey, key);
 
@@ -633,17 +670,24 @@ int ODBCStorageService::updateRow(const char *table, const char* context, const
         m_log.debug("SQLBindParam succeded (context = %s)", context);
     }
 
-    sr=SQLExecute(stmt);
-    if (sr==SQL_NO_DATA)
-        return 0;   // went missing?
-    else if (!SQL_SUCCEEDED(sr)) {
+    int attempts = 3;
+    pair<bool,bool> logres;
+    do {
+        logres = make_pair(false,false);
+        attempts--;
+        sr=SQLExecute(stmt);
+        if (sr==SQL_NO_DATA)
+            return 0;   // went missing?
+        else if (SQL_SUCCEEDED(sr)) {
+            m_log.debug("SQLExecute of update succeeded");
+            return ver + 1;
+        }
+
         m_log.error("update of record failed (t=%s, c=%s, k=%s", table, context, key);
-        log_error(stmt, SQL_HANDLE_STMT);
-        throw IOException("ODBC StorageService failed to update record.");
-    }
+        logres = log_error(stmt, SQL_HANDLE_STMT);
+    } while (attempts && logres.first);
 
-    m_log.debug("SQLExecute of update succeeded");
-    return ver + 1;
+    throw IOException("ODBC StorageService failed to update record.");
 }
 
 bool ODBCStorageService::deleteRow(const char *table, const char *context, const char* key)