Make outgoing connections. Connect to self as a test.
authorJennifer Richards <jennifer@painless-security.com>
Mon, 27 Jun 2016 20:55:32 +0000 (16:55 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Mon, 27 Jun 2016 20:55:32 +0000 (16:55 -0400)
common/tr_msg.c
include/tr_trp.h
include/trp_internal.h
include/trust_router/trp.h
tr/tr_main.c
tr/tr_trp.c
trp/trp_conn.c
trp/trpc.c
trp/trps.c

index 4faff68..f983015 100644 (file)
@@ -543,6 +543,12 @@ static TRP_RC tr_msg_encode_inforec_route(json_t *jrec, TRP_INFOREC *rec )
   if (rec==NULL)
     return TRP_BADTYPE;
 
+  if ((trp_inforec_get_comm(rec)==NULL)
+     || (trp_inforec_get_realm(rec)==NULL)
+     || (trp_inforec_get_trust_router(rec)==NULL)) {
+    return TRP_ERROR;
+  }
+
   s=tr_name_strdup(trp_inforec_get_comm(rec));
   if (s==NULL)
     return TRP_NOMEM;
@@ -782,6 +788,7 @@ static json_t *tr_msg_encode_trp_req(TRP_REQ *req)
   json_t *jbody=NULL;
   json_t *jstr=NULL;
   char *s=NULL;
+  TR_NAME *n=NULL;
 
   if (req==NULL)
     return NULL;
@@ -790,7 +797,13 @@ static json_t *tr_msg_encode_trp_req(TRP_REQ *req)
   if (jbody==NULL)
     return NULL;
 
-  s=tr_name_strdup(trp_req_get_comm(req)); /* ensures null termination */
+  if ((NULL==trp_req_get_comm(req))
+     || (NULL==trp_req_get_realm(req))) {
+    json_decref(jbody);
+    return NULL;
+  }
+
+  s=tr_name_strdup(n); /* ensures null termination */
   if (s==NULL) {
     json_decref(jbody);
     return NULL;
index 32c1f4c..66222d3 100644 (file)
@@ -16,7 +16,7 @@ typedef struct tr_trps_events {
 
 /* prototypes */
 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx);
-int tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr,
+TRP_RC tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr,
                        TR_TRPS_EVENTS *trps_ev);
-
+TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port);
 #endif /* TR_TRP_H */
index 13e2d65..5e93dcb 100644 (file)
@@ -45,7 +45,6 @@ struct trp_req {
 
 typedef struct trps_instance TRPS_INSTANCE;
 
-
 typedef enum trp_connection_status {
   TRP_CONNECTION_DOWN=0,
   TRP_CONNECTION_UP,
@@ -53,13 +52,13 @@ typedef enum trp_connection_status {
 
 typedef struct trp_connection TRP_CONNECTION;
 struct trp_connection {
+  pthread_mutex_t mutex; /* protects status attribute */
   TRP_CONNECTION *next;
   pthread_t *thread; /* thread servicing this connection */
   int fd;
   TR_NAME *gssname;
   gss_ctx_id_t *gssctx;
   TRP_CONNECTION_STATUS status;
-  pthread_mutex_t status_mutex;
 };
 
 typedef TRP_RC (*TRPS_MSG_FUNC)(TRPS_INSTANCE *, TRP_CONNECTION *, TR_MSG *);
@@ -68,10 +67,15 @@ typedef void (*TRP_RESP_FUNC)();
 typedef client_cb_fn TRP_AUTH_FUNC;
 
 /* TRP Client Instance Data */
-typedef struct trpc_instance {
+typedef struct trpc_instance TRPC_INSTANCE;
+struct trpc_instance {
+  TRPC_INSTANCE *next;
+  const char *server;
+  unsigned int port;
   TRP_CONNECTION *conn;
-  DH *client_dh;                       /* Client's DH struct with priv and pub keys */
-} TRPC_INSTANCE;
+  TR_MQ *mq; /* msgs from master to trpc */
+  DH *dh;                      /* Client's DH struct with priv and pub keys */
+};
 
 /* TRP Server Instance Data */
 struct trps_instance {
@@ -80,7 +84,8 @@ struct trps_instance {
   TRP_AUTH_FUNC auth_handler;
   TRPS_MSG_FUNC msg_handler;
   void *cookie;
-  TRP_CONNECTION *conn; /* connections to peers */
+  TRP_CONNECTION *conn; /* connections from peers */
+  TRPC_INSTANCE *trpc; /* connections to peers */
   TR_MQ *mq;
 };
 
@@ -88,6 +93,8 @@ struct trps_instance {
 TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx);
 void trp_connection_free(TRP_CONNECTION *conn);
 void trp_connection_close(TRP_CONNECTION *conn);
+int trp_connection_lock(TRP_CONNECTION *conn);
+int trp_connection_unlock(TRP_CONNECTION *conn);
 int trp_connection_get_fd(TRP_CONNECTION *conn);
 void trp_connection_set_fd(TRP_CONNECTION *conn, int fd);
 TR_NAME *trp_connection_get_gssname(TRP_CONNECTION *conn);
@@ -102,19 +109,39 @@ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remo
 void trp_connection_append(TRP_CONNECTION *conn, TRP_CONNECTION *new);
 int trp_connection_auth(TRP_CONNECTION *conn, TRP_AUTH_FUNC auth_callback, void *callback_data);
 TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname);
+TRP_RC trp_connection_initiate(TRP_CONNECTION *conn, const char *server, unsigned int port);
 
 TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx);
 void trpc_free (TRPC_INSTANCE *trpc);
-int trpc_open_connection (TRPC_INSTANCE *trpc, char *server, unsigned int port, gss_ctx_id_t *gssctx);
-int trpc_send_msg (TRPC_INSTANCE *trpc, int conn, gss_ctx_id_t gssctx, const char *msg_content,
-                   int *resp_handler(), void *cookie);
+TRP_CONNECTION *trpc_get_conn(TRPC_INSTANCE *trpc);
+void trpc_set_conn(TRPC_INSTANCE *trpc, TRP_CONNECTION *conn);
+TRPC_INSTANCE *trpc_get_next(TRPC_INSTANCE *trpc);
+void trpc_set_next(TRPC_INSTANCE *trpc, TRPC_INSTANCE *next);
+TRPC_INSTANCE *trpc_remove(TRPC_INSTANCE *trpc, TRPC_INSTANCE *remove);
+void trpc_append(TRPC_INSTANCE *trpc, TRPC_INSTANCE *new);
+const char *trpc_get_server(TRPC_INSTANCE *trpc);
+void trpc_set_server(TRPC_INSTANCE *trpc, const char *server);
+unsigned int trpc_get_port(TRPC_INSTANCE *trpc);
+void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port);
+DH *trpc_get_dh(TRPC_INSTANCE *trpc);
+void trpc_set_dh(TRPC_INSTANCE *trpc, DH *dh);
+TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc);
+TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc);
+void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq);
+void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc);
+void trpc_master_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc);
+TRP_RC trpc_connect(TRPC_INSTANCE *trpc);
+TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content);
 
 TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx);
 void trps_free (TRPS_INSTANCE *trps);
-int trps_send_msg (TRPS_INSTANCE *trps, int conn, gss_ctx_id_t gssctx, const char *msg_content);
-int trps_accept(TRPS_INSTANCE *trps, int listen);
+TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg);
 void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new);
 void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove);
+void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc);
+void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove);
 int trps_get_listener(TRPS_INSTANCE *trps,
                       TRPS_MSG_FUNC msg_handler,
                       TRP_AUTH_FUNC auth_handler,
index fd4d5cd..5f54d2f 100644 (file)
@@ -15,6 +15,7 @@ typedef enum trp_rc {
   TRP_NOMEM, /* allocation error */
   TRP_BADTYPE, /* typing error */
   TRP_UNSUPPORTED, /* unsupported feature */
+  TRP_BADARG, /* bad argument */
 } TRP_RC;
 
 typedef enum trp_inforec_type {
index 7a31d5b..f218139 100644 (file)
@@ -123,6 +123,32 @@ static void tr_talloc_log(const char *msg)
 }
 #endif /* TALLOC_DEBUG_ENABLE */
 
+
+static void debug_ping(evutil_socket_t fd, short what, void *arg)
+{
+  TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+  TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
+  TRP_REQ *req=NULL;
+  TR_MSG msg;
+  char *encoded=NULL;
+
+  tr_debug("debug_ping entered, trps=%p, trps->trpc=%p", trps, trps->trpc);
+  if (trps->trpc==NULL)
+    tr_trpc_initiate(trps, trps->hostname, trps->port);
+
+  /* create a TRP route request msg */
+  req=trp_req_new(tmp_ctx);
+  tr_msg_set_trp_req(&msg, req);
+  encoded=tr_msg_encode(&msg);
+  if (encoded==NULL)
+    tr_err("debug_ping: error encoding TRP message.");
+  else {
+    tr_debug("debug_ping: sending message");
+    trps_send_msg(trps, NULL, encoded);
+    tr_msg_free_encoded(encoded);
+  }
+}
+
 int main(int argc, char *argv[])
 {
   TALLOC_CTX *main_ctx=NULL;
@@ -133,6 +159,8 @@ int main(int argc, char *argv[])
   struct tr_socket_event tids_ev;
   TR_TRPS_EVENTS *trps_ev;
   struct event *cfgwatch_ev;
+  struct event *debug_ping_ev;
+  struct timeval debug_ping_interval={1, 0};
 
   /* we're going to be multithreaded, so disable null context tracking */
   talloc_set_abort_fn(tr_abort);
@@ -225,6 +253,10 @@ int main(int argc, char *argv[])
     return 1;
   }
 
+  /* for debugging, send a message to peers on a timer */
+  debug_ping_ev=evtimer_new(ev_base, debug_ping, (void *)(tr->trps));
+  evtimer_add(debug_ping_ev, &debug_ping_interval);
+
   tr_event_loop_run(ev_base); /* does not return until we are done */
 
   /* TODO: ensure talloc is properly used so this actually works */
index 6875e4c..754063f 100644 (file)
@@ -21,7 +21,6 @@ struct tr_trps_event_cookie {
   TR_CFG_MGR *cfg_mgr;
 };
 
-
 /* callback to schedule event to process messages */
 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
 {
@@ -84,30 +83,30 @@ static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
 }
 
 /* data passed to thread */
-struct thread_data {
+struct trps_thread_data {
   TRP_CONNECTION *conn;
   TRPS_INSTANCE *trps;
 };
 /* thread to handle GSS connections to peers */
-static void *tr_trps_conn_thread(void *arg)
+static void *tr_trps_thread(void *arg)
 {
   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
-  struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data);
+  struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
   TRP_CONNECTION *conn=thread_data->conn;
   TRPS_INSTANCE *trps=thread_data->trps;
   TR_MQ_MSG *msg=NULL;
 
-  tr_debug("tr_trps_conn_thread: started");
+  tr_debug("tr_trps_thread: started");
   trps_handle_connection(trps, conn);
 
-  msg=tr_mq_msg_new(tmp_ctx, "thread_exit");
+  msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
   if (msg==NULL)
-    tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
+    tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
   else
     trps_mq_append(trps, msg);
 
-  tr_debug("tr_trps_conn_thread: exit");
+  tr_debug("tr_trps_thread: exit");
   talloc_free(tmp_ctx);
   return NULL;
 }
@@ -120,7 +119,7 @@ static void tr_trps_event_cb(int listener, short event, void *arg)
   TRP_CONNECTION *conn=NULL;
   TR_NAME *gssname=NULL;
   char *name=NULL;
-  struct thread_data *thread_data;
+  struct trps_thread_data *thread_data=NULL;
 
   if (0==(event & EV_READ)) {
     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
@@ -132,16 +131,16 @@ static void tr_trps_event_cb(int listener, short event, void *arg)
     conn=trp_connection_accept(tmp_ctx, listener, gssname);
     if (conn!=NULL) {
       /* need to monitor this fd and trigger events when read becomes possible */
-      thread_data=talloc(conn, struct thread_data);
+      thread_data=talloc(conn, struct trps_thread_data);
       if (thread_data==NULL) {
-        tr_err("tr_trps_event_cb: unable to allocate thread_data");
+        tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
         talloc_free(tmp_ctx);
         return;
       }
       thread_data->conn=conn;
       thread_data->trps=trps;
-      pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data);
-      pthread_detach(*(conn->thread)); /* we will not rejoin the thread */
+      pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
+      pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
       trps_add_connection(trps, conn); /* remember the connection */
     }
   }
@@ -167,7 +166,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg)
   msg=trps_mq_pop(trps);
   while (msg!=NULL) {
     s=tr_mq_msg_get_message(msg);
-    if (0==strcmp(s, "thread_exit")) {
+    if (0==strcmp(s, "trps_thread_exit")) {
       tr_trps_cleanup_thread(trps,
                              talloc_get_type_abort(tr_mq_msg_get_payload(msg),
                                                    TRP_CONNECTION));
@@ -209,9 +208,8 @@ TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
 }
 
 /* Configure the trps instance and set up its event handler.
- * Returns 0 on success, nonzero on failure. Results in 
- * trps_ev, which should be allocated by caller. */
-int tr_trps_event_init(struct event_base *base,
+ * Fills in trps_ev, which should be allocated by caller. */
+TRP_RC tr_trps_event_init(struct event_base *base,
                        TRPS_INSTANCE *trps,
                        TR_CFG_MGR *cfg_mgr,
                        TR_TRPS_EVENTS *trps_ev)
@@ -219,11 +217,11 @@ int tr_trps_event_init(struct event_base *base,
   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
   struct tr_socket_event *listen_ev=NULL;
   struct tr_trps_event_cookie *cookie;
-  int retval=0;
+  TRP_RC retval=TRP_ERROR;
 
   if (trps_ev == NULL) {
     tr_debug("tr_trps_event_init: Null trps_ev.");
-    retval=1;
+    retval=TRP_BADARG;
     goto cleanup;
   }
 
@@ -235,7 +233,7 @@ int tr_trps_event_init(struct event_base *base,
   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
   if (cookie == NULL) {
     tr_debug("tr_trps_event_init: Unable to allocate cookie.");
-    retval=1;
+    retval=TRP_NOMEM;
     goto cleanup;
   }
   cookie->trps=trps;
@@ -251,7 +249,7 @@ int tr_trps_event_init(struct event_base *base,
                                        (void *)cookie);
   if (listen_ev->sock_fd < 0) {
     tr_crit("Error opening TRP server socket.");
-    retval=1;
+    retval=TRP_ERROR;
     goto cleanup;
   }
   
@@ -272,8 +270,161 @@ int tr_trps_event_init(struct event_base *base,
                            (void *)trps);
   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
 
+  retval=TRP_SUCCESS;
+
 cleanup:
   talloc_free(tmp_ctx);
   return retval;
 }
 
+
+struct trpc_notify_cb_data {
+  int msg_ready;
+  pthread_cond_t cond;
+  pthread_mutex_t mutex;
+};
+
+static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
+{
+  struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
+  pthread_mutex_lock(&(cb_data->mutex));
+  if (!cb_data->msg_ready) {
+    cb_data->msg_ready=1;
+    pthread_cond_signal(&(cb_data->cond));
+  }
+  pthread_mutex_unlock(&(cb_data->mutex));
+}
+
+/* data passed to thread */
+struct trpc_thread_data {
+  TRPC_INSTANCE *trpc;
+  TRPS_INSTANCE *trps;
+};
+static void *tr_trpc_thread(void *arg)
+{
+  TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+  struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
+  TRPC_INSTANCE *trpc=thread_data->trpc;
+  TRPS_INSTANCE *trps=thread_data->trps;
+  TRP_RC rc=TRP_ERROR;
+  TR_MQ_MSG *msg=NULL;
+  const char *msg_type=NULL;
+  char *encoded_msg=NULL;
+
+  struct trpc_notify_cb_data cb_data={0,
+                                      PTHREAD_COND_INITIALIZER,
+                                      PTHREAD_MUTEX_INITIALIZER};
+
+  tr_debug("tr_trpc_thread: started");
+
+  /* set up the mq for receiving */
+  pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
+
+  tr_mq_lock(trpc->mq);
+  tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
+  tr_mq_unlock(trpc->mq);
+
+  rc=trpc_connect(trpc);
+  if (rc!=TRP_SUCCESS) {
+    tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
+              trpc_get_server(trpc),
+              trpc_get_port(trpc));
+  } else {
+    while (1) {
+      cb_data.msg_ready=0;
+      pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
+      /* verify the condition */
+      if (cb_data.msg_ready) {
+        msg=trpc_mq_pop(trpc);
+        if (msg==NULL) {
+          /* no message in the queue */
+          tr_err("tr_trpc_thread: notified of msg, but queue empty");
+          break;
+        }
+
+        msg_type=tr_mq_msg_get_message(msg);
+
+        if (0==strcmp(msg_type, "trpc_abort")) {
+          tr_mq_msg_free(msg);
+          break; /* exit loop */
+        }
+        else if (0==strcmp(msg_type, "trpc_send")) {
+          encoded_msg=tr_mq_msg_get_payload(msg);
+          if (encoded_msg==NULL)
+            tr_notice("tr_trpc_thread: null outgoing TRP message.");
+          else {
+            rc = trpc_send_msg(trpc, encoded_msg);
+            if (rc!=TRP_SUCCESS) {
+              tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+              tr_mq_msg_free(msg);
+              break;
+            }
+          }
+        }
+        else
+          tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+
+        tr_mq_msg_free(msg);
+      }
+    }
+  }
+
+  msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
+  tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
+  if (msg==NULL)
+    tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+  else
+    trps_mq_append(trps, msg);
+
+  talloc_free(tmp_ctx);
+  return NULL;
+}
+
+/* starts a trpc thread to connect to server:port */
+TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
+{
+  TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+  TRPC_INSTANCE *trpc=NULL;
+  TRP_CONNECTION *conn=NULL;
+  struct trpc_thread_data *thread_data=NULL;
+
+  tr_debug("tr_trpc_initiate entered");
+  trpc=trpc_new(tmp_ctx);
+  if (trpc==NULL) {
+    tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
+    goto cleanup;
+  }
+  tr_debug("tr_trpc_initiate: allocated trpc");
+
+  conn=trp_connection_new(trpc);
+  if (conn==NULL) {
+    tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
+    goto cleanup;
+  }
+  trpc_set_conn(trpc, conn);
+  trpc_set_server(trpc, talloc_strdup(trpc, server));
+  trpc_set_port(trpc, port);
+  tr_debug("tr_trpc_initiate: allocated connection");
+  
+  /* start thread */
+  thread_data=talloc(trpc, struct trpc_thread_data);
+  if (thread_data==NULL) {
+    tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
+    goto cleanup;
+  }
+  thread_data->trpc=trpc;
+  thread_data->trps=trps;
+
+  pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
+  pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
+
+  tr_debug("tr_trpc_initiate: started trpc thread");
+  trps_add_trpc(trps, trpc);
+
+  talloc_report_full(trps, stderr);
+  talloc_report_full(tmp_ctx, stderr);
+
+ cleanup:
+  talloc_free(tmp_ctx);
+  return trpc;
+}
index 1065116..1f99496 100644 (file)
@@ -6,6 +6,20 @@
 #include <tr_debug.h>
 #include <trp_internal.h>
 
+/* Threading note: mutex lock is only used for protecting get_status() and set_status().
+ * If needed, locking for other operations (notably adding/removing connections) must be managed
+ * by whomever is holding on to the connection list. */
+
+int trp_connection_lock(TRP_CONNECTION *conn)
+{
+  return pthread_mutex_lock(&(conn->mutex));
+}
+
+int trp_connection_unlock(TRP_CONNECTION *conn)
+{
+  return pthread_mutex_unlock(&(conn->mutex));
+}
+
 int trp_connection_get_fd(TRP_CONNECTION *conn)
 {
   return conn->fd;
@@ -39,17 +53,17 @@ void trp_connection_set_gssctx(TRP_CONNECTION *conn, gss_ctx_id_t *gssctx)
 TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn)
 {
   TRP_CONNECTION_STATUS status;
-  pthread_mutex_lock(&(conn->status_mutex));
+  trp_connection_lock(conn);
   status=conn->status;
-  pthread_mutex_unlock(&(conn->status_mutex));
+  trp_connection_unlock(conn);
   return status;
 }
 
 static void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status)
 {
-  pthread_mutex_lock(&(conn->status_mutex));
+  trp_connection_lock(conn);
   conn->status=status;
-  pthread_mutex_unlock(&(conn->status_mutex));
+  trp_connection_unlock(conn);
 }
 
 pthread_t *trp_connection_get_thread(TRP_CONNECTION *conn)
@@ -72,7 +86,7 @@ static void trp_connection_set_next(TRP_CONNECTION *conn, TRP_CONNECTION *next)
   conn->next=next;
 }
 
-/* Ok to call more than once; guarantees connection no longer in the list.
+/* Ok to call more than once; guarantees connection no longer in the list. Does not free removed element.
  * Returns handle to new list, you must replace your old handle on the list with this.  */
 TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remove)
 {
@@ -85,7 +99,6 @@ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remo
   /* first element is a special case */
   if (cur==remove) {
     conn=trp_connection_get_next(cur); /* advance list head */
-    trp_connection_free(cur);
   } else {
     /* it was not the first element */
     last=cur;
@@ -93,7 +106,6 @@ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remo
     while (cur!=NULL) {
       if (cur==remove) {
         trp_connection_set_next(last, trp_connection_get_next(cur));
-        trp_connection_free(cur);
         break;
       }
       last=cur;
@@ -117,7 +129,7 @@ void trp_connection_append(TRP_CONNECTION *conn, TRP_CONNECTION *new)
 
 static void trp_connection_mutex_init(TRP_CONNECTION *conn)
 {
-  pthread_mutex_init(&(conn->status_mutex), NULL);
+  pthread_mutex_init(&(conn->mutex), NULL);
 }
 
 /* talloc destructor for a connection: ensures connection is closed, memory freed */
@@ -142,7 +154,7 @@ TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx)
   if (new_conn != NULL) {
     trp_connection_set_next(new_conn, NULL);
     trp_connection_set_fd(new_conn, -1);
-    trp_connection_set_gssctx(new_conn, NULL);
+    trp_connection_set_gssname(new_conn, NULL);
     trp_connection_mutex_init(new_conn);
     trp_connection_set_status(new_conn, TRP_CONNECTION_DOWN);
     thread=talloc(new_conn, pthread_t);
@@ -228,3 +240,39 @@ TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *
   return conn;
 }
 
+/* Initiate connection */
+TRP_RC trp_connection_initiate(TRP_CONNECTION *conn, const char *server, unsigned int port)
+{
+  int err = 0;
+  int fd=-1;
+  unsigned int use_port=0;
+
+  if (0 == port)
+    use_port = TRP_PORT;
+  else 
+    use_port = port;
+
+  if (conn==NULL) {
+    tr_err("trp_connection_initiate: null TRP_CONNECTION");
+    return TRP_BADARG;
+  }
+
+  tr_debug("trp_connection_initiate: opening GSS connection to %s:%d",
+           server,
+           use_port);
+  err = gsscon_connect(server,
+                       use_port,
+                       "trustrouter",
+                      &fd,
+                       trp_connection_get_gssctx(conn));
+  tr_debug("trp_connection_initiate: connected");
+
+  if (err) {
+    talloc_free(conn);
+    return TRP_ERROR;
+  } else {
+    trp_connection_set_fd(conn, fd);
+    trp_connection_set_status(conn, TRP_CONNECTION_UP);
+    return TRP_SUCCESS;
+  }
+}
index ef5af85..57b7378 100644 (file)
@@ -1,20 +1,31 @@
 #include <fcntl.h>
-#include <event2/event.h>
 #include <talloc.h>
 #include <errno.h>
 #include <unistd.h>
 
 #include <gsscon.h>
 #include <tr_rp.h>
-#include <tr_config.h>
-#include <tr_event.h>
 #include <tr_debug.h>
-#include <tr_trp.h>
+#include <trp_internal.h>
 
 
+/* also allocates the incoming mq */
 TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx)
 {
-  return talloc_zero(mem_ctx, TRPC_INSTANCE);
+  TRPC_INSTANCE *trpc=talloc(mem_ctx, TRPC_INSTANCE);
+  if (trpc!=NULL) {
+    trpc->next=NULL;
+    trpc->server=NULL;
+    trpc->port=0;
+    trpc->conn=NULL;
+    trpc->dh=NULL;
+    trpc->mq=tr_mq_new(trpc);
+    if (trpc->mq==NULL) {
+      talloc_free(trpc);
+      trpc=NULL;
+    }
+  }
+  return trpc;
 }
 
 void trpc_free (TRPC_INSTANCE *trpc)
@@ -23,55 +34,142 @@ void trpc_free (TRPC_INSTANCE *trpc)
     talloc_free(trpc);
 }
 
-/* Connect to a TRP server */
-int trpc_open_connection (TRPC_INSTANCE *trpc, 
-                          char *server,
-                          unsigned int port,
-                          gss_ctx_id_t *gssctx)
+TRPC_INSTANCE *trpc_get_next(TRPC_INSTANCE *trpc)
+{
+  return trpc->next;
+}
+
+void trpc_set_next(TRPC_INSTANCE *trpc, TRPC_INSTANCE *next)
+{
+  trpc->next=next;
+}
+
+/* Ok to call more than once; guarantees trpc no longer in the list. Does not free removed element.
+ * Returns handle to new list, you must replace your old handle on the list with this.  */
+TRPC_INSTANCE *trpc_remove(TRPC_INSTANCE *trpc, TRPC_INSTANCE *remove)
+{
+  TRPC_INSTANCE *cur=trpc;
+  TRPC_INSTANCE *last=NULL;
+
+  if (cur==NULL)
+    return NULL;
+
+  /* first element is a special case */
+  if (cur==remove) {
+    trpc=trpc_get_next(cur); /* advance list head */
+  } else {
+    /* it was not the first element */
+    last=cur;
+    cur=trpc_get_next(cur);
+    while (cur!=NULL) {
+      if (cur==remove) {
+        trpc_set_next(last, trpc_get_next(cur));
+        break;
+      }
+      last=cur;
+      cur=trpc_get_next(cur);
+    }
+  }
+  return trpc;
+}
+
+static TRPC_INSTANCE *trpc_get_tail(TRPC_INSTANCE *trpc)
+{
+  while((trpc!=NULL)&&(trpc_get_next(trpc)!=NULL))
+    trpc=trpc_get_next(trpc);
+  return trpc;
+}
+
+void trpc_append(TRPC_INSTANCE *trpc, TRPC_INSTANCE *new)
+{
+  trpc_set_next(trpc_get_tail(trpc), new);
+}
+
+const char *trpc_get_server(TRPC_INSTANCE *trpc)
+{
+  return trpc->server;
+}
+
+void trpc_set_server(TRPC_INSTANCE *trpc, const char *server)
+{
+  trpc->server=server;
+}
+
+unsigned int trpc_get_port(TRPC_INSTANCE *trpc)
+{
+  return trpc->port;
+}
+
+void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port)
 {
-  int err = 0;
-  int conn = -1;
-  unsigned int use_port = 0;
+  trpc->port=port;
+}
+
+TRP_CONNECTION *trpc_get_conn(TRPC_INSTANCE *trpc)
+{
+  return trpc->conn;
+}
+
+void trpc_set_conn(TRPC_INSTANCE *trpc, TRP_CONNECTION *conn)
+{
+  trpc->conn=conn;
+}
+
+DH *trpc_get_dh(TRPC_INSTANCE *trpc)
+{
+  return trpc->dh;
+}
+
+void trpc_set_dh(TRPC_INSTANCE *trpc, DH *dh)
+{
+  trpc->dh=dh;
+}
+
+TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc)
+{
+  return trp_connection_get_status(trpc_get_conn(trpc));
+}
 
-  if (0 == port)
-    use_port = TRP_PORT;
-  else 
-    use_port = port;
+TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc)
+{
+  return trpc->mq;
+}
+
+void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq)
+{
+  trpc->mq=mq;
+}
 
-  tr_debug("trpc_open_connection: opening GSS connection to %s:%d", server, use_port);
-  err = gsscon_connect(server, use_port, "trustrouter", &conn, gssctx);
+/* submit msg to trpc for transmission */
+void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
+{
+  tr_mq_append(trpc->mq, msg);
+}
 
-  if (!err)
-    return conn;
-  else
-    return -1;
+TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc)
+{
+  return tr_mq_pop(trpc->mq);
 }
 
+TRP_RC trpc_connect(TRPC_INSTANCE *trpc)
+{
+  return trp_connection_initiate(trpc_get_conn(trpc), trpc_get_server(trpc), trpc_get_port(trpc));
+}
 
 /* simple function, based on tidc_send_req */
-int trpc_send_msg (TRPC_INSTANCE *trpc, 
-                   int conn, 
-                   gss_ctx_id_t gssctx,
-                   const char *msg_content,
-                   int *resp_handler(),
-                   void *cookie)
+TRP_RC trpc_send_msg (TRPC_INSTANCE *trpc, 
+                      const char *msg_content)
 {
   int err=0;
-  int rc=0;
+  TRP_RC rc=TRP_SUCCESS;
 
   /* Send the request over the connection */
-  if (err = gsscon_write_encrypted_token (conn,
-                                          gssctx,
-                                          msg_content, 
-                                          strlen(msg_content))) {
+  if (err = gsscon_write_encrypted_token(trp_connection_get_fd(trpc_get_conn(trpc)),
+                                         *trp_connection_get_gssctx(trpc_get_conn(trpc)),
+                                         msg_content, 
+                                         strlen(msg_content))) {
     tr_err( "trpc_send_msg: Error sending message over connection.\n");
-    goto error;
+    rc=TRP_ERROR;
   }
-
-  goto cleanup;
-
- error:
-  rc = -1;
- cleanup:
   return rc;
 }
index ea31f86..b4b7b4f 100644 (file)
@@ -1,12 +1,10 @@
 #include <fcntl.h>
-#include <event2/event.h>
 #include <talloc.h>
 #include <errno.h>
 #include <unistd.h>
 
 #include <gsscon.h>
 #include <tr_rp.h>
-#include <tr_event.h>
 #include <tr_debug.h>
 #include <trp_internal.h>
 
@@ -19,6 +17,7 @@ TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx)
     trps->port=0;
     trps->cookie=NULL;
     trps->conn=NULL;
+    trps->trpc=NULL;
     trps->mq=tr_mq_new(trps);
     if (trps->mq==NULL) {
       /* failed to allocate mq */
@@ -63,29 +62,52 @@ void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new)
   talloc_steal(trps, new);
 }
 
-/* ok to call more than once; guarantees connection no longer in the list */
+/* ok to call more than once; guarantees connection no longer in the list.
+ * Caller is responsible for freeing the removed element afterwards.  */
 void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove)
 {
   trps->conn=trp_connection_remove(trps->conn, remove);
 }
 
-int trps_send_msg (TRPS_INSTANCE *trps,
-                   int conn,
-                   gss_ctx_id_t gssctx,
-                   const char *msg_content)
+void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
 {
-  int err=0;
-  int rc=0;
-
-  /* Send the request over the connection */
-  if (err = gsscon_write_encrypted_token (conn,
-                                          gssctx,
-                                          msg_content, 
-                                          strlen(msg_content))) {
-    tr_err( "trps_send_msg: Error sending message over connection.\n");
-    rc = -1;
-  }
+  if (trps->trpc==NULL)
+    trps->trpc=trpc;
+  else
+    trpc_append(trps->trpc, trpc);
+
+  talloc_steal(trps, trpc);
+}
+
+/* ok to call more than once; guarantees trpc no longer in the list.
+ * Caller is responsible for freeing the removed element afterwards.  */
+void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove)
+{
+  trps->trpc=trpc_remove(trps->trpc, remove);
+}
 
+TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg)
+{
+  TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+  TR_MQ_MSG *mq_msg=NULL;
+  char *msg_dup=NULL;
+  TRP_RC rc=TRP_ERROR;
+
+  /* Currently ignore peer and just send to an open connection.
+   * In reality, need to identify the correct peer and send via that
+   * one.  */
+  if (trps->trpc != NULL) {
+    if (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP)
+      tr_debug("trps_send_msg: skipping message sent while TRPC connection not up.");
+    else {
+      mq_msg=tr_mq_msg_new(tmp_ctx, "trpc_send");
+      msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
+      tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */
+      trpc_mq_append(trps->trpc, mq_msg);
+      rc=TRP_SUCCESS;
+    }
+  }
+  talloc_free(tmp_ctx);
   return rc;
 }