Eliminate deadlock in trpc message handling
authorJennifer Richards <jennifer@painless-security.com>
Tue, 1 May 2018 17:49:24 +0000 (13:49 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Tue, 1 May 2018 17:49:24 +0000 (13:49 -0400)
  * Remove notify_cb for the trpc thread's TR_MQ
  * Use trpc_mq_pop() directly in the tr_trpc_thread() instead of
    trying to empty the queue every time
  * Eliminate the complicated thread shutdown protocol needed to avoid
    invalid accesses to data allocated in tr_trpc_thread()

This eliminates a deadlock that was possible due to misuse of the
callback mutex in conjunction with the TR_MQ mutex.

common/tr_mq.c
include/tr_trp.h
include/trp_internal.h
tr/tr_trp.c
trp/trpc.c

index 07080b6..6cf309f 100644 (file)
@@ -289,13 +289,17 @@ int tr_mq_pop_timeout(time_t seconds, struct timespec *ts)
   return 0;
 }
 
-/* Caller must free msg via tr_mq_msg_free, waiting until absolute
+/* Retrieves a message from the queue, waiting until absolute
  * time ts_abort before giving up (using CLOCK_MONOTONIC). If ts_abort
  * has passed, returns an existing message but will not wait if one is
  * not already available. If ts_abort is null, no blocking.  Not
  * guaranteed to wait if an error occurs - immediately returns without
  * a message. Use tr_mq_pop_timeout() to get an absolute time that
- * is guaranteed compatible with this function. */
+ * is guaranteed compatible with this function.
+ *
+ * Caller should free msg via tr_mq_msg_free when done with it. It stays
+ * in the TR_MQ's context, though, so use talloc_steal() if you want to do
+ * something clever with it. */
 TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, struct timespec *ts_abort)
 {
   TR_MQ_MSG *popped=NULL;
index 42b586e..dea20e4 100644 (file)
@@ -68,7 +68,6 @@ struct tr_instance {
 /* messages between threads */
 #define TR_MQMSG_MSG_RECEIVED "msg received"
 #define TR_MQMSG_TRPC_DISCONNECTED "trpc disconnected"
-#define TR_MQMSG_TRPC_EXIT_OK "trpc exit ok"
 #define TR_MQMSG_TRPC_CONNECTED "trpc connected"
 #define TR_MQMSG_TRPS_DISCONNECTED "trps disconnected"
 #define TR_MQMSG_TRPS_CONNECTED "trps connected"
index 3fa4034..a52edb8 100644 (file)
@@ -209,7 +209,7 @@ TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc);
 TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc);
 void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq);
 void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
-TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc);
+TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc, struct timespec *ts_abort);
 void trpc_mq_clear(TRPC_INSTANCE *trpc);
 void trpc_master_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
 TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc);
index f6e60c5..8304300 100644 (file)
@@ -221,17 +221,6 @@ static void tr_trps_cleanup_conn(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
 
 static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
 {
-  TR_MQ_MSG *msg;
-
-  /* tell the trpc thread to exit */
-  msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK, TR_MQ_PRIO_NORMAL);
-  if (msg) {
-    tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit");
-    trpc_mq_add(trpc, msg);
-  } else {
-    tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate");
-  }
-
   pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL);
   trps_remove_trpc(trps, trpc);
   trpc_free(trpc);
@@ -292,7 +281,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
         tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp);
       else {
         trp_peer_set_incoming_status(peer, PEER_CONNECTED);
-        tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp);
+        tr_notice("tr_trps_process_mq: incoming connection from %s established.", tmp);
       }
       free(tmp);
     }
@@ -306,7 +295,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
       } else {
         trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
         tr_trps_cleanup_conn(trps, conn);
-        tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp);
+        tr_notice("tr_trps_process_mq: incoming connection from %s lost.", tmp);
       }
       free(tmp);
     }
@@ -318,13 +307,11 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
         tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp);
       else {
         trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
-        tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp);
+        tr_notice("tr_trps_process_mq: outgoing connection to %s established.", tmp);
       }
       free(tmp);
     }
     else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
-      /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol.
-       * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */
       TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
       TR_NAME *svcname=trpc_get_gssname(trpc);
       peer=trps_get_peer_by_servicename(trps, svcname);
@@ -333,7 +320,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
         tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp);
       else {
         trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
-        tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
+        tr_notice("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
         tr_trps_cleanup_trpc(trps, trpc);
       }
       free(tmp);
@@ -570,24 +557,6 @@ cleanup:
   return retval;
 }
 
-
-struct trpc_notify_cb_data {
-  int msg_ready;
-  pthread_cond_t cond;
-  pthread_mutex_t mutex;
-};
-
-static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
-{
-  struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
-  pthread_mutex_lock(&(cb_data->mutex));
-  if (!cb_data->msg_ready) {
-    cb_data->msg_ready=1;
-    pthread_cond_signal(&(cb_data->cond));
-  }
-  pthread_mutex_unlock(&(cb_data->mutex));
-}
-
 /* data passed to thread */
 struct trpc_thread_data {
   TRPC_INSTANCE *trpc;
@@ -595,46 +564,15 @@ struct trpc_thread_data {
 };
 
 /**
- * Returns when a message is ready
- */
-static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb)
-{
-  cb->msg_ready = 0;
-  do
-    pthread_cond_wait(&(cb->cond), &(cb->mutex));
-  while (!cb->msg_ready);
-}
-
-/**
  * Thread for handling TRPC (outgoing) connections
  *
- * Locking protocol:
- *
- * This thread uses a mutex and condition variable to wait for incoming messages
- * on its queue. The main thread interacts with these through the tr_trpc_mq_cb()
- * callback. This callback is called by the TR_MQ whenever the queue goes from empty
- * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag
- * signals the condition variable, then unlocks the mutex.
- *
- * This thread waits for the condition variable to be signaled, then checks that the
- * msg_ready flag has actually been set (pthreads does not guarantee against false
- * wakes). If so, it holds the mutex lock while reading any messages in its queue and
- * sending responses over the GSS connection. If it receives an ABORT message from
- * the main thread (not currently used), or if sending the GSS message fails (which
- * usually indicates that the connection has been lost), the thread begins the shutdown
- * process. It still holds the mutex lock at the start of this process.
- *
- * This begins by setting trpc->shutting_down = 1, then sending a message to the
- * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex
- * and waits for more messages, again using the mutex and the condition variable. It
- * ignores any except for an EXIT_OK message. When that is received, it exits, terminating
- * the thread.
- *
- * When the main thread receives the DISCONNECTED message, it must stop sending messages
- * to this thread's message queue. When it is certain that it will not queue any more messages,
- * it sends the EXIT_OK message as its last interaction with this thread. Once that message
- * has been queued, queueing additional messages may result in deadlock or segfaults.
- * (It is ok if there are other messages in the queue, but it must not queue more.)
+ * Opens a connection to a peer. If successful, notifies the trps thread by
+ * posting a TR_MQMSG_TRPC_CONNECTED message to the trps message queue.
+ * It then waits for messages on trpc->mq. Normally these will be TR_MQMSG_TRPC_SEND
+ * messages, which this thread forwards to the peer. If its connection is lost or
+ * a TR_MQMSG_ABORT message is received on trpc->mq, the thread sends a
+ * TR_MQMSG_TRPC_DISCONNECTED message to the trps thread, then cleans up and
+ * terminates.
  */
 static void *tr_trpc_thread(void *arg)
 {
@@ -647,36 +585,27 @@ static void *tr_trpc_thread(void *arg)
   const char *msg_type=NULL;
   char *encoded_msg=NULL;
   TR_NAME *peer_gssname=NULL;
-  int n_sent = 0;
-  int n_popped = 0;
+  struct timespec wait_until = {0};
   int exit_loop=0;
 
-  struct trpc_notify_cb_data cb_data={0,
-                                      PTHREAD_COND_INITIALIZER,
-                                      PTHREAD_MUTEX_INITIALIZER};
-
   tr_debug("tr_trpc_thread: started");
 
-  /* set up the mq for receiving */
-  pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
-
-  tr_mq_lock(trpc->mq);
-  tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
-  tr_mq_unlock(trpc->mq);
-
+  /* Try to make the outgoing connection */
   rc=trpc_connect(trpc);
   if (rc!=TRP_SUCCESS) {
     tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
               trpc_get_server(trpc),
               trpc_get_port(trpc));
   } else {
+    /* Retrieve the GSS name used by the peer for authentication */
     peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc));
     if (peer_gssname==NULL) {
       tr_err("tr_trpc_thread: could not duplicate peer_gssname.");
       talloc_free(tmp_ctx);
       return NULL;
     }
-    tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf);
+    tr_debug("tr_trpc_thread: connected to peer %.*s",
+             peer_gssname->len, peer_gssname->buf);
 
     msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH);
     tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper);
@@ -688,19 +617,22 @@ static void *tr_trpc_thread(void *arg)
     trps_mq_add(trps, msg); /* steals msg context */
     msg=NULL;
 
+    /* Loop until we get an abort message or until the connection is lost. */
     while(!exit_loop) {
-      trpc_thread_wait_for_message(&cb_data); /* handles locking */
-
-      n_popped = 0; /* have not popped any messages from the queue */
-      n_sent = 0; /* have not sent any messages yet */
-      for (msg = trpc_mq_pop(trpc);
-           msg != NULL;
-           msg = trpc_mq_pop(trpc)) {
-        n_popped++;
+      /* Wait up to 10 minutes for a message to be queued to send to the peer.
+       * Log a warning if we go longer than that, but don't give up. */
+      if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) {
+        tr_err("tr_trpc_thread: unable to set abort timeout");
+        break; /* immediately exit the loop, don't go through cleanup */
+      }
+
+      /* Pop a message from the queue. */
+      msg = trpc_mq_pop(trpc, &wait_until);
+      if (msg) {
         msg_type = tr_mq_msg_get_message(msg);
         if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+          tr_debug("tr_trpc_thread: received abort message from main thread.");
           exit_loop = 1;
-          break;
         } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
           encoded_msg = tr_mq_msg_get_payload(msg);
           if (encoded_msg == NULL)
@@ -708,60 +640,37 @@ static void *tr_trpc_thread(void *arg)
           else {
             rc = trpc_send_msg(trpc, encoded_msg);
             if (rc == TRP_SUCCESS) {
-              n_sent++;
+              tr_debug("tr_trpc_thread: sent message.");
             } else {
               tr_notice("tr_trpc_thread: trpc_send_msg failed.");
               /* Assume this means we lost the connection. */
               exit_loop = 1;
-              break;
             }
           }
         } else
           tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
 
         tr_mq_msg_free(msg);
+      } else {
+        tr_warning("tr_trpc_thread: no outgoing messages to %.*s for 10 minutes",
+                   peer_gssname->len, peer_gssname->buf);
       }
-
-      /* if n_popped == 0, then n_sent must be zero (it's only set after at
-       * least one msg is popped) */
-      if (n_popped==0)
-        tr_err("tr_trpc_thread: notified of message, but queue empty");
-      else
-        tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
     }
   }
 
-  tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread.");
-  trpc->shutting_down = 1;
-
   /* Send a DISCONNECTED message to the main thread */
+  tr_debug("tr_trpc_thread: notifying main thread of disconnection.");
   msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL);
   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
   if (msg==NULL) {
-    /* can't notify main thread of exit - just do it and hope for the best */
+    /* can't notify main thread */
     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
   } else {
     trps_mq_add(trps, msg);
-
-    /* DISCONNECTED sent, now wait for an acknowledgement before exiting */
-    exit_loop = 0;
-    while (!exit_loop) {
-      trpc_thread_wait_for_message(&cb_data);
-
-      while (NULL != (msg = trpc_mq_pop(trpc))) {
-        msg_type = tr_mq_msg_get_message(msg);
-
-        /* ignore anything except an EXIT_OK */
-        if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) {
-          exit_loop = 1;
-          break; /* skip any further messages */
-        }
-      }
-    }
   }
 
-  tr_debug("tr_trpc_thread: thread terminating.");
   talloc_free(tmp_ctx);
+  tr_debug("tr_trpc_thread: thread terminating.");
   return NULL;
 }
 
index 6c1a4bc..5985980 100644 (file)
@@ -190,9 +190,9 @@ void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
   tr_mq_add(trpc->mq, msg);
 }
 
-TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc)
+TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc, struct timespec *ts_abort)
 {
-  return tr_mq_pop(trpc->mq, 0);
+  return tr_mq_pop(trpc->mq, ts_abort);
 }
 
 void trpc_mq_clear(TRPC_INSTANCE *trpc)