2 * Copyright 2001-2010 Internet2
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 * Berkeley Socket-based ListenerService implementation.
24 #include "exceptions.h"
25 #include "ServiceProvider.h"
26 #include "remoting/impl/SocketListener.h"
31 #include <shibsp/SPConfig.h>
32 #include <xmltooling/util/NDC.h>
35 # include <netinet/in.h>
42 using namespace shibsp;
43 using namespace xmltooling;
45 using xercesc::DOMElement;
49 // Manages the pool of connections
53 SocketPool(Category& log, const SocketListener* listener)
54 : m_log(log), m_listener(listener), m_lock(Mutex::create()) {}
56 SocketListener::ShibSocket get();
57 void put(SocketListener::ShibSocket s);
60 SocketListener::ShibSocket connect();
63 const SocketListener* m_listener;
64 auto_ptr<Mutex> m_lock;
65 stack<SocketListener::ShibSocket> m_pool;
68 // Worker threads in server
71 ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id);
74 int job(); // Return -1 on error, 1 for closed, 0 for success
77 SocketListener::ShibSocket m_sock;
79 SocketListener* m_listener;
85 SocketListener::ShibSocket SocketPool::connect()
91 m_log.debug("trying to connect to listener");
93 SocketListener::ShibSocket sock;
94 if (!m_listener->create(sock)) {
95 m_log.error("cannot create socket");
96 throw ListenerException("Cannot create socket");
99 bool connected = false;
102 for (int i = num_tries-1; i >= 0; i--) {
103 if (m_listener->connect(sock)) {
108 m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : ""));
112 Sleep(2000*(num_tries-i));
114 sleep(2*(num_tries-i));
120 m_log.crit("socket server unavailable, failing");
121 m_listener->close(sock);
122 throw ListenerException("Cannot connect to shibd process, a site adminstrator should be notified.");
125 m_log.debug("socket (%u) connected successfully", sock);
129 SocketPool::~SocketPool()
131 while (!m_pool.empty()) {
133 closesocket(m_pool.top());
135 ::close(m_pool.top());
141 SocketListener::ShibSocket SocketPool::get()
144 if (m_pool.empty()) {
148 SocketListener::ShibSocket ret=m_pool.top();
154 void SocketPool::put(SocketListener::ShibSocket s)
161 SocketListener::SocketListener(const DOMElement* e)
162 : m_catchAll(false), log(&Category::getInstance(SHIBSP_LOGCAT".Listener")), m_socketpool(NULL),
163 m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_stackSize(0), m_socket((ShibSocket)0)
166 if (SPConfig::getConfig().isEnabled(SPConfig::InProcess)) {
167 m_socketpool=new SocketPool(*log,this);
170 if (SPConfig::getConfig().isEnabled(SPConfig::OutOfProcess)) {
171 m_child_lock = Mutex::create();
172 m_child_wait = CondWait::create();
174 static const XMLCh stackSize[] = UNICODE_LITERAL_9(s,t,a,c,k,S,i,z,e);
175 const XMLCh* attr = e ? e->getAttributeNS(NULL, stackSize) : NULL;
177 m_stackSize = XMLString::parseInt(attr) * 1024;
181 SocketListener::~SocketListener()
188 bool SocketListener::init(bool force)
193 log->info("listener service starting");
195 ServiceProvider* sp = SPConfig::getConfig().getServiceProvider();
197 const PropertySet* props = sp->getPropertySet("OutOfProcess");
199 pair<bool,bool> flag = props->getBool("catchAll");
200 m_catchAll = flag.first && flag.second;
204 if (!create(m_socket)) {
205 log->crit("failed to create socket");
208 if (!bind(m_socket, force)) {
209 this->close(m_socket);
210 log->crit("failed to bind to socket.");
217 bool SocketListener::run(bool* shutdown)
222 // Save flag to monitor for shutdown request.
224 unsigned long count = 0;
226 while (!*m_shutdown) {
229 FD_SET(m_socket, &readfds);
230 struct timeval tv = { 0, 0 };
233 switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
239 if (errno == EINTR) continue;
241 log->error("select() on main listener socket failed");
250 // Accept the connection.
251 SocketListener::ShibSocket newsock;
252 if (!accept(m_socket, newsock)) {
253 log->crit("failed to accept incoming socket connection");
257 // We throw away the result because the children manage themselves...
259 new ServerThread(newsock,this,++count);
261 catch (exception& ex) {
262 log->crit("exception starting new server thread to service incoming request: %s", ex.what());
265 log->crit("unknown error starting new server thread to service incoming request");
272 log->info("listener service shutting down");
274 // Wait for all children to exit.
275 m_child_lock->lock();
276 while (!m_children.empty())
277 m_child_wait->wait(m_child_lock);
278 m_child_lock->unlock();
283 void SocketListener::term()
285 this->close(m_socket);
286 m_socket=(ShibSocket)0;
289 DDF SocketListener::send(const DDF& in)
295 log->debug("sending message (%s)", in.name() ? in.name() : "unnamed");
297 // Serialize data for transmission.
300 string ostr(os.str());
302 // Loop on the RPC in case we lost contact the first time through
309 SocketListener::ShibSocket sock;
311 sock = m_socketpool->get();
313 int outlen = ostr.length();
315 if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) {
321 throw ListenerException("Failure sending remoted message ($1).", params(1,in.name()));
329 log->debug("send completed, reading response message");
332 while (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
333 if (errno == EINTR) continue; // Apparently this happens when a signal interrupts the blocking call.
334 log->error("error reading size of output message");
336 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
344 size_read = recv(sock, buf, sizeof(buf));
346 is.write(buf, size_read);
349 else if (errno != EINTR) {
355 log->error("error reading output message from socket");
357 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
360 m_socketpool->put(sock);
366 // Check for exception to unmarshall and throw, otherwise return.
367 if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
368 // Reconstitute exception object.
369 DDFJanitor jout(out);
370 XMLToolingException* except=NULL;
372 except=XMLToolingException::fromString(out.string());
373 log->error("remoted message returned an error: %s", except->what());
375 catch (XMLToolingException& e) {
376 log->error("caught XMLToolingException while building the XMLToolingException: %s", e.what());
377 log->error("XML was: %s", out.string());
378 throw ListenerException("Remote call failed with an unparsable exception.");
381 auto_ptr<XMLToolingException> wrapper(except);
388 bool SocketListener::log_error() const
391 int rc=WSAGetLastError();
395 #ifdef HAVE_STRERROR_R
397 memset(buf,0,sizeof(buf));
398 strerror_r(rc,buf,sizeof(buf));
399 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
401 const char* buf=strerror(rc);
402 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
407 // actual function run in listener on server threads
408 void* server_thread_fn(void* arg)
410 ServerThread* child = (ServerThread*)arg;
413 // First, let's block all signals
414 Thread::mask_all_signals();
417 // Run the child until it exits.
420 // Now we can clean up and exit the thread.
425 ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id)
426 : m_sock(s), m_child(NULL), m_listener(listener)
430 buf << "[" << id << "]";
433 // Create the child thread
434 m_child = Thread::create(server_thread_fn, (void*)this, m_listener->m_stackSize);
438 ServerThread::~ServerThread()
440 // Then lock the children map, remove this socket/thread, signal waiters, and return
441 m_listener->m_child_lock->lock();
442 m_listener->m_children.erase(m_sock);
443 m_listener->m_child_lock->unlock();
444 m_listener->m_child_wait->signal();
449 void ServerThread::run()
453 // Before starting up, make sure we fully "own" this socket.
454 m_listener->m_child_lock->lock();
455 while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end())
456 m_listener->m_child_wait->wait(m_listener->m_child_lock);
457 m_listener->m_children[m_sock] = m_child;
458 m_listener->m_child_lock->unlock();
462 struct timeval tv = { 0, 0 };
464 while(!*(m_listener->m_shutdown)) {
466 FD_SET(m_sock, &readfds);
469 switch (select(m_sock+1, &readfds, 0, 0, &tv)) {
475 if (errno == EINTR) continue;
476 m_listener->log_error();
477 m_listener->log->error("select() on incoming request socket (%u) returned error", m_sock);
487 m_listener->log_error();
488 m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
490 m_listener->close(m_sock);
497 int ServerThread::job()
499 Category& log = Category::getInstance(SHIBSP_LOGCAT".Listener");
501 bool incomingError = true; // set false once incoming message is received
511 int readlength = m_listener->recv(m_sock,(char*)&len,sizeof(len));
512 if (readlength == 0) {
513 log.info("detected socket closure, shutting down worker thread");
516 else if (readlength != sizeof(len)) {
517 log.error("error reading size of input message");
524 while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) {
525 is.write(m_buf, size_read);
530 log.error("error reading input message from socket");
534 // Unmarshall the message.
539 log.debug("dispatching message (%s)", in.name() ? in.name() : "unnamed");
541 incomingError = false;
543 // Dispatch the message.
544 m_listener->receive(in, sink);
546 catch (XMLToolingException& e) {
548 log.error("error processing incoming message: %s", e.what());
549 DDF out=DDF("exception").string(e.toString().c_str());
550 DDFJanitor jout(out);
553 catch (exception& e) {
555 log.error("error processing incoming message: %s", e.what());
556 ListenerException ex(e.what());
557 DDF out=DDF("exception").string(ex.toString().c_str());
558 DDFJanitor jout(out);
563 log.error("unexpected error processing incoming message");
564 if (!m_listener->m_catchAll)
566 ListenerException ex("An unexpected error occurred while processing an incoming message.");
567 DDF out=DDF("exception").string(ex.toString().c_str());
568 DDFJanitor jout(out);
572 // Return whatever's available.
573 string response(sink.str());
574 int outlen = response.length();
576 if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
577 log.error("error sending output message size");
580 if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) {
581 log.error("error sending output message");