From 1468160dce8601a39cd8545f188db9da408e5909 Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Wed, 3 Aug 2016 22:17:15 -0400 Subject: [PATCH] Wildcard route requests now working. --- common/tr_mq.c | 99 +++++++++++++++++++++++++++--- include/tr_mq.h | 17 +++++- include/tr_trp.h | 1 + include/trp_internal.h | 8 ++- include/trp_ptable.h | 15 +++++ tr/tr_main.c | 1 + tr/tr_trp.c | 162 ++++++++++++++++++++++++++----------------------- trp/trp_ptable.c | 45 ++++++++++++++ trp/trpc.c | 9 ++- trp/trps.c | 87 +++++++++++--------------- 10 files changed, 303 insertions(+), 141 deletions(-) diff --git a/common/tr_mq.c b/common/tr_mq.c index 5240c1a..592e51c 100644 --- a/common/tr_mq.c +++ b/common/tr_mq.c @@ -13,11 +13,12 @@ static int tr_mq_msg_destructor(void *object) return 0; } -TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message) +TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message, TR_MQ_PRIORITY prio) { TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG); if (msg!=NULL) { msg->next=NULL; + msg->prio=prio; msg->message=talloc_strdup(msg, message); if (msg->message==NULL) { talloc_free(msg); @@ -35,6 +36,11 @@ void tr_mq_msg_free(TR_MQ_MSG *msg) talloc_free(msg); } +TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg) +{ + return msg->prio; +} + const char *tr_mq_msg_get_message(TR_MQ_MSG *msg) { return msg->message; @@ -71,6 +77,7 @@ TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx) pthread_mutex_init(&(mq->mutex), 0); mq->head=NULL; mq->tail=NULL; + mq->last_hi_prio=NULL; } return mq; } @@ -119,17 +126,32 @@ void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg) mq->notify_cb_arg=arg; } - -/* puts msg in mq's talloc context */ -void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg) +void tr_mq_clear(TR_MQ *mq) { - int was_empty=0; - TR_MQ_NOTIFY_FN notify_cb=NULL; - void *notify_cb_arg=NULL; + TR_MQ_MSG *m=NULL; + TR_MQ_MSG *n=NULL; tr_mq_lock(mq); + m=tr_mq_get_head(mq); + while (m!=NULL) { + n=tr_mq_msg_get_next(m); + tr_mq_msg_free(m); + m=n; + } + tr_mq_set_head(mq, NULL); + tr_mq_set_tail(mq, NULL); + tr_mq_unlock(mq); +} + +static int tr_mq_empty(TR_MQ *mq) +{ + return tr_mq_get_head(mq)==NULL; +} + +/* puts msg in mq's talloc context */ +static void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg) +{ if (tr_mq_get_head(mq)==NULL) { - was_empty=1; tr_mq_set_head(mq, msg); tr_mq_set_tail(mq, msg); } else { @@ -137,9 +159,65 @@ void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg) tr_mq_set_tail(mq, msg); /* update tail of list */ } talloc_steal(mq, msg); +} + +static void tr_mq_append_high_prio(TR_MQ *mq, TR_MQ_MSG *new) +{ + if (tr_mq_get_head(mq)==NULL) { + tr_mq_set_head(mq, new); + tr_mq_set_tail(mq, new); + } else if (mq->last_hi_prio==NULL) { + tr_mq_msg_set_next(new, tr_mq_get_head(mq)); /* add to front of list */ + tr_mq_set_head(mq, new); /* update head of list */ + } else { + tr_mq_msg_set_next(new, tr_mq_msg_get_next(mq->last_hi_prio)); + tr_mq_msg_set_next(mq->last_hi_prio, new); /* add to end of hi prio msgs */ + } + mq->last_hi_prio=new; /* in any case, this is now the last high priority msg */ + talloc_steal(mq,new); +} + +#define DEBUG_TR_MQ 0 +#if DEBUG_TR_MQ +static void tr_mq_print(TR_MQ *mq) +{ + TR_MQ_MSG *m=mq->head; + int ii=0; + + tr_debug("tr_mq_print: mq contents:"); + while(m!=NULL) { + ii++; + tr_debug("tr_mq_print: Entry %02d: %-15s (prio %d)", + ii, tr_mq_msg_get_message(m), tr_mq_msg_get_prio(m)); + m=tr_mq_msg_get_next(m); + } +} +#endif +void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg) +{ + int was_empty=0; + TR_MQ_NOTIFY_FN notify_cb=NULL; + void *notify_cb_arg=NULL; + + tr_mq_lock(mq); + + was_empty=tr_mq_empty(mq); + switch (tr_mq_msg_get_prio(msg)) { + case TR_MQ_PRIO_HIGH: + tr_mq_append_high_prio(mq, msg); + break; + default: + tr_mq_append(mq, msg); + break; + } /* before releasing the mutex, get notify_cb data out of mq */ notify_cb=mq->notify_cb; notify_cb_arg=mq->notify_cb_arg; + +#if DEBUG_TR_MQ + tr_mq_print(mq); +#endif + tr_mq_unlock(mq); /* see if we need to tell someone we became non-empty */ @@ -156,6 +234,10 @@ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq) if (tr_mq_get_head(mq)!=NULL) { popped=tr_mq_get_head(mq); tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */ + + if (popped==mq->last_hi_prio) + mq->last_hi_prio=NULL; + if (tr_mq_get_head(mq)==NULL) tr_mq_set_tail(mq, NULL); /* just popped the last element */ } @@ -164,3 +246,4 @@ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq) tr_mq_msg_set_next(popped, NULL); /* disconnect from list */ return popped; } + diff --git a/include/tr_mq.h b/include/tr_mq.h index 29c8d5f..0dd7f38 100644 --- a/include/tr_mq.h +++ b/include/tr_mq.h @@ -4,10 +4,20 @@ #include #include +/* Note on mq priorities: High priority messages are guaranteed to be + * processed before any normal priority messages. Otherwise, messages + * will be processed in the order they are added to the queue. */ + +typedef enum tr_mq_priority { + TR_MQ_PRIO_NORMAL=0, + TR_MQ_PRIO_HIGH +} TR_MQ_PRIORITY; + /* msg for inter-thread messaging */ typedef struct tr_mq_msg TR_MQ_MSG; struct tr_mq_msg { TR_MQ_MSG *next; + TR_MQ_PRIORITY prio; char *message; void *p; /* payload */ void (*p_free)(void *); /* function to free payload */ @@ -21,6 +31,7 @@ struct tr_mq { pthread_mutex_t mutex; TR_MQ_MSG *head; TR_MQ_MSG *tail; + TR_MQ_MSG *last_hi_prio; TR_MQ_NOTIFY_FN notify_cb; /* callback when queue becomes non-empty */ void *notify_cb_arg; }; @@ -28,8 +39,9 @@ struct tr_mq { /* message string for sending trpc messages */ #define TR_MQMSG_TRPC_SEND "trpc send msg" -TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg); +TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg, TR_MQ_PRIORITY prio); void tr_mq_msg_free(TR_MQ_MSG *msg); +TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg); const char *tr_mq_msg_get_message(TR_MQ_MSG *msg); void *tr_mq_msg_get_payload(TR_MQ_MSG *msg); void tr_mq_msg_set_payload(TR_MQ_MSG *msg, void *p, void (*p_free)(void *)); @@ -40,7 +52,8 @@ void tr_mq_free(TR_MQ *mq); int tr_mq_lock(TR_MQ *mq); int tr_mq_unlock(TR_MQ *mq); void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg); -void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg); +void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg); TR_MQ_MSG *tr_mq_pop(TR_MQ *mq); +void tr_mq_clear(TR_MQ *mq); #endif /*_TR_MQ_H_ */ diff --git a/include/tr_trp.h b/include/tr_trp.h index 542e590..d597cb0 100644 --- a/include/tr_trp.h +++ b/include/tr_trp.h @@ -43,4 +43,5 @@ TRP_RC tr_add_local_routes(TRPS_INSTANCE *trps, TR_CFG *cfg); TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev); void tr_config_changed(TR_CFG *new_cfg, void *cookie); TRP_RC tr_connect_to_peers(TRPS_INSTANCE *trps, struct event *ev); +void tr_peer_status_change(TRP_PEER *peer, void *cookie); #endif /* TR_TRP_H */ diff --git a/include/trp_internal.h b/include/trp_internal.h index 051c00a..f9978cc 100644 --- a/include/trp_internal.h +++ b/include/trp_internal.h @@ -149,9 +149,10 @@ void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port); TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc); TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc); void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq); -void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); +void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc); -void trpc_master_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); +void trpc_mq_clear(TRPC_INSTANCE *trpc); +void trpc_master_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc); TRP_RC trpc_connect(TRPC_INSTANCE *trpc); TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content); @@ -179,7 +180,8 @@ int trps_get_listener(TRPS_INSTANCE *trps, unsigned int port, void *cookie); TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps); -void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg); +void trps_mq_add(TRPS_INSTANCE *trps, TR_MQ_MSG *msg); +TRP_RC trps_authorize_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn); void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn); TRP_RC trps_update_active_routes(TRPS_INSTANCE *trps); TRP_RC trps_handle_tr_msg(TRPS_INSTANCE *trps, TR_MSG *tr_msg); diff --git a/include/trp_ptable.h b/include/trp_ptable.h index 0b6a9d8..6881ee8 100644 --- a/include/trp_ptable.h +++ b/include/trp_ptable.h @@ -7,6 +7,11 @@ #include #include +typedef enum trp_peer_conn_status { + PEER_DISCONNECTED=0, + PEER_CONNECTED +} TRP_PEER_CONN_STATUS; + typedef struct trp_peer TRP_PEER; struct trp_peer { TRP_PEER *next; /* for making a linked list */ @@ -16,6 +21,10 @@ struct trp_peer { unsigned int port; unsigned int linkcost; struct timespec last_conn_attempt; + TRP_PEER_CONN_STATUS outgoing_status; + TRP_PEER_CONN_STATUS incoming_status; + void (*conn_status_cb)(TRP_PEER *, void *); /* callback for connected status change */ + void *conn_status_cookie; }; typedef struct trp_ptable { @@ -52,7 +61,13 @@ void trp_peer_set_port(TRP_PEER *peer, unsigned int port); unsigned int trp_peer_get_linkcost(TRP_PEER *peer); struct timespec *trp_peer_get_last_conn_attempt(TRP_PEER *peer); void trp_peer_set_last_conn_attempt(TRP_PEER *peer, struct timespec *time); +TRP_PEER_CONN_STATUS trp_peer_get_outgoing_status(TRP_PEER *peer); +void trp_peer_set_outgoing_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status); +TRP_PEER_CONN_STATUS trp_peer_get_incoming_status(TRP_PEER *peer); +void trp_peer_set_incoming_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status); +int trp_peer_is_connected(TRP_PEER *peer); void trp_peer_set_linkcost(TRP_PEER *peer, unsigned int linkcost); +void trp_peer_set_conn_status_cb(TRP_PEER *peer, void (*cb)(TRP_PEER *, void *), void *cookie); char *trp_peer_to_str(TALLOC_CTX *memctx, TRP_PEER *peer, const char *sep); #endif /* _TRP_PTABLE_H_ */ diff --git a/tr/tr_main.c b/tr/tr_main.c index 5989f0b..9a84a5c 100644 --- a/tr/tr_main.c +++ b/tr/tr_main.c @@ -293,6 +293,7 @@ int main(int argc, char *argv[]) } trp_peer_set_server(hc_peer, "epsilon.vmnet"); trp_peer_set_gssname(hc_peer, tr_new_name("tr-epsilon-vmnet@apc.painless-security.com")); + trp_peer_set_conn_status_cb(hc_peer, tr_peer_status_change, (void *)(tr->trps)); switch (tr->trps->port) { case 10000: trp_peer_set_port(hc_peer, 10001); diff --git a/tr/tr_trp.c b/tr/tr_trp.c index 6a09d63..46e4a6e 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,12 @@ static void msg_free_helper(void *p) { tr_msg_free_decoded((TR_MSG *)p); } + +static void tr_free_name_helper(void *arg) +{ + tr_free_name((TR_NAME *)arg); +} + /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */ static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps, TRP_CONNECTION *conn, @@ -50,12 +57,12 @@ static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps, /* n.b., conn is available here, but do not hold onto the reference * because it may be cleaned up if the originating connection goes * down before the message is processed */ - mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED); + mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED, TR_MQ_PRIO_NORMAL); if (mq_msg==NULL) { return TRP_NOMEM; } tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper); - trps_mq_append(trps, mq_msg); + trps_mq_add(trps, mq_msg); talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */ return TRP_SUCCESS; } @@ -101,15 +108,27 @@ static void *tr_trps_thread(void *arg) TR_MQ_MSG *msg=NULL; tr_debug("tr_trps_thread: started"); + if (trps_authorize_connection(trps, conn)!=TRP_SUCCESS) + goto cleanup; + + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED, TR_MQ_PRIO_HIGH); + tr_mq_msg_set_payload(msg, (void *)tr_dup_name(trp_connection_get_peer(conn)), tr_free_name_helper); + if (msg==NULL) { + tr_err("tr_trps_thread: error allocating TR_MQ_MSG"); + goto cleanup; + } + trps_mq_add(trps, msg); /* steals msg context */ + msg=NULL; + trps_handle_connection(trps, conn); - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED); +cleanup: + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED, TR_MQ_PRIO_HIGH); tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */ if (msg==NULL) tr_err("tr_trps_thread: error allocating TR_MQ_MSG"); else - trps_mq_append(trps, msg); - + trps_mq_add(trps, msg); tr_debug("tr_trps_thread: exit"); talloc_free(tmp_ctx); return NULL; @@ -191,16 +210,51 @@ static void tr_trps_process_mq(int socket, short event, void *arg) msg=trps_mq_pop(trps); while (msg!=NULL) { s=tr_mq_msg_get_message(msg); - if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) { - tr_trps_cleanup_conn(trps, - talloc_get_type_abort(tr_mq_msg_get_payload(msg), - TRP_CONNECTION)); + if (0==strcmp(s, TR_MQMSG_TRPS_CONNECTED)) { + TR_NAME *gssname=(TR_NAME *)tr_mq_msg_get_payload(msg); + TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname); + if (peer==NULL) + tr_err("tr_trps_process_mq: incoming connection to unknown peer (%s) reported.", gssname->buf); + else { + trp_peer_set_incoming_status(peer, PEER_CONNECTED); + tr_err("tr_trps_process_mq: incoming connection to %s established.", gssname->buf); + } + } + else if (0==strcmp(s, TR_MQMSG_TRPS_DISCONNECTED)) { + TRP_CONNECTION *conn=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION); + TR_NAME *gssname=trp_connection_get_gssname(conn); + TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname); + if (peer==NULL) { + tr_err("tr_trps_process_mq: disconnection of unknown peer (%s) reported.", + trp_connection_get_gssname(conn)->buf); + } else { + trp_peer_set_incoming_status(peer, PEER_DISCONNECTED); + tr_trps_cleanup_conn(trps, conn); + tr_err("tr_trps_process_mq: incoming connection to %s lost.", gssname->buf); + } + } + else if (0==strcmp(s, TR_MQMSG_TRPC_CONNECTED)) { + TR_NAME *svcname=(TR_NAME *)tr_mq_msg_get_payload(msg); + TRP_PEER *peer=trps_get_peer_by_servicename(trps, svcname); + if (peer==NULL) + tr_err("tr_trps_process_mq: connection to unknown peer (%s) reported.", svcname->buf); + else { + trp_peer_set_outgoing_status(peer, PEER_CONNECTED); + tr_err("tr_trps_process_mq: outgoing connection to %s established.", svcname->buf); + } } else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) { /* trpc connection died */ - tr_trps_cleanup_trpc(trps, - talloc_get_type_abort(tr_mq_msg_get_payload(msg), - TRPC_INSTANCE)); + TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE); + TR_NAME *gssname=trpc_get_gssname(trpc); + TRP_PEER *peer=trps_get_peer_by_gssname(trps, gssname); + if (peer==NULL) + tr_err("tr_trps_process_mq: disconnection of unknown peer (%s) reported.", gssname->buf); + else { + trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED); + tr_err("tr_trps_process_mq: outgoing connection to %s lost.", gssname->buf); + tr_trps_cleanup_trpc(trps, trpc); + } } else if (0==strcmp(s, TR_MQMSG_MSG_RECEIVED)) { @@ -473,7 +527,7 @@ static void *tr_trpc_thread(void *arg) trpc_get_server(trpc), trpc_get_port(trpc)); } else { - peer_gssname=tr_dup_name(trp_connection_get_peer(trpc_get_conn(trpc))); + peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc)); if (peer_gssname==NULL) { tr_err("tr_trpc_thread: could not duplicate peer_gssname."); talloc_free(tmp_ctx); @@ -481,6 +535,18 @@ static void *tr_trpc_thread(void *arg) } tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf); + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH); + tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper); + if (msg==NULL) { + tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); + talloc_free(tmp_ctx); + return NULL; + } + tr_debug("queuing********************************************************************************"); + trps_mq_add(trps, msg); /* steals msg context */ + tr_debug("queued********************************************************************************"); + msg=NULL; + while(1) { cb_data.msg_ready=0; pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex)); @@ -521,12 +587,14 @@ static void *tr_trpc_thread(void *arg) } tr_debug("tr_trpc_thread: exiting."); - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED); + msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_HIGH); tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ if (msg==NULL) tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); else - trps_mq_append(trps, msg); + trps_mq_add(trps, msg); + + trpc_mq_clear(trpc); /* clear any queued messages */ talloc_free(tmp_ctx); return NULL; @@ -577,55 +645,12 @@ static TRP_ROUTE **tr_make_local_routes(TALLOC_CTX *mem_ctx, return entries; } -struct tr_wildcard_cookie { - TRPS_INSTANCE *trps; - TR_NAME *peer_servicename; -}; - -/* can only be called once, talloc_frees its argument cookie on termination */ -static void tr_send_wildcard(int listener, short event, void *arg) +void tr_peer_status_change(TRP_PEER *peer, void *cookie) { - struct tr_wildcard_cookie *cook=talloc_get_type_abort(arg, struct tr_wildcard_cookie); + TRPS_INSTANCE *trps=talloc_get_type_abort(cookie, TRPS_INSTANCE); - /* queue a route request for all route to every peer */ - if (TRP_SUCCESS!=trps_wildcard_route_req(cook->trps, cook->peer_servicename)) + if (TRP_SUCCESS!=trps_wildcard_route_req(trps, trp_peer_get_servicename(peer))) tr_err("tr_send_wildcard: error sending wildcard route request."); - - tr_free_name(cook->peer_servicename); - talloc_free(cook); -} - -struct tr_trpc_status_change_cookie { - struct event_base *evbase; - TRPS_INSTANCE *trps; - TRPC_INSTANCE *trpc; - TRP_PEER *peer; -}; -static void tr_trpc_status_change(TRP_CONNECTION *conn, void *cookie) -{ - struct tr_trpc_status_change_cookie *cook=talloc_get_type_abort(cookie, struct tr_trpc_status_change_cookie); - struct event_base *evbase=cook->evbase; - TRPS_INSTANCE *trps=cook->trps; - TRP_PEER *peer=cook->peer; - TR_NAME *gssname=trp_peer_get_gssname(peer); - struct timeval zero_time={0,0}; - struct tr_wildcard_cookie *wc_cookie=NULL; - - if (trp_connection_get_status(conn)==TRP_CONNECTION_UP) { - tr_debug("tr_trpc_status_change: connection to %.*s now up.", gssname->len, gssname->buf); - /* add a one-off event to send a wildcard request from the main thread */ - wc_cookie=talloc(trps, struct tr_wildcard_cookie); - if (wc_cookie==NULL) { - tr_err("tr_trpc_status_change: error allocating wildcard cookie."); - return; - } - wc_cookie->trps=trps; - wc_cookie->peer_servicename=trp_peer_dup_servicename(peer); - if (0!=event_base_once(evbase, -1, EV_TIMEOUT, tr_send_wildcard, wc_cookie, &zero_time)) { - tr_err("tr_trpc_status_change: error queueing wildcard route request event."); - } - } else - tr_debug("tr_trpc_status_change: connection to %.*s now down.", gssname->len, gssname->buf); } /* starts a trpc thread to connect to server:port */ @@ -635,7 +660,6 @@ TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev) TRPC_INSTANCE *trpc=NULL; TRP_CONNECTION *conn=NULL; struct trpc_thread_data *thread_data=NULL; - struct tr_trpc_status_change_cookie *status_change_cookie=NULL; TRP_RC rc=TRP_ERROR; tr_debug("tr_trpc_initiate entered"); @@ -654,20 +678,6 @@ TRP_RC tr_trpc_initiate(TRPS_INSTANCE *trps, TRP_PEER *peer, struct event *ev) goto cleanup; } - status_change_cookie=talloc(conn, struct tr_trpc_status_change_cookie); - if (status_change_cookie==NULL) { - tr_crit("tr_trpc_initiate: could not allocate connection status cookie."); - rc=TRP_NOMEM; - goto cleanup; - } - status_change_cookie->evbase=event_get_base(ev); - status_change_cookie->trps=trps; - status_change_cookie->trpc=trpc; - status_change_cookie->peer=peer; - conn->status_change_cookie=status_change_cookie; - status_change_cookie=NULL; - conn->status_change_cb=tr_trpc_status_change; - trpc_set_conn(trpc, conn); trpc_set_server(trpc, talloc_strdup(trpc, trp_peer_get_server(peer))); trpc_set_port(trpc, trp_peer_get_port(peer)); diff --git a/trp/trp_ptable.c b/trp/trp_ptable.c index 9814f66..14a2509 100644 --- a/trp/trp_ptable.c +++ b/trp/trp_ptable.c @@ -26,6 +26,10 @@ TRP_PEER *trp_peer_new(TALLOC_CTX *memctx) peer->port=0; peer->linkcost=TRP_LINKCOST_DEFAULT; peer->last_conn_attempt=(struct timespec){0,0}; + peer->outgoing_status=PEER_DISCONNECTED; + peer->incoming_status=PEER_DISCONNECTED; + peer->conn_status_cb=NULL; + peer->conn_status_cookie=NULL; talloc_set_destructor((void *)peer, trp_peer_destructor); } return peer; @@ -131,6 +135,12 @@ void trp_peer_set_linkcost(TRP_PEER *peer, unsigned int linkcost) peer->linkcost=linkcost; } +void trp_peer_set_conn_status_cb(TRP_PEER *peer, void (*cb)(TRP_PEER *, void *), void *cookie) +{ + peer->conn_status_cb=cb; + peer->conn_status_cookie=cookie; +} + struct timespec *trp_peer_get_last_conn_attempt(TRP_PEER *peer) { return &(peer->last_conn_attempt); @@ -150,6 +160,41 @@ TRP_PTABLE *trp_ptable_new(TALLOC_CTX *memctx) return ptbl; } +void trp_peer_set_outgoing_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status) +{ + int was_connected=trp_peer_is_connected(peer); + peer->outgoing_status=status; + tr_debug("trp_peer_set_outgoing_status: %s: status=%d peer connected was %d now %d.", + trp_peer_get_gssname(peer)->buf, status, was_connected, trp_peer_is_connected(peer)); + if ((trp_peer_is_connected(peer) != was_connected) && (peer->conn_status_cb!=NULL)) + peer->conn_status_cb(peer, peer->conn_status_cookie); +} + +TRP_PEER_CONN_STATUS trp_peer_get_outgoing_status(TRP_PEER *peer) +{ + return peer->outgoing_status; +} + +void trp_peer_set_incoming_status(TRP_PEER *peer, TRP_PEER_CONN_STATUS status) +{ + int was_connected=trp_peer_is_connected(peer); + peer->incoming_status=status; + tr_debug("trp_peer_set_incoming_status: %s: status=%d peer connected was %d now %d.", + trp_peer_get_gssname(peer)->buf, status, was_connected, trp_peer_is_connected(peer)); + if ((trp_peer_is_connected(peer) != was_connected) && (peer->conn_status_cb!=NULL)) + peer->conn_status_cb(peer, peer->conn_status_cookie); +} + +TRP_PEER_CONN_STATUS trp_peer_get_incoming_status(TRP_PEER *peer) +{ + return peer->incoming_status; +} + +int trp_peer_is_connected(TRP_PEER *peer) +{ + return (peer->outgoing_status==PEER_CONNECTED) && (peer->incoming_status==PEER_CONNECTED); +} + void trp_ptable_free(TRP_PTABLE *ptbl) { talloc_free(ptbl); diff --git a/trp/trpc.c b/trp/trpc.c index 58e8061..4e02fea 100644 --- a/trp/trpc.c +++ b/trp/trpc.c @@ -150,9 +150,9 @@ void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq) } /* submit msg to trpc for transmission */ -void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg) +void trpc_mq_add(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg) { - tr_mq_append(trpc->mq, msg); + tr_mq_add(trpc->mq, msg); } TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc) @@ -160,6 +160,11 @@ TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc) return tr_mq_pop(trpc->mq); } +void trpc_mq_clear(TRPC_INSTANCE *trpc) +{ + tr_mq_clear(trpc->mq); +} + TRP_RC trpc_connect(TRPC_INSTANCE *trpc) { return trp_connection_initiate(trpc_get_conn(trpc), trpc_get_server(trpc), trpc_get_port(trpc)); diff --git a/trp/trps.c b/trp/trps.c index cb0d1ed..0b1f25d 100644 --- a/trp/trps.c +++ b/trp/trps.c @@ -90,9 +90,9 @@ TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps) return tr_mq_pop(trps->mq); } -void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg) +void trps_mq_add(TRPS_INSTANCE *trps, TR_MQ_MSG *msg) { - tr_mq_append(trps->mq, msg); + tr_mq_add(trps->mq, msg); } unsigned int trps_get_connect_interval(TRPS_INSTANCE *trps) @@ -187,16 +187,15 @@ TRP_RC trps_send_msg(TRPS_INSTANCE *trps, TRP_PEER *peer, const char *msg) /* get the connection for this peer */ trpc=trps_find_trpc(trps, peer); - if ((trpc==NULL) || (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP)) { - /* We could just let these sit on the queue in the hopes that a connection - * is eventually established. However, we'd then have to ensure the queue - * didn't keep growing, etc. */ - tr_warning("trps_send_msg: skipping message queued while TRPC connection not up."); + /* instead, let's let that happen and then clear the queue when an attempt to + * connect fails */ + if (trpc==NULL) { + tr_warning("trps_send_msg: skipping message queued for missing TRP client entry."); } else { - mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND); + mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_SEND, TR_MQ_PRIO_NORMAL); msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */ tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */ - trpc_mq_append(trpc, mq_msg); + trpc_mq_add(trpc, mq_msg); rc=TRP_SUCCESS; } talloc_free(tmp_ctx); @@ -235,22 +234,6 @@ static int trps_listen (TRPS_INSTANCE *trps, int port) return conn; } -#if 0 /* remove this if I forget to do so */ -/* returns EACCES if authorization is denied */ -int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data) -{ - TRPS_INSTANCE *trps = (TRPS_INSTANCE *)data; - int result=0; - - if (0!=trps->auth_handler(clientName, displayName, trps->cookie)) { - tr_debug("trps_auth_cb: client '%.*s' denied authorization.", displayName->length, displayName->value); - result=EACCES; /* denied */ - } - - return result; -} -#endif - /* get the currently selected route if available */ TRP_ROUTE *trps_get_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer) { @@ -375,37 +358,41 @@ int trps_get_listener(TRPS_INSTANCE *trps, return listen; } +TRP_RC trps_authorize_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn) +{ + /* try to establish a GSS context */ + if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) { + tr_notice("trps_authorize_connection: failed to authorize connection"); + trp_connection_close(conn); + return TRP_ERROR; + } + tr_notice("trps_authorize_connection: authorized connection"); + return TRP_SUCCESS; +} + void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn) { TR_MSG *msg=NULL; TRP_RC rc=TRP_ERROR; - /* try to establish a GSS context */ - if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) { - tr_notice("tr_trps_conn_thread: failed to authorize connection"); - trp_connection_close(conn); - } else { - tr_notice("trps_handle_connection: authorized connection"); - - /* loop as long as the connection exists */ - while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) { - rc=trps_read_message(trps, conn, &msg); - switch(rc) { - case TRP_SUCCESS: - trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */ - break; - - case TRP_ERROR: - trp_connection_close(conn); - break; - - default: - tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc); - } - } + /* loop as long as the connection exists */ + while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) { + rc=trps_read_message(trps, conn, &msg); + switch(rc) { + case TRP_SUCCESS: + trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */ + break; + + case TRP_ERROR: + trp_connection_close(conn); + break; - tr_debug("trps_handle_connection: connection closed."); + default: + tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc); + } } + + tr_debug("trps_handle_connection: connection closed."); } static TRP_RC trps_validate_update(TRPS_INSTANCE *trps, TRP_UPD *upd) @@ -1167,7 +1154,7 @@ TRP_RC trps_wildcard_route_req(TRPS_INSTANCE *trps, TR_NAME *peer_servicename) tr_debug("trps_wildcard_route_req: request queued successfully."); rc=TRP_SUCCESS; } - tr_crit("got here"); + cleanup: if (encoded!=NULL) tr_msg_free_encoded(encoded); -- 2.1.4