From: Jennifer Richards Date: Thu, 29 Sep 2016 15:31:25 +0000 (-0400) Subject: Send all msgs in trpc outgoing queue, not just the first. X-Git-Tag: v2.1.1~38 X-Git-Url: http://www.project-moonshot.org/gitweb/?a=commitdiff_plain;h=c1e7f501a675cbdf1b42c7de0ba39a9478aae516;hp=5d88f5956e15c32cb0fb6c7bfdf8dec4ce34ee41;p=trust_router.git Send all msgs in trpc outgoing queue, not just the first. --- diff --git a/common/tr_msg.c b/common/tr_msg.c index 879ac55..9ae5909 100644 --- a/common/tr_msg.c +++ b/common/tr_msg.c @@ -133,6 +133,7 @@ TRP_UPD *tr_msg_get_trp_upd(TR_MSG *msg) void tr_msg_set_trp_upd(TR_MSG *msg, TRP_UPD *update) { msg->msg_rep=update; + talloc_steal(NULL, update); /* should attach to msg, but TR_MSG not usually talloc'ed */ msg->msg_type=TRP_UPDATE; } @@ -701,7 +702,7 @@ static json_t *tr_msg_encode_trp_upd(TRP_UPD *update) json_decref(jupdate); return NULL; } - json_object_set_new(jrec, "realm", jstr); + json_object_set_new(jupdate, "realm", jstr); jrecords=json_array(); if (jrecords==NULL) { @@ -726,7 +727,7 @@ static json_t *tr_msg_encode_trp_upd(TRP_UPD *update) return jupdate; } -/*Creates a linked list of records in the msg->body talloc context. +/* Creates a linked list of records in the msg->body talloc context. * An error will be returned if any unparseable records are encountered. */ static TRP_UPD *tr_msg_decode_trp_upd(TALLOC_CTX *mem_ctx, json_t *jupdate) @@ -748,20 +749,32 @@ static TRP_UPD *tr_msg_decode_trp_upd(TALLOC_CTX *mem_ctx, json_t *jupdate) } rc=tr_msg_get_json_string(jupdate, "community", &s, tmp_ctx); - if (rc != TRP_SUCCESS) + if (rc != TRP_SUCCESS) { + tr_debug("tr_msg_decode_trp_upd: no community in TRP update message."); + rc=TRP_NOPARSE; goto cleanup; + } name=tr_new_name(s); - if (name==NULL) + if (name==NULL) { + tr_debug("tr_msg_decode_trp_upd: could not allocate community name."); + rc=TRP_NOMEM; goto cleanup; + } talloc_free(s); s=NULL; trp_upd_set_comm(update, name); rc=tr_msg_get_json_string(jupdate, "realm", &s, tmp_ctx); - if (rc != TRP_SUCCESS) + if (rc != TRP_SUCCESS) { + tr_debug("tr_msg_decode_trp_upd: no realm in TRP update message."); + rc=TRP_NOPARSE; goto cleanup; + } name=tr_new_name(s); - if (name==NULL) + if (name==NULL) { + tr_debug("tr_msg_decode_trp_upd: could not allocate realm name."); + rc=TRP_NOMEM; goto cleanup; + } talloc_free(s); s=NULL; trp_upd_set_realm(update, name); diff --git a/tr/tr_trp.c b/tr/tr_trp.c index ef44e89..05f8d47 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -545,6 +545,8 @@ static void *tr_trpc_thread(void *arg) const char *msg_type=NULL; char *encoded_msg=NULL; TR_NAME *peer_gssname=NULL; + int n_sent=0; + int exit_loop=0; struct trpc_notify_cb_data cb_data={0, PTHREAD_COND_INITIALIZER, @@ -583,41 +585,40 @@ static void *tr_trpc_thread(void *arg) trps_mq_add(trps, msg); /* steals msg context */ msg=NULL; - while(1) { + while(!exit_loop) { cb_data.msg_ready=0; pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex)); /* verify the condition */ if (cb_data.msg_ready) { - msg=trpc_mq_pop(trpc); - if (msg==NULL) { - /* no message in the queue */ - tr_err("tr_trpc_thread: notified of msg, but queue empty"); - break; - } - - msg_type=tr_mq_msg_get_message(msg); + for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) { + msg_type=tr_mq_msg_get_message(msg); - if (0==strcmp(msg_type, TR_MQMSG_ABORT)) { - tr_mq_msg_free(msg); - break; /* exit loop */ - } - else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { - encoded_msg=tr_mq_msg_get_payload(msg); - if (encoded_msg==NULL) - tr_notice("tr_trpc_thread: null outgoing TRP message."); - else { - rc = trpc_send_msg(trpc, encoded_msg); - if (rc!=TRP_SUCCESS) { - tr_notice("tr_trpc_thread: trpc_send_msg failed."); - tr_mq_msg_free(msg); - break; + if (0==strcmp(msg_type, TR_MQMSG_ABORT)) { + exit_loop=1; + break; + } + else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { + encoded_msg=tr_mq_msg_get_payload(msg); + if (encoded_msg==NULL) + tr_notice("tr_trpc_thread: null outgoing TRP message."); + else { + rc = trpc_send_msg(trpc, encoded_msg); + if (rc!=TRP_SUCCESS) { + tr_notice("tr_trpc_thread: trpc_send_msg failed."); + exit_loop=1; + break; + } } } - } - else - tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); + else + tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); - tr_mq_msg_free(msg); + tr_mq_msg_free(msg); + } + if (n_sent==0) + tr_err("tr_trpc_thread: notified of msg, but queue empty"); + else + tr_debug("tr_trpc_thread: sent %d messages.", n_sent); } } } diff --git a/trp/trp_upd.c b/trp/trp_upd.c index cbffd8c..c8bd784 100644 --- a/trp/trp_upd.c +++ b/trp/trp_upd.c @@ -324,14 +324,6 @@ TRP_RC trp_inforec_set_interval(TRP_INFOREC *rec, unsigned int interval) return TRP_ERROR; } -static void trp_inforec_set_data(TRP_INFOREC *rec, TRP_INFOREC_DATA *data) -{ - if (rec->data!=NULL) - talloc_free(rec->data); - rec->data=data; - talloc_steal(rec, data); /* make sure it's in our context */ -} - /* generic record type */ TRP_INFOREC *trp_inforec_new(TALLOC_CTX *mem_ctx, TRP_INFOREC_TYPE type) { @@ -345,7 +337,7 @@ TRP_INFOREC *trp_inforec_new(TALLOC_CTX *mem_ctx, TRP_INFOREC_TYPE type) if (dtype->allocate!=NULL) { data=dtype->allocate(new_rec); if (data!=NULL) - trp_inforec_set_data(new_rec, data); + new_rec->data=data; else { talloc_free(new_rec); return NULL; @@ -364,6 +356,10 @@ void trp_inforec_free(TRP_INFOREC *rec) static int trp_upd_destructor(void *object) { TRP_UPD *upd=talloc_get_type_abort(object, TRP_UPD); + if (upd->realm!=NULL) + tr_free_name(upd->realm); + if (upd->comm!=NULL) + tr_free_name(upd->comm); if (upd->peer!=NULL) tr_free_name(upd->peer); return 0; @@ -374,6 +370,8 @@ TRP_UPD *trp_upd_new(TALLOC_CTX *mem_ctx) TRP_UPD *new_body=talloc(mem_ctx, TRP_UPD); if (new_body!=NULL) { + new_body->realm=NULL; + new_body->comm=NULL; new_body->records=NULL; new_body->peer=NULL; talloc_set_destructor((void *)new_body, trp_upd_destructor); diff --git a/trp/trps.c b/trp/trps.c index 54a43a7..e01b5bd 100644 --- a/trp/trps.c +++ b/trp/trps.c @@ -1024,46 +1024,62 @@ static TRP_RC trps_update_one_peer(TRPS_INSTANCE *trps, } n_updates=1; } else { - tr_err("trps_update_one_peer: error: only comm or realm was specified."); + tr_err("trps_update_one_peer: error: only comm or realm was specified. Need both or neither."); rc=TRP_ERROR; goto cleanup; } if ((n_updates>0) && (update_list!=NULL)) { tr_debug("trps_update_one_peer: sending %u update records.", (unsigned int)n_updates); - upd=trp_upd_new(tmp_ctx); - - /* TODO: set realm/comm in update; used to be in the inforec */ for (ii=0; iilen, peer_label->buf);