3 #include <event2/event.h>
12 #include <trp_internal.h>
13 #include <tr_config.h>
19 /* data for event callbacks */
20 struct tr_trps_event_cookie {
26 /* callback to schedule event to process messages */
27 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
29 struct event *mq_ev=(struct event *)arg;
30 event_active(mq_ev, 0, 0);
33 static void msg_free_helper(void *p)
35 tr_msg_free_decoded((TR_MSG *)p);
37 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
38 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
42 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
43 TR_MQ_MSG *mq_msg=NULL;
45 /* n.b., conn is available here, but do not hold onto the reference
46 * because it may be cleaned up if the originating connection goes
47 * down before the message is processed */
48 mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
52 tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
53 trps_mq_append(trps, mq_msg);
54 talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
59 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
63 struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
64 TRPS_INSTANCE *trps = cookie->trps;
65 TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
66 TR_NAME name={gss_name->value, gss_name->length};
68 tr_debug("tr_trps_gss_handler()");
70 if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
71 tr_debug("tr_trps_gss_handler: Bad parameters.");
75 /* look up the RP client matching the GSS name */
76 if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
77 tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
81 /*trps->rp_gss = rp;*/
82 tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
87 /* data passed to thread */
88 struct trps_thread_data {
92 /* thread to handle GSS connections to peers */
93 static void *tr_trps_thread(void *arg)
95 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
96 struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
97 TRP_CONNECTION *conn=thread_data->conn;
98 TRPS_INSTANCE *trps=thread_data->trps;
101 tr_debug("tr_trps_thread: started");
102 trps_handle_connection(trps, conn);
104 msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
105 tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
107 tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
109 trps_mq_append(trps, msg);
111 tr_debug("tr_trps_thread: exit");
112 talloc_free(tmp_ctx);
116 /* called when a connection to the TRPS port is received */
117 static void tr_trps_event_cb(int listener, short event, void *arg)
119 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
120 TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
121 TRP_CONNECTION *conn=NULL;
122 TR_NAME *gssname=NULL;
124 struct trps_thread_data *thread_data=NULL;
126 if (0==(event & EV_READ)) {
127 tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
129 /* create a thread to handle this connection */
130 asprintf(&name, "trustrouter@%s", trps->hostname);
131 gssname=tr_new_name(name);
132 free(name); name=NULL;
133 conn=trp_connection_accept(tmp_ctx, listener, gssname);
135 /* need to monitor this fd and trigger events when read becomes possible */
136 thread_data=talloc(conn, struct trps_thread_data);
137 if (thread_data==NULL) {
138 tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
139 talloc_free(tmp_ctx);
142 thread_data->conn=conn;
143 thread_data->trps=trps;
144 pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
145 pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
146 trps_add_connection(trps, conn); /* remember the connection */
149 talloc_free(tmp_ctx);
152 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
154 /* everything belonging to the thread is in the TRP_CONNECTION
155 * associated with it */
156 trps_remove_connection(trps, conn);
157 tr_debug("Deleted connection");
160 static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f)
162 char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL);
164 fprintf(f, "Unable to print route table.\n");
166 fprintf(f, "%s\n", table);
171 static void tr_trps_process_mq(int socket, short event, void *arg)
173 TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
177 msg=trps_mq_pop(trps);
179 s=tr_mq_msg_get_message(msg);
180 if (0==strcmp(s, "trps_thread_exit")) {
181 tr_trps_cleanup_thread(trps,
182 talloc_get_type_abort(tr_mq_msg_get_payload(msg),
185 else if (0==strcmp(s, "tr_msg")) {
186 if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS)
187 tr_notice("tr_trps_process_mq: error handling message.");
189 tr_trps_print_route_table(trps, stderr);
193 tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
196 msg=trps_mq_pop(trps);
200 static void tr_trps_update(int listener, short event, void *arg)
202 struct tr_trps_event_cookie *cookie=talloc_get_type_abort(arg, struct tr_trps_event_cookie);
203 TRPS_INSTANCE *trps=cookie->trps;
204 struct event *ev=cookie->ev;
206 tr_debug("tr_trps_update: sending scheduled route updates.");
207 trps_scheduled_update(trps);
208 event_add(ev, &(trps->update_interval));
211 static void tr_trps_sweep(int listener, short event, void *arg)
213 struct tr_trps_event_cookie *cookie=talloc_get_type_abort(arg, struct tr_trps_event_cookie);
214 TRPS_INSTANCE *trps=cookie->trps;
215 struct event *ev=cookie->ev;
217 tr_debug("tr_trps_sweep: sweeping routes");
218 trps_sweep_routes(trps);
219 /* schedule the event to run again */
220 event_add(ev, &(trps->sweep_interval));
223 static int tr_trps_events_destructor(void *obj)
225 TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
227 event_free(ev->mq_ev);
228 if (ev->sweep_ev!=NULL)
229 event_free(ev->sweep_ev);
232 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
234 TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
236 ev->listen_ev=talloc(ev, struct tr_socket_event);
240 if (ev->listen_ev==NULL) {
244 talloc_set_destructor((void *)ev, tr_trps_events_destructor);
249 /* Configure the trps instance and set up its event handler.
250 * Fills in trps_ev, which should be allocated by caller. */
251 TRP_RC tr_trps_event_init(struct event_base *base,
254 TR_TRPS_EVENTS *trps_ev)
256 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
257 struct tr_socket_event *listen_ev=NULL;
258 struct tr_trps_event_cookie *trps_cookie=NULL;
259 struct tr_trps_event_cookie *update_cookie=NULL;
260 struct tr_trps_event_cookie *sweep_cookie=NULL;
261 TRP_RC retval=TRP_ERROR;
263 if (trps_ev == NULL) {
264 tr_debug("tr_trps_event_init: Null trps_ev.");
269 /* get convenient handles */
270 listen_ev=trps_ev->listen_ev;
272 /* Create the cookie for callbacks. It is part of the trps context, so it will
273 * be cleaned up when trps is freed by talloc_free. */
274 trps_cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
275 if (trps_cookie == NULL) {
276 tr_debug("tr_trps_event_init: Unable to allocate trps_cookie.");
280 trps_cookie->trps=trps;
281 trps_cookie->cfg_mgr=cfg_mgr;
282 talloc_steal(trps, trps_cookie);
284 /* get a trps listener */
285 listen_ev->sock_fd=trps_get_listener(trps,
288 cfg_mgr->active->internal->hostname,
289 cfg_mgr->active->internal->trps_port,
290 (void *)trps_cookie);
291 if (listen_ev->sock_fd < 0) {
292 tr_crit("Error opening TRP server socket.");
296 trps_cookie->ev=listen_ev->ev; /* in case it needs to frob the event */
299 listen_ev->ev=event_new(base,
304 event_add(listen_ev->ev, NULL);
306 /* now set up message queue processing event, only triggered by
308 trps_ev->mq_ev=event_new(base,
313 tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
315 /* now set up the route update timer event */
316 update_cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
317 if (update_cookie == NULL) {
318 tr_debug("tr_trps_event_init: Unable to allocate update_cookie.");
322 update_cookie->trps=trps;
323 update_cookie->cfg_mgr=cfg_mgr;
324 talloc_steal(trps, update_cookie);
325 trps_ev->update_ev=event_new(base, -1, EV_TIMEOUT, tr_trps_update, (void *)update_cookie);
326 update_cookie->ev=trps_ev->update_ev; /* in case it needs to frob the event */
327 event_add(trps_ev->update_ev, &(trps->update_interval));
329 /* now set up the route table sweep timer event */
330 sweep_cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
331 if (sweep_cookie == NULL) {
332 tr_debug("tr_trps_event_init: Unable to allocate sweep_cookie.");
336 sweep_cookie->trps=trps;
337 sweep_cookie->cfg_mgr=cfg_mgr;
338 talloc_steal(trps, sweep_cookie);
339 trps_ev->sweep_ev=event_new(base, -1, EV_TIMEOUT, tr_trps_sweep, (void *)sweep_cookie);
340 sweep_cookie->ev=trps_ev->sweep_ev; /* in case it needs to frob the event */
341 event_add(trps_ev->sweep_ev, &(trps->sweep_interval));
346 talloc_free(tmp_ctx);
351 struct trpc_notify_cb_data {
354 pthread_mutex_t mutex;
357 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
359 struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
360 pthread_mutex_lock(&(cb_data->mutex));
361 if (!cb_data->msg_ready) {
362 cb_data->msg_ready=1;
363 pthread_cond_signal(&(cb_data->cond));
365 pthread_mutex_unlock(&(cb_data->mutex));
368 /* data passed to thread */
369 struct trpc_thread_data {
373 static void *tr_trpc_thread(void *arg)
375 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
376 struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
377 TRPC_INSTANCE *trpc=thread_data->trpc;
378 TRPS_INSTANCE *trps=thread_data->trps;
381 const char *msg_type=NULL;
382 char *encoded_msg=NULL;
384 struct trpc_notify_cb_data cb_data={0,
385 PTHREAD_COND_INITIALIZER,
386 PTHREAD_MUTEX_INITIALIZER};
388 tr_debug("tr_trpc_thread: started");
390 /* set up the mq for receiving */
391 pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
393 tr_mq_lock(trpc->mq);
394 tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
395 tr_mq_unlock(trpc->mq);
397 rc=trpc_connect(trpc);
398 if (rc!=TRP_SUCCESS) {
399 tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
400 trpc_get_server(trpc),
401 trpc_get_port(trpc));
405 pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
406 /* verify the condition */
407 if (cb_data.msg_ready) {
408 msg=trpc_mq_pop(trpc);
410 /* no message in the queue */
411 tr_err("tr_trpc_thread: notified of msg, but queue empty");
415 msg_type=tr_mq_msg_get_message(msg);
417 if (0==strcmp(msg_type, "trpc_abort")) {
419 break; /* exit loop */
421 else if (0==strcmp(msg_type, "trpc_send")) {
422 encoded_msg=tr_mq_msg_get_payload(msg);
423 if (encoded_msg==NULL)
424 tr_notice("tr_trpc_thread: null outgoing TRP message.");
426 rc = trpc_send_msg(trpc, encoded_msg);
427 if (rc!=TRP_SUCCESS) {
428 tr_notice("tr_trpc_thread: trpc_send_msg failed.");
435 tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
442 msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
443 tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
445 tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
447 trps_mq_append(trps, msg);
449 talloc_free(tmp_ctx);
453 /* starts a trpc thread to connect to server:port */
454 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
456 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
457 TRPC_INSTANCE *trpc=NULL;
458 TRP_CONNECTION *conn=NULL;
459 struct trpc_thread_data *thread_data=NULL;
461 tr_debug("tr_trpc_initiate entered");
462 trpc=trpc_new(tmp_ctx);
464 tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
467 tr_debug("tr_trpc_initiate: allocated trpc");
469 conn=trp_connection_new(trpc);
471 tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
474 trpc_set_conn(trpc, conn);
475 trpc_set_server(trpc, talloc_strdup(trpc, server));
476 trpc_set_port(trpc, port);
477 tr_debug("tr_trpc_initiate: allocated connection");
480 thread_data=talloc(trpc, struct trpc_thread_data);
481 if (thread_data==NULL) {
482 tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
485 thread_data->trpc=trpc;
486 thread_data->trps=trps;
488 pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
489 pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
491 tr_debug("tr_trpc_initiate: started trpc thread");
492 trps_add_trpc(trps, trpc);
494 talloc_report_full(trps, stderr);
495 talloc_report_full(tmp_ctx, stderr);
498 talloc_free(tmp_ctx);
502 /* Called by the config manager after a change to the active configuration.
503 * Updates configuration of objects that do not know about the config manager. */
504 void tr_config_changed(TR_CFG *new_cfg, void *cookie)
506 TRPS_INSTANCE *trps=talloc_get_type_abort(cookie, TRPS_INSTANCE);
507 trps_set_update_interval(trps, new_cfg->internal->route_update_interval);
508 trps_set_sweep_interval(trps, new_cfg->internal->route_sweep_interval);