Better logging for remoted errors.
[shibboleth/cpp-sp.git] / shibsp / remoting / impl / SocketListener.cpp
1 /*
2  *  Copyright 2001-2007 Internet2
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 /**
18  * SocketListener.cpp
19  * 
20  * Berkeley Socket-based ListenerService implementation
21  */
22
23 #include "internal.h"
24 #include "exceptions.h"
25 #include "remoting/impl/SocketListener.h"
26
27 #include <errno.h>
28 #include <stack>
29 #include <sstream>
30 #include <shibsp/SPConfig.h>
31 #include <xmltooling/util/NDC.h>
32
33 #ifdef HAVE_UNISTD_H
34 # include <unistd.h>
35 #endif
36
37 using namespace shibsp;
38 using namespace xmltooling;
39 using namespace log4cpp;
40 using namespace std;
41 using xercesc::DOMElement;
42
43 namespace shibsp {
44   
45     // Manages the pool of connections
46     class SocketPool
47     {
48     public:
49         SocketPool(Category& log, const SocketListener* listener)
50             : m_log(log), m_listener(listener), m_lock(Mutex::create()) {}
51         ~SocketPool();
52         SocketListener::ShibSocket get();
53         void put(SocketListener::ShibSocket s);
54   
55     private:
56         SocketListener::ShibSocket connect();
57         
58         const SocketListener* m_listener;
59         Category& m_log;
60         auto_ptr<Mutex> m_lock;
61         stack<SocketListener::ShibSocket> m_pool;
62     };
63   
64     // Worker threads in server
65     class ServerThread {
66     public:
67         ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id);
68         ~ServerThread();
69         void run();
70         bool job();
71
72     private:
73         SocketListener::ShibSocket m_sock;
74         Thread* m_child;
75         SocketListener* m_listener;
76         string m_id;
77         char m_buf[16384];
78     };
79 }
80
81 SocketListener::ShibSocket SocketPool::connect()
82 {
83 #ifdef _DEBUG
84     NDC ndc("connect");
85 #endif
86
87     m_log.debug("trying to connect to listener");
88
89     SocketListener::ShibSocket sock;
90     if (!m_listener->create(sock)) {
91         m_log.error("cannot create socket");
92         throw ListenerException("Cannot create socket");
93     }
94
95     bool connected = false;
96     int num_tries = 3;
97
98     for (int i = num_tries-1; i >= 0; i--) {
99         if (m_listener->connect(sock)) {
100             connected = true;
101             break;
102         }
103     
104         m_log.warn("cannot connect socket (%u)...%s", sock, (i > 0 ? "retrying" : ""));
105
106         if (i) {
107 #ifdef WIN32
108             Sleep(2000*(num_tries-i));
109 #else
110             sleep(2*(num_tries-i));
111 #endif
112         }
113     }
114
115     if (!connected) {
116         m_log.crit("socket server unavailable, failing");
117         m_listener->close(sock);
118         throw ListenerException("Cannot connect to shibd process, a site adminstrator should be notified.");
119     }
120
121     m_log.debug("socket (%u) connected successfully", sock);
122     return sock;
123 }
124
125 SocketPool::~SocketPool()
126 {
127     while (!m_pool.empty()) {
128 #ifdef WIN32
129         closesocket(m_pool.top());
130 #else
131         ::close(m_pool.top());
132 #endif
133         m_pool.pop();
134     }
135 }
136
137 SocketListener::ShibSocket SocketPool::get()
138 {
139     m_lock->lock();
140     if (m_pool.empty()) {
141         m_lock->unlock();
142         return connect();
143     }
144     SocketListener::ShibSocket ret=m_pool.top();
145     m_pool.pop();
146     m_lock->unlock();
147     return ret;
148 }
149
150 void SocketPool::put(SocketListener::ShibSocket s)
151 {
152     m_lock->lock();
153     m_pool.push(s);
154     m_lock->unlock();
155 }
156
157 SocketListener::SocketListener(const DOMElement* e) : log(&Category::getInstance(SHIBSP_LOGCAT".Listener")),
158     m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_socketpool(NULL), m_socket((ShibSocket)0)
159 {
160     // Are we a client?
161     if (SPConfig::getConfig().isEnabled(SPConfig::InProcess)) {
162         m_socketpool=new SocketPool(*log,this);
163     }
164     // Are we a server?
165     if (SPConfig::getConfig().isEnabled(SPConfig::OutOfProcess)) {
166         m_child_lock = Mutex::create();
167         m_child_wait = CondWait::create();
168     }
169 }
170
171 SocketListener::~SocketListener()
172 {
173     delete m_socketpool;
174     delete m_child_wait;
175     delete m_child_lock;
176 }
177
178 bool SocketListener::run(bool* shutdown)
179 {
180 #ifdef _DEBUG
181     NDC ndc("run");
182 #endif
183     log->info("listener service starting");
184
185     // Save flag to monitor for shutdown request.
186     m_shutdown=shutdown;
187     unsigned long count = 0;
188
189     if (!create(m_socket)) {
190         log->crit("failed to create socket");
191         return false;
192     }
193     if (!bind(m_socket,true)) {
194         this->close(m_socket);
195         log->crit("failed to bind to socket.");
196         return false;
197     }
198
199     while (!*m_shutdown) {
200         fd_set readfds;
201         FD_ZERO(&readfds);
202         FD_SET(m_socket, &readfds);
203         struct timeval tv = { 0, 0 };
204         tv.tv_sec = 5;
205     
206         switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
207 #ifdef WIN32
208             case SOCKET_ERROR:
209 #else
210             case -1:
211 #endif
212                 if (errno == EINTR) continue;
213                 log_error();
214                 log->error("select() on main listener socket failed");
215                 return false;
216         
217             case 0:
218                 continue;
219         
220             default:
221             {
222                 // Accept the connection.
223                 SocketListener::ShibSocket newsock;
224                 if (!accept(m_socket, newsock))
225                     log->crit("failed to accept incoming socket connection");
226
227                 // We throw away the result because the children manage themselves...
228                 try {
229                     new ServerThread(newsock,this,++count);
230                 }
231                 catch (...) {
232                     log->crit("error starting new server thread to service incoming request");
233                 }
234             }
235         }
236     }
237     log->info("listener service shutting down");
238
239     // Wait for all children to exit.
240     m_child_lock->lock();
241     while (!m_children.empty())
242         m_child_wait->wait(m_child_lock);
243     m_child_lock->unlock();
244
245     this->close(m_socket);
246     m_socket=(ShibSocket)0;
247     return true;
248 }
249
250 DDF SocketListener::send(const DDF& in)
251 {
252 #ifdef _DEBUG
253     NDC ndc("send");
254 #endif
255
256     log->debug("sending message (%s)", in.name() ? in.name() : "unnamed");
257
258     // Serialize data for transmission.
259     ostringstream os;
260     os << in;
261     string ostr(os.str());
262
263     // Loop on the RPC in case we lost contact the first time through
264 #ifdef WIN32
265     u_long len;
266 #else
267     uint32_t len;
268 #endif
269     int retry = 1;
270     SocketListener::ShibSocket sock;
271     while (retry >= 0) {
272         sock = m_socketpool->get();
273         
274         int outlen = ostr.length();
275         len = htonl(outlen);
276         if (send(sock,(char*)&len,sizeof(len)) != sizeof(len) || send(sock,ostr.c_str(),outlen) != outlen) {
277             log_error();
278             this->close(sock);
279             if (retry)
280                 retry--;
281             else
282                 throw ListenerException("Failure sending remoted message ($1).", params(1,in.name()));
283         }
284         else {
285             // SUCCESS.
286             retry = -1;
287         }
288     }
289
290     log->debug("send completed, reading response message");
291
292     // Read the message.
293     if (recv(sock,(char*)&len,sizeof(len)) != sizeof(len)) {
294         log->error("error reading size of output message");
295         this->close(sock);
296         throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
297     }
298     len = ntohl(len);
299     
300     char buf[16384];
301     int size_read;
302     stringstream is;
303     while (len && (size_read = recv(sock, buf, sizeof(buf))) > 0) {
304         is.write(buf, size_read);
305         len -= size_read;
306     }
307     
308     if (len) {
309         log->error("error reading output message from socket");
310         this->close(sock);
311         throw ListenerException("Failure receiving response to remoted message ($1).", params(1,in.name()));
312     }
313     
314     m_socketpool->put(sock);
315
316     // Unmarshall data.
317     DDF out;
318     is >> out;
319     
320     // Check for exception to unmarshall and throw, otherwise return.
321     if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
322         // Reconstitute exception object.
323         DDFJanitor jout(out);
324         XMLToolingException* except=NULL;
325         try { 
326             except=XMLToolingException::fromString(out.string());
327             log->error("remoted message returned an error: %s", except->what());
328         }
329         catch (XMLToolingException& e) {
330             log->error("caught XMLToolingException while building the XMLToolingException: %s", e.what());
331             log->error("XML was: %s", out.string());
332             throw ListenerException("Remote call failed with an unparsable exception.");
333         }
334
335         auto_ptr<XMLToolingException> wrapper(except);
336         wrapper->raise();
337     }
338
339     return out;
340 }
341
342 bool SocketListener::log_error() const
343 {
344 #ifdef WIN32
345     int rc=WSAGetLastError();
346 #else
347     int rc=errno;
348 #endif
349 #ifdef HAVE_STRERROR_R
350     char buf[256];
351     memset(buf,0,sizeof(buf));
352     strerror_r(rc,buf,sizeof(buf));
353     log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
354 #else
355     const char* buf=strerror(rc);
356     log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
357 #endif
358     return false;
359 }
360
361 // actual function run in listener on server threads
362 void* server_thread_fn(void* arg)
363 {
364     ServerThread* child = (ServerThread*)arg;
365
366 #ifndef WIN32
367     // First, let's block all signals
368     Thread::mask_all_signals();
369 #endif
370
371     // Run the child until it exits.
372     child->run();
373
374     // Now we can clean up and exit the thread.
375     delete child;
376     return NULL;
377 }
378
379 ServerThread::ServerThread(SocketListener::ShibSocket& s, SocketListener* listener, unsigned long id)
380     : m_sock(s), m_child(NULL), m_listener(listener)
381 {
382
383     ostringstream buf;
384     buf << "[" << id << "]";
385     m_id = buf.str();
386
387     // Create the child thread
388     m_child = Thread::create(server_thread_fn, (void*)this);
389     m_child->detach();
390 }
391
392 ServerThread::~ServerThread()
393 {
394     // Then lock the children map, remove this socket/thread, signal waiters, and return
395     m_listener->m_child_lock->lock();
396     m_listener->m_children.erase(m_sock);
397     m_listener->m_child_lock->unlock();
398     m_listener->m_child_wait->signal();
399   
400     delete m_child;
401 }
402
403 void ServerThread::run()
404 {
405     NDC ndc(m_id);
406
407     // Before starting up, make sure we fully "own" this socket.
408     m_listener->m_child_lock->lock();
409     while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end())
410         m_listener->m_child_wait->wait(m_listener->m_child_lock);
411     m_listener->m_children[m_sock] = m_child;
412     m_listener->m_child_lock->unlock();
413     
414     fd_set readfds;
415     struct timeval tv = { 0, 0 };
416
417     while(!*(m_listener->m_shutdown)) {
418         FD_ZERO(&readfds);
419         FD_SET(m_sock, &readfds);
420         tv.tv_sec = 1;
421
422         switch (select(m_sock+1, &readfds, 0, 0, &tv)) {
423 #ifdef WIN32
424         case SOCKET_ERROR:
425 #else
426         case -1:
427 #endif
428             if (errno == EINTR) continue;
429             m_listener->log_error();
430             m_listener->log->error("select() on incoming request socket (%u) returned error", m_sock);
431             return;
432
433         case 0:
434             break;
435
436         default:
437             if (!job()) {
438                 m_listener->log_error();
439                 m_listener->log->error("I/O failure processing request on socket (%u)", m_sock);
440                 m_listener->close(m_sock);
441                 return;
442             }
443         }
444     }
445 }
446
447 bool ServerThread::job()
448 {
449     Category& log = Category::getInstance("shibd.Listener");
450
451     bool incomingError = true;  // set false once incoming message is received
452     ostringstream sink;
453 #ifdef WIN32
454     u_long len;
455 #else
456     uint32_t len;
457 #endif
458
459     try {
460         // Read the message.
461         if (m_listener->recv(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
462             log.error("error reading size of input message");
463             return false;
464         }
465         len = ntohl(len);
466         
467         int size_read;
468         stringstream is;
469         while (len && (size_read = m_listener->recv(m_sock, m_buf, sizeof(m_buf))) > 0) {
470             is.write(m_buf, size_read);
471             len -= size_read;
472         }
473         
474         if (len) {
475             log.error("error reading input message from socket");
476             return false;
477         }
478         
479         // Unmarshall the message.
480         DDF in;
481         DDFJanitor jin(in);
482         is >> in;
483
484         log.debug("dispatching message (%s)", in.name() ? in.name() : "unnamed");
485
486         incomingError = false;
487
488         // Dispatch the message.
489         m_listener->receive(in, sink);
490     }
491     catch (XMLToolingException& e) {
492         if (incomingError)
493             log.error("error processing incoming message: %s", e.what());
494         DDF out=DDF("exception").string(e.toString().c_str());
495         DDFJanitor jout(out);
496         sink << out;
497     }
498     catch (exception& e) {
499         if (incomingError)
500             log.error("error processing incoming message: %s", e.what());
501         ListenerException ex(e.what());
502         DDF out=DDF("exception").string(ex.toString().c_str());
503         DDFJanitor jout(out);
504         sink << out;
505     }
506 #ifndef _DEBUG
507     catch (...) {
508         if (incomingError)
509             log.error("unexpected error processing incoming message");
510         ListenerException ex("An unexpected error occurred while processing an incoming message.");
511         DDF out=DDF("exception").string(ex.toString().c_str());
512         DDFJanitor jout(out);
513         sink << out;
514     }
515 #endif
516     
517     // Return whatever's available.
518     string response(sink.str());
519     int outlen = response.length();
520     len = htonl(outlen);
521     if (m_listener->send(m_sock,(char*)&len,sizeof(len)) != sizeof(len)) {
522         log.error("error sending output message size");
523         return false;
524     }
525     if (m_listener->send(m_sock,response.c_str(),outlen) != outlen) {
526         log.error("error sending output message");
527         return false;
528     }
529     
530     return true;
531 }