tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
} else {
/* create a thread to handle this connection */
- if (asprintf(&name, "trustrouter@%s", trps->hostname)==-1) {
+ name = talloc_asprintf(tmp_ctx, "trustrouter@%s", trps->hostname);
+ if (name == NULL)
goto cleanup;
- }
- gssname=tr_new_name(name);
- free(name); name=NULL;
+ gssname=tr_new_name(name); /* name cleaned up with tmp_ctx */
+
conn=trp_connection_accept(tmp_ctx, listener, gssname);
if (conn!=NULL) {
/* need to monitor this fd and trigger events when read becomes possible */
thread_data=talloc(conn, struct trps_thread_data);
if (thread_data==NULL) {
tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
- talloc_free(tmp_ctx);
- return;
+ goto cleanup;
}
thread_data->conn=conn;
thread_data->trps=trps;
free(tmp);
}
else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
- /* trpc connection died */
+ /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol.
+ * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */
TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
TR_NAME *svcname=trpc_get_gssname(trpc);
peer=trps_get_peer_by_servicename(trps, svcname);
TRPC_INSTANCE *trpc;
TRPS_INSTANCE *trps;
};
+
+/**
+ * Returns when a message is ready
+ */
+static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb)
+{
+ cb->msg_ready = 0;
+ do
+ pthread_cond_wait(&(cb->cond), &(cb->mutex));
+ while (!cb->msg_ready);
+}
+
+/**
+ * Thread for handling TRPC (outgoing) connections
+ *
+ * Locking protocol:
+ *
+ * This thread uses a mutex and condition variable to wait for incoming messages
+ * on its queue. The main thread interacts with these through the tr_trpc_mq_cb()
+ * callback. This callback is called by the TR_MQ whenever the queue goes from empty
+ * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag
+ * signals the condition variable, then unlocks the mutex.
+ *
+ * This thread waits for the condition variable to be signaled, then checks that the
+ * msg_ready flag has actually been set (pthreads does not guarantee against false
+ * wakes). If so, it holds the mutex lock while reading any messages in its queue and
+ * sending responses over the GSS connection. If it receives an ABORT message from
+ * the main thread (not currently used), or if sending the GSS message fails (which
+ * usually indicates that the connection has been lost), the thread begins the shutdown
+ * process. It still holds the mutex lock at the start of this process.
+ *
+ * This begins by setting trpc->shutting_down = 1, then sending a message to the
+ * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex
+ * and waits for more messages, again using the mutex and the condition variable. It
+ * ignores any except for an EXIT_OK message. When that is received, it exits, terminating
+ * the thread.
+ *
+ * When the main thread receives the DISCONNECTED message, it must stop sending messages
+ * to this thread's message queue. When it is certain that it will not queue any more messages,
+ * it sends the EXIT_OK message as its last interaction with this thread. Once that message
+ * has been queued, queueing additional messages may result in deadlock or segfaults.
+ * (It is ok if there are other messages in the queue, but it must not queue more.)
+ */
static void *tr_trpc_thread(void *arg)
{
TALLOC_CTX *tmp_ctx=talloc_new(NULL);
msg=NULL;
while(!exit_loop) {
- cb_data.msg_ready=0;
- pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
- /* verify the condition - remember, we have the mutex! */
- if (cb_data.msg_ready) {
- n_popped = 0; /* have not popped any messages from the queue */
- n_sent = 0; /* have not sent any messages yet */
- for (msg = trpc_mq_pop(trpc);
- msg != NULL;
- msg = trpc_mq_pop(trpc)) {
- n_popped++;
- msg_type = tr_mq_msg_get_message(msg);
- if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
- exit_loop = 1;
- break;
- } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
- encoded_msg = tr_mq_msg_get_payload(msg);
- if (encoded_msg == NULL)
- tr_notice("tr_trpc_thread: null outgoing TRP message.");
- else {
- rc = trpc_send_msg(trpc, encoded_msg);
- if (rc == TRP_SUCCESS) {
- n_sent++;
- } else {
- tr_notice("tr_trpc_thread: trpc_send_msg failed.");
- /* Assume this means we lost the connection. */
- exit_loop = 1;
- break;
- }
+ trpc_thread_wait_for_message(&cb_data); /* handles locking */
+
+ n_popped = 0; /* have not popped any messages from the queue */
+ n_sent = 0; /* have not sent any messages yet */
+ for (msg = trpc_mq_pop(trpc);
+ msg != NULL;
+ msg = trpc_mq_pop(trpc)) {
+ n_popped++;
+ msg_type = tr_mq_msg_get_message(msg);
+ if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+ exit_loop = 1;
+ break;
+ } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+ encoded_msg = tr_mq_msg_get_payload(msg);
+ if (encoded_msg == NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc == TRP_SUCCESS) {
+ n_sent++;
+ } else {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ /* Assume this means we lost the connection. */
+ exit_loop = 1;
+ break;
}
- } else
- tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
-
- tr_mq_msg_free(msg);
- }
+ }
+ } else
+ tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
- /* if n_popped == 0, then n_sent must be zero (it's only set after at
- * least one msg is popped) */
- if (n_popped==0)
- tr_err("tr_trpc_thread: notified of message, but queue empty");
- else
- tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
+ tr_mq_msg_free(msg);
}
+
+ /* if n_popped == 0, then n_sent must be zero (it's only set after at
+ * least one msg is popped) */
+ if (n_popped==0)
+ tr_err("tr_trpc_thread: notified of message, but queue empty");
+ else
+ tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
}
}
tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread.");
trpc->shutting_down = 1;
- // trpc_mq_clear(trpc); /* clear any queued messages */
-
- msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH);
+ /* Send a DISCONNECTED message to the main thread */
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL);
tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
if (msg==NULL) {
/* can't notify main thread of exit - just do it and hope for the best */
tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
} else {
trps_mq_add(trps, msg);
- /* now wait for an acknowledgement */
+
+ /* DISCONNECTED sent, now wait for an acknowledgement before exiting */
exit_loop = 0;
while (!exit_loop) {
- cb_data.msg_ready = 0;
- pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
- /* verify the condition - remember, we have the mutex! */
- if (cb_data.msg_ready) {
- while (NULL != (msg = trpc_mq_pop(trpc))) {
- msg_type = tr_mq_msg_get_message(msg);
- /* ignore anything except an exit ack */
- if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK))
- exit_loop = 1;
+ trpc_thread_wait_for_message(&cb_data);
+
+ while (NULL != (msg = trpc_mq_pop(trpc))) {
+ msg_type = tr_mq_msg_get_message(msg);
+
+ /* ignore anything except an EXIT_OK */
+ if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) {
+ exit_loop = 1;
+ break; /* skip any further messages */
}
}
}
}
+ tr_debug("tr_trpc_thread: thread terminating.");
talloc_free(tmp_ctx);
return NULL;
}