2 * Copyright 2001-2005 Internet2
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 * Berkeley Socket-based Listener implementation
23 #include "SocketListener.h"
33 using namespace log4cpp;
35 using namespace shibboleth;
36 using namespace shibtarget;
38 namespace shibtarget {
40 // Manages the pool of connections
44 SocketPool(Category& log, const SocketListener* listener)
45 : m_log(log), m_listener(listener), m_lock(shibboleth::Mutex::create()) {}
47 SocketListener::ShibSocket get();
48 void put(SocketListener::ShibSocket s);
51 SocketListener::ShibSocket connect();
53 const SocketListener* m_listener;
55 auto_ptr<Mutex> m_lock;
56 stack<SocketListener::ShibSocket> m_pool;
59 // Worker threads in server
62 ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id);
68 SocketListener::ShibSocket m_sock;
70 SocketListener* m_listener;
76 SocketListener::ShibSocket SocketPool::connect()
79 saml::NDC ndc("connect");
82 m_log.debug("trying to connect to listener");
84 SocketListener::ShibSocket sock;
85 if (!m_listener->create(sock)) {
86 m_log.error("cannot create socket");
87 throw ListenerException("Cannot create socket");
90 bool connected = false;
93 for (int i = num_tries-1; i >= 0; i--) {
94 if (m_listener->connect(sock)) {
99 m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : ""));
103 Sleep(2000*(num_tries-i));
105 sleep(2*(num_tries-i));
111 m_log.crit("socket server unavailable, failing");
112 m_listener->close(sock);
113 throw ListenerException("Cannot connect to listener process, a site adminstrator should be notified.");
116 m_log.debug("socket (%u) connected successfully", sock);
120 SocketPool::~SocketPool()
122 while (!m_pool.empty()) {
124 closesocket(m_pool.top());
126 ::close(m_pool.top());
132 SocketListener::ShibSocket SocketPool::get()
135 if (m_pool.empty()) {
139 SocketListener::ShibSocket ret=m_pool.top();
145 void SocketPool::put(SocketListener::ShibSocket s)
152 SocketListener::SocketListener(const DOMElement* e) : log(&Category::getInstance(SHIBT_LOGCAT".Listener")),
153 m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_socketpool(NULL), m_socket((ShibSocket)0)
156 if (ShibTargetConfig::getConfig().isEnabled(ShibTargetConfig::InProcess)) {
157 m_socketpool=new SocketPool(*log,this);
160 if (ShibTargetConfig::getConfig().isEnabled(ShibTargetConfig::OutOfProcess)) {
161 m_child_lock = Mutex::create();
162 m_child_wait = CondWait::create();
166 SocketListener::~SocketListener()
173 bool SocketListener::run(bool* shutdown)
176 saml::NDC ndc("run");
179 // Save flag to monitor for shutdown request.
181 unsigned long count = 0;
183 if (!create(m_socket)) {
184 log->crit("failed to create socket");
187 if (!bind(m_socket,true)) {
188 this->close(m_socket);
189 log->crit("failed to bind to socket.");
193 while (!*m_shutdown) {
196 FD_SET(m_socket, &readfds);
197 struct timeval tv = { 0, 0 };
200 switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
206 if (errno == EINTR) continue;
208 log->error("select() on main listener socket failed");
216 // Accept the connection.
217 SocketListener::ShibSocket newsock;
218 if (!accept(m_socket, newsock))
219 log->crit("failed to accept incoming socket connection");
221 // We throw away the result because the children manage themselves...
223 new ServerThread(newsock,this,++count);
226 log->crit("error starting new server thread to service incoming request");
231 log->info("listener service shutting down");
233 // Wait for all children to exit.
234 m_child_lock->lock();
235 while (!m_children.empty())
236 m_child_wait->wait(m_child_lock);
237 m_child_lock->unlock();
239 this->close(m_socket);
240 m_socket=(ShibSocket)0;
244 DDF SocketListener::send(const DDF& in)
247 saml::NDC ndc("send");
250 log->debug("sending message: %s", in.name());
252 // Serialize data for transmission.
255 string ostr(os.str());
257 // Loop on the RPC in case we lost contact the first time through
264 SocketListener::ShibSocket sock;
266 sock = m_socketpool->get();
268 int outlen = ostr.length();
270 if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) {
276 throw ListenerException("Failure sending remoted message ($1).", params(1,in.name()));
284 log->debug("send completed, reading response message");
287 if (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
288 log->error("error reading size of output message");
290 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
297 while (len && (size_read = recv(sock, buf, sizeof(buf))) > 0) {
298 is.write(buf, size_read);
303 log->error("error reading output message from socket");
305 throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
308 m_socketpool->put(sock);
314 // Check for exception to unmarshall and throw, otherwise return.
315 if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
316 // Reconstitute exception object.
317 DDFJanitor jout(out);
318 SAMLException* except=NULL;
320 istringstream es(out.string());
321 except=SAMLException::getInstance(es);
323 catch (SAMLException& e) {
324 log->error("caught SAML Exception while building the SAMLException: %s", e.what());
325 log->error("XML was: %s", out.string());
326 throw ListenerException("Remote call failed with an unparsable exception.");
329 auto_ptr<SAMLException> wrapper(except);
336 bool SocketListener::log_error() const
339 int rc=WSAGetLastError();
343 #ifdef HAVE_STRERROR_R
345 memset(buf,0,sizeof(buf));
346 strerror_r(rc,buf,sizeof(buf));
347 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
349 const char* buf=strerror(rc);
350 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
355 // actual function run in listener on server threads
356 void* server_thread_fn(void* arg)
358 ServerThread* child = (ServerThread*)arg;
360 // First, let's block all signals
361 Thread::mask_all_signals();
363 // Run the child until it exits.
366 // Now we can clean up and exit the thread.
371 ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id)
372 : m_sock(s), m_child(NULL), m_listener(listener)
376 buf << "[" << id << "]";
379 // Create the child thread
380 m_child = Thread::create(server_thread_fn, (void*)this);
384 ServerThread::~ServerThread()
386 // Then lock the children map, remove this socket/thread, signal waiters, and return
387 m_listener->m_child_lock->lock();
388 m_listener->m_children.erase(m_sock);
389 m_listener->m_child_lock->unlock();
390 m_listener->m_child_wait->signal();
395 void ServerThread::run()
399 // Before starting up, make sure we fully "own" this socket.
400 m_listener->m_child_lock->lock();
401 while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end())
402 m_listener->m_child_wait->wait(m_listener->m_child_lock);
403 m_listener->m_children[m_sock] = m_child;
404 m_listener->m_child_lock->unlock();
407 struct timeval tv = { 0, 0 };
409 while(!*(m_listener->m_shutdown)) {
411 FD_SET(m_sock, &readfds);
414 switch (select(m_sock+1, &readfds, 0, 0, &tv)) {
420 if (errno == EINTR) continue;
421 m_listener->log_error();
422 m_listener->log->error("select() on incoming request socket (%u) returned error", m_sock);
430 m_listener->log_error();
431 m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
432 m_listener->close(m_sock);
439 bool ServerThread::job()
441 Category& log = Category::getInstance("shibd.Listener");
444 DDFJanitor jout(out);
452 // Lock the configuration.
453 IConfig* conf=ShibTargetConfig::getConfig().getINI();
457 if (m_listener->recv(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
458 log.error("error reading size of input message");
465 while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) {
466 is.write(m_buf, size_read);
471 log.error("error reading input message from socket");
475 // Unmarshall the message.
480 // Dispatch the message.
481 out=m_listener->receive(in);
483 catch (SAMLException &e) {
484 log.error("error processing incoming message: %s", e.what());
487 out=DDF("exception").string(os.str().c_str());
491 log.error("unexpected error processing incoming message");
492 ListenerException ex("An unexpected error occurred while processing an incoming message.");
495 out=DDF("exception").string(os.str().c_str());
499 // Return whatever's available.
500 ostringstream xmlout;
502 string response(xmlout.str());
503 int outlen = response.length();
505 if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
506 log.error("error sending output message size");
509 if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) {
510 log.error("error sending output message");