Send all msgs in trpc outgoing queue, not just the first.
authorJennifer Richards <jennifer@painless-security.com>
Thu, 29 Sep 2016 15:31:25 +0000 (11:31 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Thu, 29 Sep 2016 15:31:25 +0000 (11:31 -0400)
common/tr_msg.c
tr/tr_trp.c
trp/trp_upd.c
trp/trps.c

index 879ac55..9ae5909 100644 (file)
@@ -133,6 +133,7 @@ TRP_UPD *tr_msg_get_trp_upd(TR_MSG *msg)
 void tr_msg_set_trp_upd(TR_MSG *msg, TRP_UPD *update)
 {
   msg->msg_rep=update;
+  talloc_steal(NULL, update); /* should attach to msg, but TR_MSG not usually talloc'ed */
   msg->msg_type=TRP_UPDATE;
 }
 
@@ -701,7 +702,7 @@ static json_t *tr_msg_encode_trp_upd(TRP_UPD *update)
     json_decref(jupdate);
     return NULL;
   }
-  json_object_set_new(jrec, "realm", jstr);
+  json_object_set_new(jupdate, "realm", jstr);
 
   jrecords=json_array();
   if (jrecords==NULL) {
@@ -726,7 +727,7 @@ static json_t *tr_msg_encode_trp_upd(TRP_UPD *update)
   return jupdate;
 }
 
-/*Creates a linked list of records in the msg->body talloc context.
+/* Creates a linked list of records in the msg->body talloc context.
  * An error will be returned if any unparseable records are encountered. 
  */
 static TRP_UPD *tr_msg_decode_trp_upd(TALLOC_CTX *mem_ctx, json_t *jupdate)
@@ -748,20 +749,32 @@ static TRP_UPD *tr_msg_decode_trp_upd(TALLOC_CTX *mem_ctx, json_t *jupdate)
   }
 
   rc=tr_msg_get_json_string(jupdate, "community", &s, tmp_ctx);
-  if (rc != TRP_SUCCESS)
+  if (rc != TRP_SUCCESS) {
+    tr_debug("tr_msg_decode_trp_upd: no community in TRP update message.");
+    rc=TRP_NOPARSE;
     goto cleanup;
+  }
   name=tr_new_name(s);
-  if (name==NULL)
+  if (name==NULL) {
+    tr_debug("tr_msg_decode_trp_upd: could not allocate community name.");
+    rc=TRP_NOMEM;
     goto cleanup;
+  }
   talloc_free(s); s=NULL;
   trp_upd_set_comm(update, name);
 
   rc=tr_msg_get_json_string(jupdate, "realm", &s, tmp_ctx);
-  if (rc != TRP_SUCCESS)
+  if (rc != TRP_SUCCESS) {
+    tr_debug("tr_msg_decode_trp_upd: no realm in TRP update message.");
+    rc=TRP_NOPARSE;
     goto cleanup;
+  }
   name=tr_new_name(s);
-  if (name==NULL)
+  if (name==NULL) {
+    tr_debug("tr_msg_decode_trp_upd: could not allocate realm name.");
+    rc=TRP_NOMEM;
     goto cleanup;
+  }
   talloc_free(s); s=NULL;
   trp_upd_set_realm(update, name);
 
index ef44e89..05f8d47 100644 (file)
@@ -545,6 +545,8 @@ static void *tr_trpc_thread(void *arg)
   const char *msg_type=NULL;
   char *encoded_msg=NULL;
   TR_NAME *peer_gssname=NULL;
+  int n_sent=0;
+  int exit_loop=0;
 
   struct trpc_notify_cb_data cb_data={0,
                                       PTHREAD_COND_INITIALIZER,
@@ -583,41 +585,40 @@ static void *tr_trpc_thread(void *arg)
     trps_mq_add(trps, msg); /* steals msg context */
     msg=NULL;
 
-    while(1) {
+    while(!exit_loop) {
       cb_data.msg_ready=0;
       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
       /* verify the condition */
       if (cb_data.msg_ready) {
-        msg=trpc_mq_pop(trpc);
-        if (msg==NULL) {
-          /* no message in the queue */
-          tr_err("tr_trpc_thread: notified of msg, but queue empty");
-          break;
-        }
-
-        msg_type=tr_mq_msg_get_message(msg);
+        for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) {
+          msg_type=tr_mq_msg_get_message(msg);
 
-        if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
-          tr_mq_msg_free(msg);
-          break; /* exit loop */
-        }
-        else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
-          encoded_msg=tr_mq_msg_get_payload(msg);
-          if (encoded_msg==NULL)
-            tr_notice("tr_trpc_thread: null outgoing TRP message.");
-          else {
-            rc = trpc_send_msg(trpc, encoded_msg);
-            if (rc!=TRP_SUCCESS) {
-              tr_notice("tr_trpc_thread: trpc_send_msg failed.");
-              tr_mq_msg_free(msg);
-              break;
+          if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
+            exit_loop=1;
+            break;
+          }
+          else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+            encoded_msg=tr_mq_msg_get_payload(msg);
+            if (encoded_msg==NULL)
+              tr_notice("tr_trpc_thread: null outgoing TRP message.");
+            else {
+              rc = trpc_send_msg(trpc, encoded_msg);
+              if (rc!=TRP_SUCCESS) {
+                tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+                exit_loop=1;
+                break;
+              }
             }
           }
-        }
-        else
-          tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+          else
+            tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
 
-        tr_mq_msg_free(msg);
+          tr_mq_msg_free(msg);
+        }
+        if (n_sent==0)
+          tr_err("tr_trpc_thread: notified of msg, but queue empty");
+        else 
+          tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
       }
     }
   }
index cbffd8c..c8bd784 100644 (file)
@@ -324,14 +324,6 @@ TRP_RC trp_inforec_set_interval(TRP_INFOREC *rec, unsigned int interval)
   return TRP_ERROR;
 }
 
-static void trp_inforec_set_data(TRP_INFOREC *rec, TRP_INFOREC_DATA *data)
-{
-  if (rec->data!=NULL)
-    talloc_free(rec->data);
-  rec->data=data;
-  talloc_steal(rec, data); /* make sure it's in our context */
-}
-
 /* generic record type */
 TRP_INFOREC *trp_inforec_new(TALLOC_CTX *mem_ctx, TRP_INFOREC_TYPE type)
 {
@@ -345,7 +337,7 @@ TRP_INFOREC *trp_inforec_new(TALLOC_CTX *mem_ctx, TRP_INFOREC_TYPE type)
     if (dtype->allocate!=NULL) {
       data=dtype->allocate(new_rec);
       if (data!=NULL)
-        trp_inforec_set_data(new_rec, data);
+        new_rec->data=data;
       else {
         talloc_free(new_rec);
         return NULL;
@@ -364,6 +356,10 @@ void trp_inforec_free(TRP_INFOREC *rec)
 static int trp_upd_destructor(void *object)
 {
   TRP_UPD *upd=talloc_get_type_abort(object, TRP_UPD);
+  if (upd->realm!=NULL)
+    tr_free_name(upd->realm);
+  if (upd->comm!=NULL)
+    tr_free_name(upd->comm);
   if (upd->peer!=NULL)
     tr_free_name(upd->peer);
   return 0;
@@ -374,6 +370,8 @@ TRP_UPD *trp_upd_new(TALLOC_CTX *mem_ctx)
   TRP_UPD *new_body=talloc(mem_ctx, TRP_UPD);
 
   if (new_body!=NULL) {
+    new_body->realm=NULL;
+    new_body->comm=NULL;
     new_body->records=NULL;
     new_body->peer=NULL;
     talloc_set_destructor((void *)new_body, trp_upd_destructor);
index 54a43a7..e01b5bd 100644 (file)
@@ -1024,46 +1024,62 @@ static TRP_RC trps_update_one_peer(TRPS_INSTANCE *trps,
     }
     n_updates=1;
   } else {
-    tr_err("trps_update_one_peer: error: only comm or realm was specified.");
+    tr_err("trps_update_one_peer: error: only comm or realm was specified. Need both or neither.");
     rc=TRP_ERROR;
     goto cleanup;
   }
   if ((n_updates>0) && (update_list!=NULL)) {
     tr_debug("trps_update_one_peer: sending %u update records.", (unsigned int)n_updates);
-    upd=trp_upd_new(tmp_ctx);
-
-    /* TODO: set realm/comm in update; used to be in the inforec */
     for (ii=0; ii<n_updates; ii++) {
+      upd=trp_upd_new(tmp_ctx);
+      if (upd==NULL) {
+        tr_err("trps_update_one_peer: could not create update message.");
+        rc=TRP_NOMEM;
+        goto cleanup;
+      }
+      trp_upd_set_realm(upd, trp_route_dup_realm(update_list[ii]));
+      if (trp_upd_get_realm(upd)==NULL) {
+        tr_err("trps_update_one_peer: could not copy realm.");
+        rc=TRP_NOMEM;
+        goto cleanup;
+      }
+      trp_upd_set_comm(upd, trp_route_dup_comm(update_list[ii]));
+      if (trp_upd_get_comm(upd)==NULL) {
+        tr_err("trps_update_one_peer: could not copy comm.");
+        rc=TRP_NOMEM;
+        goto cleanup;
+      }
       rec=trps_route_to_inforec(tmp_ctx, trps, update_list[ii]);
       if (rec==NULL) {
         tr_err("trps_update_one_peer: could not create all update records.");
-        rc=TRP_ERROR;
+        rc=TRP_NOMEM;
         goto cleanup;
       }
       trp_upd_add_inforec(upd, rec);
+
+      /* now encode the update message */
+      tr_msg_set_trp_upd(&msg, upd);
+      encoded=tr_msg_encode(&msg);
+      if (encoded==NULL) {
+        tr_err("trps_update_one_peer: error encoding update.");
+        rc=TRP_ERROR;
+        goto cleanup;
+      }
+
+      tr_debug("trps_update_one_peer: adding message to queue.");
+      if (trps_send_msg(trps, peer, encoded) != TRP_SUCCESS)
+        tr_err("trps_update_one_peer: error queueing update.");
+      else
+        tr_debug("trps_update_one_peer: update queued successfully.");
+
+      tr_msg_free_encoded(encoded);
+      encoded=NULL;
+      trp_upd_free(upd);
+      upd=NULL;
     }
     talloc_free(update_list);
     update_list=NULL;
 
-    /* now encode the update message */
-    tr_msg_set_trp_upd(&msg, upd);
-    encoded=tr_msg_encode(&msg);
-    if (encoded==NULL) {
-      tr_err("trps_update_one_peer: error encoding update.");
-      rc=TRP_ERROR;
-      goto cleanup;
-    }
-
-    tr_debug("trps_update_one_peer: adding message to queue.");
-    if (trps_send_msg(trps, peer, encoded) != TRP_SUCCESS)
-      tr_err("trps_update_one_peer: error queueing update.");
-    else
-      tr_debug("trps_update_one_peer: update queued successfully.");
-
-    tr_msg_free_encoded(encoded);
-    encoded=NULL;
-    trp_upd_free(upd);
-    upd=NULL;
   } else if (n_updates==0)
     tr_debug("trps_update_one_peer: no updates for %.*s", peer_label->len, peer_label->buf);