* limitations under the License.
*/
-/*
- * $Id$
+/**
+ * xmltooling/util/CurlURLInputStream.cpp
+ *
+ * Asynchronous use of curl to fetch data from a URL.
*/
#include "internal.h"
-#include <curl/curl.h>
+#include <xmltooling/util/CurlURLInputStream.h>
+#include <xmltooling/util/XMLHelper.h>
#include <xercesc/util/XercesDefs.hpp>
#include <xercesc/util/XMLNetAccessor.hpp>
#include <xercesc/util/TranscodingException.hpp>
#include <xercesc/util/PlatformUtils.hpp>
-#include <xmltooling/util/CurlURLInputStream.hpp>
-
using namespace xmltooling;
using namespace xercesc;
-CurlURLInputStream::CurlURLInputStream(const XMLURL& urlSource, const XMLNetHTTPInfo* httpInfo/*=0*/)
- : fMemoryManager(urlSource.getMemoryManager())
- , fURLSource(urlSource)
- , fURL(0)
- , fInputStream(NULL)
- , m_log(logging::Category::getInstance(XMLTOOLING_LOGCAT".libcurl.NetAccessor"))
+namespace {
+ static const XMLCh _CURL[] = UNICODE_LITERAL_4(C,U,R,L);
+ static const XMLCh _option[] = UNICODE_LITERAL_6(o,p,t,i,o,n);
+ static const XMLCh _provider[] = UNICODE_LITERAL_8(p,r,o,v,i,d,e,r);
+ static const XMLCh TransportOption[] = UNICODE_LITERAL_15(T,r,a,n,s,p,o,r,t,O,p,t,i,o,n);
+ static const XMLCh uri[] = UNICODE_LITERAL_3(u,r,i);
+ static const XMLCh url[] = UNICODE_LITERAL_3(u,r,l);
+ static const XMLCh verifyHost[] = UNICODE_LITERAL_10(v,e,r,i,f,y,H,o,s,t);
+}
+
+CurlURLInputStream::CurlURLInputStream(const char* url)
+ : fLog(logging::Category::getInstance(XMLTOOLING_LOGCAT".libcurl.InputStream"))
+ , fURL(url)
+ , fMulti(0)
+ , fEasy(0)
+ , fTotalBytesRead(0)
+ , fWritePtr(0)
+ , fBytesRead(0)
+ , fBytesToRead(0)
+ , fDataAvailable(false)
+ , fBufferHeadPtr(fBuffer)
+ , fBufferTailPtr(fBuffer)
+ , fContentType(0)
{
- // Get the text of the URL we're going to use
- fURL.reset(XMLString::transcode(fURLSource.getURLText(), fMemoryManager), fMemoryManager);
+ init();
}
+CurlURLInputStream::CurlURLInputStream(const XMLCh* url)
+ : fLog(logging::Category::getInstance(XMLTOOLING_LOGCAT".libcurl.InputStream"))
+ , fMulti(0)
+ , fEasy(0)
+ , fTotalBytesRead(0)
+ , fWritePtr(0)
+ , fBytesRead(0)
+ , fBytesToRead(0)
+ , fDataAvailable(false)
+ , fBufferHeadPtr(fBuffer)
+ , fBufferTailPtr(fBuffer)
+ , fContentType(0)
+{
+ auto_ptr_char temp(url);
+ fURL = temp.get();
+ init();
+}
+
+CurlURLInputStream::CurlURLInputStream(const DOMElement* e)
+ : fLog(logging::Category::getInstance(XMLTOOLING_LOGCAT".libcurl.InputStream"))
+ , fMulti(0)
+ , fEasy(0)
+ , fTotalBytesRead(0)
+ , fWritePtr(0)
+ , fBytesRead(0)
+ , fBytesToRead(0)
+ , fDataAvailable(false)
+ , fBufferHeadPtr(fBuffer)
+ , fBufferTailPtr(fBuffer)
+ , fContentType(0)
+{
+ const XMLCh* attr = e->getAttributeNS(NULL, url);
+ if (!attr || !*attr) {
+ attr = e->getAttributeNS(NULL, uri);
+ if (!attr || !*attr)
+ throw IOException("No URL supplied via DOM to CurlURLInputStream constructor.");
+ }
+
+ auto_ptr_char temp(attr);
+ fURL = temp.get();
+ init(e);
+}
CurlURLInputStream::~CurlURLInputStream()
{
- delete fInputStream;
+ if (fEasy) {
+ // Remove the easy handle from the multi stack
+ curl_multi_remove_handle(fMulti, fEasy);
+
+ // Cleanup the easy handle
+ curl_easy_cleanup(fEasy);
+ }
+
+ if (fMulti) {
+ // Cleanup the multi handle
+ curl_multi_cleanup(fMulti);
+ }
+
+ XMLString::release(&fContentType);
}
+void CurlURLInputStream::init(const DOMElement* e)
+{
+ // Allocate the curl multi handle
+ fMulti = curl_multi_init();
+
+ // Allocate the curl easy handle
+ fEasy = curl_easy_init();
+
+ if (!fMulti || !fEasy)
+ throw IOException("Failed to allocate libcurl handles.");
+
+ curl_easy_setopt(fEasy, CURLOPT_URL, fURL.c_str());
+
+ // Set up a way to recieve the data
+ curl_easy_setopt(fEasy, CURLOPT_WRITEDATA, this); // Pass this pointer to write function
+ curl_easy_setopt(fEasy, CURLOPT_WRITEFUNCTION, staticWriteCallback); // Our static write function
+
+ // Do redirects
+ curl_easy_setopt(fEasy, CURLOPT_FOLLOWLOCATION, 1);
+ curl_easy_setopt(fEasy, CURLOPT_MAXREDIRS, 6);
+
+ // Default settings.
+ curl_easy_setopt(fEasy, CURLOPT_CONNECTTIMEOUT,15);
+ curl_easy_setopt(fEasy, CURLOPT_TIMEOUT,30);
+ curl_easy_setopt(fEasy, CURLOPT_HTTPAUTH,0);
+ curl_easy_setopt(fEasy, CURLOPT_USERPWD,NULL);
+ curl_easy_setopt(fEasy, CURLOPT_SSL_VERIFYHOST, 2);
+ curl_easy_setopt(fEasy, CURLOPT_SSL_VERIFYPEER, 0);
+ curl_easy_setopt(fEasy, CURLOPT_SSL_CIPHER_LIST, "ALL:!aNULL:!LOW:!EXPORT:!SSLv2");
+ curl_easy_setopt(fEasy, CURLOPT_NOPROGRESS, 1);
+ curl_easy_setopt(fEasy, CURLOPT_NOSIGNAL, 1);
+ curl_easy_setopt(fEasy, CURLOPT_FAILONERROR, 1);
+
+ fError[0] = 0;
+ curl_easy_setopt(fEasy, CURLOPT_ERRORBUFFER, fError);
+
+ if (e) {
+ const XMLCh* flag = e->getAttributeNS(NULL, verifyHost);
+ if (flag && (*flag == chLatin_f || *flag == chDigit_0))
+ curl_easy_setopt(fEasy, CURLOPT_SSL_VERIFYHOST, 2);
+
+ // Process TransportOption elements.
+ bool success;
+ DOMElement* child = XMLHelper::getLastChildElement(e, TransportOption);
+ while (child) {
+ if (child->hasChildNodes() && XMLString::equals(child->getAttributeNS(NULL,_provider), _CURL)) {
+ auto_ptr_char option(child->getAttributeNS(NULL,_option));
+ auto_ptr_char value(child->getFirstChild()->getNodeValue());
+ if (option.get() && *option.get() && value.get() && *value.get()) {
+ // For libcurl, the option is an enum and the value type depends on the option.
+ CURLoption opt = static_cast<CURLoption>(strtol(option.get(), NULL, 10));
+ if (opt < CURLOPTTYPE_OBJECTPOINT)
+ success = (curl_easy_setopt(fEasy, opt, strtol(value.get(), NULL, 10)) == CURLE_OK);
+#ifdef CURLOPTTYPE_OFF_T
+ else if (opt < CURLOPTTYPE_OFF_T)
+ success = (curl_easy_setopt(fEasy, opt, value.get()) == CURLE_OK);
+ else if (sizeof(curl_off_t) == sizeof(long))
+ success = (curl_easy_setopt(fEasy, opt, strtol(value.get(), NULL, 10)) == CURLE_OK);
+ else
+ success = false;
+#else
+ else
+ success = (curl_easy_setopt(fEasy, opt, value.get()) == CURLE_OK);
+#endif
+ if (!success)
+ fLog.error("failed to set transport option (%s)", option.get());
+ }
+ }
+ child = XMLHelper::getPreviousSiblingElement(child, TransportOption);
+ }
+ }
+
+ // Add easy handle to the multi stack
+ curl_multi_add_handle(fMulti, fEasy);
+
+ fLog.debug("libcurl trying to fetch %s", fURL.c_str());
+
+ // Start reading, to get the content type
+ while(fBufferHeadPtr == fBuffer) {
+ int runningHandles = 0;
+ try {
+ readMore(&runningHandles);
+ }
+ catch (XMLException& ex) {
+ curl_multi_remove_handle(fMulti, fEasy);
+ curl_easy_cleanup(fEasy);
+ fEasy = NULL;
+ curl_multi_cleanup(fMulti);
+ fMulti = NULL;
+ auto_ptr_char msg(ex.getMessage());
+ throw IOException(msg.get());
+ }
+ if(runningHandles == 0) break;
+ }
-size_t CurlURLInputStream::staticWriteCallback(void* ptr, size_t size, size_t nmemb, void* stream)
+ // Find the content type
+ char* contentType8 = NULL;
+ curl_easy_getinfo(fEasy, CURLINFO_CONTENT_TYPE, &contentType8);
+ if(contentType8)
+ fContentType = XMLString::transcode(contentType8);
+}
+
+
+size_t CurlURLInputStream::staticWriteCallback(char* buffer, size_t size, size_t nitems, void* outstream)
{
- size_t len = size*nmemb;
- reinterpret_cast<std::stringstream*>(stream)->write(reinterpret_cast<const char*>(ptr),len);
- return len;
+ return ((CurlURLInputStream*)outstream)->writeCallback(buffer, size, nitems);
}
+size_t CurlURLInputStream::writeCallback(char* buffer, size_t size, size_t nitems)
+{
+ size_t cnt = size * nitems;
+ size_t totalConsumed = 0;
-xsecsize_t CurlURLInputStream::readBytes(XMLByte* const toFill, const xsecsize_t maxToRead)
+ // Consume as many bytes as possible immediately into the buffer
+ size_t consume = (cnt > fBytesToRead) ? fBytesToRead : cnt;
+ memcpy(fWritePtr, buffer, consume);
+ fWritePtr += consume;
+ fBytesRead += consume;
+ fTotalBytesRead += consume;
+ fBytesToRead -= consume;
+
+ //fLog.debug("write callback consuming %d bytes", consume);
+
+ // If bytes remain, rebuffer as many as possible into our holding buffer
+ buffer += consume;
+ totalConsumed += consume;
+ cnt -= consume;
+ if (cnt > 0)
+ {
+ size_t bufAvail = sizeof(fBuffer) - (fBufferHeadPtr - fBuffer);
+ consume = (cnt > bufAvail) ? bufAvail : cnt;
+ memcpy(fBufferHeadPtr, buffer, consume);
+ fBufferHeadPtr += consume;
+ buffer += consume;
+ totalConsumed += consume;
+ //fLog.debug("write callback rebuffering %d bytes", consume);
+ }
+
+ // Return the total amount we've consumed. If we don't consume all the bytes
+ // then an error will be generated. Since our buffer size is equal to the
+ // maximum size that curl will write, this should never happen unless there
+ // is a logic error somewhere here.
+ return totalConsumed;
+}
+
+bool CurlURLInputStream::readMore(int* runningHandles)
{
- if (!fInputStream) {
- // Allocate the curl easy handle.
- CURL* fEasy = curl_easy_init();
- if (!fEasy)
- ThrowXMLwithMemMgr1(NetAccessorException, XMLExcepts::NetAcc_InternalError, "unable to allocate libcurl handle", fMemoryManager);
-
- m_log.debug("libcurl trying to fetch %s", fURL.get());
-
- // Set URL option
- curl_easy_setopt(fEasy, CURLOPT_URL, fURL.get());
- curl_easy_setopt(fEasy, CURLOPT_WRITEDATA, &fUnderlyingStream);
- curl_easy_setopt(fEasy, CURLOPT_WRITEFUNCTION, staticWriteCallback);
- curl_easy_setopt(fEasy, CURLOPT_CONNECTTIMEOUT, 30);
- curl_easy_setopt(fEasy, CURLOPT_TIMEOUT, 60);
- curl_easy_setopt(fEasy, CURLOPT_SSL_VERIFYHOST, 0);
- curl_easy_setopt(fEasy, CURLOPT_SSL_VERIFYPEER, 0);
- curl_easy_setopt(fEasy, CURLOPT_NOPROGRESS, 1);
- curl_easy_setopt(fEasy, CURLOPT_NOSIGNAL, 1);
- curl_easy_setopt(fEasy, CURLOPT_FAILONERROR, 1);
-
- char curl_errorbuf[CURL_ERROR_SIZE];
- curl_errorbuf[0]=0;
- curl_easy_setopt(fEasy,CURLOPT_ERRORBUFFER,curl_errorbuf);
-
- // Fetch the data.
- if (curl_easy_perform(fEasy) != CURLE_OK) {
- curl_easy_cleanup(fEasy);
- ThrowXMLwithMemMgr1(NetAccessorException, XMLExcepts::NetAcc_InternalError, curl_errorbuf, fMemoryManager);
- }
+ // Ask the curl to do some work
+ CURLMcode curlResult = curl_multi_perform(fMulti, runningHandles);
- curl_easy_cleanup(fEasy);
+ // Process messages from curl
+ int msgsInQueue = 0;
+ for (CURLMsg* msg = NULL; (msg = curl_multi_info_read(fMulti, &msgsInQueue)) != NULL; )
+ {
+ //fLog.debug("msg %d, %d from curl", msg->msg, msg->data.result);
+
+ if (msg->msg != CURLMSG_DONE)
+ return true;
- /*
switch (msg->data.result)
{
case CURLE_OK:
break;
case CURLE_UNSUPPORTED_PROTOCOL:
- ThrowXMLwithMemMgr(MalformedURLException, XMLExcepts::URL_UnsupportedProto, fMemoryManager);
+ ThrowXML(MalformedURLException, XMLExcepts::URL_UnsupportedProto);
break;
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
- ThrowXMLwithMemMgr1(NetAccessorException, XMLExcepts::NetAcc_TargetResolution, fURLSource.getHost(), fMemoryManager);
+ ThrowXML1(NetAccessorException, XMLExcepts::NetAcc_TargetResolution, fURL.c_str());
break;
case CURLE_COULDNT_CONNECT:
- ThrowXMLwithMemMgr1(NetAccessorException, XMLExcepts::NetAcc_ConnSocket, fURLSource.getURLText(), fMemoryManager);
+ ThrowXML1(NetAccessorException, XMLExcepts::NetAcc_ConnSocket, fURL.c_str());
+ break;
case CURLE_RECV_ERROR:
- ThrowXMLwithMemMgr1(NetAccessorException, XMLExcepts::NetAcc_ReadSocket, fURLSource.getURLText(), fMemoryManager);
+ ThrowXML1(NetAccessorException, XMLExcepts::NetAcc_ReadSocket, fURL.c_str());
break;
default:
- m_log.error("curl NetAccessor encountered error from libcurl (%d)", msg->data.result);
- ThrowXMLwithMemMgr1(NetAccessorException, XMLExcepts::NetAcc_InternalError, fURLSource.getURLText(), fMemoryManager);
+ fLog.error("error while fetching %s: (%d) %s", fURL.c_str(), msg->data.result, fError);
+ ThrowXML1(NetAccessorException, XMLExcepts::NetAcc_InternalError, fURL.c_str());
break;
}
- */
+ }
+
+ // If nothing is running any longer, bail out
+ if(*runningHandles == 0)
+ return false;
+
+ // If there is no further data to read, and we haven't
+ // read any yet on this invocation, call select to wait for data
+ if (curlResult != CURLM_CALL_MULTI_PERFORM && fBytesRead == 0)
+ {
+ fd_set readSet;
+ fd_set writeSet;
+ fd_set exceptSet;
+ int fdcnt=0;
+
+ FD_ZERO(&readSet);
+ FD_ZERO(&writeSet);
+ FD_ZERO(&exceptSet);
+
+ // Ask curl for the file descriptors to wait on
+ curl_multi_fdset(fMulti, &readSet, &writeSet, &exceptSet, &fdcnt);
+
+ // Wait on the file descriptors
+ timeval tv;
+ tv.tv_sec = 2;
+ tv.tv_usec = 0;
+ select(fdcnt+1, &readSet, &writeSet, &exceptSet, &tv);
+ }
+
+ return curlResult == CURLM_CALL_MULTI_PERFORM;
+}
+
+xsecsize_t CurlURLInputStream::readBytes(XMLByte* const toFill, const xsecsize_t maxToRead)
+{
+ fBytesRead = 0;
+ fBytesToRead = maxToRead;
+ fWritePtr = toFill;
+
+ for (bool tryAgain = true; fBytesToRead > 0 && (tryAgain || fBytesRead == 0); )
+ {
+ // First, any buffered data we have available
+ size_t bufCnt = fBufferHeadPtr - fBufferTailPtr;
+ bufCnt = (bufCnt > fBytesToRead) ? fBytesToRead : bufCnt;
+ if (bufCnt > 0)
+ {
+ memcpy(fWritePtr, fBufferTailPtr, bufCnt);
+ fWritePtr += bufCnt;
+ fBytesRead += bufCnt;
+ fTotalBytesRead += bufCnt;
+ fBytesToRead -= bufCnt;
+
+ fBufferTailPtr += bufCnt;
+ if (fBufferTailPtr == fBufferHeadPtr)
+ fBufferHeadPtr = fBufferTailPtr = fBuffer;
+
+ //fLog.debug("consuming %d buffered bytes", bufCnt);
+
+ tryAgain = true;
+ continue;
+ }
- fInputStream = new (fMemoryManager) StreamInputSource::StreamBinInputStream(fUnderlyingStream);
+ // Ask the curl to do some work
+ int runningHandles = 0;
+ tryAgain = readMore(&runningHandles);
+
+ // If nothing is running any longer, bail out
+ if (runningHandles == 0)
+ break;
}
- // Defer to the stream wrapper.
- return fInputStream->readBytes(toFill, maxToRead);
+ return fBytesRead;
}