Merge branch 'milestone/monitoring' into jennifer/no_mq_priorities
authorJennifer Richards <jennifer@painless-security.com>
Mon, 7 May 2018 16:05:23 +0000 (12:05 -0400)
committerGitHub <noreply@github.com>
Mon, 7 May 2018 16:05:23 +0000 (12:05 -0400)
1  2 
common/tr_mq.c
tr/tr_tid.c
tr/tr_trp.c

diff --combined common/tr_mq.c
@@@ -49,11 -49,12 +49,11 @@@ static int tr_mq_msg_destructor(void *o
    return 0;
  }
  
 -TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message, TR_MQ_PRIORITY prio)
 +TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message)
  {
    TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG);
    if (msg!=NULL) {
      msg->next=NULL;
 -    msg->prio=prio;
      msg->message=talloc_strdup(msg, message);
      if (msg->message==NULL) {
        talloc_free(msg);
@@@ -71,6 -72,11 +71,6 @@@ void tr_mq_msg_free(TR_MQ_MSG *msg
      talloc_free(msg);
  }
  
 -TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg)
 -{
 -  return msg->prio;
 -}
 -
  const char *tr_mq_msg_get_message(TR_MQ_MSG *msg)
  {
    return msg->message;
@@@ -115,6 -121,7 +115,6 @@@ TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx
  
      mq->head=NULL;
      mq->tail=NULL;
 -    mq->last_hi_prio=NULL;
  
      mq->notify_cb=NULL;
      mq->notify_cb_arg=NULL;
@@@ -201,6 -208,22 +201,6 @@@ static void tr_mq_append(TR_MQ *mq, TR_
    talloc_steal(mq, msg);
  }
  
 -static void tr_mq_append_high_prio(TR_MQ *mq, TR_MQ_MSG *new)
 -{
 -  if (tr_mq_get_head(mq)==NULL) {
 -    tr_mq_set_head(mq, new);
 -    tr_mq_set_tail(mq, new);
 -  } else if (mq->last_hi_prio==NULL) {
 -    tr_mq_msg_set_next(new, tr_mq_get_head(mq)); /* add to front of list */
 -    tr_mq_set_head(mq, new); /* update head of list */
 -  } else {
 -    tr_mq_msg_set_next(new, tr_mq_msg_get_next(mq->last_hi_prio));
 -    tr_mq_msg_set_next(mq->last_hi_prio, new); /* add to end of hi prio msgs */
 -  }
 -  mq->last_hi_prio=new; /* in any case, this is now the last high priority msg */
 -  talloc_steal(mq,new);
 -}
 -
  #define DEBUG_TR_MQ 0
  #if DEBUG_TR_MQ
  static void tr_mq_print(TR_MQ *mq)
    tr_debug("tr_mq_print: mq contents:");
    while(m!=NULL) {
      ii++;
 -    tr_debug("tr_mq_print: Entry %02d: %-15s (prio %d)",
 -             ii, tr_mq_msg_get_message(m), tr_mq_msg_get_prio(m));
 +    tr_debug("tr_mq_print: Entry %02d: %-15s",
 +             ii, tr_mq_msg_get_message(m));
      m=tr_mq_msg_get_next(m);
    }
  }
@@@ -226,8 -249,14 +226,8 @@@ void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *ms
    tr_mq_lock(mq);
    
    was_empty=tr_mq_empty(mq);
 -  switch (tr_mq_msg_get_prio(msg)) {
 -  case TR_MQ_PRIO_HIGH:
 -    tr_mq_append_high_prio(mq, msg);
 -    break;
 -  default:
 -    tr_mq_append(mq, msg);
 -    break;
 -  }
 +  tr_mq_append(mq, msg);
 +
    /* before releasing the mutex, get notify_cb data out of mq */
    notify_cb=mq->notify_cb;
    notify_cb_arg=mq->notify_cb_arg;
@@@ -260,13 -289,17 +260,17 @@@ int tr_mq_pop_timeout(time_t seconds, s
    return 0;
  }
  
- /* Caller must free msg via tr_mq_msg_free, waiting until absolute
+ /* Retrieves a message from the queue, waiting until absolute
   * time ts_abort before giving up (using CLOCK_MONOTONIC). If ts_abort
   * has passed, returns an existing message but will not wait if one is
   * not already available. If ts_abort is null, no blocking.  Not
   * guaranteed to wait if an error occurs - immediately returns without
   * a message. Use tr_mq_pop_timeout() to get an absolute time that
-  * is guaranteed compatible with this function. */
+  * is guaranteed compatible with this function.
+  *
+  * Caller should free msg via tr_mq_msg_free when done with it. It stays
+  * in the TR_MQ's context, though, so use talloc_steal() if you want to do
+  * something clever with it. */
  TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, struct timespec *ts_abort)
  {
    TR_MQ_MSG *popped=NULL;
      popped=tr_mq_get_head(mq);
      tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
  
 -    if (popped==mq->last_hi_prio)
 -      mq->last_hi_prio=NULL;
 -
      if (tr_mq_get_head(mq)==NULL)
        tr_mq_set_tail(mq, NULL); /* just popped the last element */
    }
diff --combined tr/tr_tid.c
@@@ -196,9 -196,9 +196,9 @@@ cleanup
      /* mq is still valid, so we can queue our response */
      tr_debug("tr_tids_req_fwd_thread: thread %d using valid msg queue.", cookie->thread_id);
      if (success)
 -      msg=tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_SUCCESS, TR_MQ_PRIO_NORMAL);
 +      msg= tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_SUCCESS);
      else
 -      msg=tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_FAILURE, TR_MQ_PRIO_NORMAL);
 +      msg= tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_FAILURE);
  
      if (msg==NULL)
        tr_notice("tr_tids_req_fwd_thread: thread %d unable to allocate response msg.", cookie->thread_id);
@@@ -346,6 -346,12 +346,12 @@@ static int tr_tids_req_handler(TIDS_INS
  
    tr_debug("tr_tids_req_handler: Request received (conn = %d)! Realm = %s, Comm = %s", orig_req->conn, 
             orig_req->realm->buf, orig_req->comm->buf);
+   if (orig_req->request_id)
+     tr_debug("tr_tids_req_handler: TID request ID: %.*s", orig_req->request_id->len, orig_req->request_id->buf);
+   else
+     tr_debug("tr_tids_req_handler: TID request ID: none");
+   tids->req_count++;
  
    /* Duplicate the request, so we can modify and forward it */
    if (NULL == (fwd_req=tid_dup_req(orig_req))) {
diff --combined tr/tr_trp.c
@@@ -93,7 -93,7 +93,7 @@@ static TRP_RC tr_trps_msg_handler(TRPS_
    /* n.b., conn is available here, but do not hold onto the reference
     * because it may be cleaned up if the originating connection goes
     * down before the message is processed */
 -  mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED, TR_MQ_PRIO_NORMAL);
 +  mq_msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED);
    if (mq_msg==NULL) {
      return TRP_NOMEM;
    }
@@@ -114,7 -114,7 +114,7 @@@ static int tr_trps_gss_handler(gss_name
  
    tr_debug("tr_trps_gss_handler()");
  
-   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
+   if ((!client_name) || (!trps) || (!cfg_mgr)) {
      tr_debug("tr_trps_gss_handler: Bad parameters.");
      return -1;
    }
@@@ -147,7 -147,7 +147,7 @@@ static void *tr_trps_thread(void *arg
    if (trps_authorize_connection(trps, conn)!=TRP_SUCCESS)
      goto cleanup;
  
 -  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED, TR_MQ_PRIO_HIGH);
 +  msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED);
    tr_mq_msg_set_payload(msg, (void *)tr_dup_name(trp_connection_get_peer(conn)), tr_free_name_helper);
    if (msg==NULL) {
      tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
    trps_handle_connection(trps, conn);
  
  cleanup:
 -  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED, TR_MQ_PRIO_HIGH);
 +  msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED);
    tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
    if (msg==NULL)
      tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
@@@ -223,17 -223,6 +223,6 @@@ static void tr_trps_cleanup_conn(TRPS_I
  
  static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
  {
-   TR_MQ_MSG *msg;
-   /* tell the trpc thread to exit */
-   msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK);
-   if (msg) {
-     tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit");
-     trpc_mq_add(trpc, msg);
-   } else {
-     tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate");
-   }
    pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL);
    trps_remove_trpc(trps, trpc);
    trpc_free(trpc);
@@@ -294,7 -283,7 +283,7 @@@ static void tr_trps_process_mq(int sock
          tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp);
        else {
          trp_peer_set_incoming_status(peer, PEER_CONNECTED);
-         tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp);
+         tr_notice("tr_trps_process_mq: incoming connection from %s established.", tmp);
        }
        free(tmp);
      }
        } else {
          trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
          tr_trps_cleanup_conn(trps, conn);
-         tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp);
+         tr_notice("tr_trps_process_mq: incoming connection from %s lost.", tmp);
        }
        free(tmp);
      }
          tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp);
        else {
          trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
-         tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp);
+         tr_notice("tr_trps_process_mq: outgoing connection to %s established.", tmp);
        }
        free(tmp);
      }
      else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
-       /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol.
-        * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */
        TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
        TR_NAME *svcname=trpc_get_gssname(trpc);
        peer=trps_get_peer_by_servicename(trps, svcname);
          tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp);
        else {
          trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
-         tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
+         tr_notice("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
          tr_trps_cleanup_trpc(trps, trpc);
        }
        free(tmp);
@@@ -572,24 -559,6 +559,6 @@@ cleanup
    return retval;
  }
  
- struct trpc_notify_cb_data {
-   int msg_ready;
-   pthread_cond_t cond;
-   pthread_mutex_t mutex;
- };
- static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
- {
-   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
-   pthread_mutex_lock(&(cb_data->mutex));
-   if (!cb_data->msg_ready) {
-     cb_data->msg_ready=1;
-     pthread_cond_signal(&(cb_data->cond));
-   }
-   pthread_mutex_unlock(&(cb_data->mutex));
- }
  /* data passed to thread */
  struct trpc_thread_data {
    TRPC_INSTANCE *trpc;
  };
  
  /**
-  * Returns when a message is ready
-  */
- static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb)
- {
-   cb->msg_ready = 0;
-   do
-     pthread_cond_wait(&(cb->cond), &(cb->mutex));
-   while (!cb->msg_ready);
- }
- /**
   * Thread for handling TRPC (outgoing) connections
   *
-  * Locking protocol:
-  *
-  * This thread uses a mutex and condition variable to wait for incoming messages
-  * on its queue. The main thread interacts with these through the tr_trpc_mq_cb()
-  * callback. This callback is called by the TR_MQ whenever the queue goes from empty
-  * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag
-  * signals the condition variable, then unlocks the mutex.
-  *
-  * This thread waits for the condition variable to be signaled, then checks that the
-  * msg_ready flag has actually been set (pthreads does not guarantee against false
-  * wakes). If so, it holds the mutex lock while reading any messages in its queue and
-  * sending responses over the GSS connection. If it receives an ABORT message from
-  * the main thread (not currently used), or if sending the GSS message fails (which
-  * usually indicates that the connection has been lost), the thread begins the shutdown
-  * process. It still holds the mutex lock at the start of this process.
-  *
-  * This begins by setting trpc->shutting_down = 1, then sending a message to the
-  * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex
-  * and waits for more messages, again using the mutex and the condition variable. It
-  * ignores any except for an EXIT_OK message. When that is received, it exits, terminating
-  * the thread.
+  * Opens a connection to a peer. If successful, notifies the trps thread by
+  * posting a TR_MQMSG_TRPC_CONNECTED message to the trps message queue.
+  * It then waits for messages on trpc->mq. Normally these will be TR_MQMSG_TRPC_SEND
+  * messages, which this thread forwards to the peer. If its connection is lost or
+  * a TR_MQMSG_ABORT message is received on trpc->mq, the thread sends a
+  * TR_MQMSG_TRPC_DISCONNECTED message to the trps thread, then cleans up and
+  * terminates.
   *
-  * When the main thread receives the DISCONNECTED message, it must stop sending messages
-  * to this thread's message queue. When it is certain that it will not queue any more messages,
-  * it sends the EXIT_OK message as its last interaction with this thread. Once that message
-  * has been queued, queueing additional messages may result in deadlock or segfaults.
-  * (It is ok if there are other messages in the queue, but it must not queue more.)
+  * The trps may continue queueing messages for this client even when the
+  * connection is down. To prevent the queue from growing endlessly, this thread
+  * should clear its queue after failed connection attempts.
   */
  static void *tr_trpc_thread(void *arg)
  {
    const char *msg_type=NULL;
    char *encoded_msg=NULL;
    TR_NAME *peer_gssname=NULL;
-   int n_sent = 0;
-   int n_popped = 0;
+   struct timespec wait_until = {0};
    int exit_loop=0;
  
-   struct trpc_notify_cb_data cb_data={0,
-                                       PTHREAD_COND_INITIALIZER,
-                                       PTHREAD_MUTEX_INITIALIZER};
    tr_debug("tr_trpc_thread: started");
  
-   /* set up the mq for receiving */
-   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
-   tr_mq_lock(trpc->mq);
-   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
-   tr_mq_unlock(trpc->mq);
+   /* Try to make the outgoing connection */
    rc=trpc_connect(trpc);
    if (rc!=TRP_SUCCESS) {
      tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
                trpc_get_server(trpc),
                trpc_get_port(trpc));
+     trpc_mq_clear(trpc); /* clear the queue even though we did not connect */
    } else {
+     /* Retrieve the GSS name used by the peer for authentication */
      peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc));
      if (peer_gssname==NULL) {
        tr_err("tr_trpc_thread: could not duplicate peer_gssname.");
        talloc_free(tmp_ctx);
        return NULL;
      }
-     tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf);
+     tr_debug("tr_trpc_thread: connected to peer %.*s",
+              peer_gssname->len, peer_gssname->buf);
  
 -    msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH);
 +    msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED);
      tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper);
      if (msg==NULL) {
        tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
      trps_mq_add(trps, msg); /* steals msg context */
      msg=NULL;
  
+     /* Loop until we get an abort message or until the connection is lost. */
      while(!exit_loop) {
-       trpc_thread_wait_for_message(&cb_data); /* handles locking */
-       n_popped = 0; /* have not popped any messages from the queue */
-       n_sent = 0; /* have not sent any messages yet */
-       for (msg = trpc_mq_pop(trpc);
-            msg != NULL;
-            msg = trpc_mq_pop(trpc)) {
-         n_popped++;
+       /* Wait up to 10 minutes for a message to be queued to send to the peer.
+        * Log a warning if we go longer than that, but don't give up. */
+       if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) {
+         tr_err("tr_trpc_thread: unable to set abort timeout");
+         break; /* immediately exit the loop, don't go through cleanup */
+       }
+       /* Pop a message from the queue. */
+       msg = trpc_mq_pop(trpc, &wait_until);
+       if (msg) {
          msg_type = tr_mq_msg_get_message(msg);
          if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+           tr_debug("tr_trpc_thread: received abort message from main thread.");
            exit_loop = 1;
-           break;
          } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
            encoded_msg = tr_mq_msg_get_payload(msg);
            if (encoded_msg == NULL)
            else {
              rc = trpc_send_msg(trpc, encoded_msg);
              if (rc == TRP_SUCCESS) {
-               n_sent++;
+               tr_debug("tr_trpc_thread: sent message.");
              } else {
                tr_notice("tr_trpc_thread: trpc_send_msg failed.");
                /* Assume this means we lost the connection. */
                exit_loop = 1;
-               break;
              }
            }
          } else
            tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
  
          tr_mq_msg_free(msg);
+       } else {
+         tr_warning("tr_trpc_thread: no outgoing messages to %.*s for 10 minutes",
+                    peer_gssname->len, peer_gssname->buf);
        }
-       /* if n_popped == 0, then n_sent must be zero (it's only set after at
-        * least one msg is popped) */
-       if (n_popped==0)
-         tr_err("tr_trpc_thread: notified of message, but queue empty");
-       else
-         tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
      }
    }
  
-   tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread.");
-   trpc->shutting_down = 1;
    /* Send a DISCONNECTED message to the main thread */
 -  tr_debug("tr_trpc_thread: notifying main thread of disconnection.");
 -  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL);
 +  msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED);
    tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
    if (msg==NULL) {
-     /* can't notify main thread of exit - just do it and hope for the best */
+     /* can't notify main thread */
      tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
    } else {
      trps_mq_add(trps, msg);
-     /* DISCONNECTED sent, now wait for an acknowledgement before exiting */
-     exit_loop = 0;
-     while (!exit_loop) {
-       trpc_thread_wait_for_message(&cb_data);
-       while (NULL != (msg = trpc_mq_pop(trpc))) {
-         msg_type = tr_mq_msg_get_message(msg);
-         /* ignore anything except an EXIT_OK */
-         if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) {
-           exit_loop = 1;
-           break; /* skip any further messages */
-         }
-       }
-     }
    }
  
-   tr_debug("tr_trpc_thread: thread terminating.");
    talloc_free(tmp_ctx);
+   tr_debug("tr_trpc_thread: thread terminating.");
    return NULL;
  }
  
@@@ -972,7 -886,11 +885,11 @@@ void tr_config_changed(TR_CFG *new_cfg
      tr_debug("tr_config_changed: freeing tr->mons->authorized_gss_names");
      tr_gss_names_free(tr->mons->authorized_gss_names);
    }
-   tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials);
+   if (new_cfg->internal->monitoring_credentials != NULL) {
+     tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials);
+   } else {
+     tr->mons->authorized_gss_names = tr_gss_names_new(tr->mons);
+   }
    if (tr->mons->authorized_gss_names == NULL) {
      tr_err("tr_config_changed: Error configuring monitoring credentials");
    }