3 #include <event2/event.h>
11 #include <trp_internal.h>
12 #include <tr_config.h>
18 /* hold a trps instance and a config manager */
19 struct tr_trps_event_cookie {
24 /* callback to schedule event to process messages */
25 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
27 struct event *mq_ev=(struct event *)arg;
28 event_active(mq_ev, 0, 0);
31 static void msg_free_helper(void *p)
33 tr_msg_free_decoded((TR_MSG *)p);
35 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
36 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
40 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
41 TR_MQ_MSG *mq_msg=NULL;
43 /* n.b., conn is available here, but do not hold onto the reference
44 * because it may be cleaned up if the originating connection goes
45 * down before the message is processed */
46 mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
50 tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
51 trps_mq_append(trps, mq_msg);
52 talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
57 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
61 struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
62 TRPS_INSTANCE *trps = cookie->trps;
63 TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
64 TR_NAME name={gss_name->value, gss_name->length};
66 tr_debug("tr_trps_gss_handler()");
68 if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
69 tr_debug("tr_trps_gss_handler: Bad parameters.");
73 /* look up the RP client matching the GSS name */
74 if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
75 tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
79 /*trps->rp_gss = rp;*/
80 tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
85 /* data passed to thread */
86 struct trps_thread_data {
90 /* thread to handle GSS connections to peers */
91 static void *tr_trps_thread(void *arg)
93 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
94 struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
95 TRP_CONNECTION *conn=thread_data->conn;
96 TRPS_INSTANCE *trps=thread_data->trps;
99 tr_debug("tr_trps_thread: started");
100 trps_handle_connection(trps, conn);
102 msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
103 tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
105 tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
107 trps_mq_append(trps, msg);
109 tr_debug("tr_trps_thread: exit");
110 talloc_free(tmp_ctx);
114 /* called when a connection to the TRPS port is received */
115 static void tr_trps_event_cb(int listener, short event, void *arg)
117 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
118 TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
119 TRP_CONNECTION *conn=NULL;
120 TR_NAME *gssname=NULL;
122 struct trps_thread_data *thread_data=NULL;
124 if (0==(event & EV_READ)) {
125 tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
127 /* create a thread to handle this connection */
128 asprintf(&name, "trustrouter@%s", trps->hostname);
129 gssname=tr_new_name(name);
130 free(name); name=NULL;
131 conn=trp_connection_accept(tmp_ctx, listener, gssname);
133 /* need to monitor this fd and trigger events when read becomes possible */
134 thread_data=talloc(conn, struct trps_thread_data);
135 if (thread_data==NULL) {
136 tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
137 talloc_free(tmp_ctx);
140 thread_data->conn=conn;
141 thread_data->trps=trps;
142 pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
143 pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
144 trps_add_connection(trps, conn); /* remember the connection */
147 talloc_free(tmp_ctx);
150 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
152 /* everything belonging to the thread is in the TRP_CONNECTION
153 * associated with it */
154 trps_remove_connection(trps, conn);
155 tr_debug("Deleted connection");
158 static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f)
160 char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL);
162 fprintf(f, "Unable to print route table.\n");
164 fprintf(f, "%s\n", table);
169 static void tr_trps_process_mq(int socket, short event, void *arg)
171 TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
175 msg=trps_mq_pop(trps);
177 s=tr_mq_msg_get_message(msg);
178 if (0==strcmp(s, "trps_thread_exit")) {
179 tr_trps_cleanup_thread(trps,
180 talloc_get_type_abort(tr_mq_msg_get_payload(msg),
183 else if (0==strcmp(s, "tr_msg")) {
184 if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS)
185 tr_notice("tr_trps_process_mq: error handling message.");
187 tr_trps_print_route_table(trps, stderr);
191 tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
194 msg=trps_mq_pop(trps);
198 static int tr_trps_events_destructor(void *obj)
200 TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
202 event_free(ev->mq_ev);
205 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
207 TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
209 ev->listen_ev=talloc(ev, struct tr_socket_event);
211 if (ev->listen_ev==NULL) {
215 talloc_set_destructor((void *)ev, tr_trps_events_destructor);
220 /* Configure the trps instance and set up its event handler.
221 * Fills in trps_ev, which should be allocated by caller. */
222 TRP_RC tr_trps_event_init(struct event_base *base,
225 TR_TRPS_EVENTS *trps_ev)
227 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
228 struct tr_socket_event *listen_ev=NULL;
229 struct tr_trps_event_cookie *cookie;
230 TRP_RC retval=TRP_ERROR;
232 if (trps_ev == NULL) {
233 tr_debug("tr_trps_event_init: Null trps_ev.");
238 /* get convenient handles */
239 listen_ev=trps_ev->listen_ev;
241 /* Create the cookie for callbacks. It is part of the trps context, so it will
242 * be cleaned up when trps is freed by talloc_free. */
243 cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
244 if (cookie == NULL) {
245 tr_debug("tr_trps_event_init: Unable to allocate cookie.");
250 cookie->cfg_mgr=cfg_mgr;
251 talloc_steal(trps, cookie);
253 /* get a trps listener */
254 listen_ev->sock_fd=trps_get_listener(trps,
257 cfg_mgr->active->internal->hostname,
258 cfg_mgr->active->internal->trps_port,
260 if (listen_ev->sock_fd < 0) {
261 tr_crit("Error opening TRP server socket.");
267 listen_ev->ev=event_new(base,
272 event_add(listen_ev->ev, NULL);
274 /* now set up message queue processing event, only triggered by
276 trps_ev->mq_ev=event_new(base,
281 tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
286 talloc_free(tmp_ctx);
291 struct trpc_notify_cb_data {
294 pthread_mutex_t mutex;
297 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
299 struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
300 pthread_mutex_lock(&(cb_data->mutex));
301 if (!cb_data->msg_ready) {
302 cb_data->msg_ready=1;
303 pthread_cond_signal(&(cb_data->cond));
305 pthread_mutex_unlock(&(cb_data->mutex));
308 /* data passed to thread */
309 struct trpc_thread_data {
313 static void *tr_trpc_thread(void *arg)
315 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
316 struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
317 TRPC_INSTANCE *trpc=thread_data->trpc;
318 TRPS_INSTANCE *trps=thread_data->trps;
321 const char *msg_type=NULL;
322 char *encoded_msg=NULL;
324 struct trpc_notify_cb_data cb_data={0,
325 PTHREAD_COND_INITIALIZER,
326 PTHREAD_MUTEX_INITIALIZER};
328 tr_debug("tr_trpc_thread: started");
330 /* set up the mq for receiving */
331 pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
333 tr_mq_lock(trpc->mq);
334 tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
335 tr_mq_unlock(trpc->mq);
337 rc=trpc_connect(trpc);
338 if (rc!=TRP_SUCCESS) {
339 tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
340 trpc_get_server(trpc),
341 trpc_get_port(trpc));
345 pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
346 /* verify the condition */
347 if (cb_data.msg_ready) {
348 msg=trpc_mq_pop(trpc);
350 /* no message in the queue */
351 tr_err("tr_trpc_thread: notified of msg, but queue empty");
355 msg_type=tr_mq_msg_get_message(msg);
357 if (0==strcmp(msg_type, "trpc_abort")) {
359 break; /* exit loop */
361 else if (0==strcmp(msg_type, "trpc_send")) {
362 encoded_msg=tr_mq_msg_get_payload(msg);
363 if (encoded_msg==NULL)
364 tr_notice("tr_trpc_thread: null outgoing TRP message.");
366 rc = trpc_send_msg(trpc, encoded_msg);
367 if (rc!=TRP_SUCCESS) {
368 tr_notice("tr_trpc_thread: trpc_send_msg failed.");
375 tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
382 msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
383 tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
385 tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
387 trps_mq_append(trps, msg);
389 talloc_free(tmp_ctx);
393 /* starts a trpc thread to connect to server:port */
394 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
396 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
397 TRPC_INSTANCE *trpc=NULL;
398 TRP_CONNECTION *conn=NULL;
399 struct trpc_thread_data *thread_data=NULL;
401 tr_debug("tr_trpc_initiate entered");
402 trpc=trpc_new(tmp_ctx);
404 tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
407 tr_debug("tr_trpc_initiate: allocated trpc");
409 conn=trp_connection_new(trpc);
411 tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
414 trpc_set_conn(trpc, conn);
415 trpc_set_server(trpc, talloc_strdup(trpc, server));
416 trpc_set_port(trpc, port);
417 tr_debug("tr_trpc_initiate: allocated connection");
420 thread_data=talloc(trpc, struct trpc_thread_data);
421 if (thread_data==NULL) {
422 tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
425 thread_data->trpc=trpc;
426 thread_data->trps=trps;
428 pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
429 pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
431 tr_debug("tr_trpc_initiate: started trpc thread");
432 trps_add_trpc(trps, trpc);
434 talloc_report_full(trps, stderr);
435 talloc_report_full(tmp_ctx, stderr);
438 talloc_free(tmp_ctx);