+/**
+ * Event handler to process TRP messages from connection threads. These
+ * are added to the message queue (mq) in tr_trps_msg_handler(), which
+ * runs in the other threads.
+ *
+ * @param socket Ignored
+ * @param event Ignored
+ * @param arg Pointer to the TRPS_INSTANCE
+ */
+static void tr_trps_process_mq(int socket, short event, void *arg)
+{
+ TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
+ TR_MQ_MSG *msg=NULL;
+ const char *s=NULL;
+ TRP_PEER *peer = NULL;
+ char *tmp = NULL;
+
+ msg=trps_mq_pop(trps);
+ while (msg!=NULL) {
+ s=tr_mq_msg_get_message(msg);
+ if (0==strcmp(s, TR_MQMSG_TRPS_CONNECTED)) {
+ TR_NAME *peer_gssname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+ peer=trps_get_peer_by_gssname(trps, peer_gssname); /* get the peer record */
+ tmp = tr_name_strdup(peer_gssname); /* get the name as a null-terminated string */
+ if (peer==NULL)
+ tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp);
+ else {
+ trp_peer_set_incoming_status(peer, PEER_CONNECTED);
+ tr_notice("tr_trps_process_mq: incoming connection from %s established.", tmp);
+ }
+ free(tmp);
+ }
+ else if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
+ TRP_CONNECTION *conn=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION);
+ TR_NAME *peer_gssname=trp_connection_get_peer(conn);
+ peer=trps_get_peer_by_gssname(trps, peer_gssname); /* get the peer record */
+ tmp = tr_name_strdup(peer_gssname); /* get the name as a null-terminated string */
+ if (peer==NULL) {
+ tr_err("tr_trps_process_mq: incoming connection from unknown peer (%.*s) lost.", tmp);
+ } else {
+ trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
+ tr_trps_cleanup_conn(trps, conn);
+ tr_notice("tr_trps_process_mq: incoming connection from %s lost.", tmp);
+ }
+ free(tmp);
+ }
+ else if (0==strcmp(s, TR_MQMSG_TRPC_CONNECTED)) {
+ TR_NAME *svcname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+ peer=trps_get_peer_by_servicename(trps, svcname);
+ tmp = tr_name_strdup(svcname);
+ if (peer==NULL)
+ tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp);
+ else {
+ trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
+ tr_notice("tr_trps_process_mq: outgoing connection to %s established.", tmp);
+ }
+ free(tmp);
+ }
+ else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
+ TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
+ TR_NAME *svcname=trpc_get_gssname(trpc);
+ peer=trps_get_peer_by_servicename(trps, svcname);
+ tmp = tr_name_strdup(svcname);
+ if (peer==NULL)
+ tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp);
+ else {
+ trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
+ tr_notice("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
+ tr_trps_cleanup_trpc(trps, trpc);
+ }
+ free(tmp);
+ }