+
+struct trpc_notify_cb_data {
+ int msg_ready;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+};
+
+static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
+{
+ struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
+ pthread_mutex_lock(&(cb_data->mutex));
+ if (!cb_data->msg_ready) {
+ cb_data->msg_ready=1;
+ pthread_cond_signal(&(cb_data->cond));
+ }
+ pthread_mutex_unlock(&(cb_data->mutex));
+}
+
+/* data passed to thread */
+struct trpc_thread_data {
+ TRPC_INSTANCE *trpc;
+ TRPS_INSTANCE *trps;
+};
+static void *tr_trpc_thread(void *arg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
+ TRPC_INSTANCE *trpc=thread_data->trpc;
+ TRPS_INSTANCE *trps=thread_data->trps;
+ TRP_RC rc=TRP_ERROR;
+ TR_MQ_MSG *msg=NULL;
+ const char *msg_type=NULL;
+ char *encoded_msg=NULL;
+ TR_NAME *peer_gssname=NULL;
+ int n_sent=0;
+ int exit_loop=0;
+
+ struct trpc_notify_cb_data cb_data={0,
+ PTHREAD_COND_INITIALIZER,
+ PTHREAD_MUTEX_INITIALIZER};
+
+ tr_debug("tr_trpc_thread: started");
+
+ /* set up the mq for receiving */
+ pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
+
+ tr_mq_lock(trpc->mq);
+ tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
+ tr_mq_unlock(trpc->mq);
+
+ rc=trpc_connect(trpc);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
+ trpc_get_server(trpc),
+ trpc_get_port(trpc));
+ } else {
+ peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc));
+ if (peer_gssname==NULL) {
+ tr_err("tr_trpc_thread: could not duplicate peer_gssname.");
+ talloc_free(tmp_ctx);
+ return NULL;
+ }
+ tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf);
+
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH);
+ tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper);
+ if (msg==NULL) {
+ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+ talloc_free(tmp_ctx);
+ return NULL;
+ }
+ trps_mq_add(trps, msg); /* steals msg context */
+ msg=NULL;
+
+ while(!exit_loop) {
+ cb_data.msg_ready=0;
+ pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
+ /* verify the condition */
+ if (cb_data.msg_ready) {
+ for (msg=trpc_mq_pop(trpc),n_sent=0; msg!=NULL; msg=trpc_mq_pop(trpc),n_sent++) {
+ msg_type=tr_mq_msg_get_message(msg);
+
+ if (0==strcmp(msg_type, TR_MQMSG_ABORT)) {
+ exit_loop=1;
+ break;
+ }
+ else if (0==strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
+ encoded_msg=tr_mq_msg_get_payload(msg);
+ if (encoded_msg==NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ exit_loop=1;
+ break;
+ }
+ }
+ }
+ else
+ tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+
+ tr_mq_msg_free(msg);
+ }
+ if (n_sent==0)
+ tr_err("tr_trpc_thread: notified of msg, but queue empty");
+ else
+ tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
+ }
+ }
+ }
+
+ tr_debug("tr_trpc_thread: exiting.");
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH);
+ tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
+ if (msg==NULL)
+ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+ else
+ trps_mq_add(trps, msg);
+
+ trpc_mq_clear(trpc); /* clear any queued messages */
+
+ talloc_free(tmp_ctx);
+ return NULL;
+}
+
+/* convert an IDP realm into routing table entries. Outputs number in *n_routes */
+static TRP_ROUTE **tr_make_local_routes(TALLOC_CTX *mem_ctx,
+ TR_IDP_REALM *realm,
+ char *trust_router,
+ size_t *n_routes)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TR_APC *comm=NULL;
+ TRP_ROUTE *new_entry=NULL;
+ TRP_ROUTE **entries=NULL;
+ size_t n_comms=0, ii=0;
+
+ *n_routes=0;
+
+ if ((realm==NULL) || (realm->origin!=TR_REALM_LOCAL))
+ goto cleanup;
+
+ /* count comms */
+ for (comm=realm->apcs, n_comms=0; comm!=NULL; comm=comm->next,n_comms++) {}
+
+ entries=talloc_array(tmp_ctx, TRP_ROUTE *, n_comms);
+ for (comm=realm->apcs,ii=0; comm!=NULL; comm=comm->next, ii++) {
+ new_entry=trp_route_new(entries);
+ if (new_entry==NULL) {
+ tr_crit("tr_make_local_routes: unable to allocate entry.");
+ talloc_free(entries);
+ goto cleanup;
+ }
+ trp_route_set_comm(new_entry, tr_dup_name(comm->id));
+ trp_route_set_realm(new_entry, tr_dup_name(realm->realm_id));
+ trp_route_set_peer(new_entry, tr_new_name("")); /* no peer, it's us */
+ trp_route_set_metric(new_entry, 0);
+ trp_route_set_trust_router(new_entry, tr_new_name(trust_router));
+ trp_route_set_next_hop(new_entry, tr_new_name(""));
+ trp_route_set_local(new_entry, 1);
+ entries[ii]=new_entry;
+ }
+
+ talloc_steal(mem_ctx, entries);
+ *n_routes=n_comms;
+ cleanup:
+ talloc_free(tmp_ctx);
+ return entries;
+}
+
+void tr_peer_status_change(TRP_PEER *peer, void *cookie)
+{
+ TRPS_INSTANCE *trps=talloc_get_type_abort(cookie, TRPS_INSTANCE);
+
+ if (TRP_SUCCESS!=trps_wildcard_route_req(trps, trp_peer_get_servicename(peer)))
+ tr_err("tr_send_wildcard: error sending wildcard route request.");
+}
+
+/* starts a trpc thread to connect to server:port */
+TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TRPC_INSTANCE *trpc=NULL;
+ TRP_CONNECTION *conn=NULL;
+ struct trpc_thread_data *thread_data=NULL;
+ TRP_RC rc=TRP_ERROR;
+
+ tr_debug("tr_trpc_initiate entered");
+ trpc=trpc_new(tmp_ctx);
+ if (trpc==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
+ rc=TRP_NOMEM;
+ goto cleanup;
+ }
+ tr_debug("tr_trpc_initiate: allocated trpc");
+
+ conn=trp_connection_new(trpc);
+ if (conn==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
+ rc=TRP_NOMEM;
+ goto cleanup;
+ }
+
+ trpc_set_conn(trpc, conn);
+ trpc_set_server(trpc, talloc_strdup(trpc, trp_peer_get_server(peer)));
+ trpc_set_port(trpc, trp_peer_get_port(peer));
+ trpc_set_gssname(trpc, trp_peer_dup_servicename(peer));
+ tr_debug("tr_trpc_initiate: allocated connection");
+
+ /* start thread */
+ thread_data=talloc(trpc, struct trpc_thread_data);
+ if (thread_data==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
+ rc=TRP_NOMEM;
+ goto cleanup;
+ }
+ thread_data->trpc=trpc;
+ thread_data->trps=trps;
+
+ trps_add_trpc(trps, trpc); /* must add before starting thread */
+ pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
+
+ tr_debug("tr_trpc_initiate: started trpc thread");
+ rc=TRP_SUCCESS;
+
+ cleanup:
+ talloc_free(tmp_ctx);
+ return rc;
+}
+
+/* Add local routes to the route table. */
+TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TR_IDP_REALM *cur=NULL;
+ TRP_ROUTE **local_routes=NULL;
+ size_t n_routes=0;
+ size_t ii=0;
+ char *trust_router_name=talloc_asprintf(tmp_ctx, "%s:%d", cfg->internal->hostname, cfg->internal->trps_port);
+
+ /* determine our trust router name */
+ if (trust_router_name==NULL)
+ return TRP_NOMEM;
+
+ for (cur=cfg->ctable->idp_realms; cur!=NULL; cur=cur->next) {
+ local_routes=tr_make_local_routes(tmp_ctx, cur, trust_router_name, &n_routes);
+ for (ii=0; ii<n_routes; ii++)
+ trps_add_route(trps, local_routes[ii]);
+
+ talloc_free(local_routes);
+ local_routes=NULL;
+ n_routes=0;
+ }
+
+ talloc_free(tmp_ctx);
+ return TRP_SUCCESS;
+}
+
+/* decide how often to attempt to connect to a peer */
+static int tr_conn_attempt_due(TRPS_INSTANCE *trps, TRP_PEER *peer, struct timespec *when)
+{
+ return 1; /* currently make an attempt every cycle */
+}
+
+/* open missing connections to peers */
+TRP_RC tr_connect_to_peers(TRPS_INSTANCE *trps, struct event *ev)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TRP_PTABLE_ITER *iter=trp_ptable_iter_new(tmp_ctx);
+ TRP_PEER *peer=NULL;
+ struct timespec curtime={0,0};
+ TRP_RC rc=TRP_ERROR;
+
+ if (clock_gettime(CLOCK_REALTIME, &curtime)) {
+ tr_err("tr_connect_to_peers: failed to read time.");
+ rc=TRP_CLOCKERR;
+ goto cleanup;
+ }
+
+ for (peer=trp_ptable_iter_first(iter, trps->ptable);
+ peer!=NULL;
+ peer=trp_ptable_iter_next(iter))
+ {
+ if (trps_find_trpc(trps, peer)==NULL) {
+ TR_NAME *label=trp_peer_get_label(peer);
+ tr_debug("tr_connect_to_peers: %.*s missing connection.",
+ label->len, label->buf);
+ /* has it been long enough since we last tried? */
+ if (tr_conn_attempt_due(trps, peer, &curtime)) {
+ trp_peer_set_last_conn_attempt(peer, &curtime); /* we are trying again now */
+ if (tr_trpc_initiate(trps, peer, ev)!=TRP_SUCCESS) {
+ tr_err("tr_connect_to_peers: unable to initiate TRP connection to %s:%u.",
+ trp_peer_get_server(peer),
+ trp_peer_get_port(peer));
+ }
+ }
+ }
+ }
+ rc=TRP_SUCCESS;
+
+cleanup:
+ trp_ptable_iter_free(iter);
+ talloc_free(tmp_ctx);
+ return rc;
+}
+
+
+/* Called by the config manager after a change to the active configuration.
+ * Updates configuration of objects that do not know about the config manager. */
+void tr_config_changed(TR_CFG *new_cfg, void *cookie)
+{
+ TR_INSTANCE *tr=talloc_get_type_abort(cookie, TR_INSTANCE);
+ TRPS_INSTANCE *trps=tr->trps;
+ char *table_str=NULL;
+
+ tr->cfgwatch->poll_interval.tv_sec=new_cfg->internal->cfg_poll_interval;
+ tr->cfgwatch->poll_interval.tv_usec=0;
+
+ tr->cfgwatch->settling_time.tv_sec=new_cfg->internal->cfg_settling_time;
+ tr->cfgwatch->settling_time.tv_usec=0;
+
+ /* These need to be updated */
+ tr->tids->hostname = new_cfg->internal->hostname;
+ tr->mons->hostname = new_cfg->internal->hostname;
+
+ /* Update the authorized monitoring gss names */
+ if (tr->mons->authorized_gss_names) {
+ tr_debug("tr_config_changed: freeing tr->mons->authorized_gss_names");
+ tr_gss_names_free(tr->mons->authorized_gss_names);
+ }
+ if (new_cfg->internal->monitoring_credentials != NULL) {
+ tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials);
+ } else {
+ tr->mons->authorized_gss_names = tr_gss_names_new(tr->mons);
+ }
+ if (tr->mons->authorized_gss_names == NULL) {
+ tr_err("tr_config_changed: Error configuring monitoring credentials");
+ }
+
+ trps_set_connect_interval(trps, new_cfg->internal->trp_connect_interval);
+ trps_set_update_interval(trps, new_cfg->internal->trp_update_interval);
+ trps_set_sweep_interval(trps, new_cfg->internal->trp_sweep_interval);
+ trps_set_ctable(trps, new_cfg->ctable);
+ trps_set_ptable(trps, new_cfg->peers);
+ trps_set_peer_status_callback(trps, tr_peer_status_change, (void *)trps);
+ trps_clear_rtable(trps); /* should we do this every time??? */
+ tr_add_local_routes(trps, new_cfg); /* should we do this every time??? */
+ trps_update_active_routes(trps); /* find new routes */
+ trps_update(trps, TRP_UPDATE_TRIGGERED); /* send any triggered routes */
+ tr_print_config(new_cfg);
+ table_str=tr_trps_route_table_to_str(NULL, trps);
+ if (table_str!=NULL) {
+ tr_info(table_str);
+ talloc_free(table_str);
+ }
+ table_str=tr_trps_comm_table_to_str(NULL, trps);
+ if (table_str!=NULL) {
+ tr_info(table_str);
+ talloc_free(table_str);
+ }
+}
+