From d3d9b0eb4b1718cad404236d589fff2f16de2c37 Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Tue, 1 May 2018 13:49:24 -0400 Subject: [PATCH] Eliminate deadlock in trpc message handling * Remove notify_cb for the trpc thread's TR_MQ * Use trpc_mq_pop() directly in the tr_trpc_thread() instead of trying to empty the queue every time * Eliminate the complicated thread shutdown protocol needed to avoid invalid accesses to data allocated in tr_trpc_thread() This eliminates a deadlock that was possible due to misuse of the callback mutex in conjunction with the TR_MQ mutex. --- common/tr_mq.c | 8 ++- include/tr_trp.h | 1 - include/trp_internal.h | 2 +- tr/tr_trp.c | 161 +++++++++++-------------------------------------- trp/trpc.c | 4 +- 5 files changed, 44 insertions(+), 132 deletions(-) diff --git a/common/tr_mq.c b/common/tr_mq.c index 07080b6..6cf309f 100644 --- a/common/tr_mq.c +++ b/common/tr_mq.c @@ -289,13 +289,17 @@ int tr_mq_pop_timeout(time_t seconds, struct timespec *ts) return 0; } -/* Caller must free msg via tr_mq_msg_free, waiting until absolute +/* Retrieves a message from the queue, waiting until absolute * time ts_abort before giving up (using CLOCK_MONOTONIC). If ts_abort * has passed, returns an existing message but will not wait if one is * not already available. If ts_abort is null, no blocking. Not * guaranteed to wait if an error occurs - immediately returns without * a message. Use tr_mq_pop_timeout() to get an absolute time that - * is guaranteed compatible with this function. */ + * is guaranteed compatible with this function. + * + * Caller should free msg via tr_mq_msg_free when done with it. It stays + * in the TR_MQ's context, though, so use talloc_steal() if you want to do + * something clever with it. */ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, struct timespec *ts_abort) { TR_MQ_MSG *popped=NULL; diff --git a/include/tr_trp.h b/include/tr_trp.h index 42b586e..dea20e4 100644 --- a/include/tr_trp.h +++ b/include/tr_trp.h @@ -68,7 +68,6 @@ struct tr_instance { /* messages between threads */ #define TR_MQMSG_MSG_RECEIVED "msg received" #define TR_MQMSG_TRPC_DISCONNECTED "trpc disconnected" -#define TR_MQMSG_TRPC_EXIT_OK "trpc exit ok" #define TR_MQMSG_TRPC_CONNECTED "trpc connected" #define TR_MQMSG_TRPS_DISCONNECTED "trps disconnected" #define TR_MQMSG_TRPS_CONNECTED "trps connected" diff --git a/include/trp_internal.h b/include/trp_internal.h index 3fa4034..a52edb8 100644 --- a/include/trp_internal.h +++ b/include/trp_internal.h @@ -209,7 +209,7 @@ TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc); TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc); void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq); void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); -TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc); +TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc, struct timespec *ts_abort); void trpc_mq_clear(TRPC_INSTANCE *trpc); void trpc_master_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc); diff --git a/tr/tr_trp.c b/tr/tr_trp.c index f6e60c5..8304300 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -221,17 +221,6 @@ static void tr_trps_cleanup_conn(TRPS_INSTANCE *trps, TRP_CONNECTION *conn) static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc) { - TR_MQ_MSG *msg; - - /* tell the trpc thread to exit */ - msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK, TR_MQ_PRIO_NORMAL); - if (msg) { - tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit"); - trpc_mq_add(trpc, msg); - } else { - tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate"); - } - pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL); trps_remove_trpc(trps, trpc); trpc_free(trpc); @@ -292,7 +281,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg) tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp); else { trp_peer_set_incoming_status(peer, PEER_CONNECTED); - tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp); + tr_notice("tr_trps_process_mq: incoming connection from %s established.", tmp); } free(tmp); } @@ -306,7 +295,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg) } else { trp_peer_set_incoming_status(peer, PEER_DISCONNECTED); tr_trps_cleanup_conn(trps, conn); - tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp); + tr_notice("tr_trps_process_mq: incoming connection from %s lost.", tmp); } free(tmp); } @@ -318,13 +307,11 @@ static void tr_trps_process_mq(int socket, short event, void *arg) tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp); else { trp_peer_set_outgoing_status(peer, PEER_CONNECTED); - tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp); + tr_notice("tr_trps_process_mq: outgoing connection to %s established.", tmp); } free(tmp); } else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) { - /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol. - * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */ TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE); TR_NAME *svcname=trpc_get_gssname(trpc); peer=trps_get_peer_by_servicename(trps, svcname); @@ -333,7 +320,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg) tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp); else { trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED); - tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp); + tr_notice("tr_trps_process_mq: outgoing connection to %s lost.", tmp); tr_trps_cleanup_trpc(trps, trpc); } free(tmp); @@ -570,24 +557,6 @@ cleanup: return retval; } - -struct trpc_notify_cb_data { - int msg_ready; - pthread_cond_t cond; - pthread_mutex_t mutex; -}; - -static void tr_trpc_mq_cb(TR_MQ *mq, void *arg) -{ - struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg; - pthread_mutex_lock(&(cb_data->mutex)); - if (!cb_data->msg_ready) { - cb_data->msg_ready=1; - pthread_cond_signal(&(cb_data->cond)); - } - pthread_mutex_unlock(&(cb_data->mutex)); -} - /* data passed to thread */ struct trpc_thread_data { TRPC_INSTANCE *trpc; @@ -595,46 +564,15 @@ struct trpc_thread_data { }; /** - * Returns when a message is ready - */ -static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb) -{ - cb->msg_ready = 0; - do - pthread_cond_wait(&(cb->cond), &(cb->mutex)); - while (!cb->msg_ready); -} - -/** * Thread for handling TRPC (outgoing) connections * - * Locking protocol: - * - * This thread uses a mutex and condition variable to wait for incoming messages - * on its queue. The main thread interacts with these through the tr_trpc_mq_cb() - * callback. This callback is called by the TR_MQ whenever the queue goes from empty - * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag - * signals the condition variable, then unlocks the mutex. - * - * This thread waits for the condition variable to be signaled, then checks that the - * msg_ready flag has actually been set (pthreads does not guarantee against false - * wakes). If so, it holds the mutex lock while reading any messages in its queue and - * sending responses over the GSS connection. If it receives an ABORT message from - * the main thread (not currently used), or if sending the GSS message fails (which - * usually indicates that the connection has been lost), the thread begins the shutdown - * process. It still holds the mutex lock at the start of this process. - * - * This begins by setting trpc->shutting_down = 1, then sending a message to the - * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex - * and waits for more messages, again using the mutex and the condition variable. It - * ignores any except for an EXIT_OK message. When that is received, it exits, terminating - * the thread. - * - * When the main thread receives the DISCONNECTED message, it must stop sending messages - * to this thread's message queue. When it is certain that it will not queue any more messages, - * it sends the EXIT_OK message as its last interaction with this thread. Once that message - * has been queued, queueing additional messages may result in deadlock or segfaults. - * (It is ok if there are other messages in the queue, but it must not queue more.) + * Opens a connection to a peer. If successful, notifies the trps thread by + * posting a TR_MQMSG_TRPC_CONNECTED message to the trps message queue. + * It then waits for messages on trpc->mq. Normally these will be TR_MQMSG_TRPC_SEND + * messages, which this thread forwards to the peer. If its connection is lost or + * a TR_MQMSG_ABORT message is received on trpc->mq, the thread sends a + * TR_MQMSG_TRPC_DISCONNECTED message to the trps thread, then cleans up and + * terminates. */ static void *tr_trpc_thread(void *arg) { @@ -647,36 +585,27 @@ static void *tr_trpc_thread(void *arg) const char *msg_type=NULL; char *encoded_msg=NULL; TR_NAME *peer_gssname=NULL; - int n_sent = 0; - int n_popped = 0; + struct timespec wait_until = {0}; int exit_loop=0; - struct trpc_notify_cb_data cb_data={0, - PTHREAD_COND_INITIALIZER, - PTHREAD_MUTEX_INITIALIZER}; - tr_debug("tr_trpc_thread: started"); - /* set up the mq for receiving */ - pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */ - - tr_mq_lock(trpc->mq); - tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data); - tr_mq_unlock(trpc->mq); - + /* Try to make the outgoing connection */ rc=trpc_connect(trpc); if (rc!=TRP_SUCCESS) { tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.", trpc_get_server(trpc), trpc_get_port(trpc)); } else { + /* Retrieve the GSS name used by the peer for authentication */ peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc)); if (peer_gssname==NULL) { tr_err("tr_trpc_thread: could not duplicate peer_gssname."); talloc_free(tmp_ctx); return NULL; } - tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf); + tr_debug("tr_trpc_thread: connected to peer %.*s", + peer_gssname->len, peer_gssname->buf); msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH); tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper); @@ -688,19 +617,22 @@ static void *tr_trpc_thread(void *arg) trps_mq_add(trps, msg); /* steals msg context */ msg=NULL; + /* Loop until we get an abort message or until the connection is lost. */ while(!exit_loop) { - trpc_thread_wait_for_message(&cb_data); /* handles locking */ - - n_popped = 0; /* have not popped any messages from the queue */ - n_sent = 0; /* have not sent any messages yet */ - for (msg = trpc_mq_pop(trpc); - msg != NULL; - msg = trpc_mq_pop(trpc)) { - n_popped++; + /* Wait up to 10 minutes for a message to be queued to send to the peer. + * Log a warning if we go longer than that, but don't give up. */ + if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) { + tr_err("tr_trpc_thread: unable to set abort timeout"); + break; /* immediately exit the loop, don't go through cleanup */ + } + + /* Pop a message from the queue. */ + msg = trpc_mq_pop(trpc, &wait_until); + if (msg) { msg_type = tr_mq_msg_get_message(msg); if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) { + tr_debug("tr_trpc_thread: received abort message from main thread."); exit_loop = 1; - break; } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { encoded_msg = tr_mq_msg_get_payload(msg); if (encoded_msg == NULL) @@ -708,60 +640,37 @@ static void *tr_trpc_thread(void *arg) else { rc = trpc_send_msg(trpc, encoded_msg); if (rc == TRP_SUCCESS) { - n_sent++; + tr_debug("tr_trpc_thread: sent message."); } else { tr_notice("tr_trpc_thread: trpc_send_msg failed."); /* Assume this means we lost the connection. */ exit_loop = 1; - break; } } } else tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); tr_mq_msg_free(msg); + } else { + tr_warning("tr_trpc_thread: no outgoing messages to %.*s for 10 minutes", + peer_gssname->len, peer_gssname->buf); } - - /* if n_popped == 0, then n_sent must be zero (it's only set after at - * least one msg is popped) */ - if (n_popped==0) - tr_err("tr_trpc_thread: notified of message, but queue empty"); - else - tr_debug("tr_trpc_thread: sent %d messages.", n_sent); } } - tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread."); - trpc->shutting_down = 1; - /* Send a DISCONNECTED message to the main thread */ + tr_debug("tr_trpc_thread: notifying main thread of disconnection."); msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL); tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ if (msg==NULL) { - /* can't notify main thread of exit - just do it and hope for the best */ + /* can't notify main thread */ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); } else { trps_mq_add(trps, msg); - - /* DISCONNECTED sent, now wait for an acknowledgement before exiting */ - exit_loop = 0; - while (!exit_loop) { - trpc_thread_wait_for_message(&cb_data); - - while (NULL != (msg = trpc_mq_pop(trpc))) { - msg_type = tr_mq_msg_get_message(msg); - - /* ignore anything except an EXIT_OK */ - if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) { - exit_loop = 1; - break; /* skip any further messages */ - } - } - } } - tr_debug("tr_trpc_thread: thread terminating."); talloc_free(tmp_ctx); + tr_debug("tr_trpc_thread: thread terminating."); return NULL; } diff --git a/trp/trpc.c b/trp/trpc.c index 6c1a4bc..5985980 100644 --- a/trp/trpc.c +++ b/trp/trpc.c @@ -190,9 +190,9 @@ void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg) tr_mq_add(trpc->mq, msg); } -TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc) +TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc, struct timespec *ts_abort) { - return tr_mq_pop(trpc->mq, 0); + return tr_mq_pop(trpc->mq, ts_abort); } void trpc_mq_clear(TRPC_INSTANCE *trpc) -- 2.1.4