X-Git-Url: http://www.project-moonshot.org/gitweb/?a=blobdiff_plain;f=tr%2Ftr_trp.c;h=da92be4312db1bd4bb149dd0b680f549c40f9d67;hb=e3e0b9487831575fe3646811ceccc9a3a5b12ab9;hp=5ad734d18733f02ba5ed52269dd029a25c74da2c;hpb=cda9bb642a88246313649e481c761602f874667d;p=trust_router.git diff --git a/tr/tr_trp.c b/tr/tr_trp.c index 5ad734d..da92be4 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -47,7 +47,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -182,23 +184,24 @@ static void tr_trps_event_cb(int listener, short event, void *arg) tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event); } else { /* create a thread to handle this connection */ - if (asprintf(&name, "trustrouter@%s", trps->hostname)==-1) { + name = talloc_asprintf(tmp_ctx, "trustrouter@%s", trps->hostname); + if (name == NULL) goto cleanup; - } - gssname=tr_new_name(name); - free(name); name=NULL; - conn=trp_connection_accept(tmp_ctx, listener, gssname); - if (conn!=NULL) { + gssname=tr_new_name(name); /* name cleaned up with tmp_ctx but need to handl gssname ourselves */ + + conn=trp_connection_accept(tmp_ctx, listener, gssname); /* steals gssname unless it fails */ + if (conn == NULL) { + tr_free_name(gssname); + } else { /* need to monitor this fd and trigger events when read becomes possible */ thread_data=talloc(conn, struct trps_thread_data); if (thread_data==NULL) { tr_err("tr_trps_event_cb: unable to allocate trps_thread_data"); - talloc_free(tmp_ctx); - return; + goto cleanup; } thread_data->conn=conn; thread_data->trps=trps; - trps_add_connection(trps, conn); /* remember the connection */ + trps_add_connection(trps, conn); /* remember the connection - this puts conn and the thread data in trps's talloc context */ pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data); } } @@ -220,21 +223,47 @@ static void tr_trps_cleanup_conn(TRPS_INSTANCE *trps, TRP_CONNECTION *conn) static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc) { + TR_MQ_MSG *msg; + + /* tell the trpc thread to exit */ + msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK, TR_MQ_PRIO_NORMAL); + if (msg) { + tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit"); + trpc_mq_add(trpc, msg); + } else { + tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate"); + } + pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL); trps_remove_trpc(trps, trpc); trpc_free(trpc); tr_debug("tr_trps_cleanup_trpc: deleted connection"); } -static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f) +/** + * Get a dynamically allocated string with a description of the route table. + * Caller must free the string using talloc_free(). + * + * @param memctx talloc context for the string + * @param trps trps instance containing the route table + * @return pointer to the output, or NULL on error + */ +static char *tr_trps_route_table_to_str(TALLOC_CTX *memctx, TRPS_INSTANCE *trps) { - char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL); - if (table==NULL) - fprintf(f, "Unable to print route table.\n"); - else { - fprintf(f, "%s\n", table); - talloc_free(table); - } + return trp_rtable_to_str(memctx, trps->rtable, " | ", NULL); +} + +/** + * Get a dynamically allocated string with a description of the community table. + * Caller must free the string using talloc_free(). + * + * @param memctx talloc context for the string + * @param trps trps instance containing the community table + * @return pointer to the output, or NULL on error + */ +static char *tr_trps_comm_table_to_str(TALLOC_CTX *memctx, TRPS_INSTANCE *trps) +{ + return tr_comm_table_to_str(memctx, trps->ctable); } /** @@ -251,63 +280,70 @@ static void tr_trps_process_mq(int socket, short event, void *arg) TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE); TR_MQ_MSG *msg=NULL; const char *s=NULL; + TRP_PEER *peer = NULL; + char *tmp = NULL; msg=trps_mq_pop(trps); while (msg!=NULL) { s=tr_mq_msg_get_message(msg); if (0==strcmp(s, TR_MQMSG_TRPS_CONNECTED)) { - TR_NAME *gssname=(TR_NAME *)tr_mq_msg_get_payload(msg); - TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname); + TR_NAME *peer_gssname=(TR_NAME *)tr_mq_msg_get_payload(msg); + peer=trps_get_peer_by_gssname(trps, peer_gssname); /* get the peer record */ + tmp = tr_name_strdup(peer_gssname); /* get the name as a null-terminated string */ if (peer==NULL) - tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", gssname->buf); + tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp); else { trp_peer_set_incoming_status(peer, PEER_CONNECTED); - tr_err("tr_trps_process_mq: incoming connection from %s established.", gssname->buf); + tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp); } + free(tmp); } else if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) { TRP_CONNECTION *conn=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION); - TR_NAME *gssname=trp_connection_get_gssname(conn); - TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname); + TR_NAME *peer_gssname=trp_connection_get_peer(conn); + peer=trps_get_peer_by_gssname(trps, peer_gssname); /* get the peer record */ + tmp = tr_name_strdup(peer_gssname); /* get the name as a null-terminated string */ if (peer==NULL) { - tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) lost.", - trp_connection_get_gssname(conn)->buf); + tr_err("tr_trps_process_mq: incoming connection from unknown peer (%.*s) lost.", tmp); } else { trp_peer_set_incoming_status(peer, PEER_DISCONNECTED); tr_trps_cleanup_conn(trps, conn); - tr_err("tr_trps_process_mq: incoming connection from %s lost.", gssname->buf); + tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp); } + free(tmp); } else if (0==strcmp(s, TR_MQMSG_TRPC_CONNECTED)) { TR_NAME *svcname=(TR_NAME *)tr_mq_msg_get_payload(msg); - TRP_PEER *peer=trps_get_peer_by_servicename(trps, svcname); + peer=trps_get_peer_by_servicename(trps, svcname); + tmp = tr_name_strdup(svcname); if (peer==NULL) - tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", svcname->buf); + tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp); else { trp_peer_set_outgoing_status(peer, PEER_CONNECTED); - tr_err("tr_trps_process_mq: outgoing connection to %s established.", svcname->buf); + tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp); } + free(tmp); } else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) { - /* trpc connection died */ + /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol. + * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */ TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE); - TR_NAME *gssname=trpc_get_gssname(trpc); - TRP_PEER *peer=trps_get_peer_by_servicename(trps, gssname); + TR_NAME *svcname=trpc_get_gssname(trpc); + peer=trps_get_peer_by_servicename(trps, svcname); + tmp = tr_name_strdup(svcname); if (peer==NULL) - tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", gssname->buf); + tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp); else { trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED); - tr_err("tr_trps_process_mq: outgoing connection to %s lost.", gssname->buf); + tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp); tr_trps_cleanup_trpc(trps, trpc); } + free(tmp); } else if (0==strcmp(s, TR_MQMSG_MSG_RECEIVED)) { if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS) tr_notice("tr_trps_process_mq: error handling message."); - else { - tr_trps_print_route_table(trps, stderr); - } } else tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg)); @@ -334,12 +370,23 @@ static void tr_trps_sweep(int listener, short event, void *arg) struct tr_trps_event_cookie *cookie=talloc_get_type_abort(arg, struct tr_trps_event_cookie); TRPS_INSTANCE *trps=cookie->trps; struct event *ev=cookie->ev; + char *table_str=NULL; tr_debug("tr_trps_sweep: sweeping routes."); trps_sweep_routes(trps); tr_debug("tr_trps_sweep: sweeping communities."); trps_sweep_ctable(trps); - tr_trps_print_route_table(trps, stderr); + table_str=tr_trps_route_table_to_str(NULL, trps); + if (table_str!=NULL) { + tr_debug(table_str); + talloc_free(table_str); + } + + table_str=tr_trps_comm_table_to_str(NULL, trps); + if (table_str!=NULL) { + tr_debug(table_str); + talloc_free(table_str); + } /* schedule the event to run again */ event_add(ev, &(trps->sweep_interval)); } @@ -548,6 +595,49 @@ struct trpc_thread_data { TRPC_INSTANCE *trpc; TRPS_INSTANCE *trps; }; + +/** + * Returns when a message is ready + */ +static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb) +{ + cb->msg_ready = 0; + do + pthread_cond_wait(&(cb->cond), &(cb->mutex)); + while (!cb->msg_ready); +} + +/** + * Thread for handling TRPC (outgoing) connections + * + * Locking protocol: + * + * This thread uses a mutex and condition variable to wait for incoming messages + * on its queue. The main thread interacts with these through the tr_trpc_mq_cb() + * callback. This callback is called by the TR_MQ whenever the queue goes from empty + * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag + * signals the condition variable, then unlocks the mutex. + * + * This thread waits for the condition variable to be signaled, then checks that the + * msg_ready flag has actually been set (pthreads does not guarantee against false + * wakes). If so, it holds the mutex lock while reading any messages in its queue and + * sending responses over the GSS connection. If it receives an ABORT message from + * the main thread (not currently used), or if sending the GSS message fails (which + * usually indicates that the connection has been lost), the thread begins the shutdown + * process. It still holds the mutex lock at the start of this process. + * + * This begins by setting trpc->shutting_down = 1, then sending a message to the + * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex + * and waits for more messages, again using the mutex and the condition variable. It + * ignores any except for an EXIT_OK message. When that is received, it exits, terminating + * the thread. + * + * When the main thread receives the DISCONNECTED message, it must stop sending messages + * to this thread's message queue. When it is certain that it will not queue any more messages, + * it sends the EXIT_OK message as its last interaction with this thread. Once that message + * has been queued, queueing additional messages may result in deadlock or segfaults. + * (It is ok if there are other messages in the queue, but it must not queue more.) + */ static void *tr_trpc_thread(void *arg) { TALLOC_CTX *tmp_ctx=talloc_new(NULL); @@ -559,7 +649,8 @@ static void *tr_trpc_thread(void *arg) const char *msg_type=NULL; char *encoded_msg=NULL; TR_NAME *peer_gssname=NULL; - int n_sent=0; + int n_sent = 0; + int n_popped = 0; int exit_loop=0; struct trpc_notify_cb_data cb_data={0, @@ -600,53 +691,78 @@ static void *tr_trpc_thread(void *arg) msg=NULL; while(!exit_loop) { - cb_data.msg_ready=0; - pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex)); - /* verify the condition */ - if (cb_data.msg_ready) { - for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) { - msg_type=tr_mq_msg_get_message(msg); - - if (0==strcmp(msg_type, TR_MQMSG_ABORT)) { - exit_loop=1; - break; - } - else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { - encoded_msg=tr_mq_msg_get_payload(msg); - if (encoded_msg==NULL) - tr_notice("tr_trpc_thread: null outgoing TRP message."); - else { - rc = trpc_send_msg(trpc, encoded_msg); - if (rc!=TRP_SUCCESS) { - tr_notice("tr_trpc_thread: trpc_send_msg failed."); - exit_loop=1; - break; - } + trpc_thread_wait_for_message(&cb_data); /* handles locking */ + + n_popped = 0; /* have not popped any messages from the queue */ + n_sent = 0; /* have not sent any messages yet */ + for (msg = trpc_mq_pop(trpc); + msg != NULL; + msg = trpc_mq_pop(trpc)) { + n_popped++; + msg_type = tr_mq_msg_get_message(msg); + if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) { + exit_loop = 1; + break; + } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { + encoded_msg = tr_mq_msg_get_payload(msg); + if (encoded_msg == NULL) + tr_notice("tr_trpc_thread: null outgoing TRP message."); + else { + rc = trpc_send_msg(trpc, encoded_msg); + if (rc == TRP_SUCCESS) { + n_sent++; + } else { + tr_notice("tr_trpc_thread: trpc_send_msg failed."); + /* Assume this means we lost the connection. */ + exit_loop = 1; + break; } } - else - tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); + } else + tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); - tr_mq_msg_free(msg); - } - if (n_sent==0) - tr_err("tr_trpc_thread: notified of msg, but queue empty"); - else - tr_debug("tr_trpc_thread: sent %d messages.", n_sent); + tr_mq_msg_free(msg); } + + /* if n_popped == 0, then n_sent must be zero (it's only set after at + * least one msg is popped) */ + if (n_popped==0) + tr_err("tr_trpc_thread: notified of message, but queue empty"); + else + tr_debug("tr_trpc_thread: sent %d messages.", n_sent); } } - tr_debug("tr_trpc_thread: exiting."); - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH); + tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread."); + trpc->shutting_down = 1; + + /* Send a DISCONNECTED message to the main thread */ + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL); tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ - if (msg==NULL) + if (msg==NULL) { + /* can't notify main thread of exit - just do it and hope for the best */ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); - else + } else { trps_mq_add(trps, msg); - trpc_mq_clear(trpc); /* clear any queued messages */ + /* DISCONNECTED sent, now wait for an acknowledgement before exiting */ + exit_loop = 0; + while (!exit_loop) { + trpc_thread_wait_for_message(&cb_data); + + while (NULL != (msg = trpc_mq_pop(trpc))) { + msg_type = tr_mq_msg_get_message(msg); + + /* ignore anything except an EXIT_OK */ + if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) { + exit_loop = 1; + break; /* skip any further messages */ + } + } + } + } + tr_debug("tr_trpc_thread: thread terminating."); talloc_free(tmp_ctx); return NULL; } @@ -839,6 +955,7 @@ void tr_config_changed(TR_CFG *new_cfg, void *cookie) { TR_INSTANCE *tr=talloc_get_type_abort(cookie, TR_INSTANCE); TRPS_INSTANCE *trps=tr->trps; + char *table_str=NULL; tr->cfgwatch->poll_interval.tv_sec=new_cfg->internal->cfg_poll_interval; tr->cfgwatch->poll_interval.tv_usec=0; @@ -846,6 +963,20 @@ void tr_config_changed(TR_CFG *new_cfg, void *cookie) tr->cfgwatch->settling_time.tv_sec=new_cfg->internal->cfg_settling_time; tr->cfgwatch->settling_time.tv_usec=0; + /* These need to be updated */ + tr->tids->hostname = new_cfg->internal->hostname; + tr->mons->hostname = new_cfg->internal->hostname; + + /* Update the authorized monitoring gss names */ + if (tr->mons->authorized_gss_names) { + tr_debug("tr_config_changed: freeing tr->mons->authorized_gss_names"); + tr_gss_names_free(tr->mons->authorized_gss_names); + } + tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials); + if (tr->mons->authorized_gss_names == NULL) { + tr_err("tr_config_changed: Error configuring monitoring credentials"); + } + trps_set_connect_interval(trps, new_cfg->internal->trp_connect_interval); trps_set_update_interval(trps, new_cfg->internal->trp_update_interval); trps_set_sweep_interval(trps, new_cfg->internal->trp_sweep_interval); @@ -857,6 +988,15 @@ void tr_config_changed(TR_CFG *new_cfg, void *cookie) trps_update_active_routes(trps); /* find new routes */ trps_update(trps, TRP_UPDATE_TRIGGERED); /* send any triggered routes */ tr_print_config(new_cfg); - tr_trps_print_route_table(trps, stderr); + table_str=tr_trps_route_table_to_str(NULL, trps); + if (table_str!=NULL) { + tr_info(table_str); + talloc_free(table_str); + } + table_str=tr_trps_comm_table_to_str(NULL, trps); + if (table_str!=NULL) { + tr_info(table_str); + talloc_free(table_str); + } }