-/*
- * Copyright 2001-2005 Internet2
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the University Corporation for Advanced Internet
+ * Development, Inc. (UCAID) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.
+ *
+ * UCAID licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the
+ * License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
*/
/**
* SocketListener.cpp
- *
- * Berkeley Socket-based ListenerService implementation
+ *
+ * Berkeley Socket-based ListenerService implementation.
*/
#include "internal.h"
#include "exceptions.h"
+#include "ServiceProvider.h"
+#include "SPConfig.h"
#include "remoting/impl/SocketListener.h"
#include <errno.h>
#include <stack>
#include <sstream>
-#include <shibsp/SPConfig.h>
+#include <xercesc/util/XMLUniDefs.hpp>
+
#include <xmltooling/util/NDC.h>
+#include <xmltooling/util/XMLHelper.h>
+
+#ifndef WIN32
+# include <netinet/in.h>
+#endif
#ifdef HAVE_UNISTD_H
# include <unistd.h>
using namespace shibsp;
using namespace xmltooling;
-using namespace log4cpp;
using namespace std;
using xercesc::DOMElement;
namespace shibsp {
-
+
// Manages the pool of connections
class SocketPool
{
~SocketPool();
SocketListener::ShibSocket get();
void put(SocketListener::ShibSocket s);
-
+
private:
SocketListener::ShibSocket connect();
-
- const SocketListener* m_listener;
+
Category& m_log;
- auto_ptr<Mutex> m_lock;
+ const SocketListener* m_listener;
+ boost::scoped_ptr<Mutex> m_lock;
stack<SocketListener::ShibSocket> m_pool;
};
-
+
// Worker threads in server
class ServerThread {
public:
ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id);
~ServerThread();
void run();
- bool job();
+ int job(); // Return -1 on error, 1 for closed, 0 for success
private:
SocketListener::ShibSocket m_sock;
connected = true;
break;
}
-
+
m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : ""));
if (i) {
void SocketPool::put(SocketListener::ShibSocket s)
{
- m_lock->lock();
+ Lock lock(m_lock);
m_pool.push(s);
- m_lock->unlock();
}
-SocketListener::SocketListener(const DOMElement* e) : log(&Category::getInstance(SHIBSP_LOGCAT".Listener")),
- m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_socketpool(NULL), m_socket((ShibSocket)0)
+SocketListener::SocketListener(const DOMElement* e)
+ : m_catchAll(false), log(&Category::getInstance(SHIBSP_LOGCAT ".Listener")),
+ m_shutdown(nullptr), m_stackSize(0), m_socket((ShibSocket)0)
{
// Are we a client?
if (SPConfig::getConfig().isEnabled(SPConfig::InProcess)) {
- m_socketpool=new SocketPool(*log,this);
+ m_socketpool.reset(new SocketPool(*log,this));
}
// Are we a server?
if (SPConfig::getConfig().isEnabled(SPConfig::OutOfProcess)) {
- m_child_lock = Mutex::create();
- m_child_wait = CondWait::create();
+ m_child_lock.reset(Mutex::create());
+ m_child_wait.reset(CondWait::create());
+
+ static const XMLCh stackSize[] = UNICODE_LITERAL_9(s,t,a,c,k,S,i,z,e);
+ m_stackSize = XMLHelper::getAttrInt(e, 0, stackSize) * 1024;
}
}
SocketListener::~SocketListener()
{
- delete m_socketpool;
- delete m_child_wait;
- delete m_child_lock;
}
-bool SocketListener::run(bool* shutdown)
+bool SocketListener::init(bool force)
{
#ifdef _DEBUG
- NDC ndc("run");
+ NDC ndc("init");
#endif
-
- // Save flag to monitor for shutdown request.
- m_shutdown=shutdown;
- unsigned long count = 0;
+ log->info("listener service starting");
+
+ ServiceProvider* sp = SPConfig::getConfig().getServiceProvider();
+ sp->lock();
+ const PropertySet* props = sp->getPropertySet("OutOfProcess");
+ if (props) {
+ pair<bool,bool> flag = props->getBool("catchAll");
+ m_catchAll = flag.first && flag.second;
+ }
+ sp->unlock();
if (!create(m_socket)) {
log->crit("failed to create socket");
return false;
}
- if (!bind(m_socket,true)) {
+ if (!bind(m_socket, force)) {
this->close(m_socket);
log->crit("failed to bind to socket.");
return false;
}
+ return true;
+}
+
+bool SocketListener::run(bool* shutdown)
+{
+#ifdef _DEBUG
+ NDC ndc("run");
+#endif
+ // Save flag to monitor for shutdown request.
+ m_shutdown = shutdown;
+ unsigned long count = 0;
+
while (!*m_shutdown) {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(m_socket, &readfds);
struct timeval tv = { 0, 0 };
tv.tv_sec = 5;
-
+
switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
#ifdef WIN32
case SOCKET_ERROR:
if (errno == EINTR) continue;
log_error();
log->error("select() on main listener socket failed");
- return false;
-
+ *m_shutdown = true;
+ break;
+
case 0:
continue;
-
+
default:
{
// Accept the connection.
SocketListener::ShibSocket newsock;
- if (!accept(m_socket, newsock))
+ if (!accept(m_socket, newsock)) {
log->crit("failed to accept incoming socket connection");
+ continue;
+ }
// We throw away the result because the children manage themselves...
try {
- new ServerThread(newsock,this,++count);
+ new ServerThread(newsock, this, ++count);
+ }
+ catch (exception& ex) {
+ log->crit("exception starting new server thread to service incoming request: %s", ex.what());
}
catch (...) {
- log->crit("error starting new server thread to service incoming request");
+ log->crit("unknown error starting new server thread to service incoming request");
+ if (!m_catchAll)
+ *m_shutdown = true;
}
}
}
// Wait for all children to exit.
m_child_lock->lock();
while (!m_children.empty())
- m_child_wait->wait(m_child_lock);
+ m_child_wait->wait(m_child_lock.get());
m_child_lock->unlock();
+ return true;
+}
+
+void SocketListener::term()
+{
this->close(m_socket);
m_socket=(ShibSocket)0;
- return true;
}
DDF SocketListener::send(const DDF& in)
NDC ndc("send");
#endif
- log->debug("sending message: %s", in.name());
+ log->debug("sending message (%s)", in.name() ? in.name() : "unnamed");
// Serialize data for transmission.
ostringstream os;
SocketListener::ShibSocket sock;
while (retry >= 0) {
sock = m_socketpool->get();
-
+
int outlen = ostr.length();
len = htonl(outlen);
if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) {
log->debug("send completed, reading response message");
// Read the message.
- if (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
+ while (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
+ if (errno == EINTR) continue; // Apparently this happens when a signal interrupts the blocking call.
log->error("error reading size of output message");
this->close(sock);
throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
}
len = ntohl(len);
-
+
char buf[16384];
int size_read;
stringstream is;
- while (len && (size_read = recv(sock, buf, sizeof(buf))) > 0) {
- is.write(buf, size_read);
- len -= size_read;
+ while (len) {
+ size_read = recv(sock, buf, sizeof(buf));
+ if (size_read > 0) {
+ is.write(buf, size_read);
+ len -= size_read;
+ }
+ else if (errno != EINTR) {
+ break;
+ }
}
-
+
if (len) {
log->error("error reading output message from socket");
this->close(sock);
throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
}
-
+
m_socketpool->put(sock);
// Unmarshall data.
DDF out;
is >> out;
-
+
// Check for exception to unmarshall and throw, otherwise return.
if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
// Reconstitute exception object.
DDFJanitor jout(out);
- XMLToolingException* except=NULL;
- try {
+ XMLToolingException* except=nullptr;
+ try {
except=XMLToolingException::fromString(out.string());
+ log->error("remoted message returned an error: %s", except->what());
}
catch (XMLToolingException& e) {
log->error("caught XMLToolingException while building the XMLToolingException: %s", e.what());
throw ListenerException("Remote call failed with an unparsable exception.");
}
- auto_ptr<XMLToolingException> wrapper(except);
+ boost::scoped_ptr<XMLToolingException> wrapper(except);
wrapper->raise();
}
return out;
}
-bool SocketListener::log_error() const
+bool SocketListener::log_error(const char* fn) const
{
+ if (!fn)
+ fn = "unknown";
#ifdef WIN32
int rc=WSAGetLastError();
#else
char buf[256];
memset(buf,0,sizeof(buf));
strerror_r(rc,buf,sizeof(buf));
- log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
+ log->error("socket call (%s) resulted in error (%d): %s",fn, rc, isprint(*buf) ? buf : "no message");
#else
const char* buf=strerror(rc);
- log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
+ log->error("socket call (%s) resulted in error (%d): %s", fn, rc, isprint(*buf) ? buf : "no message");
#endif
return false;
}
// Now we can clean up and exit the thread.
delete child;
- return NULL;
+ return nullptr;
}
ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id)
- : m_sock(s), m_child(NULL), m_listener(listener)
+ : m_sock(s), m_child(nullptr), m_listener(listener)
{
ostringstream buf;
m_id = buf.str();
// Create the child thread
- m_child = Thread::create(server_thread_fn, (void*)this);
+ m_child = Thread::create(server_thread_fn, (void*)this, m_listener->m_stackSize);
m_child->detach();
}
m_listener->m_children.erase(m_sock);
m_listener->m_child_lock->unlock();
m_listener->m_child_wait->signal();
-
+
delete m_child;
}
// Before starting up, make sure we fully "own" this socket.
m_listener->m_child_lock->lock();
- while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end())
- m_listener->m_child_wait->wait(m_listener->m_child_lock);
+ while (m_listener->m_children.find(m_sock) != m_listener->m_children.end())
+ m_listener->m_child_wait->wait(m_listener->m_child_lock.get());
m_listener->m_children[m_sock] = m_child;
m_listener->m_child_lock->unlock();
-
+
+ int result;
fd_set readfds;
struct timeval tv = { 0, 0 };
break;
default:
- if (!job()) {
- m_listener->log_error();
- m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
+ result = job();
+ if (result) {
+ if (result < 0) {
+ m_listener->log_error();
+ m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
+ }
m_listener->close(m_sock);
return;
}
}
}
-bool ServerThread::job()
+int ServerThread::job()
{
- Category& log = Category::getInstance("shibd.Listener");
+ Category& log = Category::getInstance(SHIBSP_LOGCAT ".Listener");
- DDF out;
- DDFJanitor jout(out);
+ bool incomingError = true; // set false once incoming message is received
+ ostringstream sink;
#ifdef WIN32
u_long len;
#else
try {
// Read the message.
- if (m_listener->recv(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
+ int readlength = m_listener->recv(m_sock,(char*)&len,sizeof(len));
+ if (readlength == 0) {
+ log.info("detected socket closure, shutting down worker thread");
+ return 1;
+ }
+ else if (readlength != sizeof(len)) {
log.error("error reading size of input message");
- return false;
+ return -1;
}
len = ntohl(len);
-
+
int size_read;
stringstream is;
while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) {
is.write(m_buf, size_read);
len -= size_read;
}
-
+
if (len) {
log.error("error reading input message from socket");
- return false;
+ return -1;
}
-
+
// Unmarshall the message.
DDF in;
DDFJanitor jin(in);
is >> in;
+ log.debug("dispatching message (%s)", in.name() ? in.name() : "unnamed");
+
+ incomingError = false;
+
// Dispatch the message.
- out=m_listener->receive(in);
+ m_listener->receive(in, sink);
}
catch (XMLToolingException& e) {
- log.error("error processing incoming message: %s", e.what());
- out=DDF("exception").string(e.toString().c_str());
+ if (incomingError)
+ log.error("error processing incoming message: %s", e.what());
+ DDF out=DDF("exception").string(e.toString().c_str());
+ DDFJanitor jout(out);
+ sink << out;
}
catch (exception& e) {
- log.error("error processing incoming message: %s", e.what());
+ if (incomingError)
+ log.error("error processing incoming message: %s", e.what());
ListenerException ex(e.what());
- out=DDF("exception").string(ex.toString().c_str());
+ DDF out=DDF("exception").string(ex.toString().c_str());
+ DDFJanitor jout(out);
+ sink << out;
}
-#ifndef _DEBUG
catch (...) {
- log.error("unexpected error processing incoming message");
+ if (incomingError)
+ log.error("unexpected error processing incoming message");
+ if (!m_listener->m_catchAll)
+ throw;
ListenerException ex("An unexpected error occurred while processing an incoming message.");
- out=DDF("exception").string(ex.toString().c_str());
+ DDF out=DDF("exception").string(ex.toString().c_str());
+ DDFJanitor jout(out);
+ sink << out;
}
-#endif
-
+
// Return whatever's available.
- ostringstream xmlout;
- xmlout << out;
- string response(xmlout.str());
+ string response(sink.str());
int outlen = response.length();
len = htonl(outlen);
if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
log.error("error sending output message size");
- return false;
+ return -1;
}
if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) {
log.error("error sending output message");
- return false;
+ return -1;
}
-
- return true;
+
+ return 0;
}