Add some comments, a bit of code clean up
authorJennifer Richards <jennifer@painless-security.com>
Fri, 27 Apr 2018 02:09:24 +0000 (22:09 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Fri, 27 Apr 2018 02:26:16 +0000 (22:26 -0400)
tr/tr_trp.c

index ad1084a..f6e60c5 100644 (file)
@@ -184,19 +184,18 @@ static void tr_trps_event_cb(int listener, short event, void *arg)
     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
   } else {
     /* create a thread to handle this connection */
-    if (asprintf(&name, "trustrouter@%s", trps->hostname)==-1) {
+    name = talloc_asprintf(tmp_ctx, "trustrouter@%s", trps->hostname);
+    if (name == NULL)
       goto cleanup;
-    }
-    gssname=tr_new_name(name);
-    free(name); name=NULL;
+    gssname=tr_new_name(name); /* name cleaned up with tmp_ctx */
+
     conn=trp_connection_accept(tmp_ctx, listener, gssname);
     if (conn!=NULL) {
       /* need to monitor this fd and trigger events when read becomes possible */
       thread_data=talloc(conn, struct trps_thread_data);
       if (thread_data==NULL) {
         tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
-        talloc_free(tmp_ctx);
-        return;
+        goto cleanup;
       }
       thread_data->conn=conn;
       thread_data->trps=trps;
@@ -324,7 +323,8 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
       free(tmp);
     }
     else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
-      /* trpc connection died */
+      /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol.
+       * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */
       TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
       TR_NAME *svcname=trpc_get_gssname(trpc);
       peer=trps_get_peer_by_servicename(trps, svcname);
@@ -593,6 +593,49 @@ struct trpc_thread_data {
   TRPC_INSTANCE *trpc;
   TRPS_INSTANCE *trps;
 };
+
+/**
+ * Returns when a message is ready
+ */
+static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb)
+{
+  cb->msg_ready = 0;
+  do
+    pthread_cond_wait(&(cb->cond), &(cb->mutex));
+  while (!cb->msg_ready);
+}
+
+/**
+ * Thread for handling TRPC (outgoing) connections
+ *
+ * Locking protocol:
+ *
+ * This thread uses a mutex and condition variable to wait for incoming messages
+ * on its queue. The main thread interacts with these through the tr_trpc_mq_cb()
+ * callback. This callback is called by the TR_MQ whenever the queue goes from empty
+ * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag
+ * signals the condition variable, then unlocks the mutex.
+ *
+ * This thread waits for the condition variable to be signaled, then checks that the
+ * msg_ready flag has actually been set (pthreads does not guarantee against false
+ * wakes). If so, it holds the mutex lock while reading any messages in its queue and
+ * sending responses over the GSS connection. If it receives an ABORT message from
+ * the main thread (not currently used), or if sending the GSS message fails (which
+ * usually indicates that the connection has been lost), the thread begins the shutdown
+ * process. It still holds the mutex lock at the start of this process.
+ *
+ * This begins by setting trpc->shutting_down = 1, then sending a message to the
+ * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex
+ * and waits for more messages, again using the mutex and the condition variable. It
+ * ignores any except for an EXIT_OK message. When that is received, it exits, terminating
+ * the thread.
+ *
+ * When the main thread receives the DISCONNECTED message, it must stop sending messages
+ * to this thread's message queue. When it is certain that it will not queue any more messages,
+ * it sends the EXIT_OK message as its last interaction with this thread. Once that message
+ * has been queued, queueing additional messages may result in deadlock or segfaults.
+ * (It is ok if there are other messages in the queue, but it must not queue more.)
+ */
 static void *tr_trpc_thread(void *arg)
 {
   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
@@ -646,80 +689,78 @@ static void *tr_trpc_thread(void *arg)
     msg=NULL;
 
     while(!exit_loop) {
-      cb_data.msg_ready=0;
-      pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
-      /* verify the condition - remember, we have the mutex! */
-      if (cb_data.msg_ready) {
-        n_popped = 0; /* have not popped any messages from the queue */
-        n_sent = 0; /* have not sent any messages yet */
-        for (msg = trpc_mq_pop(trpc);
-             msg != NULL;
-             msg = trpc_mq_pop(trpc)) {
-          n_popped++;
-          msg_type = tr_mq_msg_get_message(msg);
-          if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
-            exit_loop = 1;
-            break;
-          } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
-            encoded_msg = tr_mq_msg_get_payload(msg);
-            if (encoded_msg == NULL)
-              tr_notice("tr_trpc_thread: null outgoing TRP message.");
-            else {
-              rc = trpc_send_msg(trpc, encoded_msg);
-              if (rc == TRP_SUCCESS) {
-                n_sent++;
-              } else {
-                tr_notice("tr_trpc_thread: trpc_send_msg failed.");
-                /* Assume this means we lost the connection. */
-                exit_loop = 1;
-                break;
-              }
+      trpc_thread_wait_for_message(&cb_data); /* handles locking */
+
+      n_popped = 0; /* have not popped any messages from the queue */
+      n_sent = 0; /* have not sent any messages yet */
+      for (msg = trpc_mq_pop(trpc);
+           msg != NULL;
+           msg = trpc_mq_pop(trpc)) {
+        n_popped++;
+        msg_type = tr_mq_msg_get_message(msg);
+        if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+          exit_loop = 1;
+          break;
+        } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+          encoded_msg = tr_mq_msg_get_payload(msg);
+          if (encoded_msg == NULL)
+            tr_notice("tr_trpc_thread: null outgoing TRP message.");
+          else {
+            rc = trpc_send_msg(trpc, encoded_msg);
+            if (rc == TRP_SUCCESS) {
+              n_sent++;
+            } else {
+              tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+              /* Assume this means we lost the connection. */
+              exit_loop = 1;
+              break;
             }
-          } else
-            tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
-
-          tr_mq_msg_free(msg);
-        }
+          }
+        } else
+          tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
 
-        /* if n_popped == 0, then n_sent must be zero (it's only set after at
-         * least one msg is popped) */
-        if (n_popped==0)
-          tr_err("tr_trpc_thread: notified of message, but queue empty");
-        else 
-          tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
+        tr_mq_msg_free(msg);
       }
+
+      /* if n_popped == 0, then n_sent must be zero (it's only set after at
+       * least one msg is popped) */
+      if (n_popped==0)
+        tr_err("tr_trpc_thread: notified of message, but queue empty");
+      else
+        tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
     }
   }
 
   tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread.");
   trpc->shutting_down = 1;
 
-  // trpc_mq_clear(trpc); /* clear any queued messages */
-
-  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH);
+  /* Send a DISCONNECTED message to the main thread */
+  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL);
   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
   if (msg==NULL) {
     /* can't notify main thread of exit - just do it and hope for the best */
     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
   } else {
     trps_mq_add(trps, msg);
-    /* now wait for an acknowledgement */
+
+    /* DISCONNECTED sent, now wait for an acknowledgement before exiting */
     exit_loop = 0;
     while (!exit_loop) {
-      cb_data.msg_ready = 0;
-      pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
-      /* verify the condition - remember, we have the mutex! */
-      if (cb_data.msg_ready) {
-        while (NULL != (msg = trpc_mq_pop(trpc))) {
-          msg_type = tr_mq_msg_get_message(msg);
-          /* ignore anything except an exit ack */
-          if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK))
-            exit_loop = 1;
+      trpc_thread_wait_for_message(&cb_data);
+
+      while (NULL != (msg = trpc_mq_pop(trpc))) {
+        msg_type = tr_mq_msg_get_message(msg);
+
+        /* ignore anything except an EXIT_OK */
+        if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) {
+          exit_loop = 1;
+          break; /* skip any further messages */
         }
       }
     }
   }
 
+  tr_debug("tr_trpc_thread: thread terminating.");
   talloc_free(tmp_ctx);
   return NULL;
 }