#include <pthread.h>
#include <tr_mq.h>
+#include <tr_debug.h>
/* Messages */
static int tr_mq_msg_destructor(void *object)
int tr_mq_unlock(TR_MQ *mq)
{
- return pthread_mutex_lock(&(mq->mutex));
+ return pthread_mutex_unlock(&(mq->mutex));
}
static TR_MQ_MSG *tr_mq_get_head(TR_MQ *mq)
/* see if we need to tell someone we became non-empty */
if (was_empty && (notify_cb!=NULL))
- mq->notify_cb(mq, notify_cb_arg);
+ notify_cb(mq, notify_cb_arg);
}
/* caller must free msg via tr_mq_msg_free */
tr_mq_lock(mq);
if (tr_mq_get_head(mq)!=NULL) {
popped=tr_mq_get_head(mq);
- tr_mq_msg_set_next(popped, tr_mq_msg_get_next(popped)); /* popped is the old head */
+ tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
if (tr_mq_get_head(mq)==NULL)
tr_mq_set_tail(mq, NULL); /* just popped the last element */
}
#include <jansson.h>
#include <argp.h>
#include <event2/event.h>
-#include <event2/thread.h>
#include <talloc.h>
#include <sys/time.h>
TR_INSTANCE *tr = NULL;
struct cmdline_args opts;
struct event_base *ev_base;
- struct tr_socket_event tids_ev, trps_ev;
+ struct tr_socket_event tids_ev;
+ TR_TRPS_EVENTS *trps_ev;
struct event *cfgwatch_ev;
/* we're going to be multithreaded, so disable null context tracking */
}
/***** Set up the event loop *****/
- evthread_use_pthreads(); /* enable pthreads support */
ev_base=tr_event_loop_init(); /* Set up the event loop */
if (ev_base==NULL) {
tr_crit("Error initializing event loop.");
}
/* install TRP handler events */
+ trps_ev=tr_trps_events_new(main_ctx);
if (0 != tr_trps_event_init(ev_base,
tr->trps,
tr->cfg_mgr,
- &trps_ev)) {
+ trps_ev)) {
tr_crit("Error initializing Trust Path Query Server instance.");
return 1;
}
};
+/* callback to schedule event to process messages */
+static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
+{
+ struct event *mq_ev=(struct event *)arg;
+ event_active(mq_ev, 0, 0);
+}
+
static int tr_trps_req_handler (TRPS_INSTANCE *trps,
TRP_REQ *orig_req,
void *tr_in)
return 0;
}
-
/* data passed to thread */
struct thread_data {
TRP_CONNECTION *conn;
/* thread to handle GSS connections to peers */
static void *tr_trps_conn_thread(void *arg)
{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data);
TRP_CONNECTION *conn=thread_data->conn;
TRPS_INSTANCE *trps=thread_data->trps;
+ TR_MQ_MSG *msg=NULL;
tr_debug("tr_trps_conn_thread: started");
/* try to establish a GSS context */
pthread_exit(NULL);
}
tr_notice("tr_trps_conn_thread: authorized connection");
+
+ msg=tr_mq_msg_new(tmp_ctx);
+ if (msg==NULL)
+ tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
+ else
+ trps_mq_append(trps, msg);
+
+ tr_debug("tr_trps_conn_thread: exit");
+ talloc_free(tmp_ctx);
return NULL;
}
thread_data->conn=conn;
thread_data->trps=trps;
pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data);
+ pthread_detach(*(conn->thread)); /* we will not rejoin the thread */
trps_add_connection(trps, conn); /* remember the connection */
}
}
talloc_free(tmp_ctx);
}
+static void tr_trps_process_mq(int socket, short event, void *arg)
+{
+ TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
+ TR_MQ_MSG *msg=NULL;
+
+ tr_debug("tr_trps_process_mw: starting");
+ msg=trps_mq_pop(trps);
+ while (msg!=NULL) {
+ tr_debug("tr_trps_process_mq: received message");
+ tr_mq_msg_free(msg);
+ msg=trps_mq_pop(trps);
+ }
+ tr_debug("tr_trps_process_mw: ending");
+}
+
+TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
+{
+ TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
+ if (ev!=NULL) {
+ ev->listen_ev=talloc(ev, struct tr_socket_event);
+ ev->mq_ev=NULL;
+ if (ev->listen_ev==NULL) {
+ talloc_free(ev);
+ ev=NULL;
+ }
+ }
+ return ev;
+}
/* Configure the trps instance and set up its event handler.
- * Returns 0 on success, nonzero on failure. Fills in
- * *trps_event (which should be allocated by caller). */
+ * Returns 0 on success, nonzero on failure. Results in
+ * trps_ev, which should be allocated by caller. */
int tr_trps_event_init(struct event_base *base,
TRPS_INSTANCE *trps,
TR_CFG_MGR *cfg_mgr,
- struct tr_socket_event *trps_ev)
+ TR_TRPS_EVENTS *trps_ev)
{
TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ struct tr_socket_event *listen_ev=NULL;
struct tr_trps_event_cookie *cookie;
int retval=0;
goto cleanup;
}
+ /* get convenient handles */
+ listen_ev=trps_ev->listen_ev;
+
/* Create the cookie for callbacks. It is part of the trps context, so it will
* be cleaned up when trps is freed by talloc_free. */
cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
talloc_steal(trps, cookie);
/* get a trps listener */
- trps_ev->sock_fd=trps_get_listener(trps,
+ listen_ev->sock_fd=trps_get_listener(trps,
tr_trps_req_handler,
tr_trps_gss_handler,
cfg_mgr->active->internal->hostname,
cfg_mgr->active->internal->trps_port,
(void *)cookie);
- if (trps_ev->sock_fd < 0) {
+ if (listen_ev->sock_fd < 0) {
tr_crit("Error opening TRP server socket.");
retval=1;
goto cleanup;
}
-
+
/* and its event */
- trps_ev->ev=event_new(base,
- trps_ev->sock_fd,
- EV_READ|EV_PERSIST,
- tr_trps_event_cb,
- (void *)trps);
- event_add(trps_ev->ev, NULL);
+ listen_ev->ev=event_new(base,
+ listen_ev->sock_fd,
+ EV_READ|EV_PERSIST,
+ tr_trps_event_cb,
+ (void *)trps);
+ event_add(listen_ev->ev, NULL);
+
+ /* now set up message queue processing event, only triggered by
+ * tr_trps_mq_cb() */
+ trps_ev->mq_ev=event_new(base,
+ 0,
+ EV_PERSIST,
+ tr_trps_process_mq,
+ (void *)trps);
+ tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
cleanup:
talloc_free(tmp_ctx);