2 * Copyright 2001-2007 Internet2
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 * Berkeley Socket-based ListenerService implementation
24 #include "exceptions.h"
25 #include "remoting/impl/SocketListener.h"
30 #include <shibsp/SPConfig.h>
31 #include <xmltooling/util/NDC.h>
37 using namespace shibsp;
38 using namespace xmltooling;
39 using namespace log4cpp;
41 using xercesc::DOMElement;
45 // Manages the pool of connections
49 SocketPool(Category& log, const SocketListener* listener)
50 : m_log(log), m_listener(listener), m_lock(Mutex::create()) {}
52 SocketListener::ShibSocket get();
53 void put(SocketListener::ShibSocket s);
56 SocketListener::ShibSocket connect();
58 const SocketListener* m_listener;
60 auto_ptr<Mutex> m_lock;
61 stack<SocketListener::ShibSocket> m_pool;
64 // Worker threads in server
67 ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id);
73 SocketListener::ShibSocket m_sock;
75 SocketListener* m_listener;
81 SocketListener::ShibSocket SocketPool::connect()
87 m_log.debug("trying to connect to listener");
89 SocketListener::ShibSocket sock;
90 if (!m_listener->create(sock)) {
91 m_log.error("cannot create socket");
92 throw ListenerException("Cannot create socket");
95 bool connected = false;
98 for (int i = num_tries-1; i >= 0; i--) {
99 if (m_listener->connect(sock)) {
104 m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : ""));
108 Sleep(2000*(num_tries-i));
110 sleep(2*(num_tries-i));
116 m_log.crit("socket server unavailable, failing");
117 m_listener->close(sock);
118 throw ListenerException("Cannot connect to shibd process, a site adminstrator should be notified.");
121 m_log.debug("socket (%u) connected successfully", sock);
125 SocketPool::~SocketPool()
127 while (!m_pool.empty()) {
129 closesocket(m_pool.top());
131 ::close(m_pool.top());
137 SocketListener::ShibSocket SocketPool::get()
140 if (m_pool.empty()) {
144 SocketListener::ShibSocket ret=m_pool.top();
150 void SocketPool::put(SocketListener::ShibSocket s)
157 SocketListener::SocketListener(const DOMElement* e) : log(&Category::getInstance(SHIBSP_LOGCAT".Listener")),
158 m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_socketpool(NULL), m_socket((ShibSocket)0)
161 if (SPConfig::getConfig().isEnabled(SPConfig::InProcess)) {
162 m_socketpool=new SocketPool(*log,this);
165 if (SPConfig::getConfig().isEnabled(SPConfig::OutOfProcess)) {
166 m_child_lock = Mutex::create();
167 m_child_wait = CondWait::create();
171 SocketListener::~SocketListener()
178 bool SocketListener::run(bool* shutdown)
183 log->info("listener service starting");
185 // Save flag to monitor for shutdown request.
187 unsigned long count = 0;
189 if (!create(m_socket)) {
190 log->crit("failed to create socket");
193 if (!bind(m_socket,true)) {
194 this->close(m_socket);
195 log->crit("failed to bind to socket.");
199 while (!*m_shutdown) {
202 FD_SET(m_socket, &readfds);
203 struct timeval tv = { 0, 0 };
206 switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
212 if (errno == EINTR) continue;
214 log->error("select() on main listener socket failed");
222 // Accept the connection.
223 SocketListener::ShibSocket newsock;
224 if (!accept(m_socket, newsock))
225 log->crit("failed to accept incoming socket connection");
227 // We throw away the result because the children manage themselves...
229 new ServerThread(newsock,this,++count);
232 log->crit("error starting new server thread to service incoming request");
237 log->info("listener service shutting down");
239 // Wait for all children to exit.
240 m_child_lock->lock();
241 while (!m_children.empty())
242 m_child_wait->wait(m_child_lock);
243 m_child_lock->unlock();
245 this->close(m_socket);
246 m_socket=(ShibSocket)0;
250 DDF SocketListener::send(const DDF& in)
256 log->debug("sending message: %s", in.name());
258 // Serialize data for transmission.
261 string ostr(os.str());
263 // Loop on the RPC in case we lost contact the first time through
270 SocketListener::ShibSocket sock;
272 sock = m_socketpool->get();
274 int outlen = ostr.length();
276 if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) {
282 throw ListenerException("Failure sending remoted message ($1).", params(1,in.name()));
290 log->debug("send completed, reading response message");
293 if (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
294 log->error("error reading size of output message");
296 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
303 while (len && (size_read = recv(sock, buf, sizeof(buf))) > 0) {
304 is.write(buf, size_read);
309 log->error("error reading output message from socket");
311 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
314 m_socketpool->put(sock);
320 // Check for exception to unmarshall and throw, otherwise return.
321 if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
322 // Reconstitute exception object.
323 DDFJanitor jout(out);
324 XMLToolingException* except=NULL;
326 except=XMLToolingException::fromString(out.string());
328 catch (XMLToolingException& e) {
329 log->error("caught XMLToolingException while building the XMLToolingException: %s", e.what());
330 log->error("XML was: %s", out.string());
331 throw ListenerException("Remote call failed with an unparsable exception.");
334 auto_ptr<XMLToolingException> wrapper(except);
341 bool SocketListener::log_error() const
344 int rc=WSAGetLastError();
348 #ifdef HAVE_STRERROR_R
350 memset(buf,0,sizeof(buf));
351 strerror_r(rc,buf,sizeof(buf));
352 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
354 const char* buf=strerror(rc);
355 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
360 // actual function run in listener on server threads
361 void* server_thread_fn(void* arg)
363 ServerThread* child = (ServerThread*)arg;
366 // First, let's block all signals
367 Thread::mask_all_signals();
370 // Run the child until it exits.
373 // Now we can clean up and exit the thread.
378 ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id)
379 : m_sock(s), m_child(NULL), m_listener(listener)
383 buf << "[" << id << "]";
386 // Create the child thread
387 m_child = Thread::create(server_thread_fn, (void*)this);
391 ServerThread::~ServerThread()
393 // Then lock the children map, remove this socket/thread, signal waiters, and return
394 m_listener->m_child_lock->lock();
395 m_listener->m_children.erase(m_sock);
396 m_listener->m_child_lock->unlock();
397 m_listener->m_child_wait->signal();
402 void ServerThread::run()
406 // Before starting up, make sure we fully "own" this socket.
407 m_listener->m_child_lock->lock();
408 while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end())
409 m_listener->m_child_wait->wait(m_listener->m_child_lock);
410 m_listener->m_children[m_sock] = m_child;
411 m_listener->m_child_lock->unlock();
414 struct timeval tv = { 0, 0 };
416 while(!*(m_listener->m_shutdown)) {
418 FD_SET(m_sock, &readfds);
421 switch (select(m_sock+1, &readfds, 0, 0, &tv)) {
427 if (errno == EINTR) continue;
428 m_listener->log_error();
429 m_listener->log->error("select() on incoming request socket (%u) returned error", m_sock);
437 m_listener->log_error();
438 m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
439 m_listener->close(m_sock);
446 bool ServerThread::job()
448 Category& log = Category::getInstance("shibd.Listener");
459 if (m_listener->recv(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
460 log.error("error reading size of input message");
467 while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) {
468 is.write(m_buf, size_read);
473 log.error("error reading input message from socket");
477 // Unmarshall the message.
482 // Dispatch the message.
483 m_listener->receive(in, sink);
485 catch (XMLToolingException& e) {
486 log.error("error processing incoming message: %s", e.what());
487 DDF out=DDF("exception").string(e.toString().c_str());
488 DDFJanitor jout(out);
491 catch (exception& e) {
492 log.error("error processing incoming message: %s", e.what());
493 ListenerException ex(e.what());
494 DDF out=DDF("exception").string(ex.toString().c_str());
495 DDFJanitor jout(out);
500 log.error("unexpected error processing incoming message");
501 ListenerException ex("An unexpected error occurred while processing an incoming message.");
502 DDF out=DDF("exception").string(ex.toString().c_str());
503 DDFJanitor jout(out);
508 // Return whatever's available.
509 string response(sink.str());
510 int outlen = response.length();
512 if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
513 log.error("error sending output message size");
516 if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) {
517 log.error("error sending output message");