2 * Copyright 2001-2009 Internet2
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 * Storage Service using memcache (pre memcache tags)
23 #if defined (_MSC_VER) || defined(__BORLANDC__)
24 # include "config_win32.h"
30 # define _CRT_NONSTDC_NO_DEPRECATE 1
31 # define _CRT_SECURE_NO_DEPRECATE 1
32 # define MCEXT_EXPORTS __declspec(dllexport)
34 # define MCEXT_EXPORTS
37 #include <xmltooling/base.h>
41 #include <libmemcached/memcached.h>
42 #include <xercesc/util/XMLUniDefs.hpp>
44 #include <xmltooling/logging.h>
45 #include <xmltooling/unicode.h>
46 #include <xmltooling/XMLToolingConfig.h>
47 #include <xmltooling/util/NDC.h>
48 #include <xmltooling/util/StorageService.h>
49 #include <xmltooling/util/Threads.h>
50 #include <xmltooling/util/XMLHelper.h>
52 using namespace xmltooling::logging;
53 using namespace xmltooling;
54 using namespace xercesc;
57 namespace xmltooling {
58 static const XMLCh Hosts[] = UNICODE_LITERAL_5(H,o,s,t,s);
59 static const XMLCh prefix[] = UNICODE_LITERAL_6(p,r,e,f,i,x);
60 static const XMLCh buildMap[] = UNICODE_LITERAL_8(b,u,i,l,d,M,a,p);
61 static const XMLCh sendTimeout[] = UNICODE_LITERAL_11(s,e,n,d,T,i,m,e,o,u,t);
62 static const XMLCh recvTimeout[] = UNICODE_LITERAL_11(r,e,c,v,T,i,m,e,o,u,t);
63 static const XMLCh pollTimeout[] = UNICODE_LITERAL_11(p,o,l,l,T,i,m,e,o,u,t);
64 static const XMLCh failLimit[] = UNICODE_LITERAL_9(f,a,i,l,L,i,m,i,t);
65 static const XMLCh retryTimeout[] = UNICODE_LITERAL_12(r,e,t,r,y,T,i,m,e,o,u,t);
72 mc_record(string _v, time_t _e) :
73 value(_v), expiration(_e)
79 MemcacheBase(const DOMElement* e);
82 bool addMemcache(const char *key,
86 bool use_prefix = true);
87 bool setMemcache(const char *key,
91 bool use_prefix = true);
92 bool replaceMemcache(const char *key,
96 bool use_prefix = true);
97 bool getMemcache(const char *key,
100 bool use_prefix = true);
101 bool deleteMemcache(const char *key,
103 bool use_prefix = true);
105 void serialize(mc_record &source, string &dest);
106 void serialize(list<string> &source, string &dest);
107 void deserialize(string &source, mc_record &dest);
108 void deserialize(string &source, list<string> &dest);
110 bool addSessionToUser(string &key, string &user);
111 bool addLock(string what, bool use_prefix = true);
112 void deleteLock(string what, bool use_prefix = true);
115 const DOMElement* m_root; // can only use this during initialization
122 class MemcacheStorageService : public StorageService, public MemcacheBase {
125 MemcacheStorageService(const DOMElement* e);
126 ~MemcacheStorageService();
128 bool createString(const char* context, const char* key, const char* value, time_t expiration);
129 int readString(const char* context, const char* key, string* pvalue=NULL, time_t* pexpiration=NULL, int version=0);
130 int updateString(const char* context, const char* key, const char* value=NULL, time_t expiration=0, int version=0);
131 bool deleteString(const char* context, const char* key);
133 bool createText(const char* context, const char* key, const char* value, time_t expiration) {
134 return createString(context, key, value, expiration);
136 int readText(const char* context, const char* key, string* pvalue=NULL, time_t* pexpiration=NULL, int version=0) {
137 return readString(context, key, pvalue, pexpiration, version);
139 int updateText(const char* context, const char* key, const char* value=NULL, time_t expiration=0, int version=0) {
140 return updateString(context, key, value, expiration, version);
142 bool deleteText(const char* context, const char* key) {
143 return deleteString(context, key);
146 void reap(const char* context) {}
148 void updateContext(const char* context, time_t expiration);
149 void deleteContext(const char* context);
159 StorageService* MemcacheStorageServiceFactory(const DOMElement* const & e) {
160 return new MemcacheStorageService(e);
165 bool MemcacheBase::addLock(string what, bool use_prefix) {
166 string lock_name = what + ":LOCK";
167 string set_val = "1";
169 while (!addMemcache(lock_name.c_str(), set_val, 5, 0, use_prefix)) {
171 log.debug("Unable to get lock %s... FAILED.", lock_name.c_str());
174 log.debug("Unable to get lock %s... Retrying.", lock_name.c_str());
180 struct timeval tv = { 0, 100000 };
181 select(0, 0, 0, 0, &tv);
187 void MemcacheBase::deleteLock(string what, bool use_prefix) {
189 string lock_name = what + ":LOCK";
190 deleteMemcache(lock_name.c_str(), 0, use_prefix);
195 void MemcacheBase::deserialize(string &source, mc_record &dest) {
196 istringstream is(source, stringstream::in | stringstream::out);
197 is >> dest.expiration;
198 is.ignore(1); // ignore delimiter
199 dest.value = is.str().c_str() + is.tellg();
202 void MemcacheBase::deserialize(string &source, list<string> &dest) {
203 istringstream is(source, stringstream::in | stringstream::out);
211 void MemcacheBase::serialize(mc_record &source, string &dest) {
212 ostringstream os(stringstream::in | stringstream::out);
213 os << source.expiration;
214 os << "-"; // delimiter
219 void MemcacheBase::serialize(list<string> &source, string &dest) {
220 ostringstream os(stringstream::in | stringstream::out);
221 for(list<string>::iterator iter = source.begin(); iter != source.end(); iter++) {
222 if (iter != source.begin()) {
230 bool MemcacheBase::addSessionToUser(string &key, string &user) {
232 if (! addLock(user, false)) {
238 string sessid = m_prefix + key; // add specific prefix to session
239 string delimiter = ";";
240 string user_key = "UDATA:";
244 bool result = getMemcache(user_key.c_str(), user_val, &flags, false);
247 bool already_there = false;
248 // skip delimiters at beginning.
249 string::size_type lastPos = user_val.find_first_not_of(delimiter, 0);
251 // find first "non-delimiter".
252 string::size_type pos = user_val.find_first_of(delimiter, lastPos);
254 while (string::npos != pos || string::npos != lastPos) {
255 // found a token, add it to the vector.
256 string session = user_val.substr(lastPos, pos - lastPos);
257 if (strcmp(session.c_str(), sessid.c_str()) == 0) {
258 already_there = true;
262 // skip delimiters. Note the "not_of"
263 lastPos = user_val.find_first_not_of(delimiter, pos);
265 // find next "non-delimiter"
266 pos = user_val.find_first_of(delimiter, lastPos);
269 if (!already_there) {
270 user_val += delimiter + sessid;
271 replaceMemcache(user_key.c_str(), user_val, 0, 0, false);
274 addMemcache(user_key.c_str(), sessid, 0, 0, false);
277 deleteLock(user, false);
282 bool MemcacheBase::deleteMemcache(const char *key,
290 final_key = m_prefix + key;
296 rv = memcached_delete(memc, (char *)final_key.c_str(), final_key.length(), timeout);
299 if (rv == MEMCACHED_SUCCESS) {
301 } else if (rv == MEMCACHED_NOTFOUND) {
302 // Key wasn't there... No biggie.
304 } else if (rv == MEMCACHED_ERRNO) {
306 string error = string("Memcache::deleteMemcache() SYSTEM ERROR: ") + string(strerror(memc->cached_errno));
308 throw IOException(error);
310 string error = string("Memcache::deleteMemcache() Problems: ") + memcached_strerror(memc, rv);
312 throw IOException(error);
318 bool MemcacheBase::getMemcache(const char *key,
329 final_key = m_prefix + key;
335 result = memcached_get(memc, (char *)final_key.c_str(), final_key.length(), &len, flags, &rv);
338 if (rv == MEMCACHED_SUCCESS) {
342 } else if (rv == MEMCACHED_NOTFOUND) {
343 log.debug("Key %s not found in memcache...", key);
345 } else if (rv == MEMCACHED_ERRNO) {
347 string error = string("Memcache::getMemcache() SYSTEM ERROR: ") + string(strerror(memc->cached_errno));
349 throw IOException(error);
351 string error = string("Memcache::getMemcache() Problems: ") + memcached_strerror(memc, rv);
353 throw IOException(error);
359 bool MemcacheBase::addMemcache(const char *key,
370 final_key = m_prefix + key;
376 rv = memcached_add(memc, (char *)final_key.c_str(), final_key.length(), (char *)value.c_str(), value.length(), timeout, flags);
379 if (rv == MEMCACHED_SUCCESS) {
381 } else if (rv == MEMCACHED_NOTSTORED) {
384 } else if (rv == MEMCACHED_ERRNO) {
386 string error = string("Memcache::addMemcache() SYSTEM ERROR: ") + string(strerror(memc->cached_errno));
388 throw IOException(error);
390 string error = string("Memcache::addMemcache() Problems: ") + memcached_strerror(memc, rv);
392 throw IOException(error);
398 bool MemcacheBase::setMemcache(const char *key,
409 final_key = m_prefix + key;
415 rv = memcached_set(memc, (char *)final_key.c_str(), final_key.length(), (char *)value.c_str(), value.length(), timeout, flags);
418 if (rv == MEMCACHED_SUCCESS) {
420 } else if (rv == MEMCACHED_ERRNO) {
422 string error = string("Memcache::setMemcache() SYSTEM ERROR: ") + string(strerror(memc->cached_errno));
424 throw IOException(error);
426 string error = string("Memcache::setMemcache() Problems: ") + memcached_strerror(memc, rv);
428 throw IOException(error);
434 bool MemcacheBase::replaceMemcache(const char *key,
445 final_key = m_prefix + key;
451 rv = memcached_replace(memc, (char *)final_key.c_str(), final_key.length(), (char *)value.c_str(), value.length(), timeout, flags);
454 if (rv == MEMCACHED_SUCCESS) {
456 } else if (rv == MEMCACHED_NOTSTORED) {
459 } else if (rv == MEMCACHED_ERRNO) {
461 string error = string("Memcache::replaceMemcache() SYSTEM ERROR: ") + string(strerror(memc->cached_errno));
463 throw IOException(error);
465 string error = string("Memcache::replaceMemcache() Problems: ") + memcached_strerror(memc, rv);
467 throw IOException(error);
473 MemcacheBase::MemcacheBase(const DOMElement* e) : m_root(e), log(Category::getInstance("XMLTooling.MemcacheBase")), m_prefix("") {
475 auto_ptr_char p(e ? e->getAttributeNS(NULL,prefix) : NULL);
476 if (p.get() && *p.get()) {
477 log.debug("INIT: GOT key prefix: %s", p.get());
481 m_lock = Mutex::create();
482 log.debug("Lock created");
484 memc = memcached_create(NULL);
486 throw XMLToolingException("MemcacheBase::Memcache(): memcached_create() failed");
489 log.debug("Memcache created");
491 unsigned int hash = MEMCACHED_HASH_CRC;
492 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_HASH, hash);
493 log.debug("CRC hash set");
495 int32_t send_timeout = 1000000;
496 const XMLCh* tag = e ? e->getAttributeNS(NULL, sendTimeout) : NULL;
498 send_timeout = XMLString::parseInt(tag);
500 log.debug("MEMCACHED_BEHAVIOR_SND_TIMEOUT will be set to %d", send_timeout);
501 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SND_TIMEOUT, send_timeout);
503 int32_t recv_timeout = 1000000;
504 tag = e ? e->getAttributeNS(NULL, sendTimeout) : NULL;
506 recv_timeout = XMLString::parseInt(tag);
508 log.debug("MEMCACHED_BEHAVIOR_RCV_TIMEOUT will be set to %d", recv_timeout);
509 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_RCV_TIMEOUT, recv_timeout);
511 int32_t poll_timeout = 1000;
512 tag = e ? e->getAttributeNS(NULL, pollTimeout) : NULL;
514 poll_timeout = XMLString::parseInt(tag);
516 log.debug("MEMCACHED_BEHAVIOR_POLL_TIMEOUT will be set to %d", poll_timeout);
517 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, poll_timeout);
519 int32_t fail_limit = 5;
520 tag = e ? e->getAttributeNS(NULL, failLimit) : NULL;
522 fail_limit = XMLString::parseInt(tag);
524 log.debug("MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT will be set to %d", fail_limit);
525 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT, fail_limit);
527 int32_t retry_timeout = 30;
528 tag = e ? e->getAttributeNS(NULL, retryTimeout) : NULL;
530 retry_timeout = XMLString::parseInt(tag);
532 log.debug("MEMCACHED_BEHAVIOR_RETRY_TIMEOUT will be set to %d", retry_timeout);
533 memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_RETRY_TIMEOUT, retry_timeout);
535 // Grab hosts from the configuration.
536 e = e ? XMLHelper::getFirstChildElement(e,Hosts) : NULL;
537 if (!e || !e->hasChildNodes()) {
538 throw XMLToolingException("Memcache StorageService requires Hosts element in configuration.");
540 auto_ptr_char h(e->getFirstChild()->getNodeValue());
541 log.debug("INIT: GOT Hosts: %s", h.get());
542 memcached_server_st *servers;
543 servers = memcached_servers_parse(const_cast<char*>(h.get()));
544 log.debug("Got %u hosts.", memcached_server_list_count(servers));
545 if (memcached_server_push(memc, servers) != MEMCACHED_SUCCESS) {
546 throw IOException("MemcacheBase::Memcache(): memcached_server_push() failed");
548 memcached_server_list_free(servers);
550 log.debug("Memcache object initialized");
553 MemcacheBase::~MemcacheBase() {
554 memcached_free(memc);
556 log.debug("Base object destroyed");
559 MemcacheStorageService::MemcacheStorageService(const DOMElement* e)
560 : MemcacheBase(e), m_log(Category::getInstance("XMLTooling.MemcacheStorageService")), m_buildMap(false) {
562 const XMLCh* tag=e ? e->getAttributeNS(NULL,buildMap) : NULL;
563 if (tag && *tag && XMLString::parseInt(tag) != 0) {
565 m_log.debug("Cache built with buildMap ON");
570 MemcacheStorageService::~MemcacheStorageService() {
575 bool MemcacheStorageService::createString(const char* context, const char* key, const char* value, time_t expiration) {
577 log.debug("createString ctx: %s - key: %s", context, key);
579 string final_key = string(context) + ":" + string(key);
581 mc_record rec(value, expiration);
583 serialize(rec, final_value);
585 bool result = addMemcache(final_key.c_str(), final_value, expiration, 1); // the flag will be the version
587 if (result && m_buildMap) {
588 log.debug("Got result, updating map");
590 string map_name = context;
591 // we need to update the context map
592 if (! addLock(map_name)) {
593 log.error("Unable to get lock for context %s!", context);
594 deleteMemcache(final_key.c_str(), 0);
600 bool result = getMemcache(map_name.c_str(), ser_arr, &flags);
602 list<string> contents;
604 log.debug("Match found. Parsing...");
606 deserialize(ser_arr, contents);
608 log.debug("Iterating retrieved session map...");
609 list<string>::iterator iter;
610 for(iter = contents.begin();
611 iter != contents.end();
613 log.debug("value = " + *iter);
617 log.debug("New context: %s", map_name.c_str());
621 contents.push_back(key);
622 serialize(contents, ser_arr);
623 setMemcache(map_name.c_str(), ser_arr, expiration, 0);
625 deleteLock(map_name);
632 int MemcacheStorageService::readString(const char* context, const char* key, string* pvalue, time_t* pexpiration, int version) {
634 log.debug("readString ctx: %s - key: %s", context, key);
636 string final_key = string(context) + ":" + string(key);
637 uint32_t rec_version;
641 log.debug("Checking context");
643 string map_name = context;
646 bool ctx_found = getMemcache(map_name.c_str(), ser_arr, &flags);
653 bool found = getMemcache(final_key.c_str(), value, &rec_version);
658 if (version && rec_version <= (uint32_t)version) {
662 if (pexpiration || pvalue) {
664 deserialize(value, rec);
667 *pexpiration = rec.expiration;
679 int MemcacheStorageService::updateString(const char* context, const char* key, const char* value, time_t expiration, int version) {
681 log.debug("updateString ctx: %s - key: %s", context, key);
683 time_t final_exp = expiration;
684 time_t *want_expiration = NULL;
686 want_expiration = &final_exp;
689 int read_res = readString(context, key, NULL, want_expiration, version);
696 if (version && version != read_res) {
701 // Proceding with update
702 string final_key = string(context) + ":" + string(key);
703 mc_record rec(value, final_exp);
705 serialize(rec, final_value);
707 replaceMemcache(final_key.c_str(), final_value, final_exp, ++version);
712 bool MemcacheStorageService::deleteString(const char* context, const char* key) {
714 log.debug("deleteString ctx: %s - key: %s", context, key);
716 string final_key = string(context) + ":" + string(key);
718 // Not updating context map, if there is one. There is no need.
720 return deleteMemcache(final_key.c_str(), 0);
724 void MemcacheStorageService::updateContext(const char* context, time_t expiration) {
726 log.debug("updateContext ctx: %s", context);
729 log.error("updateContext invoked on a Storage with no context map built!");
733 string map_name = context;
736 bool result = getMemcache(map_name.c_str(), ser_arr, &flags);
738 list<string> contents;
740 log.debug("Match found. Parsing...");
742 deserialize(ser_arr, contents);
744 log.debug("Iterating retrieved session map...");
745 list<string>::iterator iter;
746 for(iter = contents.begin();
747 iter != contents.end();
750 // Update expiration times
752 int read_res = readString(context, iter->c_str(), &value, NULL, 0);
759 updateString(context, iter->c_str(), value.c_str(), expiration, read_res);
761 replaceMemcache(map_name.c_str(), ser_arr, expiration, flags);
766 void MemcacheStorageService::deleteContext(const char* context) {
768 log.debug("deleteContext ctx: %s", context);
771 log.error("deleteContext invoked on a Storage with no context map built!");
775 string map_name = context;
778 bool result = getMemcache(map_name.c_str(), ser_arr, &flags);
780 list<string> contents;
782 log.debug("Match found. Parsing...");
784 deserialize(ser_arr, contents);
786 log.debug("Iterating retrieved session map...");
787 list<string>::iterator iter;
788 for(iter = contents.begin();
789 iter != contents.end();
791 string final_key = map_name + *iter;
792 deleteMemcache(final_key.c_str(), 0);
795 deleteMemcache(map_name.c_str(), 0);
800 extern "C" int MCEXT_EXPORTS xmltooling_extension_init(void*) {
801 // Register this SS type
802 XMLToolingConfig::getConfig().StorageServiceManager.registerFactory("MEMCACHE", MemcacheStorageServiceFactory);
806 extern "C" void MCEXT_EXPORTS xmltooling_extension_term() {
807 XMLToolingConfig::getConfig().StorageServiceManager.deregisterFactory("MEMCACHE");