void tr_msg_set_trp_upd(TR_MSG *msg, TRP_UPD *update)
{
msg->msg_rep=update;
+ talloc_steal(NULL, update); /* should attach to msg, but TR_MSG not usually talloc'ed */
msg->msg_type=TRP_UPDATE;
}
json_decref(jupdate);
return NULL;
}
- json_object_set_new(jrec, "realm", jstr);
+ json_object_set_new(jupdate, "realm", jstr);
jrecords=json_array();
if (jrecords==NULL) {
return jupdate;
}
-/*Creates a linked list of records in the msg->body talloc context.
+/* Creates a linked list of records in the msg->body talloc context.
* An error will be returned if any unparseable records are encountered.
*/
static TRP_UPD *tr_msg_decode_trp_upd(TALLOC_CTX *mem_ctx, json_t *jupdate)
}
rc=tr_msg_get_json_string(jupdate, "community", &s, tmp_ctx);
- if (rc != TRP_SUCCESS)
+ if (rc != TRP_SUCCESS) {
+ tr_debug("tr_msg_decode_trp_upd: no community in TRP update message.");
+ rc=TRP_NOPARSE;
goto cleanup;
+ }
name=tr_new_name(s);
- if (name==NULL)
+ if (name==NULL) {
+ tr_debug("tr_msg_decode_trp_upd: could not allocate community name.");
+ rc=TRP_NOMEM;
goto cleanup;
+ }
talloc_free(s); s=NULL;
trp_upd_set_comm(update, name);
rc=tr_msg_get_json_string(jupdate, "realm", &s, tmp_ctx);
- if (rc != TRP_SUCCESS)
+ if (rc != TRP_SUCCESS) {
+ tr_debug("tr_msg_decode_trp_upd: no realm in TRP update message.");
+ rc=TRP_NOPARSE;
goto cleanup;
+ }
name=tr_new_name(s);
- if (name==NULL)
+ if (name==NULL) {
+ tr_debug("tr_msg_decode_trp_upd: could not allocate realm name.");
+ rc=TRP_NOMEM;
goto cleanup;
+ }
talloc_free(s); s=NULL;
trp_upd_set_realm(update, name);
const char *msg_type=NULL;
char *encoded_msg=NULL;
TR_NAME *peer_gssname=NULL;
+ int n_sent=0;
+ int exit_loop=0;
struct trpc_notify_cb_data cb_data={0,
PTHREAD_COND_INITIALIZER,
trps_mq_add(trps, msg); /* steals msg context */
msg=NULL;
- while(1) {
+ while(!exit_loop) {
cb_data.msg_ready=0;
pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
/* verify the condition */
if (cb_data.msg_ready) {
- msg=trpc_mq_pop(trpc);
- if (msg==NULL) {
- /* no message in the queue */
- tr_err("tr_trpc_thread: notified of msg, but queue empty");
- break;
- }
-
- msg_type=tr_mq_msg_get_message(msg);
+ for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) {
+ msg_type=tr_mq_msg_get_message(msg);
- if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
- tr_mq_msg_free(msg);
- break; /* exit loop */
- }
- else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
- encoded_msg=tr_mq_msg_get_payload(msg);
- if (encoded_msg==NULL)
- tr_notice("tr_trpc_thread: null outgoing TRP message.");
- else {
- rc = trpc_send_msg(trpc, encoded_msg);
- if (rc!=TRP_SUCCESS) {
- tr_notice("tr_trpc_thread: trpc_send_msg failed.");
- tr_mq_msg_free(msg);
- break;
+ if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
+ exit_loop=1;
+ break;
+ }
+ else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+ encoded_msg=tr_mq_msg_get_payload(msg);
+ if (encoded_msg==NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ exit_loop=1;
+ break;
+ }
}
}
- }
- else
- tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+ else
+ tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
- tr_mq_msg_free(msg);
+ tr_mq_msg_free(msg);
+ }
+ if (n_sent==0)
+ tr_err("tr_trpc_thread: notified of msg, but queue empty");
+ else
+ tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
}
}
}
return TRP_ERROR;
}
-static void trp_inforec_set_data(TRP_INFOREC *rec, TRP_INFOREC_DATA *data)
-{
- if (rec->data!=NULL)
- talloc_free(rec->data);
- rec->data=data;
- talloc_steal(rec, data); /* make sure it's in our context */
-}
-
/* generic record type */
TRP_INFOREC *trp_inforec_new(TALLOC_CTX *mem_ctx, TRP_INFOREC_TYPE type)
{
if (dtype->allocate!=NULL) {
data=dtype->allocate(new_rec);
if (data!=NULL)
- trp_inforec_set_data(new_rec, data);
+ new_rec->data=data;
else {
talloc_free(new_rec);
return NULL;
static int trp_upd_destructor(void *object)
{
TRP_UPD *upd=talloc_get_type_abort(object, TRP_UPD);
+ if (upd->realm!=NULL)
+ tr_free_name(upd->realm);
+ if (upd->comm!=NULL)
+ tr_free_name(upd->comm);
if (upd->peer!=NULL)
tr_free_name(upd->peer);
return 0;
TRP_UPD *new_body=talloc(mem_ctx, TRP_UPD);
if (new_body!=NULL) {
+ new_body->realm=NULL;
+ new_body->comm=NULL;
new_body->records=NULL;
new_body->peer=NULL;
talloc_set_destructor((void *)new_body, trp_upd_destructor);
}
n_updates=1;
} else {
- tr_err("trps_update_one_peer: error: only comm or realm was specified.");
+ tr_err("trps_update_one_peer: error: only comm or realm was specified. Need both or neither.");
rc=TRP_ERROR;
goto cleanup;
}
if ((n_updates>0) && (update_list!=NULL)) {
tr_debug("trps_update_one_peer: sending %u update records.", (unsigned int)n_updates);
- upd=trp_upd_new(tmp_ctx);
-
- /* TODO: set realm/comm in update; used to be in the inforec */
for (ii=0; ii<n_updates; ii++) {
+ upd=trp_upd_new(tmp_ctx);
+ if (upd==NULL) {
+ tr_err("trps_update_one_peer: could not create update message.");
+ rc=TRP_NOMEM;
+ goto cleanup;
+ }
+ trp_upd_set_realm(upd, trp_route_dup_realm(update_list[ii]));
+ if (trp_upd_get_realm(upd)==NULL) {
+ tr_err("trps_update_one_peer: could not copy realm.");
+ rc=TRP_NOMEM;
+ goto cleanup;
+ }
+ trp_upd_set_comm(upd, trp_route_dup_comm(update_list[ii]));
+ if (trp_upd_get_comm(upd)==NULL) {
+ tr_err("trps_update_one_peer: could not copy comm.");
+ rc=TRP_NOMEM;
+ goto cleanup;
+ }
rec=trps_route_to_inforec(tmp_ctx, trps, update_list[ii]);
if (rec==NULL) {
tr_err("trps_update_one_peer: could not create all update records.");
- rc=TRP_ERROR;
+ rc=TRP_NOMEM;
goto cleanup;
}
trp_upd_add_inforec(upd, rec);
+
+ /* now encode the update message */
+ tr_msg_set_trp_upd(&msg, upd);
+ encoded=tr_msg_encode(&msg);
+ if (encoded==NULL) {
+ tr_err("trps_update_one_peer: error encoding update.");
+ rc=TRP_ERROR;
+ goto cleanup;
+ }
+
+ tr_debug("trps_update_one_peer: adding message to queue.");
+ if (trps_send_msg(trps, peer, encoded) != TRP_SUCCESS)
+ tr_err("trps_update_one_peer: error queueing update.");
+ else
+ tr_debug("trps_update_one_peer: update queued successfully.");
+
+ tr_msg_free_encoded(encoded);
+ encoded=NULL;
+ trp_upd_free(upd);
+ upd=NULL;
}
talloc_free(update_list);
update_list=NULL;
- /* now encode the update message */
- tr_msg_set_trp_upd(&msg, upd);
- encoded=tr_msg_encode(&msg);
- if (encoded==NULL) {
- tr_err("trps_update_one_peer: error encoding update.");
- rc=TRP_ERROR;
- goto cleanup;
- }
-
- tr_debug("trps_update_one_peer: adding message to queue.");
- if (trps_send_msg(trps, peer, encoded) != TRP_SUCCESS)
- tr_err("trps_update_one_peer: error queueing update.");
- else
- tr_debug("trps_update_one_peer: update queued successfully.");
-
- tr_msg_free_encoded(encoded);
- encoded=NULL;
- trp_upd_free(upd);
- upd=NULL;
} else if (n_updates==0)
tr_debug("trps_update_one_peer: no updates for %.*s", peer_label->len, peer_label->buf);