2 * Licensed to the University Corporation for Advanced Internet
3 * Development, Inc. (UCAID) under one or more contributor license
4 * agreements. See the NOTICE file distributed with this work for
5 * additional information regarding copyright ownership.
7 * UCAID licenses this file to you under the Apache License,
8 * Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
17 * either express or implied. See the License for the specific
18 * language governing permissions and limitations under the License.
24 * Berkeley Socket-based ListenerService implementation.
28 #include "exceptions.h"
29 #include "ServiceProvider.h"
31 #include "remoting/impl/SocketListener.h"
36 #include <xercesc/util/XMLUniDefs.hpp>
38 #include <xmltooling/util/NDC.h>
39 #include <xmltooling/util/XMLHelper.h>
42 # include <netinet/in.h>
49 using namespace shibsp;
50 using namespace xmltooling;
52 using xercesc::DOMElement;
56 // Manages the pool of connections
60 SocketPool(Category& log, const SocketListener* listener)
61 : m_log(log), m_listener(listener), m_lock(Mutex::create()) {}
63 SocketListener::ShibSocket get();
64 void put(SocketListener::ShibSocket s);
67 SocketListener::ShibSocket connect();
70 const SocketListener* m_listener;
71 boost::scoped_ptr<Mutex> m_lock;
72 stack<SocketListener::ShibSocket> m_pool;
75 // Worker threads in server
78 ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id);
81 int job(); // Return -1 on error, 1 for closed, 0 for success
84 SocketListener::ShibSocket m_sock;
86 SocketListener* m_listener;
92 SocketListener::ShibSocket SocketPool::connect()
98 m_log.debug("trying to connect to listener");
100 SocketListener::ShibSocket sock;
101 if (!m_listener->create(sock)) {
102 m_log.error("cannot create socket");
103 throw ListenerException("Cannot create socket");
106 bool connected = false;
109 for (int i = num_tries-1; i >= 0; i--) {
110 if (m_listener->connect(sock)) {
115 m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : ""));
119 Sleep(2000*(num_tries-i));
121 sleep(2*(num_tries-i));
127 m_log.crit("socket server unavailable, failing");
128 m_listener->close(sock);
129 throw ListenerException("Cannot connect to shibd process, a site adminstrator should be notified.");
132 m_log.debug("socket (%u) connected successfully", sock);
136 SocketPool::~SocketPool()
138 while (!m_pool.empty()) {
140 closesocket(m_pool.top());
142 ::close(m_pool.top());
148 SocketListener::ShibSocket SocketPool::get()
151 if (m_pool.empty()) {
155 SocketListener::ShibSocket ret=m_pool.top();
161 void SocketPool::put(SocketListener::ShibSocket s)
167 SocketListener::SocketListener(const DOMElement* e)
168 : m_catchAll(false), log(&Category::getInstance(SHIBSP_LOGCAT".Listener")),
169 m_shutdown(nullptr), m_stackSize(0), m_socket((ShibSocket)0)
172 if (SPConfig::getConfig().isEnabled(SPConfig::InProcess)) {
173 m_socketpool.reset(new SocketPool(*log,this));
176 if (SPConfig::getConfig().isEnabled(SPConfig::OutOfProcess)) {
177 m_child_lock.reset(Mutex::create());
178 m_child_wait.reset(CondWait::create());
180 static const XMLCh stackSize[] = UNICODE_LITERAL_9(s,t,a,c,k,S,i,z,e);
181 m_stackSize = XMLHelper::getAttrInt(e, 0, stackSize) * 1024;
185 SocketListener::~SocketListener()
189 bool SocketListener::init(bool force)
194 log->info("listener service starting");
196 ServiceProvider* sp = SPConfig::getConfig().getServiceProvider();
198 const PropertySet* props = sp->getPropertySet("OutOfProcess");
200 pair<bool,bool> flag = props->getBool("catchAll");
201 m_catchAll = flag.first && flag.second;
205 if (!create(m_socket)) {
206 log->crit("failed to create socket");
209 if (!bind(m_socket, force)) {
210 this->close(m_socket);
211 log->crit("failed to bind to socket.");
218 bool SocketListener::run(bool* shutdown)
223 // Save flag to monitor for shutdown request.
224 m_shutdown = shutdown;
225 unsigned long count = 0;
227 while (!*m_shutdown) {
230 FD_SET(m_socket, &readfds);
231 struct timeval tv = { 0, 0 };
234 switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
240 if (errno == EINTR) continue;
242 log->error("select() on main listener socket failed");
251 // Accept the connection.
252 SocketListener::ShibSocket newsock;
253 if (!accept(m_socket, newsock)) {
254 log->crit("failed to accept incoming socket connection");
258 // We throw away the result because the children manage themselves...
260 new ServerThread(newsock, this, ++count);
262 catch (exception& ex) {
263 log->crit("exception starting new server thread to service incoming request: %s", ex.what());
266 log->crit("unknown error starting new server thread to service incoming request");
273 log->info("listener service shutting down");
275 // Wait for all children to exit.
276 m_child_lock->lock();
277 while (!m_children.empty())
278 m_child_wait->wait(m_child_lock.get());
279 m_child_lock->unlock();
284 void SocketListener::term()
286 this->close(m_socket);
287 m_socket=(ShibSocket)0;
290 DDF SocketListener::send(const DDF& in)
296 log->debug("sending message (%s)", in.name() ? in.name() : "unnamed");
298 // Serialize data for transmission.
301 string ostr(os.str());
303 // Loop on the RPC in case we lost contact the first time through
310 SocketListener::ShibSocket sock;
312 sock = m_socketpool->get();
314 int outlen = ostr.length();
316 if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) {
322 throw ListenerException("Failure sending remoted message ($1).", params(1,in.name()));
330 log->debug("send completed, reading response message");
333 while (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
334 if (errno == EINTR) continue; // Apparently this happens when a signal interrupts the blocking call.
335 log->error("error reading size of output message");
337 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
345 size_read = recv(sock, buf, sizeof(buf));
347 is.write(buf, size_read);
350 else if (errno != EINTR) {
356 log->error("error reading output message from socket");
358 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
361 m_socketpool->put(sock);
367 // Check for exception to unmarshall and throw, otherwise return.
368 if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
369 // Reconstitute exception object.
370 DDFJanitor jout(out);
371 XMLToolingException* except=nullptr;
373 except=XMLToolingException::fromString(out.string());
374 log->error("remoted message returned an error: %s", except->what());
376 catch (XMLToolingException& e) {
377 log->error("caught XMLToolingException while building the XMLToolingException: %s", e.what());
378 log->error("XML was: %s", out.string());
379 throw ListenerException("Remote call failed with an unparsable exception.");
382 boost::scoped_ptr<XMLToolingException> wrapper(except);
389 bool SocketListener::log_error(const char* fn) const
394 int rc=WSAGetLastError();
398 #ifdef HAVE_STRERROR_R
400 memset(buf,0,sizeof(buf));
401 strerror_r(rc,buf,sizeof(buf));
402 log->error("socket call (%s) resulted in error (%d): %s",fn, rc, isprint(*buf) ? buf : "no message");
404 const char* buf=strerror(rc);
405 log->error("socket call (%s) resulted in error (%d): %s", fn, rc, isprint(*buf) ? buf : "no message");
410 // actual function run in listener on server threads
411 void* server_thread_fn(void* arg)
413 ServerThread* child = (ServerThread*)arg;
416 // First, let's block all signals
417 Thread::mask_all_signals();
420 // Run the child until it exits.
423 // Now we can clean up and exit the thread.
428 ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id)
429 : m_sock(s), m_child(nullptr), m_listener(listener)
433 buf << "[" << id << "]";
436 // Create the child thread
437 m_child = Thread::create(server_thread_fn, (void*)this, m_listener->m_stackSize);
441 ServerThread::~ServerThread()
443 // Then lock the children map, remove this socket/thread, signal waiters, and return
444 m_listener->m_child_lock->lock();
445 m_listener->m_children.erase(m_sock);
446 m_listener->m_child_lock->unlock();
447 m_listener->m_child_wait->signal();
452 void ServerThread::run()
456 // Before starting up, make sure we fully "own" this socket.
457 m_listener->m_child_lock->lock();
458 while (m_listener->m_children.find(m_sock) != m_listener->m_children.end())
459 m_listener->m_child_wait->wait(m_listener->m_child_lock.get());
460 m_listener->m_children[m_sock] = m_child;
461 m_listener->m_child_lock->unlock();
465 struct timeval tv = { 0, 0 };
467 while(!*(m_listener->m_shutdown)) {
469 FD_SET(m_sock, &readfds);
472 switch (select(m_sock+1, &readfds, 0, 0, &tv)) {
478 if (errno == EINTR) continue;
479 m_listener->log_error();
480 m_listener->log->error("select() on incoming request socket (%u) returned error", m_sock);
490 m_listener->log_error();
491 m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
493 m_listener->close(m_sock);
500 int ServerThread::job()
502 Category& log = Category::getInstance(SHIBSP_LOGCAT".Listener");
504 bool incomingError = true; // set false once incoming message is received
514 int readlength = m_listener->recv(m_sock,(char*)&len,sizeof(len));
515 if (readlength == 0) {
516 log.info("detected socket closure, shutting down worker thread");
519 else if (readlength != sizeof(len)) {
520 log.error("error reading size of input message");
527 while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) {
528 is.write(m_buf, size_read);
533 log.error("error reading input message from socket");
537 // Unmarshall the message.
542 log.debug("dispatching message (%s)", in.name() ? in.name() : "unnamed");
544 incomingError = false;
546 // Dispatch the message.
547 m_listener->receive(in, sink);
549 catch (XMLToolingException& e) {
551 log.error("error processing incoming message: %s", e.what());
552 DDF out=DDF("exception").string(e.toString().c_str());
553 DDFJanitor jout(out);
556 catch (exception& e) {
558 log.error("error processing incoming message: %s", e.what());
559 ListenerException ex(e.what());
560 DDF out=DDF("exception").string(ex.toString().c_str());
561 DDFJanitor jout(out);
566 log.error("unexpected error processing incoming message");
567 if (!m_listener->m_catchAll)
569 ListenerException ex("An unexpected error occurred while processing an incoming message.");
570 DDF out=DDF("exception").string(ex.toString().c_str());
571 DDFJanitor jout(out);
575 // Return whatever's available.
576 string response(sink.str());
577 int outlen = response.length();
579 if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
580 log.error("error sending output message size");
583 if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) {
584 log.error("error sending output message");