Wildcard route requests now working.
authorJennifer Richards <jennifer@painless-security.com>
Thu, 4 Aug 2016 02:17:15 +0000 (22:17 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Thu, 4 Aug 2016 02:17:15 +0000 (22:17 -0400)
common/tr_mq.c
include/tr_mq.h
include/tr_trp.h
include/trp_internal.h
include/trp_ptable.h
tr/tr_main.c
tr/tr_trp.c
trp/trp_ptable.c
trp/trpc.c
trp/trps.c

index 5240c1a..592e51c 100644 (file)
@@ -13,11 +13,12 @@ static int tr_mq_msg_destructor(void *object)
   return 0;
 }
 
-TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message)
+TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message, TR_MQ_PRIORITY prio)
 {
   TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG);
   if (msg!=NULL) {
     msg->next=NULL;
+    msg->prio=prio;
     msg->message=talloc_strdup(msg, message);
     if (msg->message==NULL) {
       talloc_free(msg);
@@ -35,6 +36,11 @@ void tr_mq_msg_free(TR_MQ_MSG *msg)
     talloc_free(msg);
 }
 
+TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg)
+{
+  return msg->prio;
+}
+
 const char *tr_mq_msg_get_message(TR_MQ_MSG *msg)
 {
   return msg->message;
@@ -71,6 +77,7 @@ TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx)
     pthread_mutex_init(&(mq->mutex), 0);
     mq->head=NULL;
     mq->tail=NULL;
+    mq->last_hi_prio=NULL;
   }
   return mq;
 }
@@ -119,17 +126,32 @@ void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg)
   mq->notify_cb_arg=arg;
 }
 
-
-/* puts msg in mq's talloc context */
-void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
+void tr_mq_clear(TR_MQ *mq)
 {
-  int was_empty=0;
-  TR_MQ_NOTIFY_FN notify_cb=NULL;
-  void *notify_cb_arg=NULL;
+  TR_MQ_MSG *m=NULL;
+  TR_MQ_MSG *n=NULL;
 
   tr_mq_lock(mq);
+  m=tr_mq_get_head(mq);
+  while (m!=NULL) {
+    n=tr_mq_msg_get_next(m);
+    tr_mq_msg_free(m);
+    m=n;
+  }
+  tr_mq_set_head(mq, NULL);
+  tr_mq_set_tail(mq, NULL);
+  tr_mq_unlock(mq);
+}
+
+static int tr_mq_empty(TR_MQ *mq)
+{
+  return tr_mq_get_head(mq)==NULL;
+}
+
+/* puts msg in mq's talloc context */
+static void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
+{
   if (tr_mq_get_head(mq)==NULL) {
-    was_empty=1;
     tr_mq_set_head(mq, msg);
     tr_mq_set_tail(mq, msg);
   } else {
@@ -137,9 +159,65 @@ void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
     tr_mq_set_tail(mq, msg); /* update tail of list */
   }
   talloc_steal(mq, msg);
+}
+
+static void tr_mq_append_high_prio(TR_MQ *mq, TR_MQ_MSG *new)
+{
+  if (tr_mq_get_head(mq)==NULL) {
+    tr_mq_set_head(mq, new);
+    tr_mq_set_tail(mq, new);
+  } else if (mq->last_hi_prio==NULL) {
+    tr_mq_msg_set_next(new, tr_mq_get_head(mq)); /* add to front of list */
+    tr_mq_set_head(mq, new); /* update head of list */
+  } else {
+    tr_mq_msg_set_next(new, tr_mq_msg_get_next(mq->last_hi_prio));
+    tr_mq_msg_set_next(mq->last_hi_prio, new); /* add to end of hi prio msgs */
+  }
+  mq->last_hi_prio=new; /* in any case, this is now the last high priority msg */
+  talloc_steal(mq,new);
+}
+
+#define DEBUG_TR_MQ 0
+#if DEBUG_TR_MQ
+static void tr_mq_print(TR_MQ *mq)
+{
+  TR_MQ_MSG *m=mq->head;
+  int ii=0;
+
+  tr_debug("tr_mq_print: mq contents:");
+  while(m!=NULL) {
+    ii++;
+    tr_debug("tr_mq_print: Entry %02d: %-15s (prio %d)",
+             ii, tr_mq_msg_get_message(m), tr_mq_msg_get_prio(m));
+    m=tr_mq_msg_get_next(m);
+  }
+}
+#endif
+void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
+{
+  int was_empty=0;
+  TR_MQ_NOTIFY_FN notify_cb=NULL;
+  void *notify_cb_arg=NULL;
+
+  tr_mq_lock(mq);
+  
+  was_empty=tr_mq_empty(mq);
+  switch (tr_mq_msg_get_prio(msg)) {
+  case TR_MQ_PRIO_HIGH:
+    tr_mq_append_high_prio(mq, msg);
+    break;
+  default:
+    tr_mq_append(mq, msg);
+    break;
+  }
   /* before releasing the mutex, get notify_cb data out of mq */
   notify_cb=mq->notify_cb;
   notify_cb_arg=mq->notify_cb_arg;
+
+#if DEBUG_TR_MQ
+  tr_mq_print(mq);
+#endif 
+
   tr_mq_unlock(mq);
 
   /* see if we need to tell someone we became non-empty */
@@ -156,6 +234,10 @@ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
   if (tr_mq_get_head(mq)!=NULL) {
     popped=tr_mq_get_head(mq);
     tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
+
+    if (popped==mq->last_hi_prio)
+      mq->last_hi_prio=NULL;
+
     if (tr_mq_get_head(mq)==NULL)
       tr_mq_set_tail(mq, NULL); /* just popped the last element */
   }
@@ -164,3 +246,4 @@ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
     tr_mq_msg_set_next(popped, NULL); /* disconnect from list */
   return popped;
 }
+
index 29c8d5f..0dd7f38 100644 (file)
@@ -4,10 +4,20 @@
 #include <talloc.h>
 #include <pthread.h>
 
+/* Note on mq priorities: High priority messages are guaranteed to be
+ * processed before any normal priority messages. Otherwise, messages
+ * will be processed in the order they are added to the queue. */
+
+typedef enum tr_mq_priority {
+  TR_MQ_PRIO_NORMAL=0,
+  TR_MQ_PRIO_HIGH
+} TR_MQ_PRIORITY;
+
 /* msg for inter-thread messaging */
 typedef struct tr_mq_msg TR_MQ_MSG;
 struct tr_mq_msg {
   TR_MQ_MSG *next;
+  TR_MQ_PRIORITY prio;
   char *message;
   void *p; /* payload */
   void (*p_free)(void *); /* function to free payload */
@@ -21,6 +31,7 @@ struct tr_mq {
   pthread_mutex_t mutex;
   TR_MQ_MSG *head;
   TR_MQ_MSG *tail;
+  TR_MQ_MSG *last_hi_prio;
   TR_MQ_NOTIFY_FN notify_cb; /* callback when queue becomes non-empty */
   void *notify_cb_arg;
 };
@@ -28,8 +39,9 @@ struct tr_mq {
 /* message string for sending trpc messages */
 #define TR_MQMSG_TRPC_SEND "trpc send msg"
 
-TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg);
+TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg, TR_MQ_PRIORITY prio);
 void tr_mq_msg_free(TR_MQ_MSG *msg);
+TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg);
 const char *tr_mq_msg_get_message(TR_MQ_MSG *msg);
 void *tr_mq_msg_get_payload(TR_MQ_MSG *msg);
 void tr_mq_msg_set_payload(TR_MQ_MSG *msg, void *p, void (*p_free)(void *));
@@ -40,7 +52,8 @@ void tr_mq_free(TR_MQ *mq);
 int tr_mq_lock(TR_MQ *mq);
 int tr_mq_unlock(TR_MQ *mq);
 void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg);
-void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg);
+void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg);
 TR_MQ_MSG *tr_mq_pop(TR_MQ *mq);
+void tr_mq_clear(TR_MQ *mq);
  
 #endif /*_TR_MQ_H_ */
index 542e590..d597cb0 100644 (file)
@@ -43,4 +43,5 @@ TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg);
 TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev);
 void tr_config_changed(TR_CFG *new_cfg, void *cookie);
 TRP_RC tr_connect_to_peers(TRPS_INSTANCE *trps, struct event *ev);
+void tr_peer_status_change(TRP_PEER *peer, void *cookie);
 #endif /* TR_TRP_H */
index 051c00a..f9978cc 100644 (file)
@@ -149,9 +149,10 @@ void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port);
 TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc);
 TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc);
 void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq);
-void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
 TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc);
-void trpc_master_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+void trpc_mq_clear(TRPC_INSTANCE *trpc);
+void trpc_master_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
 TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc);
 TRP_RC trpc_connect(TRPC_INSTANCE *trpc);
 TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content);
@@ -179,7 +180,8 @@ int trps_get_listener(TRPS_INSTANCE *trps,
                       unsigned int port,
                       void *cookie);
 TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps);
-void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
+void trps_mq_add(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
+TRP_RC trps_authorize_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn);
 void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn);
 TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps);
 TRP_RC trps_handle_tr_msg(TRPS_INSTANCE *trps, TR_MSG *tr_msg);
index 0b6a9d8..6881ee8 100644 (file)
@@ -7,6 +7,11 @@
 #include <trust_router/tr_name.h>
 #include <trp_internal.h>
 
+typedef enum trp_peer_conn_status {
+  PEER_DISCONNECTED=0,
+  PEER_CONNECTED
+} TRP_PEER_CONN_STATUS;
+
 typedef struct trp_peer TRP_PEER;
 struct trp_peer {
   TRP_PEER *next; /* for making a linked list */
@@ -16,6 +21,10 @@ struct trp_peer {
   unsigned int port;
   unsigned int linkcost;
   struct timespec last_conn_attempt;
+  TRP_PEER_CONN_STATUS outgoing_status;
+  TRP_PEER_CONN_STATUS incoming_status;
+  void (*conn_status_cb)(TRP_PEER *, void *); /* callback for connected status change */
+  void *conn_status_cookie;
 };
 
 typedef struct trp_ptable {
@@ -52,7 +61,13 @@ void trp_peer_set_port(TRP_PEER *peer, unsigned int port);
 unsigned int trp_peer_get_linkcost(TRP_PEER *peer);
 struct timespec *trp_peer_get_last_conn_attempt(TRP_PEER *peer);
 void trp_peer_set_last_conn_attempt(TRP_PEER *peer, struct timespec *time);
+TRP_PEER_CONN_STATUS trp_peer_get_outgoing_status(TRP_PEER *peer);
+void trp_peer_set_outgoing_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status);
+TRP_PEER_CONN_STATUS trp_peer_get_incoming_status(TRP_PEER *peer);
+void trp_peer_set_incoming_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status);
+int trp_peer_is_connected(TRP_PEER *peer);
 void trp_peer_set_linkcost(TRP_PEER *peer, unsigned int linkcost);
+void trp_peer_set_conn_status_cb(TRP_PEER *peer, void (*cb)(TRP_PEER *, void *), void *cookie);
 char *trp_peer_to_str(TALLOC_CTX *memctx, TRP_PEER *peer, const char *sep);
 
 #endif /* _TRP_PTABLE_H_ */
index 5989f0b..9a84a5c 100644 (file)
@@ -293,6 +293,7 @@ int main(int argc, char *argv[])
     }
     trp_peer_set_server(hc_peer, "epsilon.vmnet");
     trp_peer_set_gssname(hc_peer, tr_new_name("tr-epsilon-vmnet@apc.painless-security.com"));
+    trp_peer_set_conn_status_cb(hc_peer, tr_peer_status_change, (void *)(tr->trps));
     switch (tr->trps->port) {
     case 10000:
       trp_peer_set_port(hc_peer, 10001);
index 6a09d63..46e4a6e 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <gsscon.h>
 #include <tr.h>
+#include <tr_mq.h>
 #include <tr_rp.h>
 #include <trp_internal.h>
 #include <trp_ptable.h>
@@ -39,6 +40,12 @@ static void msg_free_helper(void *p)
 {
   tr_msg_free_decoded((TR_MSG *)p);
 }
+
+static void tr_free_name_helper(void *arg)
+{
+  tr_free_name((TR_NAME *)arg);
+}
+
 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
                                   TRP_CONNECTION *conn,
@@ -50,12 +57,12 @@ static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
   /* n.b., conn is available here, but do not hold onto the reference
    * because it may be cleaned up if the originating connection goes
    * down before the message is processed */
-  mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED);
+  mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED, TR_MQ_PRIO_NORMAL);
   if (mq_msg==NULL) {
     return TRP_NOMEM;
   }
   tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
-  trps_mq_append(trps, mq_msg);
+  trps_mq_add(trps, mq_msg);
   talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
   return TRP_SUCCESS;
 }
@@ -101,15 +108,27 @@ static void *tr_trps_thread(void *arg)
   TR_MQ_MSG *msg=NULL;
 
   tr_debug("tr_trps_thread: started");
+  if (trps_authorize_connection(trps, conn)!=TRP_SUCCESS)
+    goto cleanup;
+
+  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED, TR_MQ_PRIO_HIGH);
+  tr_mq_msg_set_payload(msg, (void *)tr_dup_name(trp_connection_get_peer(conn)), tr_free_name_helper);
+  if (msg==NULL) {
+    tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
+    goto cleanup;
+  } 
+  trps_mq_add(trps, msg); /* steals msg context */
+  msg=NULL;
+
   trps_handle_connection(trps, conn);
 
-  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED);
+cleanup:
+  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED, TR_MQ_PRIO_HIGH);
   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
   if (msg==NULL)
     tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
   else
-    trps_mq_append(trps, msg);
-
+    trps_mq_add(trps, msg);
   tr_debug("tr_trps_thread: exit");
   talloc_free(tmp_ctx);
   return NULL;
@@ -191,16 +210,51 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
   msg=trps_mq_pop(trps);
   while (msg!=NULL) {
     s=tr_mq_msg_get_message(msg);
-    if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
-      tr_trps_cleanup_conn(trps,
-                           talloc_get_type_abort(tr_mq_msg_get_payload(msg),
-                                                 TRP_CONNECTION));
+    if (0==strcmp(s, TR_MQMSG_TRPS_CONNECTED)) {
+      TR_NAME *gssname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+      TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+      if (peer==NULL)
+        tr_err("tr_trps_process_mq: incoming connection to unknown peer (%s) reported.", gssname->buf);
+      else {
+        trp_peer_set_incoming_status(peer, PEER_CONNECTED);
+        tr_err("tr_trps_process_mq: incoming connection to %s established.", gssname->buf);
+      }
+    }
+    else if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
+      TRP_CONNECTION *conn=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION);
+      TR_NAME *gssname=trp_connection_get_gssname(conn);
+      TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+      if (peer==NULL) {
+        tr_err("tr_trps_process_mq: disconnection of unknown peer (%s) reported.",
+               trp_connection_get_gssname(conn)->buf);
+      } else {
+        trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
+        tr_trps_cleanup_conn(trps, conn);
+        tr_err("tr_trps_process_mq: incoming connection to %s lost.", gssname->buf);
+      }
+    }
+    else if (0==strcmp(s, TR_MQMSG_TRPC_CONNECTED)) {
+      TR_NAME *svcname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+      TRP_PEER *peer=trps_get_peer_by_servicename(trps, svcname);
+      if (peer==NULL)
+        tr_err("tr_trps_process_mq: connection to unknown peer (%s) reported.", svcname->buf);
+      else {
+        trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
+        tr_err("tr_trps_process_mq: outgoing connection to %s established.", svcname->buf);
+      }
     }
     else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
       /* trpc connection died */
-      tr_trps_cleanup_trpc(trps,
-                           talloc_get_type_abort(tr_mq_msg_get_payload(msg),
-                                                 TRPC_INSTANCE));
+      TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
+      TR_NAME *gssname=trpc_get_gssname(trpc);
+      TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+      if (peer==NULL)
+        tr_err("tr_trps_process_mq: disconnection of unknown peer (%s) reported.", gssname->buf);
+      else {
+        trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
+        tr_err("tr_trps_process_mq: outgoing connection to %s lost.", gssname->buf);
+        tr_trps_cleanup_trpc(trps, trpc);
+      }
     }
 
     else if (0==strcmp(s, TR_MQMSG_MSG_RECEIVED)) {
@@ -473,7 +527,7 @@ static void *tr_trpc_thread(void *arg)
               trpc_get_server(trpc),
               trpc_get_port(trpc));
   } else {
-    peer_gssname=tr_dup_name(trp_connection_get_peer(trpc_get_conn(trpc)));
+    peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc));
     if (peer_gssname==NULL) {
       tr_err("tr_trpc_thread: could not duplicate peer_gssname.");
       talloc_free(tmp_ctx);
@@ -481,6 +535,18 @@ static void *tr_trpc_thread(void *arg)
     }
     tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf);
 
+    msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH);
+    tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper);
+    if (msg==NULL) {
+      tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+      talloc_free(tmp_ctx);
+      return NULL;
+    }
+    tr_debug("queuing********************************************************************************");
+    trps_mq_add(trps, msg); /* steals msg context */
+    tr_debug("queued********************************************************************************");
+    msg=NULL;
+
     while(1) {
       cb_data.msg_ready=0;
       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
@@ -521,12 +587,14 @@ static void *tr_trpc_thread(void *arg)
   }
 
   tr_debug("tr_trpc_thread: exiting.");
-  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED);
+  msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH);
   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
   if (msg==NULL)
     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
   else
-    trps_mq_append(trps, msg);
+    trps_mq_add(trps, msg);
+
+  trpc_mq_clear(trpc); /* clear any queued messages */
 
   talloc_free(tmp_ctx);
   return NULL;
@@ -577,55 +645,12 @@ static TRP_ROUTE **tr_make_local_routes(TALLOC_CTX *mem_ctx,
   return entries;
 }
 
-struct tr_wildcard_cookie {
-  TRPS_INSTANCE *trps;
-  TR_NAME *peer_servicename;
-};
-
-/* can only be called once, talloc_frees its argument cookie on termination */
-static void tr_send_wildcard(int listener, short event, void *arg)
+void tr_peer_status_change(TRP_PEER *peer, void *cookie)
 {
-  struct tr_wildcard_cookie *cook=talloc_get_type_abort(arg, struct tr_wildcard_cookie);
+  TRPS_INSTANCE *trps=talloc_get_type_abort(cookie, TRPS_INSTANCE);
 
-  /* queue a route request for all route to every peer */
-  if (TRP_SUCCESS!=trps_wildcard_route_req(cook->trps, cook->peer_servicename))
+  if (TRP_SUCCESS!=trps_wildcard_route_req(trps, trp_peer_get_servicename(peer)))
     tr_err("tr_send_wildcard: error sending wildcard route request.");
-
-  tr_free_name(cook->peer_servicename);
-  talloc_free(cook);
-}
-
-struct tr_trpc_status_change_cookie {
-  struct event_base *evbase;
-  TRPS_INSTANCE *trps;
-  TRPC_INSTANCE *trpc;
-  TRP_PEER *peer;
-};
-static void tr_trpc_status_change(TRP_CONNECTION *conn, void *cookie)
-{
-  struct tr_trpc_status_change_cookie *cook=talloc_get_type_abort(cookie, struct tr_trpc_status_change_cookie);
-  struct event_base *evbase=cook->evbase;
-  TRPS_INSTANCE *trps=cook->trps;
-  TRP_PEER *peer=cook->peer;
-  TR_NAME *gssname=trp_peer_get_gssname(peer);
-  struct timeval zero_time={0,0};
-  struct tr_wildcard_cookie *wc_cookie=NULL;
-  
-  if (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
-    tr_debug("tr_trpc_status_change: connection to %.*s now up.", gssname->len, gssname->buf);
-    /* add a one-off event to send a wildcard request from the main thread */
-    wc_cookie=talloc(trps, struct tr_wildcard_cookie);
-    if (wc_cookie==NULL) {
-      tr_err("tr_trpc_status_change: error allocating wildcard cookie.");
-      return;
-    }
-    wc_cookie->trps=trps;
-    wc_cookie->peer_servicename=trp_peer_dup_servicename(peer);
-    if (0!=event_base_once(evbase, -1, EV_TIMEOUT, tr_send_wildcard, wc_cookie, &zero_time)) {
-      tr_err("tr_trpc_status_change: error queueing wildcard route request event.");
-    }
-  } else
-    tr_debug("tr_trpc_status_change: connection to %.*s now down.", gssname->len, gssname->buf);
 }
 
 /* starts a trpc thread to connect to server:port */
@@ -635,7 +660,6 @@ TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev)
   TRPC_INSTANCE *trpc=NULL;
   TRP_CONNECTION *conn=NULL;
   struct trpc_thread_data *thread_data=NULL;
-  struct tr_trpc_status_change_cookie *status_change_cookie=NULL;
   TRP_RC rc=TRP_ERROR;
 
   tr_debug("tr_trpc_initiate entered");
@@ -654,20 +678,6 @@ TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev)
     goto cleanup;
   }
 
-  status_change_cookie=talloc(conn, struct tr_trpc_status_change_cookie);
-  if (status_change_cookie==NULL) {
-    tr_crit("tr_trpc_initiate: could not allocate connection status cookie.");
-    rc=TRP_NOMEM;
-    goto cleanup;
-  }
-  status_change_cookie->evbase=event_get_base(ev);
-  status_change_cookie->trps=trps;
-  status_change_cookie->trpc=trpc;
-  status_change_cookie->peer=peer;
-  conn->status_change_cookie=status_change_cookie;
-  status_change_cookie=NULL;
-  conn->status_change_cb=tr_trpc_status_change;
-
   trpc_set_conn(trpc, conn);
   trpc_set_server(trpc, talloc_strdup(trpc, trp_peer_get_server(peer)));
   trpc_set_port(trpc, trp_peer_get_port(peer));
index 9814f66..14a2509 100644 (file)
@@ -26,6 +26,10 @@ TRP_PEER *trp_peer_new(TALLOC_CTX *memctx)
     peer->port=0;
     peer->linkcost=TRP_LINKCOST_DEFAULT;
     peer->last_conn_attempt=(struct timespec){0,0};
+    peer->outgoing_status=PEER_DISCONNECTED;
+    peer->incoming_status=PEER_DISCONNECTED;
+    peer->conn_status_cb=NULL;
+    peer->conn_status_cookie=NULL;
     talloc_set_destructor((void *)peer, trp_peer_destructor);
   }
   return peer;
@@ -131,6 +135,12 @@ void trp_peer_set_linkcost(TRP_PEER *peer, unsigned int linkcost)
   peer->linkcost=linkcost;
 }
 
+void trp_peer_set_conn_status_cb(TRP_PEER *peer, void (*cb)(TRP_PEER *, void *), void *cookie)
+{
+  peer->conn_status_cb=cb;
+  peer->conn_status_cookie=cookie;
+}
+
 struct timespec *trp_peer_get_last_conn_attempt(TRP_PEER *peer)
 {
   return &(peer->last_conn_attempt);
@@ -150,6 +160,41 @@ TRP_PTABLE *trp_ptable_new(TALLOC_CTX *memctx)
   return ptbl;
 }
 
+void trp_peer_set_outgoing_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status)
+{
+  int was_connected=trp_peer_is_connected(peer);
+  peer->outgoing_status=status;
+  tr_debug("trp_peer_set_outgoing_status: %s: status=%d peer connected was %d now %d.",
+           trp_peer_get_gssname(peer)->buf, status, was_connected, trp_peer_is_connected(peer));
+  if ((trp_peer_is_connected(peer) != was_connected) && (peer->conn_status_cb!=NULL))
+    peer->conn_status_cb(peer, peer->conn_status_cookie);
+}
+
+TRP_PEER_CONN_STATUS trp_peer_get_outgoing_status(TRP_PEER *peer)
+{
+  return peer->outgoing_status;
+}
+
+void trp_peer_set_incoming_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status)
+{
+  int was_connected=trp_peer_is_connected(peer);
+  peer->incoming_status=status;
+  tr_debug("trp_peer_set_incoming_status: %s: status=%d peer connected was %d now %d.",
+           trp_peer_get_gssname(peer)->buf, status, was_connected, trp_peer_is_connected(peer));
+  if ((trp_peer_is_connected(peer) != was_connected) && (peer->conn_status_cb!=NULL))
+    peer->conn_status_cb(peer, peer->conn_status_cookie);
+}
+
+TRP_PEER_CONN_STATUS trp_peer_get_incoming_status(TRP_PEER *peer)
+{
+  return peer->incoming_status;
+}
+
+int trp_peer_is_connected(TRP_PEER *peer)
+{
+  return (peer->outgoing_status==PEER_CONNECTED) && (peer->incoming_status==PEER_CONNECTED);
+}
+
 void trp_ptable_free(TRP_PTABLE *ptbl)
 {
   talloc_free(ptbl);
index 58e8061..4e02fea 100644 (file)
@@ -150,9 +150,9 @@ void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq)
 }
 
 /* submit msg to trpc for transmission */
-void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
+void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
 {
-  tr_mq_append(trpc->mq, msg);
+  tr_mq_add(trpc->mq, msg);
 }
 
 TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc)
@@ -160,6 +160,11 @@ TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc)
   return tr_mq_pop(trpc->mq);
 }
 
+void trpc_mq_clear(TRPC_INSTANCE *trpc)
+{
+  tr_mq_clear(trpc->mq);
+}
+
 TRP_RC trpc_connect(TRPC_INSTANCE *trpc)
 {
   return trp_connection_initiate(trpc_get_conn(trpc), trpc_get_server(trpc), trpc_get_port(trpc));
index cb0d1ed..0b1f25d 100644 (file)
@@ -90,9 +90,9 @@ TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps)
   return tr_mq_pop(trps->mq);
 }
 
-void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg)
+void trps_mq_add(TRPS_INSTANCE *trps, TR_MQ_MSG *msg)
 {
-  tr_mq_append(trps->mq, msg);
+  tr_mq_add(trps->mq, msg);
 }
 
 unsigned int trps_get_connect_interval(TRPS_INSTANCE *trps)
@@ -187,16 +187,15 @@ TRP_RC trps_send_msg(TRPS_INSTANCE *trps, TRP_PEER *peer, const char *msg)
 
   /* get the connection for this peer */
   trpc=trps_find_trpc(trps, peer);
-  if ((trpc==NULL) || (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP)) {
-    /* We could just let these sit on the queue in the hopes that a connection
-     * is eventually established. However, we'd then have to ensure the queue
-     * didn't keep growing, etc. */
-    tr_warning("trps_send_msg: skipping message queued while TRPC connection not up.");
+  /* instead, let's let that happen and then clear the queue when an attempt to
+   * connect fails */
+  if (trpc==NULL) {
+    tr_warning("trps_send_msg: skipping message queued for missing TRP client entry.");
   } else {
-    mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND);
+    mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND, TR_MQ_PRIO_NORMAL);
     msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
     tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */
-    trpc_mq_append(trpc, mq_msg);
+    trpc_mq_add(trpc, mq_msg);
     rc=TRP_SUCCESS;
   }
   talloc_free(tmp_ctx);
@@ -235,22 +234,6 @@ static int trps_listen (TRPS_INSTANCE *trps, int port)
   return conn;
 }
 
-#if 0 /* remove this if I forget to do so */
-/* returns EACCES if authorization is denied */
-int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data)
-{
-  TRPS_INSTANCE *trps = (TRPS_INSTANCE *)data;
-  int result=0;
-
-  if (0!=trps->auth_handler(clientName, displayName, trps->cookie)) {
-    tr_debug("trps_auth_cb: client '%.*s' denied authorization.", displayName->length, displayName->value);
-    result=EACCES; /* denied */
-  }
-
-  return result;
-}
-#endif 
-
 /* get the currently selected route if available */
 TRP_ROUTE *trps_get_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer)
 {
@@ -375,37 +358,41 @@ int trps_get_listener(TRPS_INSTANCE *trps,
   return listen;
 }
 
+TRP_RC trps_authorize_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
+{
+  /* try to establish a GSS context */
+  if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
+    tr_notice("trps_authorize_connection: failed to authorize connection");
+    trp_connection_close(conn);
+    return TRP_ERROR;
+  }
+  tr_notice("trps_authorize_connection: authorized connection");
+  return TRP_SUCCESS;
+}
+
 void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
 {
   TR_MSG *msg=NULL;
   TRP_RC rc=TRP_ERROR;
 
-  /* try to establish a GSS context */
-  if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
-    tr_notice("tr_trps_conn_thread: failed to authorize connection");
-    trp_connection_close(conn);
-  } else {
-    tr_notice("trps_handle_connection: authorized connection");
-
-    /* loop as long as the connection exists */
-    while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
-      rc=trps_read_message(trps, conn, &msg);
-      switch(rc) {
-      case TRP_SUCCESS:
-        trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */
-        break;
-
-      case TRP_ERROR:
-        trp_connection_close(conn);
-        break;
-
-      default:
-        tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc);
-      }
-    }
+  /* loop as long as the connection exists */
+  while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
+    rc=trps_read_message(trps, conn, &msg);
+    switch(rc) {
+    case TRP_SUCCESS:
+      trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */
+      break;
+
+    case TRP_ERROR:
+      trp_connection_close(conn);
+      break;
 
-    tr_debug("trps_handle_connection: connection closed.");
+    default:
+      tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc);
+    }
   }
+
+  tr_debug("trps_handle_connection: connection closed.");
 }
 
 static TRP_RC trps_validate_update(TRPS_INSTANCE *trps, TRP_UPD *upd)
@@ -1167,7 +1154,7 @@ TRP_RC trps_wildcard_route_req(TRPS_INSTANCE *trps, TR_NAME *peer_servicename)
     tr_debug("trps_wildcard_route_req: request queued successfully.");
     rc=TRP_SUCCESS;
   }
-  tr_crit("got here");
+
 cleanup:
   if (encoded!=NULL)
     tr_msg_free_encoded(encoded);