+
+struct trpc_notify_cb_data {
+ int msg_ready;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+};
+
+static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
+{
+ struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
+ pthread_mutex_lock(&(cb_data->mutex));
+ if (!cb_data->msg_ready) {
+ cb_data->msg_ready=1;
+ pthread_cond_signal(&(cb_data->cond));
+ }
+ pthread_mutex_unlock(&(cb_data->mutex));
+}
+
+/* data passed to thread */
+struct trpc_thread_data {
+ TRPC_INSTANCE *trpc;
+ TRPS_INSTANCE *trps;
+};
+static void *tr_trpc_thread(void *arg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
+ TRPC_INSTANCE *trpc=thread_data->trpc;
+ TRPS_INSTANCE *trps=thread_data->trps;
+ TRP_RC rc=TRP_ERROR;
+ TR_MQ_MSG *msg=NULL;
+ const char *msg_type=NULL;
+ char *encoded_msg=NULL;
+
+ struct trpc_notify_cb_data cb_data={0,
+ PTHREAD_COND_INITIALIZER,
+ PTHREAD_MUTEX_INITIALIZER};
+
+ tr_debug("tr_trpc_thread: started");
+
+ /* set up the mq for receiving */
+ pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
+
+ tr_mq_lock(trpc->mq);
+ tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
+ tr_mq_unlock(trpc->mq);
+
+ rc=trpc_connect(trpc);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
+ trpc_get_server(trpc),
+ trpc_get_port(trpc));
+ } else {
+ while (1) {
+ cb_data.msg_ready=0;
+ pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
+ /* verify the condition */
+ if (cb_data.msg_ready) {
+ msg=trpc_mq_pop(trpc);
+ if (msg==NULL) {
+ /* no message in the queue */
+ tr_err("tr_trpc_thread: notified of msg, but queue empty");
+ break;
+ }
+
+ msg_type=tr_mq_msg_get_message(msg);
+
+ if (0==strcmp(msg_type, "trpc_abort")) {
+ tr_mq_msg_free(msg);
+ break; /* exit loop */
+ }
+ else if (0==strcmp(msg_type, "trpc_send")) {
+ encoded_msg=tr_mq_msg_get_payload(msg);
+ if (encoded_msg==NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ tr_mq_msg_free(msg);
+ break;
+ }
+ }
+ }
+ else
+ tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+
+ tr_mq_msg_free(msg);
+ }
+ }
+ }
+
+ msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
+ tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
+ if (msg==NULL)
+ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+ else
+ trps_mq_append(trps, msg);
+
+ talloc_free(tmp_ctx);
+ return NULL;
+}
+
+/* starts a trpc thread to connect to server:port */
+TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TRPC_INSTANCE *trpc=NULL;
+ TRP_CONNECTION *conn=NULL;
+ struct trpc_thread_data *thread_data=NULL;
+
+ tr_debug("tr_trpc_initiate entered");
+ trpc=trpc_new(tmp_ctx);
+ if (trpc==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
+ goto cleanup;
+ }
+ tr_debug("tr_trpc_initiate: allocated trpc");
+
+ conn=trp_connection_new(trpc);
+ if (conn==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
+ goto cleanup;
+ }
+ trpc_set_conn(trpc, conn);
+ trpc_set_server(trpc, talloc_strdup(trpc, server));
+ trpc_set_port(trpc, port);
+ tr_debug("tr_trpc_initiate: allocated connection");
+
+ /* start thread */
+ thread_data=talloc(trpc, struct trpc_thread_data);
+ if (thread_data==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
+ goto cleanup;
+ }
+ thread_data->trpc=trpc;
+ thread_data->trps=trps;
+
+ pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
+ pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
+
+ tr_debug("tr_trpc_initiate: started trpc thread");
+ trps_add_trpc(trps, trpc);
+
+ talloc_report_full(trps, stderr);
+ talloc_report_full(tmp_ctx, stderr);
+
+ cleanup:
+ talloc_free(tmp_ctx);
+ return trpc;
+}