void *notify_cb_arg;
};
+/* message string for sending trpc messages */
+#define TR_MQMSG_TRPC_SEND "trpc send msg"
+
TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg);
void tr_mq_msg_free(TR_MQ_MSG *msg);
const char *tr_mq_msg_get_message(TR_MQ_MSG *msg);
struct event *sweep_ev;
} TR_TRPS_EVENTS;
+/* messages between threads */
+#define TR_MQMSG_TRPC_DISCONNECTED "trpc disconnected"
+#define TR_MQMSG_TRPC_CONNECTED "trpc connected"
+#define TR_MQMSG_TRPS_DISCONNECTED "trps disconnected"
+#define TR_MQMSG_TRPS_CONNECTED "trps connected"
+#define TR_MQMSG_ABORT "abort"
+
/* prototypes */
TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx);
TRP_RC tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr,
TR_TRPS_EVENTS *trps_ev);
+TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg);
TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer);
void tr_config_changed(TR_CFG *new_cfg, void *cookie);
TRP_RC tr_connect_to_peers(TRPS_INSTANCE *trps);
typedef enum trp_connection_status {
TRP_CONNECTION_DOWN=0,
TRP_CONNECTION_UP,
+ TRP_CONNECTION_UNKNOWN,
} TRP_CONNECTION_STATUS;
typedef struct trp_connection TRP_CONNECTION;
TR_NAME *peer;
gss_ctx_id_t *gssctx;
TRP_CONNECTION_STATUS status;
+ void (*status_change_cb)(TRP_CONNECTION *conn, void *cookie);
+ void *status_change_cookie;
};
typedef TRP_RC (*TRPS_MSG_FUNC)(TRPS_INSTANCE *, TRP_CONNECTION *, TR_MSG *);
TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx);
void trps_free (TRPS_INSTANCE *trps);
+TRP_RC trps_init_rtable(TRPS_INSTANCE *trps);
+void trps_clear_rtable(TRPS_INSTANCE *trps);
void trps_set_connect_interval(TRPS_INSTANCE *trps, unsigned int interval);
unsigned int trps_get_connect_interval(TRPS_INSTANCE *trps);
void trps_set_update_interval(TRPS_INSTANCE *trps, unsigned int interval);
TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps);
void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn);
+TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps);
TRP_RC trps_handle_tr_msg(TRPS_INSTANCE *trps, TR_MSG *tr_msg);
TRP_RENTRY *trps_get_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer);
TRP_RENTRY *trps_get_selected_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm);
TR_NAME *trps_get_next_hop(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm);
TRP_RC trps_sweep_routes(TRPS_INSTANCE *trps);
+TRP_RC trps_add_route(TRPS_INSTANCE *trps, TRP_RENTRY *route);
TRP_RC trps_add_peer(TRPS_INSTANCE *trps, TRP_PEER *peer);
TRP_PEER *trps_get_peer(TRPS_INSTANCE *trps, TR_NAME *gssname);
TRP_RC trps_scheduled_update(TRPS_INSTANCE *trps);
void trp_rtable_free(TRP_RTABLE *rtbl);
void trp_rtable_add(TRP_RTABLE *rtbl, TRP_RENTRY *entry); /* adds or updates */
void trp_rtable_remove(TRP_RTABLE *rtbl, TRP_RENTRY *entry);
+void trp_rtable_clear(TRP_RTABLE *rtbl);
size_t trp_rtable_size(TRP_RTABLE *rtbl);
size_t trp_rtable_apc_size(TRP_RTABLE *rtbl, TR_NAME *apc);
size_t trp_rtable_realm_size(TRP_RTABLE *rtbl, TR_NAME *apc, TR_NAME *realm);
tr_debug("tr_trps_thread: started");
trps_handle_connection(trps, conn);
- msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED);
tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
if (msg==NULL)
tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
static void tr_trpc_abort(TRPC_INSTANCE *trpc)
{
TALLOC_CTX *tmp_ctx=talloc_new(NULL);
- TR_MQ_MSG *msg=tr_mq_msg_new(tmp_ctx, "trpc_abort");
+ TR_MQ_MSG *msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_ABORT);
tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
trpc_mq_append(msg); /* gives msg over to the queue to manage */
msg=trps_mq_pop(trps);
while (msg!=NULL) {
s=tr_mq_msg_get_message(msg);
- if (0==strcmp(s, "trps_thread_exit")) {
+ if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
tr_trps_cleanup_conn(trps,
talloc_get_type_abort(tr_mq_msg_get_payload(msg),
TRP_CONNECTION));
}
- else if (0==strcmp(s, "trpc_thread_exit")) {
+ else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
/* trpc connection died */
tr_trps_cleanup_trpc(trps,
talloc_get_type_abort(tr_mq_msg_get_payload(msg),
msg_type=tr_mq_msg_get_message(msg);
- if (0==strcmp(msg_type, "trpc_abort")) {
+ if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
tr_mq_msg_free(msg);
break; /* exit loop */
}
- else if (0==strcmp(msg_type, "trpc_send")) {
+ else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
encoded_msg=tr_mq_msg_get_payload(msg);
if (encoded_msg==NULL)
tr_notice("tr_trpc_thread: null outgoing TRP message.");
}
}
- msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED);
tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
if (msg==NULL)
tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
return NULL;
}
+/* convert an IDP realm into routing table entries. Outputs number in *n_routes */
+static TRP_RENTRY **tr_make_local_routes(TALLOC_CTX *mem_ctx,
+ TR_IDP_REALM *realm,
+ char *trust_router,
+ size_t *n_routes)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TR_APC *apc=NULL;
+ TRP_RENTRY *new_entry=NULL;
+ TRP_RENTRY **entries=NULL;
+ size_t n_apcs=0, ii=0;
+
+ *n_routes=0;
+
+ if (realm==NULL)
+ goto cleanup;
+
+ /* count apcs */
+ for (apc=realm->apcs, n_apcs=0; apc!=NULL; apc=apc->next,n_apcs++) {}
+
+ entries=talloc_array(tmp_ctx, TRP_RENTRY *, n_apcs);
+ for (apc=realm->apcs,ii=0; apc!=NULL; apc=apc->next, ii++) {
+ new_entry=trp_rentry_new(entries);
+ if (new_entry==NULL) {
+ tr_crit("tr_make_local_routes: unable to allocate entry.");
+ talloc_free(entries);
+ goto cleanup;
+ }
+ trp_rentry_set_apc(new_entry, tr_dup_name(apc->id));
+ trp_rentry_set_realm(new_entry, tr_dup_name(realm->realm_id));
+ trp_rentry_set_peer(new_entry, tr_new_name("")); /* no peer, it's us */
+ trp_rentry_set_metric(new_entry, 0);
+ trp_rentry_set_trust_router(new_entry, tr_new_name(trust_router));
+ trp_rentry_set_next_hop(new_entry, tr_new_name(""));
+ /* we do not set selected (tbd later) or expiry/interval (not needed for
+ * local routes) */
+ entries[ii]=new_entry;
+ }
+
+ talloc_steal(mem_ctx, entries);
+ *n_routes=n_apcs;
+ cleanup:
+ talloc_free(tmp_ctx);
+ return entries;
+}
+
+struct tr_trpc_status_change_cookie {
+ TRPS_INSTANCE *trps;
+ TRPC_INSTANCE *trpc;
+ TRP_PEER *peer;
+};
+static void tr_trpc_status_change(TRP_CONNECTION *conn, void *cookie)
+{
+ struct tr_trpc_status_change_cookie *cook=talloc_get_type_abort(cookie, struct tr_trpc_status_change_cookie);
+ /*TRPS_INSTANCE *trps=cook->trps;*/
+ /* TRPC_INSTANCE *trpc=cook->trpc;*/
+ TRP_PEER *peer=cook->peer;
+ TR_NAME *gssname=trp_peer_get_gssname(peer);
+
+ if (trp_connection_get_status(conn)==TRP_CONNECTION_UP)
+ tr_debug("tr_trpc_status_change: connection now up.");
+ else
+ tr_debug("tr_trpc_status_change: connection now down.");
+ tr_free_name(gssname);
+}
+
/* starts a trpc thread to connect to server:port */
TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer)
{
TRPC_INSTANCE *trpc=NULL;
TRP_CONNECTION *conn=NULL;
struct trpc_thread_data *thread_data=NULL;
+ struct tr_trpc_status_change_cookie *status_change_cookie=NULL;
tr_debug("tr_trpc_initiate entered");
trpc=trpc_new(tmp_ctx);
tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
goto cleanup;
}
+
+ status_change_cookie=talloc(conn, struct tr_trpc_status_change_cookie);
+ if (status_change_cookie==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate connection status cookie.");
+ goto cleanup;
+ }
+ status_change_cookie->trps=trps;
+ status_change_cookie->trpc=trpc;
+ status_change_cookie->peer=peer;
+ conn->status_change_cookie=status_change_cookie;
+ status_change_cookie=NULL;
+ conn->status_change_cb=tr_trpc_status_change;
+
trpc_set_conn(trpc, conn);
trpc_set_server(trpc, talloc_strdup(trpc, trp_peer_get_server(peer)));
trpc_set_port(trpc, trp_peer_get_port(peer));
return trpc;
}
+/* Add local routes to the route table. */
+TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TR_IDP_REALM *cur=NULL;
+ TRP_RENTRY **local_routes=NULL;
+ size_t n_routes=0;
+ size_t ii=0;
+ char *trust_router_name=talloc_asprintf(tmp_ctx, "%s:%d", cfg->internal->hostname, cfg->internal->trps_port);
+
+ /* determine our trust router name */
+ if (trust_router_name==NULL)
+ return TRP_NOMEM;
+
+ for (cur=cfg->idp_realms; cur!=NULL; cur=cur->next) {
+ local_routes=tr_make_local_routes(tmp_ctx, cur, trust_router_name, &n_routes);
+ for (ii=0; ii<n_routes; ii++)
+ trps_add_route(trps, local_routes[ii]);
+
+ talloc_free(local_routes);
+ local_routes=NULL;
+ n_routes=0;
+ }
+
+ talloc_free(tmp_ctx);
+ return TRP_SUCCESS;
+}
+
/* decide how often to attempt to connect to a peer */
static int tr_conn_attempt_due(TRPS_INSTANCE *trps, TRP_PEER *peer, struct timespec *when)
{
trps_set_connect_interval(trps, new_cfg->internal->trp_connect_interval);
trps_set_update_interval(trps, new_cfg->internal->trp_update_interval);
trps_set_sweep_interval(trps, new_cfg->internal->trp_sweep_interval);
+ trps_clear_rtable(trps); /* should we do this every time??? */
+ tr_add_local_routes(trps, new_cfg); /* should we do this every time??? */
+ trps_update_active_routes(trps);
+ tr_trps_print_route_table(trps, stderr);
}
TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn)
{
- TRP_CONNECTION_STATUS status;
+ TRP_CONNECTION_STATUS status=TRP_CONNECTION_UNKNOWN;
trp_connection_lock(conn);
status=conn->status;
trp_connection_unlock(conn);
static void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status)
{
+ TRP_CONNECTION_STATUS old_status=TRP_CONNECTION_UNKNOWN;
trp_connection_lock(conn);
+ old_status=conn->status;
conn->status=status;
trp_connection_unlock(conn);
+ if ((status!=old_status) && (conn->status_change_cb!=NULL))
+ conn->status_change_cb(conn, conn->status_change_cookie);
}
pthread_t *trp_connection_get_thread(TRP_CONNECTION *conn)
trp_connection_set_fd(new_conn, -1);
trp_connection_set_gssname(new_conn, NULL);
trp_connection_mutex_init(new_conn);
- trp_connection_set_status(new_conn, TRP_CONNECTION_DOWN);
new_conn->peer=NULL; /* no true set function for this */
+ new_conn->status_change_cb=NULL;
+ new_conn->status_change_cookie=NULL;
+ new_conn->status=TRP_CONNECTION_DOWN; /* set directly in the constructor */
+
thread=talloc(new_conn, pthread_t);
- gssctx=talloc(new_conn, gss_ctx_id_t);
- if (gssctx==NULL) {
+ if (thread==NULL) {
talloc_free(new_conn);
return NULL;
}
- trp_connection_set_gssctx(new_conn, gssctx);
- if (thread==NULL) {
+ trp_connection_set_thread(new_conn, thread);
+
+ gssctx=talloc(new_conn, gss_ctx_id_t);
+ if (gssctx==NULL) {
talloc_free(new_conn);
return NULL;
}
- trp_connection_set_thread(new_conn, thread);
+ trp_connection_set_gssctx(new_conn, gssctx);
talloc_set_destructor((void *)new_conn, trp_connection_destructor);
}
return new_conn;
apc_tbl=trp_rtbl_get_or_add_table(rtbl, entry->apc, trp_rtable_destroy_table);
realm_tbl=trp_rtbl_get_or_add_table(apc_tbl, entry->realm, trp_rtable_destroy_rentry);
g_hash_table_insert(realm_tbl, tr_dup_name(entry->peer), entry); /* destroys and replaces a duplicate */
+ /* the route entry should not belong to any context, we will manage it ourselves */
+ talloc_steal(NULL, entry);
}
/* note: the entry pointer passed in is invalid after calling this because the entry is freed */
g_hash_table_remove(rtbl, entry->apc);
}
+void trp_rtable_clear(TRP_RTABLE *rtbl)
+{
+ g_hash_table_remove_all(rtbl); /* destructors should do all the cleanup */
+}
+
/* gets the actual hash table, for internal use only */
static GHashTable *trp_rtable_get_apc_table(TRP_RTABLE *rtbl, TR_NAME *apc)
{
TRP_RENTRY *trp_rtable_get_entry(TRP_RTABLE *rtbl, TR_NAME *apc, TR_NAME *realm, TR_NAME *peer)
{
GHashTable *realm_tbl=NULL;
-
+
realm_tbl=trp_rtable_get_realm_table(rtbl, apc, realm);
if (realm_tbl==NULL)
return NULL;
+
return g_hash_table_lookup(realm_tbl, peer); /* does not copy or increment ref count */
}
return NULL;
}
- trps->rtable=trp_rtable_new();
- if (trps->rtable==NULL) {
+ trps->rtable=NULL;
+ if (trps_init_rtable(trps) != TRP_SUCCESS) {
/* failed to allocate rtable */
talloc_free(trps);
return NULL;
return trps;
}
+/* create a new route table, first discarding an old one if necessary */
+TRP_RC trps_init_rtable(TRPS_INSTANCE *trps)
+{
+ if (trps->rtable != NULL) {
+ trp_rtable_free(trps->rtable);
+ trps->rtable=NULL;
+ }
+
+ trps->rtable=trp_rtable_new();
+ if (trps->rtable==NULL) {
+ return TRP_NOMEM;
+ }
+ return TRP_SUCCESS;
+}
+
+void trps_clear_rtable(TRPS_INSTANCE *trps)
+{
+ trp_rtable_clear(trps->rtable);
+}
+
void trps_free (TRPS_INSTANCE *trps)
{
if (trps!=NULL)
* didn't keep growing, etc. */
tr_warning("trps_send_msg: skipping message queued while TRPC connection not up.");
} else {
- mq_msg=tr_mq_msg_new(tmp_ctx, "trpc_send");
+ mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND);
msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */
trpc_mq_append(trpc, mq_msg);
/* TODO: think this through more carefully. At least ought to add hysteresis
* to avoid flapping between routers or routes. */
-static TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps)
+TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps)
{
size_t n_apc=0, ii=0;
TR_NAME **apc=trp_rtable_get_apcs(trps->rtable, &n_apc);
/* Take the currently selected route unless it is through the peer we're sending the update to.
* I.e., enforce the split horizon rule. */
route=trp_rtable_get_selected_entry(trps->rtable, comm, realm);
+ if (route==NULL) {
+ /* No selected route, this should only happen if the only route has been retracted,
+ * in which case we do not want to advertise it. */
+ return NULL;
+ }
if (0==tr_name_cmp(peer_gssname, trp_rentry_get_peer(route))) {
/* the selected entry goes through the peer we're reporting to, choose an alternate */
route=trps_find_best_route(trps, comm, realm, peer_gssname);
- if (!trp_metric_is_finite(trp_rentry_get_metric(route)))
- route=NULL; /* don't advertise a retracted route */
+ if ((route==NULL) || (!trp_metric_is_finite(trp_rentry_get_metric(route))))
+ route=NULL; /* don't advertise a nonexistent or retracted route */
}
return route;
}
return rc;
}
+TRP_RC trps_add_route(TRPS_INSTANCE *trps, TRP_RENTRY *route)
+{
+ trp_rtable_add(trps->rtable, route); /* should return status */
+ return TRP_SUCCESS;
+}
+
TRP_RC trps_add_peer(TRPS_INSTANCE *trps, TRP_PEER *peer)
{
return trp_ptable_add(trps->ptable, peer);