- while(1) {
- cb_data.msg_ready=0;
- pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
- /* verify the condition */
- if (cb_data.msg_ready) {
- msg=trpc_mq_pop(trpc);
- if (msg==NULL) {
- /* no message in the queue */
- tr_err("tr_trpc_thread: notified of msg, but queue empty");
- break;
- }
-
- msg_type=tr_mq_msg_get_message(msg);
-
- if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
- tr_mq_msg_free(msg);
- break; /* exit loop */
- }
- else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
- encoded_msg=tr_mq_msg_get_payload(msg);
- if (encoded_msg==NULL)
+ /* Loop until we get an abort message or until the connection is lost. */
+ while(!exit_loop) {
+ /* Wait up to 10 minutes for a message to be queued to send to the peer.
+ * Log a warning if we go longer than that, but don't give up. */
+ if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) {
+ tr_err("tr_trpc_thread: unable to set abort timeout");
+ break; /* immediately exit the loop, don't go through cleanup */
+ }
+
+ /* Pop a message from the queue. */
+ msg = trpc_mq_pop(trpc, &wait_until);
+ if (msg) {
+ msg_type = tr_mq_msg_get_message(msg);
+ if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+ tr_debug("tr_trpc_thread: received abort message from main thread.");
+ exit_loop = 1;
+ } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+ encoded_msg = tr_mq_msg_get_payload(msg);
+ if (encoded_msg == NULL)