return 0;
}
-TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message)
+TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message, TR_MQ_PRIORITY prio)
{
TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG);
if (msg!=NULL) {
msg->next=NULL;
+ msg->prio=prio;
msg->message=talloc_strdup(msg, message);
if (msg->message==NULL) {
talloc_free(msg);
talloc_free(msg);
}
+TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg)
+{
+ return msg->prio;
+}
+
const char *tr_mq_msg_get_message(TR_MQ_MSG *msg)
{
return msg->message;
pthread_mutex_init(&(mq->mutex), 0);
mq->head=NULL;
mq->tail=NULL;
+ mq->last_hi_prio=NULL;
}
return mq;
}
mq->notify_cb_arg=arg;
}
-
-/* puts msg in mq's talloc context */
-void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
+void tr_mq_clear(TR_MQ *mq)
{
- int was_empty=0;
- TR_MQ_NOTIFY_FN notify_cb=NULL;
- void *notify_cb_arg=NULL;
+ TR_MQ_MSG *m=NULL;
+ TR_MQ_MSG *n=NULL;
tr_mq_lock(mq);
+ m=tr_mq_get_head(mq);
+ while (m!=NULL) {
+ n=tr_mq_msg_get_next(m);
+ tr_mq_msg_free(m);
+ m=n;
+ }
+ tr_mq_set_head(mq, NULL);
+ tr_mq_set_tail(mq, NULL);
+ tr_mq_unlock(mq);
+}
+
+static int tr_mq_empty(TR_MQ *mq)
+{
+ return tr_mq_get_head(mq)==NULL;
+}
+
+/* puts msg in mq's talloc context */
+static void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
+{
if (tr_mq_get_head(mq)==NULL) {
- was_empty=1;
tr_mq_set_head(mq, msg);
tr_mq_set_tail(mq, msg);
} else {
tr_mq_set_tail(mq, msg); /* update tail of list */
}
talloc_steal(mq, msg);
+}
+
+static void tr_mq_append_high_prio(TR_MQ *mq, TR_MQ_MSG *new)
+{
+ if (tr_mq_get_head(mq)==NULL) {
+ tr_mq_set_head(mq, new);
+ tr_mq_set_tail(mq, new);
+ } else if (mq->last_hi_prio==NULL) {
+ tr_mq_msg_set_next(new, tr_mq_get_head(mq)); /* add to front of list */
+ tr_mq_set_head(mq, new); /* update head of list */
+ } else {
+ tr_mq_msg_set_next(new, tr_mq_msg_get_next(mq->last_hi_prio));
+ tr_mq_msg_set_next(mq->last_hi_prio, new); /* add to end of hi prio msgs */
+ }
+ mq->last_hi_prio=new; /* in any case, this is now the last high priority msg */
+ talloc_steal(mq,new);
+}
+
+#define DEBUG_TR_MQ 0
+#if DEBUG_TR_MQ
+static void tr_mq_print(TR_MQ *mq)
+{
+ TR_MQ_MSG *m=mq->head;
+ int ii=0;
+
+ tr_debug("tr_mq_print: mq contents:");
+ while(m!=NULL) {
+ ii++;
+ tr_debug("tr_mq_print: Entry %02d: %-15s (prio %d)",
+ ii, tr_mq_msg_get_message(m), tr_mq_msg_get_prio(m));
+ m=tr_mq_msg_get_next(m);
+ }
+}
+#endif
+void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
+{
+ int was_empty=0;
+ TR_MQ_NOTIFY_FN notify_cb=NULL;
+ void *notify_cb_arg=NULL;
+
+ tr_mq_lock(mq);
+
+ was_empty=tr_mq_empty(mq);
+ switch (tr_mq_msg_get_prio(msg)) {
+ case TR_MQ_PRIO_HIGH:
+ tr_mq_append_high_prio(mq, msg);
+ break;
+ default:
+ tr_mq_append(mq, msg);
+ break;
+ }
/* before releasing the mutex, get notify_cb data out of mq */
notify_cb=mq->notify_cb;
notify_cb_arg=mq->notify_cb_arg;
+
+#if DEBUG_TR_MQ
+ tr_mq_print(mq);
+#endif
+
tr_mq_unlock(mq);
/* see if we need to tell someone we became non-empty */
if (tr_mq_get_head(mq)!=NULL) {
popped=tr_mq_get_head(mq);
tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
+
+ if (popped==mq->last_hi_prio)
+ mq->last_hi_prio=NULL;
+
if (tr_mq_get_head(mq)==NULL)
tr_mq_set_tail(mq, NULL); /* just popped the last element */
}
tr_mq_msg_set_next(popped, NULL); /* disconnect from list */
return popped;
}
+
#include <talloc.h>
#include <pthread.h>
+/* Note on mq priorities: High priority messages are guaranteed to be
+ * processed before any normal priority messages. Otherwise, messages
+ * will be processed in the order they are added to the queue. */
+
+typedef enum tr_mq_priority {
+ TR_MQ_PRIO_NORMAL=0,
+ TR_MQ_PRIO_HIGH
+} TR_MQ_PRIORITY;
+
/* msg for inter-thread messaging */
typedef struct tr_mq_msg TR_MQ_MSG;
struct tr_mq_msg {
TR_MQ_MSG *next;
+ TR_MQ_PRIORITY prio;
char *message;
void *p; /* payload */
void (*p_free)(void *); /* function to free payload */
pthread_mutex_t mutex;
TR_MQ_MSG *head;
TR_MQ_MSG *tail;
+ TR_MQ_MSG *last_hi_prio;
TR_MQ_NOTIFY_FN notify_cb; /* callback when queue becomes non-empty */
void *notify_cb_arg;
};
/* message string for sending trpc messages */
#define TR_MQMSG_TRPC_SEND "trpc send msg"
-TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg);
+TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg, TR_MQ_PRIORITY prio);
void tr_mq_msg_free(TR_MQ_MSG *msg);
+TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg);
const char *tr_mq_msg_get_message(TR_MQ_MSG *msg);
void *tr_mq_msg_get_payload(TR_MQ_MSG *msg);
void tr_mq_msg_set_payload(TR_MQ_MSG *msg, void *p, void (*p_free)(void *));
int tr_mq_lock(TR_MQ *mq);
int tr_mq_unlock(TR_MQ *mq);
void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg);
-void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg);
+void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg);
TR_MQ_MSG *tr_mq_pop(TR_MQ *mq);
+void tr_mq_clear(TR_MQ *mq);
#endif /*_TR_MQ_H_ */
TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev);
void tr_config_changed(TR_CFG *new_cfg, void *cookie);
TRP_RC tr_connect_to_peers(TRPS_INSTANCE *trps, struct event *ev);
+void tr_peer_status_change(TRP_PEER *peer, void *cookie);
#endif /* TR_TRP_H */
TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc);
TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc);
void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq);
-void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc);
-void trpc_master_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+void trpc_mq_clear(TRPC_INSTANCE *trpc);
+void trpc_master_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc);
TRP_RC trpc_connect(TRPC_INSTANCE *trpc);
TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content);
unsigned int port,
void *cookie);
TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps);
-void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
+void trps_mq_add(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
+TRP_RC trps_authorize_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn);
void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn);
TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps);
TRP_RC trps_handle_tr_msg(TRPS_INSTANCE *trps, TR_MSG *tr_msg);
#include <trust_router/tr_name.h>
#include <trp_internal.h>
+typedef enum trp_peer_conn_status {
+ PEER_DISCONNECTED=0,
+ PEER_CONNECTED
+} TRP_PEER_CONN_STATUS;
+
typedef struct trp_peer TRP_PEER;
struct trp_peer {
TRP_PEER *next; /* for making a linked list */
unsigned int port;
unsigned int linkcost;
struct timespec last_conn_attempt;
+ TRP_PEER_CONN_STATUS outgoing_status;
+ TRP_PEER_CONN_STATUS incoming_status;
+ void (*conn_status_cb)(TRP_PEER *, void *); /* callback for connected status change */
+ void *conn_status_cookie;
};
typedef struct trp_ptable {
unsigned int trp_peer_get_linkcost(TRP_PEER *peer);
struct timespec *trp_peer_get_last_conn_attempt(TRP_PEER *peer);
void trp_peer_set_last_conn_attempt(TRP_PEER *peer, struct timespec *time);
+TRP_PEER_CONN_STATUS trp_peer_get_outgoing_status(TRP_PEER *peer);
+void trp_peer_set_outgoing_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status);
+TRP_PEER_CONN_STATUS trp_peer_get_incoming_status(TRP_PEER *peer);
+void trp_peer_set_incoming_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status);
+int trp_peer_is_connected(TRP_PEER *peer);
void trp_peer_set_linkcost(TRP_PEER *peer, unsigned int linkcost);
+void trp_peer_set_conn_status_cb(TRP_PEER *peer, void (*cb)(TRP_PEER *, void *), void *cookie);
char *trp_peer_to_str(TALLOC_CTX *memctx, TRP_PEER *peer, const char *sep);
#endif /* _TRP_PTABLE_H_ */
}
trp_peer_set_server(hc_peer, "epsilon.vmnet");
trp_peer_set_gssname(hc_peer, tr_new_name("tr-epsilon-vmnet@apc.painless-security.com"));
+ trp_peer_set_conn_status_cb(hc_peer, tr_peer_status_change, (void *)(tr->trps));
switch (tr->trps->port) {
case 10000:
trp_peer_set_port(hc_peer, 10001);
#include <gsscon.h>
#include <tr.h>
+#include <tr_mq.h>
#include <tr_rp.h>
#include <trp_internal.h>
#include <trp_ptable.h>
{
tr_msg_free_decoded((TR_MSG *)p);
}
+
+static void tr_free_name_helper(void *arg)
+{
+ tr_free_name((TR_NAME *)arg);
+}
+
/* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
TRP_CONNECTION *conn,
/* n.b., conn is available here, but do not hold onto the reference
* because it may be cleaned up if the originating connection goes
* down before the message is processed */
- mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED);
+ mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED, TR_MQ_PRIO_NORMAL);
if (mq_msg==NULL) {
return TRP_NOMEM;
}
tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
- trps_mq_append(trps, mq_msg);
+ trps_mq_add(trps, mq_msg);
talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
return TRP_SUCCESS;
}
TR_MQ_MSG *msg=NULL;
tr_debug("tr_trps_thread: started");
+ if (trps_authorize_connection(trps, conn)!=TRP_SUCCESS)
+ goto cleanup;
+
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED, TR_MQ_PRIO_HIGH);
+ tr_mq_msg_set_payload(msg, (void *)tr_dup_name(trp_connection_get_peer(conn)), tr_free_name_helper);
+ if (msg==NULL) {
+ tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
+ goto cleanup;
+ }
+ trps_mq_add(trps, msg); /* steals msg context */
+ msg=NULL;
+
trps_handle_connection(trps, conn);
- msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED);
+cleanup:
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED, TR_MQ_PRIO_HIGH);
tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
if (msg==NULL)
tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
else
- trps_mq_append(trps, msg);
-
+ trps_mq_add(trps, msg);
tr_debug("tr_trps_thread: exit");
talloc_free(tmp_ctx);
return NULL;
msg=trps_mq_pop(trps);
while (msg!=NULL) {
s=tr_mq_msg_get_message(msg);
- if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
- tr_trps_cleanup_conn(trps,
- talloc_get_type_abort(tr_mq_msg_get_payload(msg),
- TRP_CONNECTION));
+ if (0==strcmp(s, TR_MQMSG_TRPS_CONNECTED)) {
+ TR_NAME *gssname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+ TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+ if (peer==NULL)
+ tr_err("tr_trps_process_mq: incoming connection to unknown peer (%s) reported.", gssname->buf);
+ else {
+ trp_peer_set_incoming_status(peer, PEER_CONNECTED);
+ tr_err("tr_trps_process_mq: incoming connection to %s established.", gssname->buf);
+ }
+ }
+ else if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) {
+ TRP_CONNECTION *conn=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION);
+ TR_NAME *gssname=trp_connection_get_gssname(conn);
+ TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+ if (peer==NULL) {
+ tr_err("tr_trps_process_mq: disconnection of unknown peer (%s) reported.",
+ trp_connection_get_gssname(conn)->buf);
+ } else {
+ trp_peer_set_incoming_status(peer, PEER_DISCONNECTED);
+ tr_trps_cleanup_conn(trps, conn);
+ tr_err("tr_trps_process_mq: incoming connection to %s lost.", gssname->buf);
+ }
+ }
+ else if (0==strcmp(s, TR_MQMSG_TRPC_CONNECTED)) {
+ TR_NAME *svcname=(TR_NAME *)tr_mq_msg_get_payload(msg);
+ TRP_PEER *peer=trps_get_peer_by_servicename(trps, svcname);
+ if (peer==NULL)
+ tr_err("tr_trps_process_mq: connection to unknown peer (%s) reported.", svcname->buf);
+ else {
+ trp_peer_set_outgoing_status(peer, PEER_CONNECTED);
+ tr_err("tr_trps_process_mq: outgoing connection to %s established.", svcname->buf);
+ }
}
else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) {
/* trpc connection died */
- tr_trps_cleanup_trpc(trps,
- talloc_get_type_abort(tr_mq_msg_get_payload(msg),
- TRPC_INSTANCE));
+ TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE);
+ TR_NAME *gssname=trpc_get_gssname(trpc);
+ TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname);
+ if (peer==NULL)
+ tr_err("tr_trps_process_mq: disconnection of unknown peer (%s) reported.", gssname->buf);
+ else {
+ trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED);
+ tr_err("tr_trps_process_mq: outgoing connection to %s lost.", gssname->buf);
+ tr_trps_cleanup_trpc(trps, trpc);
+ }
}
else if (0==strcmp(s, TR_MQMSG_MSG_RECEIVED)) {
trpc_get_server(trpc),
trpc_get_port(trpc));
} else {
- peer_gssname=tr_dup_name(trp_connection_get_peer(trpc_get_conn(trpc)));
+ peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc));
if (peer_gssname==NULL) {
tr_err("tr_trpc_thread: could not duplicate peer_gssname.");
talloc_free(tmp_ctx);
}
tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf);
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH);
+ tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper);
+ if (msg==NULL) {
+ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+ talloc_free(tmp_ctx);
+ return NULL;
+ }
+ tr_debug("queuing********************************************************************************");
+ trps_mq_add(trps, msg); /* steals msg context */
+ tr_debug("queued********************************************************************************");
+ msg=NULL;
+
while(1) {
cb_data.msg_ready=0;
pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
}
tr_debug("tr_trpc_thread: exiting.");
- msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED);
+ msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH);
tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
if (msg==NULL)
tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
else
- trps_mq_append(trps, msg);
+ trps_mq_add(trps, msg);
+
+ trpc_mq_clear(trpc); /* clear any queued messages */
talloc_free(tmp_ctx);
return NULL;
return entries;
}
-struct tr_wildcard_cookie {
- TRPS_INSTANCE *trps;
- TR_NAME *peer_servicename;
-};
-
-/* can only be called once, talloc_frees its argument cookie on termination */
-static void tr_send_wildcard(int listener, short event, void *arg)
+void tr_peer_status_change(TRP_PEER *peer, void *cookie)
{
- struct tr_wildcard_cookie *cook=talloc_get_type_abort(arg, struct tr_wildcard_cookie);
+ TRPS_INSTANCE *trps=talloc_get_type_abort(cookie, TRPS_INSTANCE);
- /* queue a route request for all route to every peer */
- if (TRP_SUCCESS!=trps_wildcard_route_req(cook->trps, cook->peer_servicename))
+ if (TRP_SUCCESS!=trps_wildcard_route_req(trps, trp_peer_get_servicename(peer)))
tr_err("tr_send_wildcard: error sending wildcard route request.");
-
- tr_free_name(cook->peer_servicename);
- talloc_free(cook);
-}
-
-struct tr_trpc_status_change_cookie {
- struct event_base *evbase;
- TRPS_INSTANCE *trps;
- TRPC_INSTANCE *trpc;
- TRP_PEER *peer;
-};
-static void tr_trpc_status_change(TRP_CONNECTION *conn, void *cookie)
-{
- struct tr_trpc_status_change_cookie *cook=talloc_get_type_abort(cookie, struct tr_trpc_status_change_cookie);
- struct event_base *evbase=cook->evbase;
- TRPS_INSTANCE *trps=cook->trps;
- TRP_PEER *peer=cook->peer;
- TR_NAME *gssname=trp_peer_get_gssname(peer);
- struct timeval zero_time={0,0};
- struct tr_wildcard_cookie *wc_cookie=NULL;
-
- if (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
- tr_debug("tr_trpc_status_change: connection to %.*s now up.", gssname->len, gssname->buf);
- /* add a one-off event to send a wildcard request from the main thread */
- wc_cookie=talloc(trps, struct tr_wildcard_cookie);
- if (wc_cookie==NULL) {
- tr_err("tr_trpc_status_change: error allocating wildcard cookie.");
- return;
- }
- wc_cookie->trps=trps;
- wc_cookie->peer_servicename=trp_peer_dup_servicename(peer);
- if (0!=event_base_once(evbase, -1, EV_TIMEOUT, tr_send_wildcard, wc_cookie, &zero_time)) {
- tr_err("tr_trpc_status_change: error queueing wildcard route request event.");
- }
- } else
- tr_debug("tr_trpc_status_change: connection to %.*s now down.", gssname->len, gssname->buf);
}
/* starts a trpc thread to connect to server:port */
TRPC_INSTANCE *trpc=NULL;
TRP_CONNECTION *conn=NULL;
struct trpc_thread_data *thread_data=NULL;
- struct tr_trpc_status_change_cookie *status_change_cookie=NULL;
TRP_RC rc=TRP_ERROR;
tr_debug("tr_trpc_initiate entered");
goto cleanup;
}
- status_change_cookie=talloc(conn, struct tr_trpc_status_change_cookie);
- if (status_change_cookie==NULL) {
- tr_crit("tr_trpc_initiate: could not allocate connection status cookie.");
- rc=TRP_NOMEM;
- goto cleanup;
- }
- status_change_cookie->evbase=event_get_base(ev);
- status_change_cookie->trps=trps;
- status_change_cookie->trpc=trpc;
- status_change_cookie->peer=peer;
- conn->status_change_cookie=status_change_cookie;
- status_change_cookie=NULL;
- conn->status_change_cb=tr_trpc_status_change;
-
trpc_set_conn(trpc, conn);
trpc_set_server(trpc, talloc_strdup(trpc, trp_peer_get_server(peer)));
trpc_set_port(trpc, trp_peer_get_port(peer));
peer->port=0;
peer->linkcost=TRP_LINKCOST_DEFAULT;
peer->last_conn_attempt=(struct timespec){0,0};
+ peer->outgoing_status=PEER_DISCONNECTED;
+ peer->incoming_status=PEER_DISCONNECTED;
+ peer->conn_status_cb=NULL;
+ peer->conn_status_cookie=NULL;
talloc_set_destructor((void *)peer, trp_peer_destructor);
}
return peer;
peer->linkcost=linkcost;
}
+void trp_peer_set_conn_status_cb(TRP_PEER *peer, void (*cb)(TRP_PEER *, void *), void *cookie)
+{
+ peer->conn_status_cb=cb;
+ peer->conn_status_cookie=cookie;
+}
+
struct timespec *trp_peer_get_last_conn_attempt(TRP_PEER *peer)
{
return &(peer->last_conn_attempt);
return ptbl;
}
+void trp_peer_set_outgoing_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status)
+{
+ int was_connected=trp_peer_is_connected(peer);
+ peer->outgoing_status=status;
+ tr_debug("trp_peer_set_outgoing_status: %s: status=%d peer connected was %d now %d.",
+ trp_peer_get_gssname(peer)->buf, status, was_connected, trp_peer_is_connected(peer));
+ if ((trp_peer_is_connected(peer) != was_connected) && (peer->conn_status_cb!=NULL))
+ peer->conn_status_cb(peer, peer->conn_status_cookie);
+}
+
+TRP_PEER_CONN_STATUS trp_peer_get_outgoing_status(TRP_PEER *peer)
+{
+ return peer->outgoing_status;
+}
+
+void trp_peer_set_incoming_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status)
+{
+ int was_connected=trp_peer_is_connected(peer);
+ peer->incoming_status=status;
+ tr_debug("trp_peer_set_incoming_status: %s: status=%d peer connected was %d now %d.",
+ trp_peer_get_gssname(peer)->buf, status, was_connected, trp_peer_is_connected(peer));
+ if ((trp_peer_is_connected(peer) != was_connected) && (peer->conn_status_cb!=NULL))
+ peer->conn_status_cb(peer, peer->conn_status_cookie);
+}
+
+TRP_PEER_CONN_STATUS trp_peer_get_incoming_status(TRP_PEER *peer)
+{
+ return peer->incoming_status;
+}
+
+int trp_peer_is_connected(TRP_PEER *peer)
+{
+ return (peer->outgoing_status==PEER_CONNECTED) && (peer->incoming_status==PEER_CONNECTED);
+}
+
void trp_ptable_free(TRP_PTABLE *ptbl)
{
talloc_free(ptbl);
}
/* submit msg to trpc for transmission */
-void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
+void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
{
- tr_mq_append(trpc->mq, msg);
+ tr_mq_add(trpc->mq, msg);
}
TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc)
return tr_mq_pop(trpc->mq);
}
+void trpc_mq_clear(TRPC_INSTANCE *trpc)
+{
+ tr_mq_clear(trpc->mq);
+}
+
TRP_RC trpc_connect(TRPC_INSTANCE *trpc)
{
return trp_connection_initiate(trpc_get_conn(trpc), trpc_get_server(trpc), trpc_get_port(trpc));
return tr_mq_pop(trps->mq);
}
-void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg)
+void trps_mq_add(TRPS_INSTANCE *trps, TR_MQ_MSG *msg)
{
- tr_mq_append(trps->mq, msg);
+ tr_mq_add(trps->mq, msg);
}
unsigned int trps_get_connect_interval(TRPS_INSTANCE *trps)
/* get the connection for this peer */
trpc=trps_find_trpc(trps, peer);
- if ((trpc==NULL) || (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP)) {
- /* We could just let these sit on the queue in the hopes that a connection
- * is eventually established. However, we'd then have to ensure the queue
- * didn't keep growing, etc. */
- tr_warning("trps_send_msg: skipping message queued while TRPC connection not up.");
+ /* instead, let's let that happen and then clear the queue when an attempt to
+ * connect fails */
+ if (trpc==NULL) {
+ tr_warning("trps_send_msg: skipping message queued for missing TRP client entry.");
} else {
- mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND);
+ mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND, TR_MQ_PRIO_NORMAL);
msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */
- trpc_mq_append(trpc, mq_msg);
+ trpc_mq_add(trpc, mq_msg);
rc=TRP_SUCCESS;
}
talloc_free(tmp_ctx);
return conn;
}
-#if 0 /* remove this if I forget to do so */
-/* returns EACCES if authorization is denied */
-int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data)
-{
- TRPS_INSTANCE *trps = (TRPS_INSTANCE *)data;
- int result=0;
-
- if (0!=trps->auth_handler(clientName, displayName, trps->cookie)) {
- tr_debug("trps_auth_cb: client '%.*s' denied authorization.", displayName->length, displayName->value);
- result=EACCES; /* denied */
- }
-
- return result;
-}
-#endif
-
/* get the currently selected route if available */
TRP_ROUTE *trps_get_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer)
{
return listen;
}
+TRP_RC trps_authorize_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
+{
+ /* try to establish a GSS context */
+ if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
+ tr_notice("trps_authorize_connection: failed to authorize connection");
+ trp_connection_close(conn);
+ return TRP_ERROR;
+ }
+ tr_notice("trps_authorize_connection: authorized connection");
+ return TRP_SUCCESS;
+}
+
void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
{
TR_MSG *msg=NULL;
TRP_RC rc=TRP_ERROR;
- /* try to establish a GSS context */
- if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
- tr_notice("tr_trps_conn_thread: failed to authorize connection");
- trp_connection_close(conn);
- } else {
- tr_notice("trps_handle_connection: authorized connection");
-
- /* loop as long as the connection exists */
- while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
- rc=trps_read_message(trps, conn, &msg);
- switch(rc) {
- case TRP_SUCCESS:
- trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */
- break;
-
- case TRP_ERROR:
- trp_connection_close(conn);
- break;
-
- default:
- tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc);
- }
- }
+ /* loop as long as the connection exists */
+ while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
+ rc=trps_read_message(trps, conn, &msg);
+ switch(rc) {
+ case TRP_SUCCESS:
+ trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */
+ break;
+
+ case TRP_ERROR:
+ trp_connection_close(conn);
+ break;
- tr_debug("trps_handle_connection: connection closed.");
+ default:
+ tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc);
+ }
}
+
+ tr_debug("trps_handle_connection: connection closed.");
}
static TRP_RC trps_validate_update(TRPS_INSTANCE *trps, TRP_UPD *upd)
tr_debug("trps_wildcard_route_req: request queued successfully.");
rc=TRP_SUCCESS;
}
- tr_crit("got here");
+
cleanup:
if (encoded!=NULL)
tr_msg_free_encoded(encoded);