- cb_data.msg_ready=0;
- pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
- /* verify the condition */
- if (cb_data.msg_ready) {
- for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) {
- msg_type=tr_mq_msg_get_message(msg);
-
- if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
- exit_loop=1;
- break;
- }
- else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
- encoded_msg=tr_mq_msg_get_payload(msg);
- if (encoded_msg==NULL)
- tr_notice("tr_trpc_thread: null outgoing TRP message.");
- else {
- rc = trpc_send_msg(trpc, encoded_msg);
- if (rc!=TRP_SUCCESS) {
- tr_notice("tr_trpc_thread: trpc_send_msg failed.");
- exit_loop=1;
- break;
- }
+ /* Wait up to 10 minutes for a message to be queued to send to the peer.
+ * Log a warning if we go longer than that, but don't give up. */
+ if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) {
+ tr_err("tr_trpc_thread: unable to set abort timeout");
+ break; /* immediately exit the loop, don't go through cleanup */
+ }
+
+ /* Pop a message from the queue. */
+ msg = trpc_mq_pop(trpc, &wait_until);
+ if (msg) {
+ msg_type = tr_mq_msg_get_message(msg);
+ if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+ tr_debug("tr_trpc_thread: received abort message from main thread.");
+ exit_loop = 1;
+ } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+ encoded_msg = tr_mq_msg_get_payload(msg);
+ if (encoded_msg == NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc == TRP_SUCCESS) {
+ tr_debug("tr_trpc_thread: sent message.");
+ } else {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ /* Assume this means we lost the connection. */
+ exit_loop = 1;