2 * Copyright 2001-2005 Internet2
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 * RPCListener.cpp -- Handles marshalling and connection mgmt for ONC-remoted IListeners
25 #include "RPCListener.h"
27 // Deal with inadequate Sun RPC libraries
29 #if !HAVE_DECL_SVCFD_CREATE
30 extern "C" SVCXPRT* svcfd_create(int, u_int, u_int);
33 #ifndef HAVE_WORKING_SVC_DESTROY
34 struct tcp_conn { /* kept in xprt->xp_p1 */
35 enum xprt_stat strm_stat;
38 char verf_body[MAX_AUTH_BYTES];
42 extern "C" void shibrpc_prog_3(struct svc_req* rqstp, register SVCXPRT* transp);
52 using namespace log4cpp;
54 using namespace shibboleth;
55 using namespace shibtarget;
57 namespace shibtarget {
58 // Wraps the actual RPC connection
62 RPCHandle(Category& log);
65 CLIENT* connect(const RPCListener* listener); // connects and returns the CLIENT handle
66 void disconnect(const RPCListener* listener=NULL); // disconnects, should not return disconnected handles to pool!
71 RPCListener::ShibSocket m_sock;
74 // Manages the pool of connections
78 RPCHandlePool(Category& log, const RPCListener* listener)
79 : m_log(log), m_listener(listener), m_lock(shibboleth::Mutex::create()) {}
85 const RPCListener* m_listener;
87 auto_ptr<Mutex> m_lock;
88 stack<RPCHandle*> m_pool;
91 // Cleans up after use
95 RPC(RPCHandlePool& pool);
96 ~RPC() {delete m_handle;}
97 RPCHandle* operator->() {return m_handle;}
98 void pool() {if (m_handle) m_pool.put(m_handle); m_handle=NULL;}
102 RPCHandlePool& m_pool;
105 // Worker threads in server
108 ServerThread(RPCListener::ShibSocket& s, RPCListener* listener);
114 RPCListener::ShibSocket m_sock;
116 RPCListener* m_listener;
121 RPCListener::RPCListener(const DOMElement* e) : log(&Category::getInstance(SHIBT_LOGCAT".Listener")),
122 m_shutdown(NULL), m_child_lock(NULL), m_child_wait(NULL), m_rpcpool(NULL), m_socket((ShibSocket)0)
125 if (ShibTargetConfig::getConfig().isEnabled(ShibTargetConfig::InProcess)) {
126 m_rpcpool=new RPCHandlePool(*log,this);
129 if (ShibTargetConfig::getConfig().isEnabled(ShibTargetConfig::OutOfProcess)) {
130 m_child_lock = Mutex::create();
131 m_child_wait = CondWait::create();
135 RPCListener::~RPCListener()
142 bool RPCListener::run(bool* shutdown)
145 saml::NDC ndc("run");
148 // Save flag to monitor for shutdown request.
151 if (!create(m_socket)) {
152 log->crit("failed to create socket");
155 if (!bind(m_socket,true)) {
156 this->close(m_socket);
157 log->crit("failed to bind to socket.");
161 while (!*m_shutdown) {
164 FD_SET(m_socket, &readfds);
165 struct timeval tv = { 0, 0 };
168 switch (select(m_socket + 1, &readfds, 0, 0, &tv)) {
174 if (errno == EINTR) continue;
176 log->error("select() on main listener socket failed");
184 // Accept the connection.
185 RPCListener::ShibSocket newsock;
186 if (!accept(m_socket, newsock))
187 log->crit("failed to accept incoming socket connection");
189 // We throw away the result because the children manage themselves...
191 new ServerThread(newsock,this);
194 log->crit("error starting new server thread to service incoming request");
199 log->info("listener service shutting down");
201 // Wait for all children to exit.
202 m_child_lock->lock();
203 while (!m_children.empty())
204 m_child_wait->wait(m_child_lock);
205 m_child_lock->unlock();
207 this->close(m_socket);
208 m_socket=(ShibSocket)0;
212 DDF RPCListener::send(const DDF& in)
215 saml::NDC ndc("send");
218 // Serialize data for transmission.
222 string ostr(os.str());
223 arg.xml = const_cast<char*>(ostr.c_str());
225 log->debug("sending message: %s", in.name());
228 memset(&ret, 0, sizeof(ret));
230 // Loop on the RPC in case we lost contact the first time through
235 clnt = rpc->connect(this);
236 clnt_stat status = shibrpc_call_3(&arg, &ret, clnt);
237 if (status != RPC_SUCCESS) {
238 // FAILED. Release, disconnect, and retry
239 log->error("RPC Failure: (CLIENT: %p) (%d): %s", clnt, status, clnt_spcreateerror("shibrpc_call_3"));
240 rpc->disconnect(this);
244 throw ListenerException("Failure sending remoted message ($1).",params(1,in.name()));
247 // SUCCESS. Pool and continue
252 log->debug("call completed, unmarshalling response message");
257 istringstream is(ret.xml);
259 clnt_freeres(clnt, (xdrproc_t)xdr_shibrpc_ret_3, (caddr_t)&ret);
263 log->error("caught exception while unmarshalling response message");
264 clnt_freeres(clnt, (xdrproc_t)xdr_shibrpc_ret_3, (caddr_t)&ret);
269 // Check for exception to unmarshall and throw, otherwise return.
270 if (out.isstring() && out.name() && !strcmp(out.name(),"exception")) {
271 // Reconstitute exception object.
272 DDFJanitor jout(out);
273 SAMLException* except=NULL;
275 istringstream es(out.string());
276 except=SAMLException::getInstance(es);
278 catch (SAMLException& e) {
279 log->error("caught SAML Exception while building the SAMLException: %s", e.what());
280 log->error("XML was: %s", out.string());
281 throw ListenerException("Remote call failed with an unparsable exception.");
285 log->error("caught unknown exception building SAMLException");
286 log->error("XML was: %s", out.string());
290 auto_ptr<SAMLException> wrapper(except);
297 bool RPCListener::log_error() const
300 int rc=WSAGetLastError();
304 #ifdef HAVE_STRERROR_R
306 memset(buf,0,sizeof(buf));
307 strerror_r(rc,buf,sizeof(buf));
308 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
310 const char* buf=strerror(rc);
311 log->error("socket call resulted in error (%d): %s",rc,isprint(*buf) ? buf : "no message");
316 RPCHandle::RPCHandle(Category& log) : m_clnt(NULL), m_sock((RPCListener::ShibSocket)0), m_log(log)
318 m_log.debug("new RPCHandle created: %p", this);
321 RPCHandle::~RPCHandle()
323 m_log.debug("destroying RPC Handle: %p", this);
327 void RPCHandle::disconnect(const RPCListener* listener)
330 clnt_destroy(m_clnt);
333 listener->close(m_sock);
334 m_sock=(RPCListener::ShibSocket)0;
338 ::closesocket(m_sock);
342 m_sock=(RPCListener::ShibSocket)0;
347 CLIENT* RPCHandle::connect(const RPCListener* listener)
350 saml::NDC ndc("connect");
353 m_log.debug("returning existing connection: %p -> %p", this, m_clnt);
357 m_log.debug("trying to connect to socket");
359 RPCListener::ShibSocket sock;
360 if (!listener->create(sock)) {
361 m_log.error("cannot create socket");
362 throw ListenerException("Cannot create socket");
365 bool connected = false;
368 for (int i = num_tries-1; i >= 0; i--) {
369 if (listener->connect(sock)) {
374 m_log.warn("cannot connect %p to socket...%s", this, (i > 0 ? "retrying" : ""));
378 Sleep(2000*(num_tries-i));
380 sleep(2*(num_tries-i));
386 m_log.crit("socket server unavailable, failing");
387 listener->close(sock);
388 throw ListenerException("Cannot connect to listener process, a site adminstrator should be notified.");
391 CLIENT* clnt = (CLIENT*)listener->getClientHandle(sock, SHIBRPC_PROG, SHIBRPC_VERS_3);
393 const char* rpcerror = clnt_spcreateerror("RPCHandle::connect");
394 m_log.crit("RPC failed for %p: %s", this, rpcerror);
395 listener->close(sock);
396 throw ListenerException(rpcerror);
399 // Set the RPC timeout to a fairly high value...
401 tv.tv_sec = 300; /* change timeout to 5 minutes */
402 tv.tv_usec = 0; /* this should always be set */
403 clnt_control(clnt, CLSET_TIMEOUT, (char*)&tv);
408 m_log.debug("success: %p -> %p", this, m_clnt);
412 RPCHandlePool::~RPCHandlePool()
414 while (!m_pool.empty()) {
420 RPCHandle* RPCHandlePool::get()
423 if (m_pool.empty()) {
425 return new RPCHandle(m_log);
427 RPCHandle* ret=m_pool.top();
433 void RPCHandlePool::put(RPCHandle* handle)
440 RPC::RPC(RPCHandlePool& pool) : m_pool(pool)
442 m_handle=m_pool.get();
445 // actual function run in listener on server threads
446 void* server_thread_fn(void* arg)
448 ServerThread* child = (ServerThread*)arg;
450 // First, let's block all signals
451 Thread::mask_all_signals();
453 // Run the child until it exits.
456 // Now we can clean up and exit the thread.
461 ServerThread::ServerThread(RPCListener::ShibSocket& s, RPCListener* listener)
462 : m_sock(s), m_child(NULL), m_listener(listener)
464 // Create the child thread
465 m_child = Thread::create(server_thread_fn, (void*)this);
469 ServerThread::~ServerThread()
471 // Then lock the children map, remove this socket/thread, signal waiters, and return
472 m_listener->m_child_lock->lock();
473 m_listener->m_children.erase(m_sock);
474 m_listener->m_child_lock->unlock();
475 m_listener->m_child_wait->signal();
480 void ServerThread::run()
482 // Before starting up, make sure we fully "own" this socket.
483 m_listener->m_child_lock->lock();
484 while (m_listener->m_children.find(m_sock)!=m_listener->m_children.end())
485 m_listener->m_child_wait->wait(m_listener->m_child_lock);
486 m_listener->m_children[m_sock] = m_child;
487 m_listener->m_child_lock->unlock();
493 struct timeval tv = { 0, 0 };
495 while(!*(m_listener->m_shutdown) && FD_ISSET(m_sock, &svc_fdset)) {
497 FD_SET(m_sock, &readfds);
500 switch (select(m_sock+1, &readfds, 0, 0, &tv)) {
506 if (errno == EINTR) continue;
507 m_listener->log_error();
508 m_listener->log->error("select() on incoming request socket (%u) returned error", m_sock);
515 svc_getreqset(&readfds);
520 bool ServerThread::svc_create()
522 /* Wrap an RPC Service around the new connection socket. */
523 SVCXPRT* transp = svcfd_create(m_sock, 0, 0);
526 NDC ndc("svc_create");
528 m_listener->log->error("failed to wrap RPC service around socket");
532 /* Register the SHIBRPC RPC Program */
533 if (!svc_register (transp, SHIBRPC_PROG, SHIBRPC_VERS_3, shibrpc_prog_3, 0)) {
534 #ifdef HAVE_WORKING_SVC_DESTROY
537 /* we have to inline svc_destroy because we can't pass in the xprt variable */
538 struct tcp_conn *cd = (struct tcp_conn *)transp->xp_p1;
539 xprt_unregister(transp);
540 close(transp->xp_sock);
541 if (transp->xp_port != 0) {
542 /* a rendezvouser socket */
546 /* an actual connection socket */
547 XDR_DESTROY(&(cd->xdrs));
549 mem_free((caddr_t)cd, sizeof(struct tcp_conn));
550 mem_free((caddr_t)transp, sizeof(SVCXPRT));
553 NDC ndc("svc_create");
555 m_listener->log->error("failed to register RPC program");
562 static string get_threadid()
564 static u_long counter = 0;
566 buf << "[" << counter++ << "]";
570 extern "C" bool_t shibrpc_call_3_svc(
571 shibrpc_args_3 *argp,
572 shibrpc_ret_3 *result,
573 struct svc_req *rqstp
576 string ctx=get_threadid();
578 Category& log = Category::getInstance("shibd.Listener");
580 if (!argp || !result) {
581 log.error("RPC Argument Error");
585 memset(result, 0, sizeof (*result));
588 DDFJanitor jout(out);
591 // Lock the configuration.
592 IConfig* conf=ShibTargetConfig::getConfig().getINI();
595 // Get listener interface.
596 IListener* listener=conf->getListener();
598 throw ListenerException("No listener implementation found to process incoming message.");
600 // Unmarshal the message.
603 istringstream is(argp->xml);
606 // Dispatch the message.
607 out=listener->receive(in);
609 catch (SAMLException &e) {
610 log.error("error processing incoming message: %s", e.what());
613 out=DDF("exception").string(os.str().c_str());
617 log.error("unexpected error processing incoming message");
618 ListenerException ex("An unexpected error occurred while processing an incoming message.");
621 out=DDF("exception").string(os.str().c_str());
625 // Return whatever's available.
626 ostringstream xmlout;
628 result->xml=strdup(xmlout.str().c_str());
633 shibrpc_prog_3_freeresult (SVCXPRT *transp, xdrproc_t xdr_result, caddr_t result)
635 xdr_free (xdr_result, result);
638 * Insert additional freeing code here, if needed