using namespace std;
#define PLUGIN_VER_MAJOR 1
-#define PLUGIN_VER_MINOR 0
+#define PLUGIN_VER_MINOR 1
#define LONGDATA_BUFLEN 16384
context varchar(255) not null,
id varchar(255) not null,
expires datetime not null,
- version smallint not null,
+ version int not null,
value varchar(255) not null,
PRIMARY KEY (context, id)
)
context varchar(255) not null,
id varchar(255) not null,
expires datetime not null,
- version smallint not null,
+ version int not null,
value text not null,
PRIMARY KEY (context, id)
)
SQLHENV m_henv;
string m_connstring;
long m_isolation;
+ bool m_wideVersion;
vector<SQLINTEGER> m_retries;
};
ODBCStorageService::ODBCStorageService(const DOMElement* e) : m_log(Category::getInstance("XMLTooling.StorageService")),
m_caps(XMLHelper::getAttrInt(e, 255, contextSize), XMLHelper::getAttrInt(e, 255, keySize), XMLHelper::getAttrInt(e, 255, stringSize)),
m_cleanupInterval(XMLHelper::getAttrInt(e, 900, cleanupInterval)),
- cleanup_thread(nullptr), shutdown(false), m_henv(SQL_NULL_HENV), m_isolation(SQL_TXN_SERIALIZABLE)
+ cleanup_thread(nullptr), shutdown(false), m_henv(SQL_NULL_HENV), m_isolation(SQL_TXN_SERIALIZABLE), m_wideVersion(false)
{
#ifdef _DEBUG
xmltooling::NDC ndc("ODBCStorageService");
m_log.crit("unknown database version: %d.%d", v.first, v.second);
throw XMLToolingException("Unknown database version for ODBC StorageService.");
}
+
+ if (v.first > 1 || v.second > 0) {
+ m_log.info("using 32-bit int type for version fields in tables");
+ m_wideVersion = true;
+ }
// Load any retry errors to check.
e = XMLHelper::getNextSiblingElement(e, RetryOnError);
while (e) {
if (e->hasChildNodes()) {
- m_retries.push_back(XMLString::parseInt(e->getTextContent()));
- m_log.info("will retry operations when native ODBC error (%ld) is returned", m_retries.back());
+ try {
+ int code = XMLString::parseInt(e->getTextContent());
+ m_retries.push_back(code);
+ m_log.info("will retry operations when native ODBC error (%d) is returned", code);
+ }
+ catch (XMLException&) {
+ m_log.error("skipping non-numeric ODBC retry code");
+ }
}
e = XMLHelper::getNextSiblingElement(e, RetryOnError);
}
- // Initialize the cleanup thread
- shutdown_wait.reset(CondWait::create());
- cleanup_thread = Thread::create(&cleanup_fn, (void*)this);
+ if (m_cleanupInterval > 0) {
+ // Initialize the cleanup thread
+ shutdown_wait.reset(CondWait::create());
+ cleanup_thread = Thread::create(&cleanup_fn, (void*)this);
+ }
+ else {
+ m_log.info("no cleanup interval configured, no cleanup thread will be started");
+ }
}
ODBCStorageService::~ODBCStorageService()
{
shutdown = true;
- shutdown_wait->signal();
- cleanup_thread->join(nullptr);
- if (m_henv != SQL_NULL_HANDLE)
+ if (shutdown_wait.get()) {
+ shutdown_wait->signal();
+ }
+ if (cleanup_thread) {
+ cleanup_thread->join(nullptr);
+ }
+ if (m_henv != SQL_NULL_HANDLE) {
SQLFreeHandle(SQL_HANDLE_ENV, m_henv);
+ }
}
pair<bool,bool> ODBCStorageService::log_error(SQLHANDLE handle, SQLSMALLINT htype, const char* checkfor)
}
m_log.error("insert record failed (t=%s, c=%s, k=%s)", table, context, key);
logres = log_error(stmt, SQL_HANDLE_STMT, "23000");
- if (logres.second)
- return false; // supposedly integrity violation?
+ if (logres.second) {
+ // Supposedly integrity violation.
+ // Try and delete any expired record still hanging around until the final attempt.
+ if (attempts > 0) {
+ reap(table, context);
+ logres.first = true; // force it to treat as a retryable error
+ continue;
+ }
+ return false;
+ }
} while (attempts && logres.first);
throw IOException("ODBC StorageService failed to insert record.");
}
SQLSMALLINT ver;
+ SQLINTEGER widever;
SQL_TIMESTAMP_STRUCT expiration;
- SQLBindCol(stmt, 1, SQL_C_SSHORT, &ver, 0, nullptr);
+ if (m_wideVersion)
+ SQLBindCol(stmt, 1, SQL_C_SLONG, &widever, 0, nullptr);
+ else
+ SQLBindCol(stmt, 1, SQL_C_SSHORT, &ver, 0, nullptr);
if (pexpiration)
SQLBindCol(stmt, 2, SQL_C_TYPE_TIMESTAMP, &expiration, 0, nullptr);
- if ((sr = SQLFetch(stmt)) == SQL_NO_DATA)
+ if ((sr = SQLFetch(stmt)) == SQL_NO_DATA) {
+ if (m_log.isDebugEnabled())
+ m_log.debug("search returned no data (t=%s, c=%s, k=%s)", table, context, key);
return 0;
+ }
if (pexpiration)
*pexpiration = timeFromTimestamp(expiration);
- if (version == ver)
+ if (version == (m_wideVersion ? widever : ver)) {
+ if (m_log.isDebugEnabled())
+ m_log.debug("versioned search detected no change (t=%s, c=%s, k=%s)", table, context, key);
return version; // nothing's changed, so just echo back the version
+ }
if (pvalue) {
SQLLEN len;
}
}
- return ver;
+ return (m_wideVersion ? widever : ver);
}
int ODBCStorageService::updateRow(const char *table, const char* context, const char* key, const char* value, time_t expiration, int version)
}
SQLSMALLINT ver;
- SQLBindCol(stmt, 1, SQL_C_SSHORT, &ver, 0, nullptr);
+ SQLINTEGER widever;
+ if (m_wideVersion)
+ SQLBindCol(stmt, 1, SQL_C_SLONG, &widever, 0, nullptr);
+ else
+ SQLBindCol(stmt, 1, SQL_C_SSHORT, &ver, 0, nullptr);
if ((sr = SQLFetch(stmt)) == SQL_NO_DATA) {
return 0;
}
// Check version?
- if (version > 0 && version != ver) {
+ if (version > 0 && version != (m_wideVersion ? widever : ver)) {
return -1;
}
+ else if ((m_wideVersion && widever == INT_MAX) || (!m_wideVersion && ver == 32767)) {
+ m_log.error("record version overflow (t=%s, c=%s, k=%s)", table, context, key);
+ throw IOException("Version overflow, record in ODBC StorageService could not be updated.");
+ }
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
stmt = getHSTMT(conn);
else
sr = SQLBindParam(stmt, 1, SQL_C_CHAR, SQL_VARCHAR, 255, 0, const_cast<char*>(value), &b_ind);
if (!SQL_SUCCEEDED(sr)) {
- m_log.error("SQLBindParam failed (context = %s)", context);
+ m_log.error("SQLBindParam failed (value = %s)", value);
log_error(stmt, SQL_HANDLE_STMT);
throw IOException("ODBC StorageService failed to update record.");
}
- m_log.debug("SQLBindParam succeeded (context = %s)", context);
+ m_log.debug("SQLBindParam succeeded (value = %s)", value);
}
int attempts = 3;
return 0; // went missing?
else if (SQL_SUCCEEDED(sr)) {
m_log.debug("SQLExecute of update succeeded");
- return ver + 1;
+ return (m_wideVersion ? widever : ver) + 1;
}
- m_log.error("update of record failed (t=%s, c=%s, k=%s", table, context, key);
+ m_log.error("update of record failed (t=%s, c=%s, k=%s)", table, context, key);
logres = log_error(stmt, SQL_HANDLE_STMT);
} while (attempts && logres.first);