From: Jennifer Richards Date: Fri, 24 Jun 2016 17:16:13 +0000 (-0400) Subject: Successful messages via mq to main thread. X-Git-Tag: v2.0~57 X-Git-Url: http://www.project-moonshot.org/gitweb/?p=trust_router.git;a=commitdiff_plain;h=5903dd2a15a4a3cffe1b2eeff53e7be9d30f4d3b Successful messages via mq to main thread. --- diff --git a/common/tr_mq.c b/common/tr_mq.c index 4e43091..02b42ee 100644 --- a/common/tr_mq.c +++ b/common/tr_mq.c @@ -2,6 +2,7 @@ #include #include +#include /* Messages */ static int tr_mq_msg_destructor(void *object) @@ -79,7 +80,7 @@ int tr_mq_lock(TR_MQ *mq) int tr_mq_unlock(TR_MQ *mq) { - return pthread_mutex_lock(&(mq->mutex)); + return pthread_mutex_unlock(&(mq->mutex)); } static TR_MQ_MSG *tr_mq_get_head(TR_MQ *mq) @@ -133,7 +134,7 @@ void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg) /* see if we need to tell someone we became non-empty */ if (was_empty && (notify_cb!=NULL)) - mq->notify_cb(mq, notify_cb_arg); + notify_cb(mq, notify_cb_arg); } /* caller must free msg via tr_mq_msg_free */ @@ -144,7 +145,7 @@ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq) tr_mq_lock(mq); if (tr_mq_get_head(mq)!=NULL) { popped=tr_mq_get_head(mq); - tr_mq_msg_set_next(popped, tr_mq_msg_get_next(popped)); /* popped is the old head */ + tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */ if (tr_mq_get_head(mq)==NULL) tr_mq_set_tail(mq, NULL); /* just popped the last element */ } diff --git a/include/tr_trp.h b/include/tr_trp.h index 48bc331..32c1f4c 100644 --- a/include/tr_trp.h +++ b/include/tr_trp.h @@ -1,6 +1,7 @@ #ifndef TR_TRP_H #define TR_TRP_H +#include #include #include @@ -8,8 +9,14 @@ #include #include +typedef struct tr_trps_events { + struct tr_socket_event *listen_ev; + struct event *mq_ev; +} TR_TRPS_EVENTS; + /* prototypes */ +TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx); int tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr, - struct tr_socket_event *trps_ev); + TR_TRPS_EVENTS *trps_ev); #endif /* TR_TRP_H */ diff --git a/include/trp_internal.h b/include/trp_internal.h index ca60364..d8ea986 100644 --- a/include/trp_internal.h +++ b/include/trp_internal.h @@ -120,5 +120,6 @@ int trps_get_listener(TRPS_INSTANCE *trps, unsigned int port, void *cookie); int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data); - +TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps); +void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg); #endif /* TRP_INTERNAL_H */ diff --git a/tr/tr_event.c b/tr/tr_event.c index c637d94..452dea9 100644 --- a/tr/tr_event.c +++ b/tr/tr_event.c @@ -1,4 +1,5 @@ #include +#include #include #include @@ -7,11 +8,12 @@ /* Allocate and set up the event base, return a pointer * to the new event_base or NULL on failure. - * Does not currently enable thread-safe mode. */ + * Enables thread-safe mode. */ struct event_base *tr_event_loop_init(void) { struct event_base *base=NULL; + evthread_use_pthreads(); /* enable pthreads support */ base=event_base_new(); return base; } diff --git a/tr/tr_main.c b/tr/tr_main.c index 65a9521..7a31d5b 100644 --- a/tr/tr_main.c +++ b/tr/tr_main.c @@ -37,7 +37,6 @@ #include #include #include -#include #include #include @@ -131,7 +130,8 @@ int main(int argc, char *argv[]) TR_INSTANCE *tr = NULL; struct cmdline_args opts; struct event_base *ev_base; - struct tr_socket_event tids_ev, trps_ev; + struct tr_socket_event tids_ev; + TR_TRPS_EVENTS *trps_ev; struct event *cfgwatch_ev; /* we're going to be multithreaded, so disable null context tracking */ @@ -187,7 +187,6 @@ int main(int argc, char *argv[]) } /***** Set up the event loop *****/ - evthread_use_pthreads(); /* enable pthreads support */ ev_base=tr_event_loop_init(); /* Set up the event loop */ if (ev_base==NULL) { tr_crit("Error initializing event loop."); @@ -217,10 +216,11 @@ int main(int argc, char *argv[]) } /* install TRP handler events */ + trps_ev=tr_trps_events_new(main_ctx); if (0 != tr_trps_event_init(ev_base, tr->trps, tr->cfg_mgr, - &trps_ev)) { + trps_ev)) { tr_crit("Error initializing Trust Path Query Server instance."); return 1; } diff --git a/tr/tr_trp.c b/tr/tr_trp.c index 9a416e8..e109301 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -20,6 +20,13 @@ struct tr_trps_event_cookie { }; +/* callback to schedule event to process messages */ +static void tr_trps_mq_cb(TR_MQ *mq, void *arg) +{ + struct event *mq_ev=(struct event *)arg; + event_active(mq_ev, 0, 0); +} + static int tr_trps_req_handler (TRPS_INSTANCE *trps, TRP_REQ *orig_req, void *tr_in) @@ -58,7 +65,6 @@ static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name, return 0; } - /* data passed to thread */ struct thread_data { TRP_CONNECTION *conn; @@ -67,9 +73,11 @@ struct thread_data { /* thread to handle GSS connections to peers */ static void *tr_trps_conn_thread(void *arg) { + TALLOC_CTX *tmp_ctx=talloc_new(NULL); struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data); TRP_CONNECTION *conn=thread_data->conn; TRPS_INSTANCE *trps=thread_data->trps; + TR_MQ_MSG *msg=NULL; tr_debug("tr_trps_conn_thread: started"); /* try to establish a GSS context */ @@ -78,6 +86,15 @@ static void *tr_trps_conn_thread(void *arg) pthread_exit(NULL); } tr_notice("tr_trps_conn_thread: authorized connection"); + + msg=tr_mq_msg_new(tmp_ctx); + if (msg==NULL) + tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG"); + else + trps_mq_append(trps, msg); + + tr_debug("tr_trps_conn_thread: exit"); + talloc_free(tmp_ctx); return NULL; } @@ -110,22 +127,52 @@ static void tr_trps_event_cb(int listener, short event, void *arg) thread_data->conn=conn; thread_data->trps=trps; pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data); + pthread_detach(*(conn->thread)); /* we will not rejoin the thread */ trps_add_connection(trps, conn); /* remember the connection */ } } talloc_free(tmp_ctx); } +static void tr_trps_process_mq(int socket, short event, void *arg) +{ + TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE); + TR_MQ_MSG *msg=NULL; + + tr_debug("tr_trps_process_mw: starting"); + msg=trps_mq_pop(trps); + while (msg!=NULL) { + tr_debug("tr_trps_process_mq: received message"); + tr_mq_msg_free(msg); + msg=trps_mq_pop(trps); + } + tr_debug("tr_trps_process_mw: ending"); +} + +TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx) +{ + TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS); + if (ev!=NULL) { + ev->listen_ev=talloc(ev, struct tr_socket_event); + ev->mq_ev=NULL; + if (ev->listen_ev==NULL) { + talloc_free(ev); + ev=NULL; + } + } + return ev; +} /* Configure the trps instance and set up its event handler. - * Returns 0 on success, nonzero on failure. Fills in - * *trps_event (which should be allocated by caller). */ + * Returns 0 on success, nonzero on failure. Results in + * trps_ev, which should be allocated by caller. */ int tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr, - struct tr_socket_event *trps_ev) + TR_TRPS_EVENTS *trps_ev) { TALLOC_CTX *tmp_ctx=talloc_new(NULL); + struct tr_socket_event *listen_ev=NULL; struct tr_trps_event_cookie *cookie; int retval=0; @@ -135,6 +182,9 @@ int tr_trps_event_init(struct event_base *base, goto cleanup; } + /* get convenient handles */ + listen_ev=trps_ev->listen_ev; + /* Create the cookie for callbacks. It is part of the trps context, so it will * be cleaned up when trps is freed by talloc_free. */ cookie=talloc(tmp_ctx, struct tr_trps_event_cookie); @@ -148,25 +198,34 @@ int tr_trps_event_init(struct event_base *base, talloc_steal(trps, cookie); /* get a trps listener */ - trps_ev->sock_fd=trps_get_listener(trps, + listen_ev->sock_fd=trps_get_listener(trps, tr_trps_req_handler, tr_trps_gss_handler, cfg_mgr->active->internal->hostname, cfg_mgr->active->internal->trps_port, (void *)cookie); - if (trps_ev->sock_fd < 0) { + if (listen_ev->sock_fd < 0) { tr_crit("Error opening TRP server socket."); retval=1; goto cleanup; } - + /* and its event */ - trps_ev->ev=event_new(base, - trps_ev->sock_fd, - EV_READ|EV_PERSIST, - tr_trps_event_cb, - (void *)trps); - event_add(trps_ev->ev, NULL); + listen_ev->ev=event_new(base, + listen_ev->sock_fd, + EV_READ|EV_PERSIST, + tr_trps_event_cb, + (void *)trps); + event_add(listen_ev->ev, NULL); + + /* now set up message queue processing event, only triggered by + * tr_trps_mq_cb() */ + trps_ev->mq_ev=event_new(base, + 0, + EV_PERSIST, + tr_trps_process_mq, + (void *)trps); + tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev); cleanup: talloc_free(tmp_ctx); diff --git a/trp/trps.c b/trp/trps.c index d0e6a31..34d6533 100644 --- a/trp/trps.c +++ b/trp/trps.c @@ -35,6 +35,16 @@ void trps_free (TRPS_INSTANCE *trps) talloc_free(trps); } +TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps) +{ + return tr_mq_pop(trps->mq); +} + +void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg) +{ + tr_mq_append(trps->mq, msg); +} + /* stand-in for a function that finds the connection for a particular peer */ #if 0 static TRP_CONNECTION *trps_find_connection(TRPS_INSTANCE *trps)