From e90c9a23023275d59fe6f545a46a8ec61ab65608 Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Tue, 26 Jul 2016 14:26:49 -0400 Subject: [PATCH] Initialize route table with local routes. Fix bugs. --- include/tr_mq.h | 3 ++ include/tr_trp.h | 8 ++++ include/trp_internal.h | 7 +++ include/trp_rtable.h | 1 + tr/tr_trp.c | 126 ++++++++++++++++++++++++++++++++++++++++++++++--- trp/trp_conn.c | 22 ++++++--- trp/trp_rtable.c | 10 +++- trp/trps.c | 43 ++++++++++++++--- 8 files changed, 199 insertions(+), 21 deletions(-) diff --git a/include/tr_mq.h b/include/tr_mq.h index a6f472c..29c8d5f 100644 --- a/include/tr_mq.h +++ b/include/tr_mq.h @@ -25,6 +25,9 @@ struct tr_mq { void *notify_cb_arg; }; +/* message string for sending trpc messages */ +#define TR_MQMSG_TRPC_SEND "trpc send msg" + TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg); void tr_mq_msg_free(TR_MQ_MSG *msg); const char *tr_mq_msg_get_message(TR_MQ_MSG *msg); diff --git a/include/tr_trp.h b/include/tr_trp.h index 87f4ce0..2e5e00d 100644 --- a/include/tr_trp.h +++ b/include/tr_trp.h @@ -17,10 +17,18 @@ typedef struct tr_trps_events { struct event *sweep_ev; } TR_TRPS_EVENTS; +/* messages between threads */ +#define TR_MQMSG_TRPC_DISCONNECTED "trpc disconnected" +#define TR_MQMSG_TRPC_CONNECTED "trpc connected" +#define TR_MQMSG_TRPS_DISCONNECTED "trps disconnected" +#define TR_MQMSG_TRPS_CONNECTED "trps connected" +#define TR_MQMSG_ABORT "abort" + /* prototypes */ TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx); TRP_RC tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr, TR_TRPS_EVENTS *trps_ev); +TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg); TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer); void tr_config_changed(TR_CFG *new_cfg, void *cookie); TRP_RC tr_connect_to_peers(TRPS_INSTANCE *trps); diff --git a/include/trp_internal.h b/include/trp_internal.h index 20e54b8..96a0010 100644 --- a/include/trp_internal.h +++ b/include/trp_internal.h @@ -53,6 +53,7 @@ typedef struct trps_instance TRPS_INSTANCE; typedef enum trp_connection_status { TRP_CONNECTION_DOWN=0, TRP_CONNECTION_UP, + TRP_CONNECTION_UNKNOWN, } TRP_CONNECTION_STATUS; typedef struct trp_connection TRP_CONNECTION; @@ -65,6 +66,8 @@ struct trp_connection { TR_NAME *peer; gss_ctx_id_t *gssctx; TRP_CONNECTION_STATUS status; + void (*status_change_cb)(TRP_CONNECTION *conn, void *cookie); + void *status_change_cookie; }; typedef TRP_RC (*TRPS_MSG_FUNC)(TRPS_INSTANCE *, TRP_CONNECTION *, TR_MSG *); @@ -152,6 +155,8 @@ TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content); TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx); void trps_free (TRPS_INSTANCE *trps); +TRP_RC trps_init_rtable(TRPS_INSTANCE *trps); +void trps_clear_rtable(TRPS_INSTANCE *trps); void trps_set_connect_interval(TRPS_INSTANCE *trps, unsigned int interval); unsigned int trps_get_connect_interval(TRPS_INSTANCE *trps); void trps_set_update_interval(TRPS_INSTANCE *trps, unsigned int interval); @@ -173,11 +178,13 @@ int trps_get_listener(TRPS_INSTANCE *trps, TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps); void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg); void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn); +TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps); TRP_RC trps_handle_tr_msg(TRPS_INSTANCE *trps, TR_MSG *tr_msg); TRP_RENTRY *trps_get_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer); TRP_RENTRY *trps_get_selected_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm); TR_NAME *trps_get_next_hop(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm); TRP_RC trps_sweep_routes(TRPS_INSTANCE *trps); +TRP_RC trps_add_route(TRPS_INSTANCE *trps, TRP_RENTRY *route); TRP_RC trps_add_peer(TRPS_INSTANCE *trps, TRP_PEER *peer); TRP_PEER *trps_get_peer(TRPS_INSTANCE *trps, TR_NAME *gssname); TRP_RC trps_scheduled_update(TRPS_INSTANCE *trps); diff --git a/include/trp_rtable.h b/include/trp_rtable.h index e9ebd6f..36d6eb9 100644 --- a/include/trp_rtable.h +++ b/include/trp_rtable.h @@ -25,6 +25,7 @@ TRP_RTABLE *trp_rtable_new(void); void trp_rtable_free(TRP_RTABLE *rtbl); void trp_rtable_add(TRP_RTABLE *rtbl, TRP_RENTRY *entry); /* adds or updates */ void trp_rtable_remove(TRP_RTABLE *rtbl, TRP_RENTRY *entry); +void trp_rtable_clear(TRP_RTABLE *rtbl); size_t trp_rtable_size(TRP_RTABLE *rtbl); size_t trp_rtable_apc_size(TRP_RTABLE *rtbl, TR_NAME *apc); size_t trp_rtable_realm_size(TRP_RTABLE *rtbl, TR_NAME *apc, TR_NAME *realm); diff --git a/tr/tr_trp.c b/tr/tr_trp.c index 67a5c88..bf06d1c 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -104,7 +104,7 @@ static void *tr_trps_thread(void *arg) tr_debug("tr_trps_thread: started"); trps_handle_connection(trps, conn); - msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit"); + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED); tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */ if (msg==NULL) tr_err("tr_trps_thread: error allocating TR_MQ_MSG"); @@ -165,7 +165,7 @@ static void tr_trps_cleanup_conn(TRPS_INSTANCE *trps, TRP_CONNECTION *conn) static void tr_trpc_abort(TRPC_INSTANCE *trpc) { TALLOC_CTX *tmp_ctx=talloc_new(NULL); - TR_MQ_MSG *msg=tr_mq_msg_new(tmp_ctx, "trpc_abort"); + TR_MQ_MSG *msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_ABORT); tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */ trpc_mq_append(msg); /* gives msg over to the queue to manage */ @@ -202,12 +202,12 @@ static void tr_trps_process_mq(int socket, short event, void *arg) msg=trps_mq_pop(trps); while (msg!=NULL) { s=tr_mq_msg_get_message(msg); - if (0==strcmp(s, "trps_thread_exit")) { + if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) { tr_trps_cleanup_conn(trps, talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION)); } - else if (0==strcmp(s, "trpc_thread_exit")) { + else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) { /* trpc connection died */ tr_trps_cleanup_trpc(trps, talloc_get_type_abort(tr_mq_msg_get_payload(msg), @@ -478,11 +478,11 @@ static void *tr_trpc_thread(void *arg) msg_type=tr_mq_msg_get_message(msg); - if (0==strcmp(msg_type, "trpc_abort")) { + if (0==strcmp(msg_type, TR_MQMSG_ABORT)) { tr_mq_msg_free(msg); break; /* exit loop */ } - else if (0==strcmp(msg_type, "trpc_send")) { + else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { encoded_msg=tr_mq_msg_get_payload(msg); if (encoded_msg==NULL) tr_notice("tr_trpc_thread: null outgoing TRP message."); @@ -503,7 +503,7 @@ static void *tr_trpc_thread(void *arg) } } - msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit"); + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED); tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ if (msg==NULL) tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); @@ -514,6 +514,72 @@ static void *tr_trpc_thread(void *arg) return NULL; } +/* convert an IDP realm into routing table entries. Outputs number in *n_routes */ +static TRP_RENTRY **tr_make_local_routes(TALLOC_CTX *mem_ctx, + TR_IDP_REALM *realm, + char *trust_router, + size_t *n_routes) +{ + TALLOC_CTX *tmp_ctx=talloc_new(NULL); + TR_APC *apc=NULL; + TRP_RENTRY *new_entry=NULL; + TRP_RENTRY **entries=NULL; + size_t n_apcs=0, ii=0; + + *n_routes=0; + + if (realm==NULL) + goto cleanup; + + /* count apcs */ + for (apc=realm->apcs, n_apcs=0; apc!=NULL; apc=apc->next,n_apcs++) {} + + entries=talloc_array(tmp_ctx, TRP_RENTRY *, n_apcs); + for (apc=realm->apcs,ii=0; apc!=NULL; apc=apc->next, ii++) { + new_entry=trp_rentry_new(entries); + if (new_entry==NULL) { + tr_crit("tr_make_local_routes: unable to allocate entry."); + talloc_free(entries); + goto cleanup; + } + trp_rentry_set_apc(new_entry, tr_dup_name(apc->id)); + trp_rentry_set_realm(new_entry, tr_dup_name(realm->realm_id)); + trp_rentry_set_peer(new_entry, tr_new_name("")); /* no peer, it's us */ + trp_rentry_set_metric(new_entry, 0); + trp_rentry_set_trust_router(new_entry, tr_new_name(trust_router)); + trp_rentry_set_next_hop(new_entry, tr_new_name("")); + /* we do not set selected (tbd later) or expiry/interval (not needed for + * local routes) */ + entries[ii]=new_entry; + } + + talloc_steal(mem_ctx, entries); + *n_routes=n_apcs; + cleanup: + talloc_free(tmp_ctx); + return entries; +} + +struct tr_trpc_status_change_cookie { + TRPS_INSTANCE *trps; + TRPC_INSTANCE *trpc; + TRP_PEER *peer; +}; +static void tr_trpc_status_change(TRP_CONNECTION *conn, void *cookie) +{ + struct tr_trpc_status_change_cookie *cook=talloc_get_type_abort(cookie, struct tr_trpc_status_change_cookie); + /*TRPS_INSTANCE *trps=cook->trps;*/ + /* TRPC_INSTANCE *trpc=cook->trpc;*/ + TRP_PEER *peer=cook->peer; + TR_NAME *gssname=trp_peer_get_gssname(peer); + + if (trp_connection_get_status(conn)==TRP_CONNECTION_UP) + tr_debug("tr_trpc_status_change: connection now up."); + else + tr_debug("tr_trpc_status_change: connection now down."); + tr_free_name(gssname); +} + /* starts a trpc thread to connect to server:port */ TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer) { @@ -521,6 +587,7 @@ TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer) TRPC_INSTANCE *trpc=NULL; TRP_CONNECTION *conn=NULL; struct trpc_thread_data *thread_data=NULL; + struct tr_trpc_status_change_cookie *status_change_cookie=NULL; tr_debug("tr_trpc_initiate entered"); trpc=trpc_new(tmp_ctx); @@ -535,6 +602,19 @@ TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer) tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION."); goto cleanup; } + + status_change_cookie=talloc(conn, struct tr_trpc_status_change_cookie); + if (status_change_cookie==NULL) { + tr_crit("tr_trpc_initiate: could not allocate connection status cookie."); + goto cleanup; + } + status_change_cookie->trps=trps; + status_change_cookie->trpc=trpc; + status_change_cookie->peer=peer; + conn->status_change_cookie=status_change_cookie; + status_change_cookie=NULL; + conn->status_change_cb=tr_trpc_status_change; + trpc_set_conn(trpc, conn); trpc_set_server(trpc, talloc_strdup(trpc, trp_peer_get_server(peer))); trpc_set_port(trpc, trp_peer_get_port(peer)); @@ -561,6 +641,34 @@ TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer) return trpc; } +/* Add local routes to the route table. */ +TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg) +{ + TALLOC_CTX *tmp_ctx=talloc_new(NULL); + TR_IDP_REALM *cur=NULL; + TRP_RENTRY **local_routes=NULL; + size_t n_routes=0; + size_t ii=0; + char *trust_router_name=talloc_asprintf(tmp_ctx, "%s:%d", cfg->internal->hostname, cfg->internal->trps_port); + + /* determine our trust router name */ + if (trust_router_name==NULL) + return TRP_NOMEM; + + for (cur=cfg->idp_realms; cur!=NULL; cur=cur->next) { + local_routes=tr_make_local_routes(tmp_ctx, cur, trust_router_name, &n_routes); + for (ii=0; iiinternal->trp_connect_interval); trps_set_update_interval(trps, new_cfg->internal->trp_update_interval); trps_set_sweep_interval(trps, new_cfg->internal->trp_sweep_interval); + trps_clear_rtable(trps); /* should we do this every time??? */ + tr_add_local_routes(trps, new_cfg); /* should we do this every time??? */ + trps_update_active_routes(trps); + tr_trps_print_route_table(trps, stderr); } diff --git a/trp/trp_conn.c b/trp/trp_conn.c index dc3c961..c002cca 100644 --- a/trp/trp_conn.c +++ b/trp/trp_conn.c @@ -117,7 +117,7 @@ void trp_connection_set_gssctx(TRP_CONNECTION *conn, gss_ctx_id_t *gssctx) TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn) { - TRP_CONNECTION_STATUS status; + TRP_CONNECTION_STATUS status=TRP_CONNECTION_UNKNOWN; trp_connection_lock(conn); status=conn->status; trp_connection_unlock(conn); @@ -126,9 +126,13 @@ TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn) static void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status) { + TRP_CONNECTION_STATUS old_status=TRP_CONNECTION_UNKNOWN; trp_connection_lock(conn); + old_status=conn->status; conn->status=status; trp_connection_unlock(conn); + if ((status!=old_status) && (conn->status_change_cb!=NULL)) + conn->status_change_cb(conn, conn->status_change_cookie); } pthread_t *trp_connection_get_thread(TRP_CONNECTION *conn) @@ -223,20 +227,24 @@ TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx) trp_connection_set_fd(new_conn, -1); trp_connection_set_gssname(new_conn, NULL); trp_connection_mutex_init(new_conn); - trp_connection_set_status(new_conn, TRP_CONNECTION_DOWN); new_conn->peer=NULL; /* no true set function for this */ + new_conn->status_change_cb=NULL; + new_conn->status_change_cookie=NULL; + new_conn->status=TRP_CONNECTION_DOWN; /* set directly in the constructor */ + thread=talloc(new_conn, pthread_t); - gssctx=talloc(new_conn, gss_ctx_id_t); - if (gssctx==NULL) { + if (thread==NULL) { talloc_free(new_conn); return NULL; } - trp_connection_set_gssctx(new_conn, gssctx); - if (thread==NULL) { + trp_connection_set_thread(new_conn, thread); + + gssctx=talloc(new_conn, gss_ctx_id_t); + if (gssctx==NULL) { talloc_free(new_conn); return NULL; } - trp_connection_set_thread(new_conn, thread); + trp_connection_set_gssctx(new_conn, gssctx); talloc_set_destructor((void *)new_conn, trp_connection_destructor); } return new_conn; diff --git a/trp/trp_rtable.c b/trp/trp_rtable.c index 6fbbd6f..e69f2a2 100644 --- a/trp/trp_rtable.c +++ b/trp/trp_rtable.c @@ -255,6 +255,8 @@ void trp_rtable_add(TRP_RTABLE *rtbl, TRP_RENTRY *entry) apc_tbl=trp_rtbl_get_or_add_table(rtbl, entry->apc, trp_rtable_destroy_table); realm_tbl=trp_rtbl_get_or_add_table(apc_tbl, entry->realm, trp_rtable_destroy_rentry); g_hash_table_insert(realm_tbl, tr_dup_name(entry->peer), entry); /* destroys and replaces a duplicate */ + /* the route entry should not belong to any context, we will manage it ourselves */ + talloc_steal(NULL, entry); } /* note: the entry pointer passed in is invalid after calling this because the entry is freed */ @@ -281,6 +283,11 @@ void trp_rtable_remove(TRP_RTABLE *rtbl, TRP_RENTRY *entry) g_hash_table_remove(rtbl, entry->apc); } +void trp_rtable_clear(TRP_RTABLE *rtbl) +{ + g_hash_table_remove_all(rtbl); /* destructors should do all the cleanup */ +} + /* gets the actual hash table, for internal use only */ static GHashTable *trp_rtable_get_apc_table(TRP_RTABLE *rtbl, TR_NAME *apc) { @@ -537,10 +544,11 @@ TR_NAME **trp_rtable_get_apc_realm_peers(TRP_RTABLE *rtbl, TR_NAME *apc, TR_NAME TRP_RENTRY *trp_rtable_get_entry(TRP_RTABLE *rtbl, TR_NAME *apc, TR_NAME *realm, TR_NAME *peer) { GHashTable *realm_tbl=NULL; - + realm_tbl=trp_rtable_get_realm_table(rtbl, apc, realm); if (realm_tbl==NULL) return NULL; + return g_hash_table_lookup(realm_tbl, peer); /* does not copy or increment ref count */ } diff --git a/trp/trps.c b/trp/trps.c index ae177eb..02bb371 100644 --- a/trp/trps.c +++ b/trp/trps.c @@ -47,8 +47,8 @@ TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx) return NULL; } - trps->rtable=trp_rtable_new(); - if (trps->rtable==NULL) { + trps->rtable=NULL; + if (trps_init_rtable(trps) != TRP_SUCCESS) { /* failed to allocate rtable */ talloc_free(trps); return NULL; @@ -59,6 +59,26 @@ TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx) return trps; } +/* create a new route table, first discarding an old one if necessary */ +TRP_RC trps_init_rtable(TRPS_INSTANCE *trps) +{ + if (trps->rtable != NULL) { + trp_rtable_free(trps->rtable); + trps->rtable=NULL; + } + + trps->rtable=trp_rtable_new(); + if (trps->rtable==NULL) { + return TRP_NOMEM; + } + return TRP_SUCCESS; +} + +void trps_clear_rtable(TRPS_INSTANCE *trps) +{ + trp_rtable_clear(trps->rtable); +} + void trps_free (TRPS_INSTANCE *trps) { if (trps!=NULL) @@ -174,7 +194,7 @@ TRP_RC trps_send_msg(TRPS_INSTANCE *trps, TRP_PEER *peer, const char *msg) * didn't keep growing, etc. */ tr_warning("trps_send_msg: skipping message queued while TRPC connection not up."); } else { - mq_msg=tr_mq_msg_new(tmp_ctx, "trpc_send"); + mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND); msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */ tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */ trpc_mq_append(trpc, mq_msg); @@ -631,7 +651,7 @@ static TRP_RENTRY *trps_find_best_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_N /* TODO: think this through more carefully. At least ought to add hysteresis * to avoid flapping between routers or routes. */ -static TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps) +TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps) { size_t n_apc=0, ii=0; TR_NAME **apc=trp_rtable_get_apcs(trps->rtable, &n_apc); @@ -751,11 +771,16 @@ static TRP_RENTRY *trps_select_realm_update(TRPS_INSTANCE *trps, TR_NAME *comm, /* Take the currently selected route unless it is through the peer we're sending the update to. * I.e., enforce the split horizon rule. */ route=trp_rtable_get_selected_entry(trps->rtable, comm, realm); + if (route==NULL) { + /* No selected route, this should only happen if the only route has been retracted, + * in which case we do not want to advertise it. */ + return NULL; + } if (0==tr_name_cmp(peer_gssname, trp_rentry_get_peer(route))) { /* the selected entry goes through the peer we're reporting to, choose an alternate */ route=trps_find_best_route(trps, comm, realm, peer_gssname); - if (!trp_metric_is_finite(trp_rentry_get_metric(route))) - route=NULL; /* don't advertise a retracted route */ + if ((route==NULL) || (!trp_metric_is_finite(trp_rentry_get_metric(route)))) + route=NULL; /* don't advertise a nonexistent or retracted route */ } return route; } @@ -903,6 +928,12 @@ cleanup: return rc; } +TRP_RC trps_add_route(TRPS_INSTANCE *trps, TRP_RENTRY *route) +{ + trp_rtable_add(trps->rtable, route); /* should return status */ + return TRP_SUCCESS; +} + TRP_RC trps_add_peer(TRPS_INSTANCE *trps, TRP_PEER *peer) { return trp_ptable_add(trps->ptable, peer); -- 2.1.4