From b8efc0d782acc643583b6b442d33a793bae2a7ed Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Thu, 26 Apr 2018 22:09:24 -0400 Subject: [PATCH] Add some comments, a bit of code clean up --- tr/tr_trp.c | 159 ++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 100 insertions(+), 59 deletions(-) diff --git a/tr/tr_trp.c b/tr/tr_trp.c index ad1084a..f6e60c5 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -184,19 +184,18 @@ static void tr_trps_event_cb(int listener, short event, void *arg) tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event); } else { /* create a thread to handle this connection */ - if (asprintf(&name, "trustrouter@%s", trps->hostname)==-1) { + name = talloc_asprintf(tmp_ctx, "trustrouter@%s", trps->hostname); + if (name == NULL) goto cleanup; - } - gssname=tr_new_name(name); - free(name); name=NULL; + gssname=tr_new_name(name); /* name cleaned up with tmp_ctx */ + conn=trp_connection_accept(tmp_ctx, listener, gssname); if (conn!=NULL) { /* need to monitor this fd and trigger events when read becomes possible */ thread_data=talloc(conn, struct trps_thread_data); if (thread_data==NULL) { tr_err("tr_trps_event_cb: unable to allocate trps_thread_data"); - talloc_free(tmp_ctx); - return; + goto cleanup; } thread_data->conn=conn; thread_data->trps=trps; @@ -324,7 +323,8 @@ static void tr_trps_process_mq(int socket, short event, void *arg) free(tmp); } else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) { - /* trpc connection died */ + /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol. + * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */ TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE); TR_NAME *svcname=trpc_get_gssname(trpc); peer=trps_get_peer_by_servicename(trps, svcname); @@ -593,6 +593,49 @@ struct trpc_thread_data { TRPC_INSTANCE *trpc; TRPS_INSTANCE *trps; }; + +/** + * Returns when a message is ready + */ +static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb) +{ + cb->msg_ready = 0; + do + pthread_cond_wait(&(cb->cond), &(cb->mutex)); + while (!cb->msg_ready); +} + +/** + * Thread for handling TRPC (outgoing) connections + * + * Locking protocol: + * + * This thread uses a mutex and condition variable to wait for incoming messages + * on its queue. The main thread interacts with these through the tr_trpc_mq_cb() + * callback. This callback is called by the TR_MQ whenever the queue goes from empty + * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag + * signals the condition variable, then unlocks the mutex. + * + * This thread waits for the condition variable to be signaled, then checks that the + * msg_ready flag has actually been set (pthreads does not guarantee against false + * wakes). If so, it holds the mutex lock while reading any messages in its queue and + * sending responses over the GSS connection. If it receives an ABORT message from + * the main thread (not currently used), or if sending the GSS message fails (which + * usually indicates that the connection has been lost), the thread begins the shutdown + * process. It still holds the mutex lock at the start of this process. + * + * This begins by setting trpc->shutting_down = 1, then sending a message to the + * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex + * and waits for more messages, again using the mutex and the condition variable. It + * ignores any except for an EXIT_OK message. When that is received, it exits, terminating + * the thread. + * + * When the main thread receives the DISCONNECTED message, it must stop sending messages + * to this thread's message queue. When it is certain that it will not queue any more messages, + * it sends the EXIT_OK message as its last interaction with this thread. Once that message + * has been queued, queueing additional messages may result in deadlock or segfaults. + * (It is ok if there are other messages in the queue, but it must not queue more.) + */ static void *tr_trpc_thread(void *arg) { TALLOC_CTX *tmp_ctx=talloc_new(NULL); @@ -646,80 +689,78 @@ static void *tr_trpc_thread(void *arg) msg=NULL; while(!exit_loop) { - cb_data.msg_ready=0; - pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex)); - /* verify the condition - remember, we have the mutex! */ - if (cb_data.msg_ready) { - n_popped = 0; /* have not popped any messages from the queue */ - n_sent = 0; /* have not sent any messages yet */ - for (msg = trpc_mq_pop(trpc); - msg != NULL; - msg = trpc_mq_pop(trpc)) { - n_popped++; - msg_type = tr_mq_msg_get_message(msg); - if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) { - exit_loop = 1; - break; - } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { - encoded_msg = tr_mq_msg_get_payload(msg); - if (encoded_msg == NULL) - tr_notice("tr_trpc_thread: null outgoing TRP message."); - else { - rc = trpc_send_msg(trpc, encoded_msg); - if (rc == TRP_SUCCESS) { - n_sent++; - } else { - tr_notice("tr_trpc_thread: trpc_send_msg failed."); - /* Assume this means we lost the connection. */ - exit_loop = 1; - break; - } + trpc_thread_wait_for_message(&cb_data); /* handles locking */ + + n_popped = 0; /* have not popped any messages from the queue */ + n_sent = 0; /* have not sent any messages yet */ + for (msg = trpc_mq_pop(trpc); + msg != NULL; + msg = trpc_mq_pop(trpc)) { + n_popped++; + msg_type = tr_mq_msg_get_message(msg); + if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) { + exit_loop = 1; + break; + } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { + encoded_msg = tr_mq_msg_get_payload(msg); + if (encoded_msg == NULL) + tr_notice("tr_trpc_thread: null outgoing TRP message."); + else { + rc = trpc_send_msg(trpc, encoded_msg); + if (rc == TRP_SUCCESS) { + n_sent++; + } else { + tr_notice("tr_trpc_thread: trpc_send_msg failed."); + /* Assume this means we lost the connection. */ + exit_loop = 1; + break; } - } else - tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); - - tr_mq_msg_free(msg); - } + } + } else + tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); - /* if n_popped == 0, then n_sent must be zero (it's only set after at - * least one msg is popped) */ - if (n_popped==0) - tr_err("tr_trpc_thread: notified of message, but queue empty"); - else - tr_debug("tr_trpc_thread: sent %d messages.", n_sent); + tr_mq_msg_free(msg); } + + /* if n_popped == 0, then n_sent must be zero (it's only set after at + * least one msg is popped) */ + if (n_popped==0) + tr_err("tr_trpc_thread: notified of message, but queue empty"); + else + tr_debug("tr_trpc_thread: sent %d messages.", n_sent); } } tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread."); trpc->shutting_down = 1; - // trpc_mq_clear(trpc); /* clear any queued messages */ - - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH); + /* Send a DISCONNECTED message to the main thread */ + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL); tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ if (msg==NULL) { /* can't notify main thread of exit - just do it and hope for the best */ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); } else { trps_mq_add(trps, msg); - /* now wait for an acknowledgement */ + + /* DISCONNECTED sent, now wait for an acknowledgement before exiting */ exit_loop = 0; while (!exit_loop) { - cb_data.msg_ready = 0; - pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex)); - /* verify the condition - remember, we have the mutex! */ - if (cb_data.msg_ready) { - while (NULL != (msg = trpc_mq_pop(trpc))) { - msg_type = tr_mq_msg_get_message(msg); - /* ignore anything except an exit ack */ - if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) - exit_loop = 1; + trpc_thread_wait_for_message(&cb_data); + + while (NULL != (msg = trpc_mq_pop(trpc))) { + msg_type = tr_mq_msg_get_message(msg); + + /* ignore anything except an EXIT_OK */ + if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) { + exit_loop = 1; + break; /* skip any further messages */ } } } } + tr_debug("tr_trpc_thread: thread terminating."); talloc_free(tmp_ctx); return NULL; } -- 2.1.4