From 2bfbc6d0df8bcdc7b15a03f1bac56c96767f4550 Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Mon, 27 Jun 2016 16:55:32 -0400 Subject: [PATCH] Make outgoing connections. Connect to self as a test. --- common/tr_msg.c | 15 +++- include/tr_trp.h | 4 +- include/trp_internal.h | 49 +++++++++--- include/trust_router/trp.h | 1 + tr/tr_main.c | 32 ++++++++ tr/tr_trp.c | 193 ++++++++++++++++++++++++++++++++++++++++----- trp/trp_conn.c | 66 +++++++++++++--- trp/trpc.c | 180 ++++++++++++++++++++++++++++++++---------- trp/trps.c | 58 +++++++++----- 9 files changed, 495 insertions(+), 103 deletions(-) diff --git a/common/tr_msg.c b/common/tr_msg.c index 4faff68..f983015 100644 --- a/common/tr_msg.c +++ b/common/tr_msg.c @@ -543,6 +543,12 @@ static TRP_RC tr_msg_encode_inforec_route(json_t *jrec, TRP_INFOREC *rec ) if (rec==NULL) return TRP_BADTYPE; + if ((trp_inforec_get_comm(rec)==NULL) + || (trp_inforec_get_realm(rec)==NULL) + || (trp_inforec_get_trust_router(rec)==NULL)) { + return TRP_ERROR; + } + s=tr_name_strdup(trp_inforec_get_comm(rec)); if (s==NULL) return TRP_NOMEM; @@ -782,6 +788,7 @@ static json_t *tr_msg_encode_trp_req(TRP_REQ *req) json_t *jbody=NULL; json_t *jstr=NULL; char *s=NULL; + TR_NAME *n=NULL; if (req==NULL) return NULL; @@ -790,7 +797,13 @@ static json_t *tr_msg_encode_trp_req(TRP_REQ *req) if (jbody==NULL) return NULL; - s=tr_name_strdup(trp_req_get_comm(req)); /* ensures null termination */ + if ((NULL==trp_req_get_comm(req)) + || (NULL==trp_req_get_realm(req))) { + json_decref(jbody); + return NULL; + } + + s=tr_name_strdup(n); /* ensures null termination */ if (s==NULL) { json_decref(jbody); return NULL; diff --git a/include/tr_trp.h b/include/tr_trp.h index 32c1f4c..66222d3 100644 --- a/include/tr_trp.h +++ b/include/tr_trp.h @@ -16,7 +16,7 @@ typedef struct tr_trps_events { /* prototypes */ TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx); -int tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr, +TRP_RC tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr, TR_TRPS_EVENTS *trps_ev); - +TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port); #endif /* TR_TRP_H */ diff --git a/include/trp_internal.h b/include/trp_internal.h index 13e2d65..5e93dcb 100644 --- a/include/trp_internal.h +++ b/include/trp_internal.h @@ -45,7 +45,6 @@ struct trp_req { typedef struct trps_instance TRPS_INSTANCE; - typedef enum trp_connection_status { TRP_CONNECTION_DOWN=0, TRP_CONNECTION_UP, @@ -53,13 +52,13 @@ typedef enum trp_connection_status { typedef struct trp_connection TRP_CONNECTION; struct trp_connection { + pthread_mutex_t mutex; /* protects status attribute */ TRP_CONNECTION *next; pthread_t *thread; /* thread servicing this connection */ int fd; TR_NAME *gssname; gss_ctx_id_t *gssctx; TRP_CONNECTION_STATUS status; - pthread_mutex_t status_mutex; }; typedef TRP_RC (*TRPS_MSG_FUNC)(TRPS_INSTANCE *, TRP_CONNECTION *, TR_MSG *); @@ -68,10 +67,15 @@ typedef void (*TRP_RESP_FUNC)(); typedef client_cb_fn TRP_AUTH_FUNC; /* TRP Client Instance Data */ -typedef struct trpc_instance { +typedef struct trpc_instance TRPC_INSTANCE; +struct trpc_instance { + TRPC_INSTANCE *next; + const char *server; + unsigned int port; TRP_CONNECTION *conn; - DH *client_dh; /* Client's DH struct with priv and pub keys */ -} TRPC_INSTANCE; + TR_MQ *mq; /* msgs from master to trpc */ + DH *dh; /* Client's DH struct with priv and pub keys */ +}; /* TRP Server Instance Data */ struct trps_instance { @@ -80,7 +84,8 @@ struct trps_instance { TRP_AUTH_FUNC auth_handler; TRPS_MSG_FUNC msg_handler; void *cookie; - TRP_CONNECTION *conn; /* connections to peers */ + TRP_CONNECTION *conn; /* connections from peers */ + TRPC_INSTANCE *trpc; /* connections to peers */ TR_MQ *mq; }; @@ -88,6 +93,8 @@ struct trps_instance { TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx); void trp_connection_free(TRP_CONNECTION *conn); void trp_connection_close(TRP_CONNECTION *conn); +int trp_connection_lock(TRP_CONNECTION *conn); +int trp_connection_unlock(TRP_CONNECTION *conn); int trp_connection_get_fd(TRP_CONNECTION *conn); void trp_connection_set_fd(TRP_CONNECTION *conn, int fd); TR_NAME *trp_connection_get_gssname(TRP_CONNECTION *conn); @@ -102,19 +109,39 @@ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remo void trp_connection_append(TRP_CONNECTION *conn, TRP_CONNECTION *new); int trp_connection_auth(TRP_CONNECTION *conn, TRP_AUTH_FUNC auth_callback, void *callback_data); TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname); +TRP_RC trp_connection_initiate(TRP_CONNECTION *conn, const char *server, unsigned int port); TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx); void trpc_free (TRPC_INSTANCE *trpc); -int trpc_open_connection (TRPC_INSTANCE *trpc, char *server, unsigned int port, gss_ctx_id_t *gssctx); -int trpc_send_msg (TRPC_INSTANCE *trpc, int conn, gss_ctx_id_t gssctx, const char *msg_content, - int *resp_handler(), void *cookie); +TRP_CONNECTION *trpc_get_conn(TRPC_INSTANCE *trpc); +void trpc_set_conn(TRPC_INSTANCE *trpc, TRP_CONNECTION *conn); +TRPC_INSTANCE *trpc_get_next(TRPC_INSTANCE *trpc); +void trpc_set_next(TRPC_INSTANCE *trpc, TRPC_INSTANCE *next); +TRPC_INSTANCE *trpc_remove(TRPC_INSTANCE *trpc, TRPC_INSTANCE *remove); +void trpc_append(TRPC_INSTANCE *trpc, TRPC_INSTANCE *new); +const char *trpc_get_server(TRPC_INSTANCE *trpc); +void trpc_set_server(TRPC_INSTANCE *trpc, const char *server); +unsigned int trpc_get_port(TRPC_INSTANCE *trpc); +void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port); +DH *trpc_get_dh(TRPC_INSTANCE *trpc); +void trpc_set_dh(TRPC_INSTANCE *trpc, DH *dh); +TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc); +TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc); +void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq); +void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); +TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc); +void trpc_master_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg); +TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc); +TRP_RC trpc_connect(TRPC_INSTANCE *trpc); +TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content); TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx); void trps_free (TRPS_INSTANCE *trps); -int trps_send_msg (TRPS_INSTANCE *trps, int conn, gss_ctx_id_t gssctx, const char *msg_content); -int trps_accept(TRPS_INSTANCE *trps, int listen); +TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg); void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new); void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove); +void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc); +void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove); int trps_get_listener(TRPS_INSTANCE *trps, TRPS_MSG_FUNC msg_handler, TRP_AUTH_FUNC auth_handler, diff --git a/include/trust_router/trp.h b/include/trust_router/trp.h index fd4d5cd..5f54d2f 100644 --- a/include/trust_router/trp.h +++ b/include/trust_router/trp.h @@ -15,6 +15,7 @@ typedef enum trp_rc { TRP_NOMEM, /* allocation error */ TRP_BADTYPE, /* typing error */ TRP_UNSUPPORTED, /* unsupported feature */ + TRP_BADARG, /* bad argument */ } TRP_RC; typedef enum trp_inforec_type { diff --git a/tr/tr_main.c b/tr/tr_main.c index 7a31d5b..f218139 100644 --- a/tr/tr_main.c +++ b/tr/tr_main.c @@ -123,6 +123,32 @@ static void tr_talloc_log(const char *msg) } #endif /* TALLOC_DEBUG_ENABLE */ + +static void debug_ping(evutil_socket_t fd, short what, void *arg) +{ + TALLOC_CTX *tmp_ctx=talloc_new(NULL); + TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE); + TRP_REQ *req=NULL; + TR_MSG msg; + char *encoded=NULL; + + tr_debug("debug_ping entered, trps=%p, trps->trpc=%p", trps, trps->trpc); + if (trps->trpc==NULL) + tr_trpc_initiate(trps, trps->hostname, trps->port); + + /* create a TRP route request msg */ + req=trp_req_new(tmp_ctx); + tr_msg_set_trp_req(&msg, req); + encoded=tr_msg_encode(&msg); + if (encoded==NULL) + tr_err("debug_ping: error encoding TRP message."); + else { + tr_debug("debug_ping: sending message"); + trps_send_msg(trps, NULL, encoded); + tr_msg_free_encoded(encoded); + } +} + int main(int argc, char *argv[]) { TALLOC_CTX *main_ctx=NULL; @@ -133,6 +159,8 @@ int main(int argc, char *argv[]) struct tr_socket_event tids_ev; TR_TRPS_EVENTS *trps_ev; struct event *cfgwatch_ev; + struct event *debug_ping_ev; + struct timeval debug_ping_interval={1, 0}; /* we're going to be multithreaded, so disable null context tracking */ talloc_set_abort_fn(tr_abort); @@ -225,6 +253,10 @@ int main(int argc, char *argv[]) return 1; } + /* for debugging, send a message to peers on a timer */ + debug_ping_ev=evtimer_new(ev_base, debug_ping, (void *)(tr->trps)); + evtimer_add(debug_ping_ev, &debug_ping_interval); + tr_event_loop_run(ev_base); /* does not return until we are done */ /* TODO: ensure talloc is properly used so this actually works */ diff --git a/tr/tr_trp.c b/tr/tr_trp.c index 6875e4c..754063f 100644 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@ -21,7 +21,6 @@ struct tr_trps_event_cookie { TR_CFG_MGR *cfg_mgr; }; - /* callback to schedule event to process messages */ static void tr_trps_mq_cb(TR_MQ *mq, void *arg) { @@ -84,30 +83,30 @@ static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name, } /* data passed to thread */ -struct thread_data { +struct trps_thread_data { TRP_CONNECTION *conn; TRPS_INSTANCE *trps; }; /* thread to handle GSS connections to peers */ -static void *tr_trps_conn_thread(void *arg) +static void *tr_trps_thread(void *arg) { TALLOC_CTX *tmp_ctx=talloc_new(NULL); - struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data); + struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data); TRP_CONNECTION *conn=thread_data->conn; TRPS_INSTANCE *trps=thread_data->trps; TR_MQ_MSG *msg=NULL; - tr_debug("tr_trps_conn_thread: started"); + tr_debug("tr_trps_thread: started"); trps_handle_connection(trps, conn); - msg=tr_mq_msg_new(tmp_ctx, "thread_exit"); + msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit"); tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */ if (msg==NULL) - tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG"); + tr_err("tr_trps_thread: error allocating TR_MQ_MSG"); else trps_mq_append(trps, msg); - tr_debug("tr_trps_conn_thread: exit"); + tr_debug("tr_trps_thread: exit"); talloc_free(tmp_ctx); return NULL; } @@ -120,7 +119,7 @@ static void tr_trps_event_cb(int listener, short event, void *arg) TRP_CONNECTION *conn=NULL; TR_NAME *gssname=NULL; char *name=NULL; - struct thread_data *thread_data; + struct trps_thread_data *thread_data=NULL; if (0==(event & EV_READ)) { tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event); @@ -132,16 +131,16 @@ static void tr_trps_event_cb(int listener, short event, void *arg) conn=trp_connection_accept(tmp_ctx, listener, gssname); if (conn!=NULL) { /* need to monitor this fd and trigger events when read becomes possible */ - thread_data=talloc(conn, struct thread_data); + thread_data=talloc(conn, struct trps_thread_data); if (thread_data==NULL) { - tr_err("tr_trps_event_cb: unable to allocate thread_data"); + tr_err("tr_trps_event_cb: unable to allocate trps_thread_data"); talloc_free(tmp_ctx); return; } thread_data->conn=conn; thread_data->trps=trps; - pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data); - pthread_detach(*(conn->thread)); /* we will not rejoin the thread */ + pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data); + pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */ trps_add_connection(trps, conn); /* remember the connection */ } } @@ -167,7 +166,7 @@ static void tr_trps_process_mq(int socket, short event, void *arg) msg=trps_mq_pop(trps); while (msg!=NULL) { s=tr_mq_msg_get_message(msg); - if (0==strcmp(s, "thread_exit")) { + if (0==strcmp(s, "trps_thread_exit")) { tr_trps_cleanup_thread(trps, talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRP_CONNECTION)); @@ -209,9 +208,8 @@ TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx) } /* Configure the trps instance and set up its event handler. - * Returns 0 on success, nonzero on failure. Results in - * trps_ev, which should be allocated by caller. */ -int tr_trps_event_init(struct event_base *base, + * Fills in trps_ev, which should be allocated by caller. */ +TRP_RC tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr, TR_TRPS_EVENTS *trps_ev) @@ -219,11 +217,11 @@ int tr_trps_event_init(struct event_base *base, TALLOC_CTX *tmp_ctx=talloc_new(NULL); struct tr_socket_event *listen_ev=NULL; struct tr_trps_event_cookie *cookie; - int retval=0; + TRP_RC retval=TRP_ERROR; if (trps_ev == NULL) { tr_debug("tr_trps_event_init: Null trps_ev."); - retval=1; + retval=TRP_BADARG; goto cleanup; } @@ -235,7 +233,7 @@ int tr_trps_event_init(struct event_base *base, cookie=talloc(tmp_ctx, struct tr_trps_event_cookie); if (cookie == NULL) { tr_debug("tr_trps_event_init: Unable to allocate cookie."); - retval=1; + retval=TRP_NOMEM; goto cleanup; } cookie->trps=trps; @@ -251,7 +249,7 @@ int tr_trps_event_init(struct event_base *base, (void *)cookie); if (listen_ev->sock_fd < 0) { tr_crit("Error opening TRP server socket."); - retval=1; + retval=TRP_ERROR; goto cleanup; } @@ -272,8 +270,161 @@ int tr_trps_event_init(struct event_base *base, (void *)trps); tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev); + retval=TRP_SUCCESS; + cleanup: talloc_free(tmp_ctx); return retval; } + +struct trpc_notify_cb_data { + int msg_ready; + pthread_cond_t cond; + pthread_mutex_t mutex; +}; + +static void tr_trpc_mq_cb(TR_MQ *mq, void *arg) +{ + struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg; + pthread_mutex_lock(&(cb_data->mutex)); + if (!cb_data->msg_ready) { + cb_data->msg_ready=1; + pthread_cond_signal(&(cb_data->cond)); + } + pthread_mutex_unlock(&(cb_data->mutex)); +} + +/* data passed to thread */ +struct trpc_thread_data { + TRPC_INSTANCE *trpc; + TRPS_INSTANCE *trps; +}; +static void *tr_trpc_thread(void *arg) +{ + TALLOC_CTX *tmp_ctx=talloc_new(NULL); + struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data); + TRPC_INSTANCE *trpc=thread_data->trpc; + TRPS_INSTANCE *trps=thread_data->trps; + TRP_RC rc=TRP_ERROR; + TR_MQ_MSG *msg=NULL; + const char *msg_type=NULL; + char *encoded_msg=NULL; + + struct trpc_notify_cb_data cb_data={0, + PTHREAD_COND_INITIALIZER, + PTHREAD_MUTEX_INITIALIZER}; + + tr_debug("tr_trpc_thread: started"); + + /* set up the mq for receiving */ + pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */ + + tr_mq_lock(trpc->mq); + tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data); + tr_mq_unlock(trpc->mq); + + rc=trpc_connect(trpc); + if (rc!=TRP_SUCCESS) { + tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.", + trpc_get_server(trpc), + trpc_get_port(trpc)); + } else { + while (1) { + cb_data.msg_ready=0; + pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex)); + /* verify the condition */ + if (cb_data.msg_ready) { + msg=trpc_mq_pop(trpc); + if (msg==NULL) { + /* no message in the queue */ + tr_err("tr_trpc_thread: notified of msg, but queue empty"); + break; + } + + msg_type=tr_mq_msg_get_message(msg); + + if (0==strcmp(msg_type, "trpc_abort")) { + tr_mq_msg_free(msg); + break; /* exit loop */ + } + else if (0==strcmp(msg_type, "trpc_send")) { + encoded_msg=tr_mq_msg_get_payload(msg); + if (encoded_msg==NULL) + tr_notice("tr_trpc_thread: null outgoing TRP message."); + else { + rc = trpc_send_msg(trpc, encoded_msg); + if (rc!=TRP_SUCCESS) { + tr_notice("tr_trpc_thread: trpc_send_msg failed."); + tr_mq_msg_free(msg); + break; + } + } + } + else + tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); + + tr_mq_msg_free(msg); + } + } + } + + msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit"); + tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ + if (msg==NULL) + tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); + else + trps_mq_append(trps, msg); + + talloc_free(tmp_ctx); + return NULL; +} + +/* starts a trpc thread to connect to server:port */ +TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port) +{ + TALLOC_CTX *tmp_ctx=talloc_new(NULL); + TRPC_INSTANCE *trpc=NULL; + TRP_CONNECTION *conn=NULL; + struct trpc_thread_data *thread_data=NULL; + + tr_debug("tr_trpc_initiate entered"); + trpc=trpc_new(tmp_ctx); + if (trpc==NULL) { + tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE."); + goto cleanup; + } + tr_debug("tr_trpc_initiate: allocated trpc"); + + conn=trp_connection_new(trpc); + if (conn==NULL) { + tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION."); + goto cleanup; + } + trpc_set_conn(trpc, conn); + trpc_set_server(trpc, talloc_strdup(trpc, server)); + trpc_set_port(trpc, port); + tr_debug("tr_trpc_initiate: allocated connection"); + + /* start thread */ + thread_data=talloc(trpc, struct trpc_thread_data); + if (thread_data==NULL) { + tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data."); + goto cleanup; + } + thread_data->trpc=trpc; + thread_data->trps=trps; + + pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data); + pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */ + + tr_debug("tr_trpc_initiate: started trpc thread"); + trps_add_trpc(trps, trpc); + + talloc_report_full(trps, stderr); + talloc_report_full(tmp_ctx, stderr); + + cleanup: + talloc_free(tmp_ctx); + return trpc; +} diff --git a/trp/trp_conn.c b/trp/trp_conn.c index 1065116..1f99496 100644 --- a/trp/trp_conn.c +++ b/trp/trp_conn.c @@ -6,6 +6,20 @@ #include #include +/* Threading note: mutex lock is only used for protecting get_status() and set_status(). + * If needed, locking for other operations (notably adding/removing connections) must be managed + * by whomever is holding on to the connection list. */ + +int trp_connection_lock(TRP_CONNECTION *conn) +{ + return pthread_mutex_lock(&(conn->mutex)); +} + +int trp_connection_unlock(TRP_CONNECTION *conn) +{ + return pthread_mutex_unlock(&(conn->mutex)); +} + int trp_connection_get_fd(TRP_CONNECTION *conn) { return conn->fd; @@ -39,17 +53,17 @@ void trp_connection_set_gssctx(TRP_CONNECTION *conn, gss_ctx_id_t *gssctx) TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn) { TRP_CONNECTION_STATUS status; - pthread_mutex_lock(&(conn->status_mutex)); + trp_connection_lock(conn); status=conn->status; - pthread_mutex_unlock(&(conn->status_mutex)); + trp_connection_unlock(conn); return status; } static void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status) { - pthread_mutex_lock(&(conn->status_mutex)); + trp_connection_lock(conn); conn->status=status; - pthread_mutex_unlock(&(conn->status_mutex)); + trp_connection_unlock(conn); } pthread_t *trp_connection_get_thread(TRP_CONNECTION *conn) @@ -72,7 +86,7 @@ static void trp_connection_set_next(TRP_CONNECTION *conn, TRP_CONNECTION *next) conn->next=next; } -/* Ok to call more than once; guarantees connection no longer in the list. +/* Ok to call more than once; guarantees connection no longer in the list. Does not free removed element. * Returns handle to new list, you must replace your old handle on the list with this. */ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remove) { @@ -85,7 +99,6 @@ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remo /* first element is a special case */ if (cur==remove) { conn=trp_connection_get_next(cur); /* advance list head */ - trp_connection_free(cur); } else { /* it was not the first element */ last=cur; @@ -93,7 +106,6 @@ TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remo while (cur!=NULL) { if (cur==remove) { trp_connection_set_next(last, trp_connection_get_next(cur)); - trp_connection_free(cur); break; } last=cur; @@ -117,7 +129,7 @@ void trp_connection_append(TRP_CONNECTION *conn, TRP_CONNECTION *new) static void trp_connection_mutex_init(TRP_CONNECTION *conn) { - pthread_mutex_init(&(conn->status_mutex), NULL); + pthread_mutex_init(&(conn->mutex), NULL); } /* talloc destructor for a connection: ensures connection is closed, memory freed */ @@ -142,7 +154,7 @@ TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx) if (new_conn != NULL) { trp_connection_set_next(new_conn, NULL); trp_connection_set_fd(new_conn, -1); - trp_connection_set_gssctx(new_conn, NULL); + trp_connection_set_gssname(new_conn, NULL); trp_connection_mutex_init(new_conn); trp_connection_set_status(new_conn, TRP_CONNECTION_DOWN); thread=talloc(new_conn, pthread_t); @@ -228,3 +240,39 @@ TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME * return conn; } +/* Initiate connection */ +TRP_RC trp_connection_initiate(TRP_CONNECTION *conn, const char *server, unsigned int port) +{ + int err = 0; + int fd=-1; + unsigned int use_port=0; + + if (0 == port) + use_port = TRP_PORT; + else + use_port = port; + + if (conn==NULL) { + tr_err("trp_connection_initiate: null TRP_CONNECTION"); + return TRP_BADARG; + } + + tr_debug("trp_connection_initiate: opening GSS connection to %s:%d", + server, + use_port); + err = gsscon_connect(server, + use_port, + "trustrouter", + &fd, + trp_connection_get_gssctx(conn)); + tr_debug("trp_connection_initiate: connected"); + + if (err) { + talloc_free(conn); + return TRP_ERROR; + } else { + trp_connection_set_fd(conn, fd); + trp_connection_set_status(conn, TRP_CONNECTION_UP); + return TRP_SUCCESS; + } +} diff --git a/trp/trpc.c b/trp/trpc.c index ef5af85..57b7378 100644 --- a/trp/trpc.c +++ b/trp/trpc.c @@ -1,20 +1,31 @@ #include -#include #include #include #include #include #include -#include -#include #include -#include +#include +/* also allocates the incoming mq */ TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx) { - return talloc_zero(mem_ctx, TRPC_INSTANCE); + TRPC_INSTANCE *trpc=talloc(mem_ctx, TRPC_INSTANCE); + if (trpc!=NULL) { + trpc->next=NULL; + trpc->server=NULL; + trpc->port=0; + trpc->conn=NULL; + trpc->dh=NULL; + trpc->mq=tr_mq_new(trpc); + if (trpc->mq==NULL) { + talloc_free(trpc); + trpc=NULL; + } + } + return trpc; } void trpc_free (TRPC_INSTANCE *trpc) @@ -23,55 +34,142 @@ void trpc_free (TRPC_INSTANCE *trpc) talloc_free(trpc); } -/* Connect to a TRP server */ -int trpc_open_connection (TRPC_INSTANCE *trpc, - char *server, - unsigned int port, - gss_ctx_id_t *gssctx) +TRPC_INSTANCE *trpc_get_next(TRPC_INSTANCE *trpc) +{ + return trpc->next; +} + +void trpc_set_next(TRPC_INSTANCE *trpc, TRPC_INSTANCE *next) +{ + trpc->next=next; +} + +/* Ok to call more than once; guarantees trpc no longer in the list. Does not free removed element. + * Returns handle to new list, you must replace your old handle on the list with this. */ +TRPC_INSTANCE *trpc_remove(TRPC_INSTANCE *trpc, TRPC_INSTANCE *remove) +{ + TRPC_INSTANCE *cur=trpc; + TRPC_INSTANCE *last=NULL; + + if (cur==NULL) + return NULL; + + /* first element is a special case */ + if (cur==remove) { + trpc=trpc_get_next(cur); /* advance list head */ + } else { + /* it was not the first element */ + last=cur; + cur=trpc_get_next(cur); + while (cur!=NULL) { + if (cur==remove) { + trpc_set_next(last, trpc_get_next(cur)); + break; + } + last=cur; + cur=trpc_get_next(cur); + } + } + return trpc; +} + +static TRPC_INSTANCE *trpc_get_tail(TRPC_INSTANCE *trpc) +{ + while((trpc!=NULL)&&(trpc_get_next(trpc)!=NULL)) + trpc=trpc_get_next(trpc); + return trpc; +} + +void trpc_append(TRPC_INSTANCE *trpc, TRPC_INSTANCE *new) +{ + trpc_set_next(trpc_get_tail(trpc), new); +} + +const char *trpc_get_server(TRPC_INSTANCE *trpc) +{ + return trpc->server; +} + +void trpc_set_server(TRPC_INSTANCE *trpc, const char *server) +{ + trpc->server=server; +} + +unsigned int trpc_get_port(TRPC_INSTANCE *trpc) +{ + return trpc->port; +} + +void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port) { - int err = 0; - int conn = -1; - unsigned int use_port = 0; + trpc->port=port; +} + +TRP_CONNECTION *trpc_get_conn(TRPC_INSTANCE *trpc) +{ + return trpc->conn; +} + +void trpc_set_conn(TRPC_INSTANCE *trpc, TRP_CONNECTION *conn) +{ + trpc->conn=conn; +} + +DH *trpc_get_dh(TRPC_INSTANCE *trpc) +{ + return trpc->dh; +} + +void trpc_set_dh(TRPC_INSTANCE *trpc, DH *dh) +{ + trpc->dh=dh; +} + +TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc) +{ + return trp_connection_get_status(trpc_get_conn(trpc)); +} - if (0 == port) - use_port = TRP_PORT; - else - use_port = port; +TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc) +{ + return trpc->mq; +} + +void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq) +{ + trpc->mq=mq; +} - tr_debug("trpc_open_connection: opening GSS connection to %s:%d", server, use_port); - err = gsscon_connect(server, use_port, "trustrouter", &conn, gssctx); +/* submit msg to trpc for transmission */ +void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg) +{ + tr_mq_append(trpc->mq, msg); +} - if (!err) - return conn; - else - return -1; +TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc) +{ + return tr_mq_pop(trpc->mq); } +TRP_RC trpc_connect(TRPC_INSTANCE *trpc) +{ + return trp_connection_initiate(trpc_get_conn(trpc), trpc_get_server(trpc), trpc_get_port(trpc)); +} /* simple function, based on tidc_send_req */ -int trpc_send_msg (TRPC_INSTANCE *trpc, - int conn, - gss_ctx_id_t gssctx, - const char *msg_content, - int *resp_handler(), - void *cookie) +TRP_RC trpc_send_msg (TRPC_INSTANCE *trpc, + const char *msg_content) { int err=0; - int rc=0; + TRP_RC rc=TRP_SUCCESS; /* Send the request over the connection */ - if (err = gsscon_write_encrypted_token (conn, - gssctx, - msg_content, - strlen(msg_content))) { + if (err = gsscon_write_encrypted_token(trp_connection_get_fd(trpc_get_conn(trpc)), + *trp_connection_get_gssctx(trpc_get_conn(trpc)), + msg_content, + strlen(msg_content))) { tr_err( "trpc_send_msg: Error sending message over connection.\n"); - goto error; + rc=TRP_ERROR; } - - goto cleanup; - - error: - rc = -1; - cleanup: return rc; } diff --git a/trp/trps.c b/trp/trps.c index ea31f86..b4b7b4f 100644 --- a/trp/trps.c +++ b/trp/trps.c @@ -1,12 +1,10 @@ #include -#include #include #include #include #include #include -#include #include #include @@ -19,6 +17,7 @@ TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx) trps->port=0; trps->cookie=NULL; trps->conn=NULL; + trps->trpc=NULL; trps->mq=tr_mq_new(trps); if (trps->mq==NULL) { /* failed to allocate mq */ @@ -63,29 +62,52 @@ void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new) talloc_steal(trps, new); } -/* ok to call more than once; guarantees connection no longer in the list */ +/* ok to call more than once; guarantees connection no longer in the list. + * Caller is responsible for freeing the removed element afterwards. */ void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove) { trps->conn=trp_connection_remove(trps->conn, remove); } -int trps_send_msg (TRPS_INSTANCE *trps, - int conn, - gss_ctx_id_t gssctx, - const char *msg_content) +void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc) { - int err=0; - int rc=0; - - /* Send the request over the connection */ - if (err = gsscon_write_encrypted_token (conn, - gssctx, - msg_content, - strlen(msg_content))) { - tr_err( "trps_send_msg: Error sending message over connection.\n"); - rc = -1; - } + if (trps->trpc==NULL) + trps->trpc=trpc; + else + trpc_append(trps->trpc, trpc); + + talloc_steal(trps, trpc); +} + +/* ok to call more than once; guarantees trpc no longer in the list. + * Caller is responsible for freeing the removed element afterwards. */ +void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove) +{ + trps->trpc=trpc_remove(trps->trpc, remove); +} +TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg) +{ + TALLOC_CTX *tmp_ctx=talloc_new(NULL); + TR_MQ_MSG *mq_msg=NULL; + char *msg_dup=NULL; + TRP_RC rc=TRP_ERROR; + + /* Currently ignore peer and just send to an open connection. + * In reality, need to identify the correct peer and send via that + * one. */ + if (trps->trpc != NULL) { + if (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP) + tr_debug("trps_send_msg: skipping message sent while TRPC connection not up."); + else { + mq_msg=tr_mq_msg_new(tmp_ctx, "trpc_send"); + msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */ + tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */ + trpc_mq_append(trps->trpc, mq_msg); + rc=TRP_SUCCESS; + } + } + talloc_free(tmp_ctx); return rc; } -- 2.1.4