Merge pull request #76 from painless-security/jennifer/trpc_deadlock
authormrw42 <margaret@painless-security.com>
Fri, 4 May 2018 20:58:06 +0000 (16:58 -0400)
committerGitHub <noreply@github.com>
Fri, 4 May 2018 20:58:06 +0000 (16:58 -0400)
Eliminate deadlock in TRPC messaging queueing

1  2 
tr/tr_trp.c
trp/trps.c

diff --combined tr/tr_trp.c
@@@ -114,7 -114,7 +114,7 @@@ static int tr_trps_gss_handler(gss_name
  
    tr_debug("tr_trps_gss_handler()");
  
 -  if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
 +  if ((!client_name) || (!trps) || (!cfg_mgr)) {
      tr_debug("tr_trps_gss_handler: Bad parameters.");
      return -1;
    }
@@@ -221,17 -221,6 +221,6 @@@ static void tr_trps_cleanup_conn(TRPS_I
  
  static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
  {
-   TR_MQ_MSG *msg;
-   /* tell the trpc thread to exit */
-   msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK, TR_MQ_PRIO_NORMAL);
-   if (msg) {
-     tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit");
-     trpc_mq_add(trpc, msg);
-   } else {
-     tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate");
-   }
    pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL);
    trps_remove_trpc(trps, trpc);
    trpc_free(trpc);
@@@ -292,7 -281,7 +281,7 @@@ static void tr_trps_process_mq(int sock
          tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp);
        else {
          trp_peer_set_incoming_status(peer, PEER_CONNECTED);
-         tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp);
+         tr_notice("tr_trps_process_mq: incoming connection from %s established.", tmp);
        }
        free(tmp);
      }
        } else {
          trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
          tr_trps_cleanup_conn(trps, conn);
-         tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp);
+         tr_notice("tr_trps_process_mq: incoming connection from %s lost.", tmp);
        }
        free(tmp);
      }
          tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp);
        else {
          trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
-         tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp);
+         tr_notice("tr_trps_process_mq: outgoing connection to %s established.", tmp);
        }
        free(tmp);
      }
      else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
-       /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol.
-        * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */
        TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
        TR_NAME *svcname=trpc_get_gssname(trpc);
        peer=trps_get_peer_by_servicename(trps, svcname);
          tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp);
        else {
          trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
-         tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
+         tr_notice("tr_trps_process_mq: outgoing connection to %s lost.", tmp);
          tr_trps_cleanup_trpc(trps, trpc);
        }
        free(tmp);
@@@ -570,24 -557,6 +557,6 @@@ cleanup
    return retval;
  }
  
- struct trpc_notify_cb_data {
-   int msg_ready;
-   pthread_cond_t cond;
-   pthread_mutex_t mutex;
- };
- static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
- {
-   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
-   pthread_mutex_lock(&(cb_data->mutex));
-   if (!cb_data->msg_ready) {
-     cb_data->msg_ready=1;
-     pthread_cond_signal(&(cb_data->cond));
-   }
-   pthread_mutex_unlock(&(cb_data->mutex));
- }
  /* data passed to thread */
  struct trpc_thread_data {
    TRPC_INSTANCE *trpc;
  };
  
  /**
-  * Returns when a message is ready
-  */
- static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb)
- {
-   cb->msg_ready = 0;
-   do
-     pthread_cond_wait(&(cb->cond), &(cb->mutex));
-   while (!cb->msg_ready);
- }
- /**
   * Thread for handling TRPC (outgoing) connections
   *
-  * Locking protocol:
-  *
-  * This thread uses a mutex and condition variable to wait for incoming messages
-  * on its queue. The main thread interacts with these through the tr_trpc_mq_cb()
-  * callback. This callback is called by the TR_MQ whenever the queue goes from empty
-  * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag
-  * signals the condition variable, then unlocks the mutex.
-  *
-  * This thread waits for the condition variable to be signaled, then checks that the
-  * msg_ready flag has actually been set (pthreads does not guarantee against false
-  * wakes). If so, it holds the mutex lock while reading any messages in its queue and
-  * sending responses over the GSS connection. If it receives an ABORT message from
-  * the main thread (not currently used), or if sending the GSS message fails (which
-  * usually indicates that the connection has been lost), the thread begins the shutdown
-  * process. It still holds the mutex lock at the start of this process.
+  * Opens a connection to a peer. If successful, notifies the trps thread by
+  * posting a TR_MQMSG_TRPC_CONNECTED message to the trps message queue.
+  * It then waits for messages on trpc->mq. Normally these will be TR_MQMSG_TRPC_SEND
+  * messages, which this thread forwards to the peer. If its connection is lost or
+  * a TR_MQMSG_ABORT message is received on trpc->mq, the thread sends a
+  * TR_MQMSG_TRPC_DISCONNECTED message to the trps thread, then cleans up and
+  * terminates.
   *
-  * This begins by setting trpc->shutting_down = 1, then sending a message to the
-  * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex
-  * and waits for more messages, again using the mutex and the condition variable. It
-  * ignores any except for an EXIT_OK message. When that is received, it exits, terminating
-  * the thread.
-  *
-  * When the main thread receives the DISCONNECTED message, it must stop sending messages
-  * to this thread's message queue. When it is certain that it will not queue any more messages,
-  * it sends the EXIT_OK message as its last interaction with this thread. Once that message
-  * has been queued, queueing additional messages may result in deadlock or segfaults.
-  * (It is ok if there are other messages in the queue, but it must not queue more.)
+  * The trps may continue queueing messages for this client even when the
+  * connection is down. To prevent the queue from growing endlessly, this thread
+  * should clear its queue after failed connection attempts.
   */
  static void *tr_trpc_thread(void *arg)
  {
    const char *msg_type=NULL;
    char *encoded_msg=NULL;
    TR_NAME *peer_gssname=NULL;
-   int n_sent = 0;
-   int n_popped = 0;
+   struct timespec wait_until = {0};
    int exit_loop=0;
  
-   struct trpc_notify_cb_data cb_data={0,
-                                       PTHREAD_COND_INITIALIZER,
-                                       PTHREAD_MUTEX_INITIALIZER};
    tr_debug("tr_trpc_thread: started");
  
-   /* set up the mq for receiving */
-   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
-   tr_mq_lock(trpc->mq);
-   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
-   tr_mq_unlock(trpc->mq);
+   /* Try to make the outgoing connection */
    rc=trpc_connect(trpc);
    if (rc!=TRP_SUCCESS) {
      tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
                trpc_get_server(trpc),
                trpc_get_port(trpc));
+     trpc_mq_clear(trpc); /* clear the queue even though we did not connect */
    } else {
+     /* Retrieve the GSS name used by the peer for authentication */
      peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc));
      if (peer_gssname==NULL) {
        tr_err("tr_trpc_thread: could not duplicate peer_gssname.");
        talloc_free(tmp_ctx);
        return NULL;
      }
-     tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf);
+     tr_debug("tr_trpc_thread: connected to peer %.*s",
+              peer_gssname->len, peer_gssname->buf);
  
      msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH);
      tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper);
      trps_mq_add(trps, msg); /* steals msg context */
      msg=NULL;
  
+     /* Loop until we get an abort message or until the connection is lost. */
      while(!exit_loop) {
-       trpc_thread_wait_for_message(&cb_data); /* handles locking */
-       n_popped = 0; /* have not popped any messages from the queue */
-       n_sent = 0; /* have not sent any messages yet */
-       for (msg = trpc_mq_pop(trpc);
-            msg != NULL;
-            msg = trpc_mq_pop(trpc)) {
-         n_popped++;
+       /* Wait up to 10 minutes for a message to be queued to send to the peer.
+        * Log a warning if we go longer than that, but don't give up. */
+       if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) {
+         tr_err("tr_trpc_thread: unable to set abort timeout");
+         break; /* immediately exit the loop, don't go through cleanup */
+       }
+       /* Pop a message from the queue. */
+       msg = trpc_mq_pop(trpc, &wait_until);
+       if (msg) {
          msg_type = tr_mq_msg_get_message(msg);
          if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) {
+           tr_debug("tr_trpc_thread: received abort message from main thread.");
            exit_loop = 1;
-           break;
          } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) {
            encoded_msg = tr_mq_msg_get_payload(msg);
            if (encoded_msg == NULL)
            else {
              rc = trpc_send_msg(trpc, encoded_msg);
              if (rc == TRP_SUCCESS) {
-               n_sent++;
+               tr_debug("tr_trpc_thread: sent message.");
              } else {
                tr_notice("tr_trpc_thread: trpc_send_msg failed.");
                /* Assume this means we lost the connection. */
                exit_loop = 1;
-               break;
              }
            }
          } else
            tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
  
          tr_mq_msg_free(msg);
+       } else {
+         tr_warning("tr_trpc_thread: no outgoing messages to %.*s for 10 minutes",
+                    peer_gssname->len, peer_gssname->buf);
        }
-       /* if n_popped == 0, then n_sent must be zero (it's only set after at
-        * least one msg is popped) */
-       if (n_popped==0)
-         tr_err("tr_trpc_thread: notified of message, but queue empty");
-       else
-         tr_debug("tr_trpc_thread: sent %d messages.", n_sent);
      }
    }
  
-   tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread.");
-   trpc->shutting_down = 1;
    /* Send a DISCONNECTED message to the main thread */
+   tr_debug("tr_trpc_thread: notifying main thread of disconnection.");
    msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL);
    tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
    if (msg==NULL) {
-     /* can't notify main thread of exit - just do it and hope for the best */
+     /* can't notify main thread */
      tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
    } else {
      trps_mq_add(trps, msg);
-     /* DISCONNECTED sent, now wait for an acknowledgement before exiting */
-     exit_loop = 0;
-     while (!exit_loop) {
-       trpc_thread_wait_for_message(&cb_data);
-       while (NULL != (msg = trpc_mq_pop(trpc))) {
-         msg_type = tr_mq_msg_get_message(msg);
-         /* ignore anything except an EXIT_OK */
-         if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) {
-           exit_loop = 1;
-           break; /* skip any further messages */
-         }
-       }
-     }
    }
  
-   tr_debug("tr_trpc_thread: thread terminating.");
    talloc_free(tmp_ctx);
+   tr_debug("tr_trpc_thread: thread terminating.");
    return NULL;
  }
  
@@@ -970,11 -884,7 +884,11 @@@ void tr_config_changed(TR_CFG *new_cfg
      tr_debug("tr_config_changed: freeing tr->mons->authorized_gss_names");
      tr_gss_names_free(tr->mons->authorized_gss_names);
    }
 -  tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials);
 +  if (new_cfg->internal->monitoring_credentials != NULL) {
 +    tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials);
 +  } else {
 +    tr->mons->authorized_gss_names = tr_gss_names_new(tr->mons);
 +  }
    if (tr->mons->authorized_gss_names == NULL) {
      tr_err("tr_config_changed: Error configuring monitoring credentials");
    }
diff --combined trp/trps.c
@@@ -265,13 -265,13 +265,13 @@@ TRP_RC trps_send_msg(TRPS_INSTANCE *trp
  
    /* get the connection for this peer */
    trpc=trps_find_trpc(trps, peer);
-   /* instead, let's let that happen and then clear the queue when an attempt to
-    * connect fails */
+   /* The peer connection (trpc) usually exists even if the connection is down.
+    * We will queue messages even if the connection is down. To prevent this from
+    * endlessly increasing the size of the queue, the trpc handler needs to clear
+    * its queue periodically, even if it is unable to send the messages
+    */
    if (trpc==NULL) {
      tr_warning("trps_send_msg: skipping message queued for missing TRP client entry.");
-   } else if (trpc->shutting_down) {
-     tr_debug("trps_send_msg: skipping message because TRP client is shutting down.");
-     rc = TRP_SUCCESS; /* it's ok that this didn't get sent, the connection will be gone in a moment */
    } else {
      mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND, TR_MQ_PRIO_NORMAL);
      msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
@@@ -656,51 -656,36 +656,51 @@@ static TRP_RC trps_accept_update(TRPS_I
  static TRP_RC trps_handle_inforec_route(TRPS_INSTANCE *trps, TRP_UPD *upd, TRP_INFOREC *rec)
  {
    TRP_ROUTE *route=NULL;
 +  TR_COMM *comm = NULL;
    unsigned int feas=0;
  
    /* determine feasibility */
    feas=trps_check_feasibility(trps, trp_upd_get_realm(upd), trp_upd_get_comm(upd), rec);
    tr_debug("trps_handle_update: record feasibility=%d", feas);
  
 -  /* do we have an existing route? */
 -  route=trps_get_route(trps,
 -                       trp_upd_get_comm(upd),
 -                       trp_upd_get_realm(upd),
 -                       trp_upd_get_peer(upd));
 -  if (route!=NULL) {
 -    /* there was a route table entry already */
 -    tr_debug("trps_handle_updates: route entry already exists.");
 -    if (feas) {
 -      /* Update is feasible. Accept it. */
 -      trps_accept_update(trps, upd, rec);
 -    } else {
 -      /* Update is infeasible. Ignore it unless the trust router has changed. */
 -      if (0!=tr_name_cmp(trp_route_get_trust_router(route),
 -                         trp_inforec_get_trust_router(rec))) {
 -        /* the trust router associated with the route has changed, treat update as a retraction */
 -        trps_retract_route(trps, route);
 +  /* verify that the community is an APC */
 +  comm = tr_comm_table_find_comm(trps->ctable, trp_upd_get_comm(upd));
 +  if (comm == NULL) {
 +    /* We don't know this community. Reject the route. */
 +    tr_debug("trps_handle_updates: community %.*s unknown, ignoring route for %.*s",
 +             trp_upd_get_comm(upd)->len, trp_upd_get_comm(upd)->buf,
 +             trp_upd_get_realm(upd)->len, trp_upd_get_realm(upd)->buf);
 +  } else if (tr_comm_get_type(comm) != TR_COMM_APC) {
 +    /* The community in a route request *must* be an APC. This was not - ignore it. */
 +    tr_debug("trps_handle_updates: community %.*s is not an APC, ignoring route for %.*s",
 +             trp_upd_get_comm(upd)->len, trp_upd_get_comm(upd)->buf,
 +             trp_upd_get_realm(upd)->len, trp_upd_get_realm(upd)->buf);
 +  } else {
 +    /* do we have an existing route? */
 +    route=trps_get_route(trps,
 +                         trp_upd_get_comm(upd),
 +                         trp_upd_get_realm(upd),
 +                         trp_upd_get_peer(upd));
 +    if (route!=NULL) {
 +      /* there was a route table entry already */
 +      tr_debug("trps_handle_updates: route entry already exists.");
 +      if (feas) {
 +        /* Update is feasible. Accept it. */
 +        trps_accept_update(trps, upd, rec);
 +      } else {
 +        /* Update is infeasible. Ignore it unless the trust router has changed. */
 +        if (0!=tr_name_cmp(trp_route_get_trust_router(route),
 +                           trp_inforec_get_trust_router(rec))) {
 +          /* the trust router associated with the route has changed, treat update as a retraction */
 +          trps_retract_route(trps, route);
 +        }
        }
 +    } else {
 +      /* No existing route table entry. Ignore it unless it is feasible and not a retraction. */
 +      tr_debug("trps_handle_update: no route entry exists yet.");
 +      if (feas && trp_metric_is_finite(trp_inforec_get_metric(rec)))
 +        trps_accept_update(trps, upd, rec);
      }
 -  } else {
 -    /* No existing route table entry. Ignore it unless it is feasible and not a retraction. */
 -    tr_debug("trps_handle_update: no route entry exists yet.");
 -    if (feas && trp_metric_is_finite(trp_inforec_get_metric(rec)))
 -      trps_accept_update(trps, upd, rec);
    }
  
    return TRP_SUCCESS;
@@@ -786,7 -771,7 +786,7 @@@ cleanup
    return comm;
  }
  
 -static TR_RP_REALM *trps_create_new_rp_realm(TALLOC_CTX *mem_ctx, TR_NAME *realm_id, TRP_INFOREC *rec)
 +static TR_RP_REALM *trps_create_new_rp_realm(TALLOC_CTX *mem_ctx, TR_NAME *comm, TR_NAME *realm_id, TRP_INFOREC *rec)
  {
    TALLOC_CTX *tmp_ctx=talloc_new(NULL);
    TR_RP_REALM *rp=tr_rp_realm_new(tmp_ctx);
@@@ -809,15 -794,11 +809,15 @@@ cleanup
    return rp;
  }
  
 -static TR_IDP_REALM *trps_create_new_idp_realm(TALLOC_CTX *mem_ctx, TR_NAME *realm_id, TRP_INFOREC *rec)
 +static TR_IDP_REALM *trps_create_new_idp_realm(TALLOC_CTX *mem_ctx,
 +                                               TR_NAME *comm_id,
 +                                               TR_NAME *realm_id,
 +                                               TRP_INFOREC *rec)
  {
    TALLOC_CTX *tmp_ctx=talloc_new(NULL);
    TR_IDP_REALM *idp=tr_idp_realm_new(tmp_ctx);
 -  
 +  TR_APC *realm_apcs = NULL;
 +
    if (idp==NULL) {
      tr_debug("trps_create_new_idp_realm: unable to allocate new realm.");
      goto cleanup;
      idp=NULL;
      goto cleanup;
    }
 -  if (trp_inforec_get_apcs(rec)!=NULL) {
 -    tr_idp_realm_set_apcs(idp, tr_apc_dup(tmp_ctx, trp_inforec_get_apcs(rec)));
 -    if (tr_idp_realm_get_apcs(idp)==NULL) {
 -      tr_debug("trps_create_new_idp_realm: unable to allocate APC list.");
 -      idp=NULL;
 +
 +  /* Set the APCs. If the community is a CoI, copy its APCs. If it is an APC, then
 +   * that community itself is the APC for the realm. */
 +  if (trp_inforec_get_comm_type(rec) == TR_COMM_APC) {
 +    /* the community is an APC for this realm */
 +    realm_apcs = tr_apc_new(tmp_ctx);
 +    if (realm_apcs == NULL) {
 +      tr_debug("trps_create_new_idp_realm: unable to allocate new APC list.");
 +      idp = NULL;
 +      goto cleanup;
 +    }
 +
 +    tr_apc_set_id(realm_apcs, tr_dup_name(comm_id));
 +    if (tr_apc_get_id(realm_apcs) == NULL) {
 +      tr_debug("trps_create_new_idp_realm: unable to allocate new APC name.");
 +      idp = NULL;
 +      goto cleanup;
 +    }
 +  } else {
 +    /* the community is not an APC for this realm */
 +    realm_apcs = trp_inforec_get_apcs(rec);
 +    if (realm_apcs == NULL) {
 +      tr_debug("trps_create_new_idp_realm: no APCs for realm %.*s/%.*s, cannot add.",
 +               realm_id->len, realm_id->buf,
 +               comm_id->len, comm_id->buf);
 +      idp = NULL;
 +      goto cleanup;
 +    }
 +
 +    /* we have APCs, make our own copy */
 +    realm_apcs = tr_apc_dup(tmp_ctx, realm_apcs);
 +    if (realm_apcs == NULL) {
 +      tr_debug("trps_create_new_idp_realm: unable to duplicate APC list.");
 +      idp = NULL;
        goto cleanup;
      }
    }
 +
 +  /* Whether the community is an APC or CoI, the APCs for the realm are in realm_apcs */
 +  tr_idp_realm_set_apcs(idp, realm_apcs); /* takes realm_apcs out of tmp_ctx on success */
 +  if (tr_idp_realm_get_apcs(idp) == NULL) {
 +    tr_debug("trps_create_new_idp_realm: unable to set APC list for new realm.");
 +    idp=NULL;
 +    goto cleanup;
 +  }
 +
    idp->origin=TR_REALM_DISCOVERED;
    
    talloc_steal(mem_ctx, idp);
@@@ -924,11 -867,7 +924,11 @@@ static TRP_RC trps_handle_inforec_comm(
          tr_debug("trps_handle_inforec_comm: unable to create new community.");
          goto cleanup;
        }
 -      tr_comm_table_add_comm(trps->ctable, comm);
 +      if (tr_comm_table_add_comm(trps->ctable, comm) != 0)
 +      {
 +        tr_debug("trps_handle_inforec_comm: unable to add community to community table.");
 +        goto cleanup;
 +      }
      }
      /* TODO: see if other comm data match the new inforec and update or complain */
  
        if (rp_realm==NULL) {
          tr_debug("trps_handle_inforec_comm: unknown RP realm %.*s in inforec, creating it.",
                   realm_id->len, realm_id->buf);
 -        rp_realm=trps_create_new_rp_realm(tmp_ctx, realm_id, rec);
 +        rp_realm= trps_create_new_rp_realm(tmp_ctx, tr_comm_get_id(comm), realm_id, rec);
          if (rp_realm==NULL) {
            tr_debug("trps_handle_inforec_comm: unable to create new RP realm.");
            /* we may leave an unused community in the table, but it will only last until
        if (idp_realm==NULL) {
          tr_debug("trps_handle_inforec_comm: unknown IDP realm %.*s in inforec, creating it.",
                   realm_id->len, realm_id->buf);
 -        idp_realm=trps_create_new_idp_realm(tmp_ctx, realm_id, rec);
 +        idp_realm= trps_create_new_idp_realm(tmp_ctx, tr_comm_get_id(comm), realm_id, rec);
          if (idp_realm==NULL) {
            tr_debug("trps_handle_inforec_comm: unable to create new IDP realm.");
            /* we may leave an unused community in the table, but it will only last until
@@@ -1116,13 -1055,12 +1116,13 @@@ static TRP_RC trps_validate_request(TRP
  
  /* choose the best route to comm/realm, optionally excluding routes to a particular peer */
  static TRP_ROUTE *trps_find_best_route(TRPS_INSTANCE *trps,
 -                                        TR_NAME *comm,
 -                                        TR_NAME *realm,
 -                                        TR_NAME *exclude_peer)
 +                                       TR_NAME *comm,
 +                                       TR_NAME *realm,
 +                                       TR_NAME *exclude_peer_label)
  {
    TRP_ROUTE **entry=NULL;
    TRP_ROUTE *best=NULL;
 +  TRP_PEER *route_peer = NULL;
    size_t n_entry=0;
    unsigned int kk=0;
    unsigned int kk_min=0;
    entry=trp_rtable_get_realm_entries(trps->rtable, comm, realm, &n_entry);
    for (kk=0; kk<n_entry; kk++) {
      if (trp_route_get_metric(entry[kk]) < min_metric) {
 -      if ((exclude_peer==NULL) || (0!=tr_name_cmp(trp_route_get_peer(entry[kk]),
 -                                                  exclude_peer))) {
 -        kk_min=kk;
 -        min_metric=trp_route_get_metric(entry[kk]);
 -      } 
 +      if (exclude_peer_label != NULL) {
 +        if (!trp_route_is_local(entry[kk])) {
 +          /* route is not local, check the peer label */
 +          route_peer = trp_ptable_find_gss_name(trps->ptable,
 +                                                trp_route_get_peer(entry[kk]));
 +          if (route_peer == NULL) {
 +            tr_err("trps_find_best_route: unknown peer GSS name (%.*s) for route %d to %.*s/%.*s",
 +                   trp_route_get_peer(entry[kk])->len, trp_route_get_peer(entry[kk])->buf,
 +                   kk,
 +                   realm->len, realm->buf,
 +                   comm->len, comm->buf);
 +            continue; /* unknown peer, skip the route */
 +          }
 +          if (0 == tr_name_cmp(exclude_peer_label, trp_peer_get_label(route_peer))) {
 +            /* we're excluding this peer - skip the route */
 +            continue;
 +          }
 +        }
 +      }
 +      /* if we get here, we're not excluding the route */
 +      kk_min = kk;
 +      min_metric = trp_route_get_metric(entry[kk]);
      }
    }
 +
    if (trp_metric_is_finite(min_metric))
      best=entry[kk_min];
    
@@@ -1262,7 -1182,6 +1262,7 @@@ TRP_RC trps_sweep_ctable(TRPS_INSTANCE 
  {
    TALLOC_CTX *tmp_ctx=talloc_new(NULL);
    struct timespec sweep_time={0,0};
 +  struct timespec tmp = {0};
    TR_COMM_MEMB *memb=NULL;
    TR_COMM_ITER *iter=NULL;
    TRP_RC rc=TRP_ERROR;
                   tr_comm_memb_get_realm_id(memb)->len, tr_comm_memb_get_realm_id(memb)->buf,
                   tr_comm_get_id(tr_comm_memb_get_comm(memb))->len, tr_comm_get_id(tr_comm_memb_get_comm(memb))->buf,
                   tr_comm_memb_get_origin(memb)->len, tr_comm_memb_get_origin(memb)->buf,
 -                 timespec_to_str(tr_comm_memb_get_expiry(memb)));
 +                 timespec_to_str(tr_comm_memb_get_expiry_realtime(memb, &tmp)));
          tr_comm_table_remove_memb(trps->ctable, memb);
          tr_comm_memb_free(memb);
        } else {
          tr_comm_memb_expire(memb);
          trps_compute_expiry(trps, tr_comm_memb_get_interval(memb), tr_comm_memb_get_expiry(memb));
          tr_debug("trps_sweep_ctable: community membership expired at %s, resetting expiry to %s (%.*s in %.*s, origin %.*s).",
 -                 timespec_to_str(&sweep_time),
 -                 timespec_to_str(tr_comm_memb_get_expiry(memb)),
 +                 timespec_to_str(tr_clock_convert(TRP_CLOCK, &sweep_time, CLOCK_REALTIME, &tmp)),
 +                 timespec_to_str(tr_comm_memb_get_expiry_realtime(memb, &tmp)),
                   tr_comm_memb_get_realm_id(memb)->len, tr_comm_memb_get_realm_id(memb)->buf,
                   tr_comm_get_id(tr_comm_memb_get_comm(memb))->len, tr_comm_get_id(tr_comm_memb_get_comm(memb))->buf,
                   tr_comm_memb_get_origin(memb)->len, tr_comm_memb_get_origin(memb)->buf);
@@@ -1405,67 -1324,26 +1405,67 @@@ cleanup
  }
  
  /* select the correct route to comm/realm to be announced to peer */
 -static TRP_ROUTE *trps_select_realm_update(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer_gssname)
 +static TRP_ROUTE *trps_select_realm_update(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer_label)
  {
 -  TRP_ROUTE *route;
 +  TRP_ROUTE *route = NULL;
 +  TRP_PEER *route_peer = NULL;
 +  TR_NAME *route_peer_label = NULL;
  
    /* Take the currently selected route unless it is through the peer we're sending the update to.
 -   * I.e., enforce the split horizon rule. */
 +   * I.e., enforce the split horizon rule. Start by looking up the currently selected route. */
    route=trp_rtable_get_selected_entry(trps->rtable, comm, realm);
    if (route==NULL) {
      /* No selected route, this should only happen if the only route has been retracted,
       * in which case we do not want to advertise it. */
      return NULL;
    }
 -  tr_debug("trps_select_realm_update: %s vs %s", peer_gssname->buf,
 -           trp_route_get_peer(route)->buf);
 -  if (0==tr_name_cmp(peer_gssname, trp_route_get_peer(route))) {
 -    tr_debug("trps_select_realm_update: matched, finding alternate route");
 -    /* the selected entry goes through the peer we're reporting to, choose an alternate */
 -    route=trps_find_best_route(trps, comm, realm, peer_gssname);
 -    if ((route==NULL) || (!trp_metric_is_finite(trp_route_get_metric(route))))
 -      return NULL; /* don't advertise a nonexistent or retracted route */
 +
 +  /* Check whether it's local. */
 +  if (trp_route_is_local(route)) {
 +    /* It is always ok to announce a local route */
 +    tr_debug("trps_select_realm_update: selected route for %.*s/%.*s is local",
 +             realm->len, realm->buf,
 +             comm->len, comm->buf);
 +  } else {
 +    /* It's not local. Get the route's peer and check whether it's the same place we
 +     * got the selected route from. Peer should always correspond to an entry in our
 +     * peer table. */
 +    tr_debug("trps_select_realm_update: selected route for %.*s/%.*s is not local",
 +             realm->len, realm->buf,
 +             comm->len, comm->buf);
 +    route_peer = trp_ptable_find_gss_name(trps->ptable, trp_route_get_peer(route));
 +    if (route_peer == NULL) {
 +      tr_err("trps_select_realm_update: unknown peer GSS name (%.*s) for selected route for %.*s/%.*s",
 +             trp_route_get_peer(route)->len, trp_route_get_peer(route)->buf,
 +             realm->len, realm->buf,
 +             comm->len, comm->buf);
 +      return NULL;
 +    }
 +    route_peer_label = trp_peer_get_label(route_peer);
 +    if (route_peer_label == NULL) {
 +      tr_err("trps_select_realm_update: error retrieving peer label for selected route for %.*s/%.*s",
 +             realm->len, realm->buf,
 +             comm->len, comm->buf);
 +      return NULL;
 +    }
 +
 +    /* see if these match */
 +    tr_debug("trps_select_realm_update: %.*s vs %.*s",
 +             peer_label->len, peer_label->buf,
 +             route_peer_label->len, route_peer_label->buf);
 +
 +    if (0==tr_name_cmp(peer_label, route_peer_label)) {
 +      /* the selected entry goes through the peer we're reporting to, choose an alternate */
 +      tr_debug("trps_select_realm_update: matched, finding alternate route");
 +      route=trps_find_best_route(trps, comm, realm, peer_label);
 +      if ((route==NULL) || (!trp_metric_is_finite(trp_route_get_metric(route)))) {
 +        tr_debug("trps_select_realm_update: no route to %.*s/%.*s suitable to advertise to %.*s",
 +                 realm->len, realm->buf,
 +                 comm->len, comm->buf,
 +                 peer_label->len, peer_label->buf);
 +        return NULL; /* don't advertise a nonexistent or retracted route */
 +      }
 +    }
    }
    return route;
  }
  static TRP_RC trps_select_route_updates_for_peer(TALLOC_CTX *mem_ctx,
                                                   GPtrArray *updates,
                                                   TRPS_INSTANCE *trps,
 -                                                 TR_NAME *peer_gssname,
 +                                                 TR_NAME *peer_label,
                                                   int triggered)
  {
    size_t n_comm=0;
    for (ii=0; ii<n_comm; ii++) {
      realm=trp_rtable_get_comm_realms(trps->rtable, comm[ii], &n_realm);
      for (jj=0; jj<n_realm; jj++) {
 -      best=trps_select_realm_update(trps, comm[ii], realm[jj], peer_gssname);
 +      best=trps_select_realm_update(trps, comm[ii], realm[jj], peer_label);
        /* If we found a route, add it to the list. If triggered!=0, then only
         * add triggered routes. */
        if ((best!=NULL) && ((!triggered) || trp_route_is_triggered(best))) {
@@@ -1582,11 -1460,7 +1582,11 @@@ cleanup
  }
  
  /* construct an update with all the inforecs for comm/realm/role to be sent to peer */
 -static TRP_UPD *trps_comm_update(TALLOC_CTX *mem_ctx, TRPS_INSTANCE *trps, TR_NAME *peer_gssname, TR_COMM *comm, TR_REALM *realm)
 +static TRP_UPD *trps_comm_update(TALLOC_CTX *mem_ctx,
 +                                 TRPS_INSTANCE *trps,
 +                                 TR_NAME *peer_label,
 +                                 TR_COMM *comm,
 +                                 TR_REALM *realm)
  {
    TALLOC_CTX *tmp_ctx=talloc_new(NULL);
    TRP_UPD *upd=trp_upd_new(tmp_ctx);
@@@ -1652,7 -1526,7 +1652,7 @@@ cleanup
  static TRP_RC trps_select_comm_updates_for_peer(TALLOC_CTX *mem_ctx,
                                                  GPtrArray *updates,
                                                  TRPS_INSTANCE *trps,
 -                                                TR_NAME *peer_gssname,
 +                                                TR_NAME *peer_label,
                                                  int triggered)
  {
    TALLOC_CTX *tmp_ctx=talloc_new(NULL);
        tr_debug("trps_select_comm_updates_for_peer: adding realm %.*s",
                 tr_realm_get_id(realm)->len,
                 tr_realm_get_id(realm)->buf);
 -      upd=trps_comm_update(mem_ctx, trps, peer_gssname, comm, realm);
 +      upd=trps_comm_update(mem_ctx, trps, peer_label, comm, realm);
        if (upd!=NULL)
          g_ptr_array_add(updates, upd);
      }