Successful messages via mq to main thread.
authorJennifer Richards <jennifer@painless-security.com>
Fri, 24 Jun 2016 17:16:13 +0000 (13:16 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Fri, 24 Jun 2016 17:16:13 +0000 (13:16 -0400)
common/tr_mq.c
include/tr_trp.h
include/trp_internal.h
tr/tr_event.c
tr/tr_main.c
tr/tr_trp.c
trp/trps.c

index 4e43091..02b42ee 100644 (file)
@@ -2,6 +2,7 @@
 #include <pthread.h>
 
 #include <tr_mq.h>
+#include <tr_debug.h>
 
 /* Messages */
 static int tr_mq_msg_destructor(void *object)
@@ -79,7 +80,7 @@ int tr_mq_lock(TR_MQ *mq)
 
 int tr_mq_unlock(TR_MQ *mq)
 {
-  return pthread_mutex_lock(&(mq->mutex));
+  return pthread_mutex_unlock(&(mq->mutex));
 }
 
 static TR_MQ_MSG *tr_mq_get_head(TR_MQ *mq)
@@ -133,7 +134,7 @@ void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
 
   /* see if we need to tell someone we became non-empty */
   if (was_empty && (notify_cb!=NULL))
-    mq->notify_cb(mq, notify_cb_arg);
+    notify_cb(mq, notify_cb_arg);
 }
 
 /* caller must free msg via tr_mq_msg_free */
@@ -144,7 +145,7 @@ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
   tr_mq_lock(mq);
   if (tr_mq_get_head(mq)!=NULL) {
     popped=tr_mq_get_head(mq);
-    tr_mq_msg_set_next(popped, tr_mq_msg_get_next(popped)); /* popped is the old head */
+    tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
     if (tr_mq_get_head(mq)==NULL)
       tr_mq_set_tail(mq, NULL); /* just popped the last element */
   }
index 48bc331..32c1f4c 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef TR_TRP_H
 #define TR_TRP_H
 
+#include <event2/event.h>
 #include <talloc.h>
 #include <pthread.h>
 
@@ -8,8 +9,14 @@
 #include <tr_config.h>
 #include <tr_event.h>
 
+typedef struct tr_trps_events {
+  struct tr_socket_event *listen_ev;
+  struct event *mq_ev;
+} TR_TRPS_EVENTS;
+
 /* prototypes */
+TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx);
 int tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr,
-                       struct tr_socket_event *trps_ev);
+                       TR_TRPS_EVENTS *trps_ev);
 
 #endif /* TR_TRP_H */
index ca60364..d8ea986 100644 (file)
@@ -120,5 +120,6 @@ int trps_get_listener(TRPS_INSTANCE *trps,
                       unsigned int port,
                       void *cookie);
 int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data);
-
+TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps);
+void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
 #endif /* TRP_INTERNAL_H */
index c637d94..452dea9 100644 (file)
@@ -1,4 +1,5 @@
 #include <event2/event.h>
+#include <event2/thread.h>
 
 #include <tr.h>
 #include <tid_internal.h>
@@ -7,11 +8,12 @@
 
 /* Allocate and set up the event base, return a pointer
  * to the new event_base or NULL on failure.
- * Does not currently enable thread-safe mode. */
+ * Enables thread-safe mode. */
 struct event_base *tr_event_loop_init(void)
 {
   struct event_base *base=NULL;
 
+  evthread_use_pthreads(); /* enable pthreads support */
   base=event_base_new();
   return base;
 }
index 65a9521..7a31d5b 100644 (file)
@@ -37,7 +37,6 @@
 #include <jansson.h>
 #include <argp.h>
 #include <event2/event.h>
-#include <event2/thread.h>
 #include <talloc.h>
 #include <sys/time.h>
 
@@ -131,7 +130,8 @@ int main(int argc, char *argv[])
   TR_INSTANCE *tr = NULL;
   struct cmdline_args opts;
   struct event_base *ev_base;
-  struct tr_socket_event tids_ev, trps_ev;
+  struct tr_socket_event tids_ev;
+  TR_TRPS_EVENTS *trps_ev;
   struct event *cfgwatch_ev;
 
   /* we're going to be multithreaded, so disable null context tracking */
@@ -187,7 +187,6 @@ int main(int argc, char *argv[])
   }
 
   /***** Set up the event loop *****/
-  evthread_use_pthreads(); /* enable pthreads support */
   ev_base=tr_event_loop_init(); /* Set up the event loop */
   if (ev_base==NULL) {
     tr_crit("Error initializing event loop.");
@@ -217,10 +216,11 @@ int main(int argc, char *argv[])
   }
 
   /* install TRP handler events */
+  trps_ev=tr_trps_events_new(main_ctx);
   if (0 != tr_trps_event_init(ev_base,
                               tr->trps,
                               tr->cfg_mgr,
-                             &trps_ev)) {
+                              trps_ev)) {
     tr_crit("Error initializing Trust Path Query Server instance.");
     return 1;
   }
index 9a416e8..e109301 100644 (file)
@@ -20,6 +20,13 @@ struct tr_trps_event_cookie {
 };
 
 
+/* callback to schedule event to process messages */
+static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
+{
+  struct event *mq_ev=(struct event *)arg;
+  event_active(mq_ev, 0, 0);
+}
+
 static int tr_trps_req_handler (TRPS_INSTANCE *trps,
                                 TRP_REQ *orig_req, 
                                 void *tr_in)
@@ -58,7 +65,6 @@ static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
   return 0;
 }
 
-
 /* data passed to thread */
 struct thread_data {
   TRP_CONNECTION *conn;
@@ -67,9 +73,11 @@ struct thread_data {
 /* thread to handle GSS connections to peers */
 static void *tr_trps_conn_thread(void *arg)
 {
+  TALLOC_CTX *tmp_ctx=talloc_new(NULL);
   struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data);
   TRP_CONNECTION *conn=thread_data->conn;
   TRPS_INSTANCE *trps=thread_data->trps;
+  TR_MQ_MSG *msg=NULL;
 
   tr_debug("tr_trps_conn_thread: started");
   /* try to establish a GSS context */
@@ -78,6 +86,15 @@ static void *tr_trps_conn_thread(void *arg)
     pthread_exit(NULL);
   }
   tr_notice("tr_trps_conn_thread: authorized connection");
+  
+  msg=tr_mq_msg_new(tmp_ctx);
+  if (msg==NULL)
+    tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
+  else
+    trps_mq_append(trps, msg);
+
+  tr_debug("tr_trps_conn_thread: exit");
+  talloc_free(tmp_ctx);
   return NULL;
 }
 
@@ -110,22 +127,52 @@ static void tr_trps_event_cb(int listener, short event, void *arg)
       thread_data->conn=conn;
       thread_data->trps=trps;
       pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data);
+      pthread_detach(*(conn->thread)); /* we will not rejoin the thread */
       trps_add_connection(trps, conn); /* remember the connection */
     }
   }
   talloc_free(tmp_ctx);
 }
 
+static void tr_trps_process_mq(int socket, short event, void *arg)
+{
+  TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
+  TR_MQ_MSG *msg=NULL;
+
+  tr_debug("tr_trps_process_mw: starting");
+  msg=trps_mq_pop(trps);
+  while (msg!=NULL) {
+    tr_debug("tr_trps_process_mq: received message");
+    tr_mq_msg_free(msg);
+    msg=trps_mq_pop(trps);
+  }
+  tr_debug("tr_trps_process_mw: ending");
+}
+
+TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
+{
+  TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
+  if (ev!=NULL) {
+    ev->listen_ev=talloc(ev, struct tr_socket_event);
+    ev->mq_ev=NULL;
+    if (ev->listen_ev==NULL) {
+      talloc_free(ev);
+      ev=NULL;
+    }
+  }
+  return ev;
+}
 
 /* Configure the trps instance and set up its event handler.
- * Returns 0 on success, nonzero on failure. Fills in
- * *trps_event (which should be allocated by caller). */
+ * Returns 0 on success, nonzero on failure. Results in 
+ * trps_ev, which should be allocated by caller. */
 int tr_trps_event_init(struct event_base *base,
                        TRPS_INSTANCE *trps,
                        TR_CFG_MGR *cfg_mgr,
-                       struct tr_socket_event *trps_ev)
+                       TR_TRPS_EVENTS *trps_ev)
 {
   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+  struct tr_socket_event *listen_ev=NULL;
   struct tr_trps_event_cookie *cookie;
   int retval=0;
 
@@ -135,6 +182,9 @@ int tr_trps_event_init(struct event_base *base,
     goto cleanup;
   }
 
+  /* get convenient handles */
+  listen_ev=trps_ev->listen_ev;
+
   /* Create the cookie for callbacks. It is part of the trps context, so it will
    * be cleaned up when trps is freed by talloc_free. */
   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
@@ -148,25 +198,34 @@ int tr_trps_event_init(struct event_base *base,
   talloc_steal(trps, cookie);
 
   /* get a trps listener */
-  trps_ev->sock_fd=trps_get_listener(trps,
+  listen_ev->sock_fd=trps_get_listener(trps,
                                      tr_trps_req_handler,
                                      tr_trps_gss_handler,
                                      cfg_mgr->active->internal->hostname,
                                      cfg_mgr->active->internal->trps_port,
                                      (void *)cookie);
-  if (trps_ev->sock_fd < 0) {
+  if (listen_ev->sock_fd < 0) {
     tr_crit("Error opening TRP server socket.");
     retval=1;
     goto cleanup;
   }
-
+  
   /* and its event */
-  trps_ev->ev=event_new(base,
-                        trps_ev->sock_fd,
-                        EV_READ|EV_PERSIST,
-                        tr_trps_event_cb,
-                        (void *)trps);
-  event_add(trps_ev->ev, NULL);
+  listen_ev->ev=event_new(base,
+                          listen_ev->sock_fd,
+                          EV_READ|EV_PERSIST,
+                          tr_trps_event_cb,
+                          (void *)trps);
+  event_add(listen_ev->ev, NULL);
+  
+  /* now set up message queue processing event, only triggered by
+   * tr_trps_mq_cb() */
+  trps_ev->mq_ev=event_new(base,
+                           0,
+                           EV_PERSIST,
+                           tr_trps_process_mq,
+                           (void *)trps);
+  tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
 
 cleanup:
   talloc_free(tmp_ctx);
index d0e6a31..34d6533 100644 (file)
@@ -35,6 +35,16 @@ void trps_free (TRPS_INSTANCE *trps)
     talloc_free(trps);
 }
 
+TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps)
+{
+  return tr_mq_pop(trps->mq);
+}
+
+void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg)
+{
+  tr_mq_append(trps->mq, msg);
+}
+
 /* stand-in for a function that finds the connection for a particular peer */
 #if 0
 static TRP_CONNECTION *trps_find_connection(TRPS_INSTANCE *trps)