+/*
+ * Copyright (c) 2016, JANET(UK)
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * 3. Neither the name of JANET(UK) nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
#include <stdio.h>
#include <pthread.h>
#include <fcntl.h>
tr_debug("tr_trps_cleanup_conn: freeing %p", conn);
pthread_join(*trp_connection_get_thread(conn), NULL);
trps_remove_connection(trps, conn);
- talloc_report_full(conn, stderr);
trp_connection_free(conn);
tr_debug("tr_trps_cleanup_conn: deleted connection");
}
TR_MQ_MSG *msg=NULL;
const char *s=NULL;
- talloc_report_full(trps->mq, stderr);
msg=trps_mq_pop(trps);
while (msg!=NULL) {
s=tr_mq_msg_get_message(msg);
const char *msg_type=NULL;
char *encoded_msg=NULL;
TR_NAME *peer_gssname=NULL;
+ int n_sent=0;
+ int exit_loop=0;
struct trpc_notify_cb_data cb_data={0,
PTHREAD_COND_INITIALIZER,
trps_mq_add(trps, msg); /* steals msg context */
msg=NULL;
- while(1) {
+ while(!exit_loop) {
cb_data.msg_ready=0;
pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
/* verify the condition */
if (cb_data.msg_ready) {
- msg=trpc_mq_pop(trpc);
- if (msg==NULL) {
- /* no message in the queue */
- tr_err("tr_trpc_thread: notified of msg, but queue empty");
- break;
- }
+ for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) {
+ msg_type=tr_mq_msg_get_message(msg);
- msg_type=tr_mq_msg_get_message(msg);
-
- if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
- tr_mq_msg_free(msg);
- break; /* exit loop */
- }
- else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
- encoded_msg=tr_mq_msg_get_payload(msg);
- if (encoded_msg==NULL)
- tr_notice("tr_trpc_thread: null outgoing TRP message.");
- else {
- rc = trpc_send_msg(trpc, encoded_msg);
- if (rc!=TRP_SUCCESS) {
- tr_notice("tr_trpc_thread: trpc_send_msg failed.");
- tr_mq_msg_free(msg);
- break;
+ if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
+ exit_loop=1;
+ break;
+ }
+ else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+ encoded_msg=tr_mq_msg_get_payload(msg);
+ if (encoded_msg==NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ exit_loop=1;
+ break;
+ }
}
}
- }
- else
- tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+ else
+ tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
- tr_mq_msg_free(msg);
+ tr_mq_msg_free(msg);
+ }
+ if (n_sent==0)
+ tr_err("tr_trpc_thread: notified of msg, but queue empty");
+ else
+ tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
}
}
}