Fix race condition on exit of trpc threads
[trust_router.git] / tr / tr_trp.c
index 5ad734d..ad1084a 100644 (file)
@@ -47,7 +47,9 @@
 #include <tr.h>
 #include <tr_mq.h>
 #include <tr_rp.h>
+#include <trp_route.h>
 #include <trp_internal.h>
+#include <trp_peer.h>
 #include <trp_ptable.h>
 #include <trp_rtable.h>
 #include <tr_config.h>
@@ -198,7 +200,7 @@ static void tr_trps_event_cb(int listener, short event, void *arg)
       }
       thread_data->conn=conn;
       thread_data->trps=trps;
-      trps_add_connection(trps, conn); /* remember the connection */
+      trps_add_connection(trps, conn); /* remember the connection - this puts conn and the thread data in trps's talloc context */
       pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
     }
   }
@@ -220,21 +222,47 @@ static void tr_trps_cleanup_conn(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
 
 static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
 {
+  TR_MQ_MSG *msg;
+
+  /* tell the trpc thread to exit */
+  msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK, TR_MQ_PRIO_NORMAL);
+  if (msg) {
+    tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit");
+    trpc_mq_add(trpc, msg);
+  } else {
+    tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate");
+  }
+
   pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL);
   trps_remove_trpc(trps, trpc);
   trpc_free(trpc);
   tr_debug("tr_trps_cleanup_trpc: deleted connection");
 }
 
-static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f)
+/**
+ * Get a dynamically allocated string with a description of the route table.
+ * Caller must free the string using talloc_free().
+ *
+ * @param memctx talloc context for the string
+ * @param trps trps instance containing the route table
+ * @return pointer to the output, or NULL on error
+ */
+static char *tr_trps_route_table_to_str(TALLOC_CTX *memctx, TRPS_INSTANCE *trps)
 {
-  char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL);
-  if (table==NULL)
-    fprintf(f, "Unable to print route table.\n");
-  else {
-    fprintf(f, "%s\n", table);
-    talloc_free(table);
-  }
+  return trp_rtable_to_str(memctx, trps->rtable, " | ", NULL);
+}
+
+/**
+ * Get a dynamically allocated string with a description of the community table.
+ * Caller must free the string using talloc_free().
+ *
+ * @param memctx talloc context for the string
+ * @param trps trps instance containing the community table
+ * @return pointer to the output, or NULL on error
+ */
+static char *tr_trps_comm_table_to_str(TALLOC_CTX *memctx, TRPS_INSTANCE *trps)
+{
+  return tr_comm_table_to_str(memctx, trps->ctable);
 }
 
 /**
@@ -251,63 +279,69 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
   TR_MQ_MSG *msg=NULL;
   const char *s=NULL;
+  TRP_PEER *peer = NULL;
+  char *tmp = NULL;
 
   msg=trps_mq_pop(trps);
   while (msg!=NULL) {
     s=tr_mq_msg_get_message(msg);
     if (0==strcmp(s, TR_MQMSG_TRPS_CONNECTED)) {
-      TR_NAME *gssname=(TR_NAME *)tr_mq_msg_get_payload(msg);
-      TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+      TR_NAME *peer_gssname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+      peer=trps_get_peer_by_gssname(trps, peer_gssname); /* get the peer record */
+      tmp = tr_name_strdup(peer_gssname); /* get the name as a null-terminated string */
       if (peer==NULL)
-        tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", gssname->buf);
+        tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp);
       else {
         trp_peer_set_incoming_status(peer, PEER_CONNECTED);
-        tr_err("tr_trps_process_mq: incoming connection from %s established.", gssname->buf);
+        tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp);
       }
+      free(tmp);
     }
     else if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
       TRP_CONNECTION *conn=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION);
-      TR_NAME *gssname=trp_connection_get_gssname(conn);
-      TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+      TR_NAME *peer_gssname=trp_connection_get_peer(conn);
+      peer=trps_get_peer_by_gssname(trps, peer_gssname); /* get the peer record */
+      tmp = tr_name_strdup(peer_gssname); /* get the name as a null-terminated string */
       if (peer==NULL) {
-        tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) lost.",
-               trp_connection_get_gssname(conn)->buf);
+        tr_err("tr_trps_process_mq: incoming connection from unknown peer (%.*s) lost.", tmp);
       } else {
         trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
         tr_trps_cleanup_conn(trps, conn);
-        tr_err("tr_trps_process_mq: incoming connection from %s lost.", gssname->buf);
+        tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp);
       }
+      free(tmp);
     }
     else if (0==strcmp(s, TR_MQMSG_TRPC_CONNECTED)) {
       TR_NAME *svcname=(TR_NAME *)tr_mq_msg_get_payload(msg);
-      TRP_PEER *peer=trps_get_peer_by_servicename(trps, svcname);
+      peer=trps_get_peer_by_servicename(trps, svcname);
+      tmp = tr_name_strdup(svcname);
       if (peer==NULL)
-        tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", svcname->buf);
+        tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp);
       else {
         trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
-        tr_err("tr_trps_process_mq: outgoing connection to %s established.", svcname->buf);
+        tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp);
       }
+      free(tmp);
     }
     else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
       /* trpc connection died */
       TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
-      TR_NAME *gssname=trpc_get_gssname(trpc);
-      TRP_PEER *peer=trps_get_peer_by_servicename(trps, gssname);
+      TR_NAME *svcname=trpc_get_gssname(trpc);
+      peer=trps_get_peer_by_servicename(trps, svcname);
+      tmp = tr_name_strdup(svcname);
       if (peer==NULL)
-        tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", gssname->buf);
+        tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp);
       else {
         trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
-        tr_err("tr_trps_process_mq: outgoing connection to %s lost.", gssname->buf);
+        tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
         tr_trps_cleanup_trpc(trps, trpc);
       }
+      free(tmp);
     }
 
     else if (0==strcmp(s, TR_MQMSG_MSG_RECEIVED)) {
       if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS)
         tr_notice("tr_trps_process_mq: error handling message.");
-      else {
-        tr_trps_print_route_table(trps, stderr);
-      }
     }
     else
       tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
@@ -334,12 +368,23 @@ static void tr_trps_sweep(int listener, short event, void *arg)
   struct tr_trps_event_cookie *cookie=talloc_get_type_abort(arg, struct tr_trps_event_cookie);
   TRPS_INSTANCE *trps=cookie->trps;
   struct event *ev=cookie->ev;
+  char *table_str=NULL;
 
   tr_debug("tr_trps_sweep: sweeping routes.");
   trps_sweep_routes(trps);
   tr_debug("tr_trps_sweep: sweeping communities.");
   trps_sweep_ctable(trps);
-  tr_trps_print_route_table(trps, stderr);
+  table_str=tr_trps_route_table_to_str(NULL, trps);
+  if (table_str!=NULL) {
+    tr_debug(table_str);
+    talloc_free(table_str);
+  }
+
+  table_str=tr_trps_comm_table_to_str(NULL, trps);
+  if (table_str!=NULL) {
+    tr_debug(table_str);
+    talloc_free(table_str);
+  }
   /* schedule the event to run again */
   event_add(ev, &(trps->sweep_interval));
 }
@@ -559,7 +604,8 @@ static void *tr_trpc_thread(void *arg)
   const char *msg_type=NULL;
   char *encoded_msg=NULL;
   TR_NAME *peer_gssname=NULL;
-  int n_sent=0;
+  int n_sent = 0;
+  int n_popped = 0;
   int exit_loop=0;
 
   struct trpc_notify_cb_data cb_data={0,
@@ -602,50 +648,77 @@ static void *tr_trpc_thread(void *arg)
     while(!exit_loop) {
       cb_data.msg_ready=0;
       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
-      /* verify the condition */
+      /* verify the condition - remember, we have the mutex! */
       if (cb_data.msg_ready) {
-        for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) {
-          msg_type=tr_mq_msg_get_message(msg);
-
-          if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
-            exit_loop=1;
+        n_popped = 0; /* have not popped any messages from the queue */
+        n_sent = 0; /* have not sent any messages yet */
+        for (msg = trpc_mq_pop(trpc);
+             msg != NULL;
+             msg = trpc_mq_pop(trpc)) {
+          n_popped++;
+          msg_type = tr_mq_msg_get_message(msg);
+          if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+            exit_loop = 1;
             break;
-          }
-          else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
-            encoded_msg=tr_mq_msg_get_payload(msg);
-            if (encoded_msg==NULL)
+          } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+            encoded_msg = tr_mq_msg_get_payload(msg);
+            if (encoded_msg == NULL)
               tr_notice("tr_trpc_thread: null outgoing TRP message.");
             else {
               rc = trpc_send_msg(trpc, encoded_msg);
-              if (rc!=TRP_SUCCESS) {
+              if (rc == TRP_SUCCESS) {
+                n_sent++;
+              } else {
                 tr_notice("tr_trpc_thread: trpc_send_msg failed.");
-                exit_loop=1;
+                /* Assume this means we lost the connection. */
+                exit_loop = 1;
                 break;
               }
             }
-          }
-          else
+          } else
             tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
 
           tr_mq_msg_free(msg);
         }
-        if (n_sent==0)
-          tr_err("tr_trpc_thread: notified of msg, but queue empty");
+
+        /* if n_popped == 0, then n_sent must be zero (it's only set after at
+         * least one msg is popped) */
+        if (n_popped==0)
+          tr_err("tr_trpc_thread: notified of message, but queue empty");
         else 
           tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
       }
     }
   }
 
-  tr_debug("tr_trpc_thread: exiting.");
+  tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread.");
+  trpc->shutting_down = 1;
+
+  // trpc_mq_clear(trpc); /* clear any queued messages */
+
   msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH);
   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
-  if (msg==NULL)
+  if (msg==NULL) {
+    /* can't notify main thread of exit - just do it and hope for the best */
     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
-  else
+  } else {
     trps_mq_add(trps, msg);
-
-  trpc_mq_clear(trpc); /* clear any queued messages */
+    /* now wait for an acknowledgement */
+    exit_loop = 0;
+    while (!exit_loop) {
+      cb_data.msg_ready = 0;
+      pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
+      /* verify the condition - remember, we have the mutex! */
+      if (cb_data.msg_ready) {
+        while (NULL != (msg = trpc_mq_pop(trpc))) {
+          msg_type = tr_mq_msg_get_message(msg);
+          /* ignore anything except an exit ack */
+          if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK))
+            exit_loop = 1;
+        }
+      }
+    }
+  }
 
   talloc_free(tmp_ctx);
   return NULL;
@@ -839,6 +912,7 @@ void tr_config_changed(TR_CFG *new_cfg, void *cookie)
 {
   TR_INSTANCE *tr=talloc_get_type_abort(cookie, TR_INSTANCE);
   TRPS_INSTANCE *trps=tr->trps;
+  char *table_str=NULL;
 
   tr->cfgwatch->poll_interval.tv_sec=new_cfg->internal->cfg_poll_interval;
   tr->cfgwatch->poll_interval.tv_usec=0;
@@ -846,6 +920,20 @@ void tr_config_changed(TR_CFG *new_cfg, void *cookie)
   tr->cfgwatch->settling_time.tv_sec=new_cfg->internal->cfg_settling_time;
   tr->cfgwatch->settling_time.tv_usec=0;
 
+  /* These need to be updated */
+  tr->tids->hostname = new_cfg->internal->hostname;
+  tr->mons->hostname = new_cfg->internal->hostname;
+
+  /* Update the authorized monitoring gss names */
+  if (tr->mons->authorized_gss_names) {
+    tr_debug("tr_config_changed: freeing tr->mons->authorized_gss_names");
+    tr_gss_names_free(tr->mons->authorized_gss_names);
+  }
+  tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials);
+  if (tr->mons->authorized_gss_names == NULL) {
+    tr_err("tr_config_changed: Error configuring monitoring credentials");
+  }
+
   trps_set_connect_interval(trps, new_cfg->internal->trp_connect_interval);
   trps_set_update_interval(trps, new_cfg->internal->trp_update_interval);
   trps_set_sweep_interval(trps, new_cfg->internal->trp_sweep_interval);
@@ -857,6 +945,15 @@ void tr_config_changed(TR_CFG *new_cfg, void *cookie)
   trps_update_active_routes(trps); /* find new routes */
   trps_update(trps, TRP_UPDATE_TRIGGERED); /* send any triggered routes */
   tr_print_config(new_cfg);
-  tr_trps_print_route_table(trps, stderr);
+  table_str=tr_trps_route_table_to_str(NULL, trps);
+  if (table_str!=NULL) {
+    tr_info(table_str);
+    talloc_free(table_str);
+  }
+  table_str=tr_trps_comm_table_to_str(NULL, trps);
+  if (table_str!=NULL) {
+    tr_info(table_str);
+    talloc_free(table_str);
+  }
 }