X-Git-Url: http://www.project-moonshot.org/gitweb/?a=blobdiff_plain;f=shibsp%2Fremoting%2Fimpl%2FSocketListener.cpp;h=8951175ae44ecce02588773db281bedc9307a2bb;hb=c51bfd77603cf0ddb0b5e374c35586a8435895d6;hp=66ebeb8d9754d25b03b655c7ca84f1352af8eb93;hpb=aebe94d53162cb5e0a50da51c7fa434e866072b6;p=shibboleth%2Fcpp-sp.git diff --git a/shibsp/remoting/impl/SocketListener.cpp b/shibsp/remoting/impl/SocketListener.cpp index 66ebeb8..8951175 100644 --- a/shibsp/remoting/impl/SocketListener.cpp +++ b/shibsp/remoting/impl/SocketListener.cpp @@ -1,46 +1,58 @@ -/* - * Copyright 2001-2007 Internet2 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/** + * Licensed to the University Corporation for Advanced Internet + * Development, Inc. (UCAID) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. + * + * UCAID licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. */ /** * SocketListener.cpp - * - * Berkeley Socket-based ListenerService implementation + * + * Berkeley Socket-based ListenerService implementation. */ #include "internal.h" #include "exceptions.h" +#include "ServiceProvider.h" +#include "SPConfig.h" #include "remoting/impl/SocketListener.h" #include #include #include -#include +#include + #include +#include #ifndef WIN32 # include #endif +#ifdef HAVE_UNISTD_H +# include +#endif + using namespace shibsp; using namespace xmltooling; using namespace std; using xercesc::DOMElement; namespace shibsp { - + // Manages the pool of connections class SocketPool { @@ -50,23 +62,23 @@ namespace shibsp { ~SocketPool(); SocketListener::ShibSocket get(); void put(SocketListener::ShibSocket s); - + private: SocketListener::ShibSocket connect(); - - Category& m_log; + + Category& m_log; const SocketListener* m_listener; - auto_ptr m_lock; + boost::scoped_ptr m_lock; stack m_pool; }; - + // Worker threads in server class ServerThread { public: ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id); ~ServerThread(); void run(); - bool job(); + int job(); // Return -1 on error, 1 for closed, 0 for success private: SocketListener::ShibSocket m_sock; @@ -99,7 +111,7 @@ SocketListener::ShibSocket SocketPool::connect() connected = true; break; } - + m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : "")); if (i) { @@ -148,60 +160,77 @@ SocketListener::ShibSocket SocketPool::get() void SocketPool::put(SocketListener::ShibSocket s) { - m_lock->lock(); + Lock lock(m_lock); m_pool.push(s); - m_lock->unlock(); } -SocketListener::SocketListener(const DOMElement* e) : log(&Category::getInstance(SHIBSP_LOGCAT".Listener")), - m_socketpool(NULL), m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_socket((ShibSocket)0) +SocketListener::SocketListener(const DOMElement* e) + : m_catchAll(false), log(&Category::getInstance(SHIBSP_LOGCAT ".Listener")), + m_shutdown(nullptr), m_stackSize(0), m_socket((ShibSocket)0) { // Are we a client? if (SPConfig::getConfig().isEnabled(SPConfig::InProcess)) { - m_socketpool=new SocketPool(*log,this); + m_socketpool.reset(new SocketPool(*log,this)); } // Are we a server? if (SPConfig::getConfig().isEnabled(SPConfig::OutOfProcess)) { - m_child_lock = Mutex::create(); - m_child_wait = CondWait::create(); + m_child_lock.reset(Mutex::create()); + m_child_wait.reset(CondWait::create()); + + static const XMLCh stackSize[] = UNICODE_LITERAL_9(s,t,a,c,k,S,i,z,e); + m_stackSize = XMLHelper::getAttrInt(e, 0, stackSize) * 1024; } } SocketListener::~SocketListener() { - delete m_socketpool; - delete m_child_wait; - delete m_child_lock; } -bool SocketListener::run(bool* shutdown) +bool SocketListener::init(bool force) { #ifdef _DEBUG - NDC ndc("run"); + NDC ndc("init"); #endif log->info("listener service starting"); - // Save flag to monitor for shutdown request. - m_shutdown=shutdown; - unsigned long count = 0; + ServiceProvider* sp = SPConfig::getConfig().getServiceProvider(); + sp->lock(); + const PropertySet* props = sp->getPropertySet("OutOfProcess"); + if (props) { + pair flag = props->getBool("catchAll"); + m_catchAll = flag.first && flag.second; + } + sp->unlock(); if (!create(m_socket)) { log->crit("failed to create socket"); return false; } - if (!bind(m_socket,true)) { + if (!bind(m_socket, force)) { this->close(m_socket); log->crit("failed to bind to socket."); return false; } + return true; +} + +bool SocketListener::run(bool* shutdown) +{ +#ifdef _DEBUG + NDC ndc("run"); +#endif + // Save flag to monitor for shutdown request. + m_shutdown = shutdown; + unsigned long count = 0; + while (!*m_shutdown) { fd_set readfds; FD_ZERO(&readfds); FD_SET(m_socket, &readfds); struct timeval tv = { 0, 0 }; tv.tv_sec = 5; - + switch (select(m_socket + 1, &readfds, 0, 0, &tv)) { #ifdef WIN32 case SOCKET_ERROR: @@ -211,24 +240,32 @@ bool SocketListener::run(bool* shutdown) if (errno == EINTR) continue; log_error(); log->error("select() on main listener socket failed"); - return false; - + *m_shutdown = true; + break; + case 0: continue; - + default: { // Accept the connection. SocketListener::ShibSocket newsock; - if (!accept(m_socket, newsock)) + if (!accept(m_socket, newsock)) { log->crit("failed to accept incoming socket connection"); + continue; + } // We throw away the result because the children manage themselves... try { - new ServerThread(newsock,this,++count); + new ServerThread(newsock, this, ++count); + } + catch (exception& ex) { + log->crit("exception starting new server thread to service incoming request: %s", ex.what()); } catch (...) { - log->crit("error starting new server thread to service incoming request"); + log->crit("unknown error starting new server thread to service incoming request"); + if (!m_catchAll) + *m_shutdown = true; } } } @@ -238,12 +275,16 @@ bool SocketListener::run(bool* shutdown) // Wait for all children to exit. m_child_lock->lock(); while (!m_children.empty()) - m_child_wait->wait(m_child_lock); + m_child_wait->wait(m_child_lock.get()); m_child_lock->unlock(); + return true; +} + +void SocketListener::term() +{ this->close(m_socket); m_socket=(ShibSocket)0; - return true; } DDF SocketListener::send(const DDF& in) @@ -269,7 +310,7 @@ DDF SocketListener::send(const DDF& in) SocketListener::ShibSocket sock; while (retry >= 0) { sock = m_socketpool->get(); - + int outlen = ostr.length(); len = htonl(outlen); if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) { @@ -289,39 +330,46 @@ DDF SocketListener::send(const DDF& in) log->debug("send completed, reading response message"); // Read the message. - if (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) { + while (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) { + if (errno == EINTR) continue; // Apparently this happens when a signal interrupts the blocking call. log->error("error reading size of output message"); this->close(sock); throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name())); } len = ntohl(len); - + char buf[16384]; int size_read; stringstream is; - while (len && (size_read = recv(sock, buf, sizeof(buf))) > 0) { - is.write(buf, size_read); - len -= size_read; + while (len) { + size_read = recv(sock, buf, sizeof(buf)); + if (size_read > 0) { + is.write(buf, size_read); + len -= size_read; + } + else if (errno != EINTR) { + break; + } } - + if (len) { log->error("error reading output message from socket"); this->close(sock); throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name())); } - + m_socketpool->put(sock); // Unmarshall data. DDF out; is >> out; - + // Check for exception to unmarshall and throw, otherwise return. if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) { // Reconstitute exception object. DDFJanitor jout(out); - XMLToolingException* except=NULL; - try { + XMLToolingException* except=nullptr; + try { except=XMLToolingException::fromString(out.string()); log->error("remoted message returned an error: %s", except->what()); } @@ -331,15 +379,17 @@ DDF SocketListener::send(const DDF& in) throw ListenerException("Remote call failed with an unparsable exception."); } - auto_ptr wrapper(except); + boost::scoped_ptr wrapper(except); wrapper->raise(); } return out; } -bool SocketListener::log_error() const +bool SocketListener::log_error(const char* fn) const { + if (!fn) + fn = "unknown"; #ifdef WIN32 int rc=WSAGetLastError(); #else @@ -349,10 +399,10 @@ bool SocketListener::log_error() const char buf[256]; memset(buf,0,sizeof(buf)); strerror_r(rc,buf,sizeof(buf)); - log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message"); + log->error("socket call (%s) resulted in error (%d): %s",fn, rc, isprint(*buf) ? buf : "no message"); #else const char* buf=strerror(rc); - log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message"); + log->error("socket call (%s) resulted in error (%d): %s", fn, rc, isprint(*buf) ? buf : "no message"); #endif return false; } @@ -372,11 +422,11 @@ void* server_thread_fn(void* arg) // Now we can clean up and exit the thread. delete child; - return NULL; + return nullptr; } ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id) - : m_sock(s), m_child(NULL), m_listener(listener) + : m_sock(s), m_child(nullptr), m_listener(listener) { ostringstream buf; @@ -384,7 +434,7 @@ ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listen m_id = buf.str(); // Create the child thread - m_child = Thread::create(server_thread_fn, (void*)this); + m_child = Thread::create(server_thread_fn, (void*)this, m_listener->m_stackSize); m_child->detach(); } @@ -395,7 +445,7 @@ ServerThread::~ServerThread() m_listener->m_children.erase(m_sock); m_listener->m_child_lock->unlock(); m_listener->m_child_wait->signal(); - + delete m_child; } @@ -405,11 +455,12 @@ void ServerThread::run() // Before starting up, make sure we fully "own" this socket. m_listener->m_child_lock->lock(); - while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end()) - m_listener->m_child_wait->wait(m_listener->m_child_lock); + while (m_listener->m_children.find(m_sock) != m_listener->m_children.end()) + m_listener->m_child_wait->wait(m_listener->m_child_lock.get()); m_listener->m_children[m_sock] = m_child; m_listener->m_child_lock->unlock(); - + + int result; fd_set readfds; struct timeval tv = { 0, 0 }; @@ -433,9 +484,12 @@ void ServerThread::run() break; default: - if (!job()) { - m_listener->log_error(); - m_listener->log->error("I/O failure processing request on socket (%u)", m_sock); + result = job(); + if (result) { + if (result < 0) { + m_listener->log_error(); + m_listener->log->error("I/O failure processing request on socket (%u)", m_sock); + } m_listener->close(m_sock); return; } @@ -443,9 +497,9 @@ void ServerThread::run() } } -bool ServerThread::job() +int ServerThread::job() { - Category& log = Category::getInstance("shibd.Listener"); + Category& log = Category::getInstance(SHIBSP_LOGCAT ".Listener"); bool incomingError = true; // set false once incoming message is received ostringstream sink; @@ -457,24 +511,29 @@ bool ServerThread::job() try { // Read the message. - if (m_listener->recv(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) { + int readlength = m_listener->recv(m_sock,(char*)&len,sizeof(len)); + if (readlength == 0) { + log.info("detected socket closure, shutting down worker thread"); + return 1; + } + else if (readlength != sizeof(len)) { log.error("error reading size of input message"); - return false; + return -1; } len = ntohl(len); - + int size_read; stringstream is; while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) { is.write(m_buf, size_read); len -= size_read; } - + if (len) { log.error("error reading input message from socket"); - return false; + return -1; } - + // Unmarshall the message. DDF in; DDFJanitor jin(in); @@ -502,29 +561,29 @@ bool ServerThread::job() DDFJanitor jout(out); sink << out; } -#ifndef _DEBUG catch (...) { if (incomingError) log.error("unexpected error processing incoming message"); + if (!m_listener->m_catchAll) + throw; ListenerException ex("An unexpected error occurred while processing an incoming message."); DDF out=DDF("exception").string(ex.toString().c_str()); DDFJanitor jout(out); sink << out; } -#endif - + // Return whatever's available. string response(sink.str()); int outlen = response.length(); len = htonl(outlen); if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) { log.error("error sending output message size"); - return false; + return -1; } if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) { log.error("error sending output message"); - return false; + return -1; } - - return true; + + return 0; }