if (rec==NULL)
return TRP_BADTYPE;
+ if ((trp_inforec_get_comm(rec)==NULL)
+ || (trp_inforec_get_realm(rec)==NULL)
+ || (trp_inforec_get_trust_router(rec)==NULL)) {
+ return TRP_ERROR;
+ }
+
s=tr_name_strdup(trp_inforec_get_comm(rec));
if (s==NULL)
return TRP_NOMEM;
json_t *jbody=NULL;
json_t *jstr=NULL;
char *s=NULL;
+ TR_NAME *n=NULL;
if (req==NULL)
return NULL;
if (jbody==NULL)
return NULL;
- s=tr_name_strdup(trp_req_get_comm(req)); /* ensures null termination */
+ if ((NULL==trp_req_get_comm(req))
+ || (NULL==trp_req_get_realm(req))) {
+ json_decref(jbody);
+ return NULL;
+ }
+
+ s=tr_name_strdup(n); /* ensures null termination */
if (s==NULL) {
json_decref(jbody);
return NULL;
/* prototypes */
TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx);
-int tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr,
+TRP_RC tr_trps_event_init(struct event_base *base, TRPS_INSTANCE *trps, TR_CFG_MGR *cfg_mgr,
TR_TRPS_EVENTS *trps_ev);
-
+TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port);
#endif /* TR_TRP_H */
typedef struct trps_instance TRPS_INSTANCE;
-
typedef enum trp_connection_status {
TRP_CONNECTION_DOWN=0,
TRP_CONNECTION_UP,
typedef struct trp_connection TRP_CONNECTION;
struct trp_connection {
+ pthread_mutex_t mutex; /* protects status attribute */
TRP_CONNECTION *next;
pthread_t *thread; /* thread servicing this connection */
int fd;
TR_NAME *gssname;
gss_ctx_id_t *gssctx;
TRP_CONNECTION_STATUS status;
- pthread_mutex_t status_mutex;
};
typedef TRP_RC (*TRPS_MSG_FUNC)(TRPS_INSTANCE *, TRP_CONNECTION *, TR_MSG *);
typedef client_cb_fn TRP_AUTH_FUNC;
/* TRP Client Instance Data */
-typedef struct trpc_instance {
+typedef struct trpc_instance TRPC_INSTANCE;
+struct trpc_instance {
+ TRPC_INSTANCE *next;
+ const char *server;
+ unsigned int port;
TRP_CONNECTION *conn;
- DH *client_dh; /* Client's DH struct with priv and pub keys */
-} TRPC_INSTANCE;
+ TR_MQ *mq; /* msgs from master to trpc */
+ DH *dh; /* Client's DH struct with priv and pub keys */
+};
/* TRP Server Instance Data */
struct trps_instance {
TRP_AUTH_FUNC auth_handler;
TRPS_MSG_FUNC msg_handler;
void *cookie;
- TRP_CONNECTION *conn; /* connections to peers */
+ TRP_CONNECTION *conn; /* connections from peers */
+ TRPC_INSTANCE *trpc; /* connections to peers */
TR_MQ *mq;
};
TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx);
void trp_connection_free(TRP_CONNECTION *conn);
void trp_connection_close(TRP_CONNECTION *conn);
+int trp_connection_lock(TRP_CONNECTION *conn);
+int trp_connection_unlock(TRP_CONNECTION *conn);
int trp_connection_get_fd(TRP_CONNECTION *conn);
void trp_connection_set_fd(TRP_CONNECTION *conn, int fd);
TR_NAME *trp_connection_get_gssname(TRP_CONNECTION *conn);
void trp_connection_append(TRP_CONNECTION *conn, TRP_CONNECTION *new);
int trp_connection_auth(TRP_CONNECTION *conn, TRP_AUTH_FUNC auth_callback, void *callback_data);
TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname);
+TRP_RC trp_connection_initiate(TRP_CONNECTION *conn, const char *server, unsigned int port);
TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx);
void trpc_free (TRPC_INSTANCE *trpc);
-int trpc_open_connection (TRPC_INSTANCE *trpc, char *server, unsigned int port, gss_ctx_id_t *gssctx);
-int trpc_send_msg (TRPC_INSTANCE *trpc, int conn, gss_ctx_id_t gssctx, const char *msg_content,
- int *resp_handler(), void *cookie);
+TRP_CONNECTION *trpc_get_conn(TRPC_INSTANCE *trpc);
+void trpc_set_conn(TRPC_INSTANCE *trpc, TRP_CONNECTION *conn);
+TRPC_INSTANCE *trpc_get_next(TRPC_INSTANCE *trpc);
+void trpc_set_next(TRPC_INSTANCE *trpc, TRPC_INSTANCE *next);
+TRPC_INSTANCE *trpc_remove(TRPC_INSTANCE *trpc, TRPC_INSTANCE *remove);
+void trpc_append(TRPC_INSTANCE *trpc, TRPC_INSTANCE *new);
+const char *trpc_get_server(TRPC_INSTANCE *trpc);
+void trpc_set_server(TRPC_INSTANCE *trpc, const char *server);
+unsigned int trpc_get_port(TRPC_INSTANCE *trpc);
+void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port);
+DH *trpc_get_dh(TRPC_INSTANCE *trpc);
+void trpc_set_dh(TRPC_INSTANCE *trpc, DH *dh);
+TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc);
+TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc);
+void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq);
+void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc);
+void trpc_master_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg);
+TR_MQ_MSG *trpc_master_mq_pop(TRPC_INSTANCE *trpc);
+TRP_RC trpc_connect(TRPC_INSTANCE *trpc);
+TRP_RC trpc_send_msg(TRPC_INSTANCE *trpc, const char *msg_content);
TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx);
void trps_free (TRPS_INSTANCE *trps);
-int trps_send_msg (TRPS_INSTANCE *trps, int conn, gss_ctx_id_t gssctx, const char *msg_content);
-int trps_accept(TRPS_INSTANCE *trps, int listen);
+TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg);
void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new);
void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove);
+void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc);
+void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove);
int trps_get_listener(TRPS_INSTANCE *trps,
TRPS_MSG_FUNC msg_handler,
TRP_AUTH_FUNC auth_handler,
TRP_NOMEM, /* allocation error */
TRP_BADTYPE, /* typing error */
TRP_UNSUPPORTED, /* unsupported feature */
+ TRP_BADARG, /* bad argument */
} TRP_RC;
typedef enum trp_inforec_type {
}
#endif /* TALLOC_DEBUG_ENABLE */
+
+static void debug_ping(evutil_socket_t fd, short what, void *arg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
+ TRP_REQ *req=NULL;
+ TR_MSG msg;
+ char *encoded=NULL;
+
+ tr_debug("debug_ping entered, trps=%p, trps->trpc=%p", trps, trps->trpc);
+ if (trps->trpc==NULL)
+ tr_trpc_initiate(trps, trps->hostname, trps->port);
+
+ /* create a TRP route request msg */
+ req=trp_req_new(tmp_ctx);
+ tr_msg_set_trp_req(&msg, req);
+ encoded=tr_msg_encode(&msg);
+ if (encoded==NULL)
+ tr_err("debug_ping: error encoding TRP message.");
+ else {
+ tr_debug("debug_ping: sending message");
+ trps_send_msg(trps, NULL, encoded);
+ tr_msg_free_encoded(encoded);
+ }
+}
+
int main(int argc, char *argv[])
{
TALLOC_CTX *main_ctx=NULL;
struct tr_socket_event tids_ev;
TR_TRPS_EVENTS *trps_ev;
struct event *cfgwatch_ev;
+ struct event *debug_ping_ev;
+ struct timeval debug_ping_interval={1, 0};
/* we're going to be multithreaded, so disable null context tracking */
talloc_set_abort_fn(tr_abort);
return 1;
}
+ /* for debugging, send a message to peers on a timer */
+ debug_ping_ev=evtimer_new(ev_base, debug_ping, (void *)(tr->trps));
+ evtimer_add(debug_ping_ev, &debug_ping_interval);
+
tr_event_loop_run(ev_base); /* does not return until we are done */
/* TODO: ensure talloc is properly used so this actually works */
TR_CFG_MGR *cfg_mgr;
};
-
/* callback to schedule event to process messages */
static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
{
}
/* data passed to thread */
-struct thread_data {
+struct trps_thread_data {
TRP_CONNECTION *conn;
TRPS_INSTANCE *trps;
};
/* thread to handle GSS connections to peers */
-static void *tr_trps_conn_thread(void *arg)
+static void *tr_trps_thread(void *arg)
{
TALLOC_CTX *tmp_ctx=talloc_new(NULL);
- struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data);
+ struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
TRP_CONNECTION *conn=thread_data->conn;
TRPS_INSTANCE *trps=thread_data->trps;
TR_MQ_MSG *msg=NULL;
- tr_debug("tr_trps_conn_thread: started");
+ tr_debug("tr_trps_thread: started");
trps_handle_connection(trps, conn);
- msg=tr_mq_msg_new(tmp_ctx, "thread_exit");
+ msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
if (msg==NULL)
- tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
+ tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
else
trps_mq_append(trps, msg);
- tr_debug("tr_trps_conn_thread: exit");
+ tr_debug("tr_trps_thread: exit");
talloc_free(tmp_ctx);
return NULL;
}
TRP_CONNECTION *conn=NULL;
TR_NAME *gssname=NULL;
char *name=NULL;
- struct thread_data *thread_data;
+ struct trps_thread_data *thread_data=NULL;
if (0==(event & EV_READ)) {
tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
conn=trp_connection_accept(tmp_ctx, listener, gssname);
if (conn!=NULL) {
/* need to monitor this fd and trigger events when read becomes possible */
- thread_data=talloc(conn, struct thread_data);
+ thread_data=talloc(conn, struct trps_thread_data);
if (thread_data==NULL) {
- tr_err("tr_trps_event_cb: unable to allocate thread_data");
+ tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
talloc_free(tmp_ctx);
return;
}
thread_data->conn=conn;
thread_data->trps=trps;
- pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data);
- pthread_detach(*(conn->thread)); /* we will not rejoin the thread */
+ pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
+ pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
trps_add_connection(trps, conn); /* remember the connection */
}
}
msg=trps_mq_pop(trps);
while (msg!=NULL) {
s=tr_mq_msg_get_message(msg);
- if (0==strcmp(s, "thread_exit")) {
+ if (0==strcmp(s, "trps_thread_exit")) {
tr_trps_cleanup_thread(trps,
talloc_get_type_abort(tr_mq_msg_get_payload(msg),
TRP_CONNECTION));
}
/* Configure the trps instance and set up its event handler.
- * Returns 0 on success, nonzero on failure. Results in
- * trps_ev, which should be allocated by caller. */
-int tr_trps_event_init(struct event_base *base,
+ * Fills in trps_ev, which should be allocated by caller. */
+TRP_RC tr_trps_event_init(struct event_base *base,
TRPS_INSTANCE *trps,
TR_CFG_MGR *cfg_mgr,
TR_TRPS_EVENTS *trps_ev)
TALLOC_CTX *tmp_ctx=talloc_new(NULL);
struct tr_socket_event *listen_ev=NULL;
struct tr_trps_event_cookie *cookie;
- int retval=0;
+ TRP_RC retval=TRP_ERROR;
if (trps_ev == NULL) {
tr_debug("tr_trps_event_init: Null trps_ev.");
- retval=1;
+ retval=TRP_BADARG;
goto cleanup;
}
cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
if (cookie == NULL) {
tr_debug("tr_trps_event_init: Unable to allocate cookie.");
- retval=1;
+ retval=TRP_NOMEM;
goto cleanup;
}
cookie->trps=trps;
(void *)cookie);
if (listen_ev->sock_fd < 0) {
tr_crit("Error opening TRP server socket.");
- retval=1;
+ retval=TRP_ERROR;
goto cleanup;
}
(void *)trps);
tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
+ retval=TRP_SUCCESS;
+
cleanup:
talloc_free(tmp_ctx);
return retval;
}
+
+struct trpc_notify_cb_data {
+ int msg_ready;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+};
+
+static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
+{
+ struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
+ pthread_mutex_lock(&(cb_data->mutex));
+ if (!cb_data->msg_ready) {
+ cb_data->msg_ready=1;
+ pthread_cond_signal(&(cb_data->cond));
+ }
+ pthread_mutex_unlock(&(cb_data->mutex));
+}
+
+/* data passed to thread */
+struct trpc_thread_data {
+ TRPC_INSTANCE *trpc;
+ TRPS_INSTANCE *trps;
+};
+static void *tr_trpc_thread(void *arg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
+ TRPC_INSTANCE *trpc=thread_data->trpc;
+ TRPS_INSTANCE *trps=thread_data->trps;
+ TRP_RC rc=TRP_ERROR;
+ TR_MQ_MSG *msg=NULL;
+ const char *msg_type=NULL;
+ char *encoded_msg=NULL;
+
+ struct trpc_notify_cb_data cb_data={0,
+ PTHREAD_COND_INITIALIZER,
+ PTHREAD_MUTEX_INITIALIZER};
+
+ tr_debug("tr_trpc_thread: started");
+
+ /* set up the mq for receiving */
+ pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
+
+ tr_mq_lock(trpc->mq);
+ tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
+ tr_mq_unlock(trpc->mq);
+
+ rc=trpc_connect(trpc);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
+ trpc_get_server(trpc),
+ trpc_get_port(trpc));
+ } else {
+ while (1) {
+ cb_data.msg_ready=0;
+ pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
+ /* verify the condition */
+ if (cb_data.msg_ready) {
+ msg=trpc_mq_pop(trpc);
+ if (msg==NULL) {
+ /* no message in the queue */
+ tr_err("tr_trpc_thread: notified of msg, but queue empty");
+ break;
+ }
+
+ msg_type=tr_mq_msg_get_message(msg);
+
+ if (0==strcmp(msg_type, "trpc_abort")) {
+ tr_mq_msg_free(msg);
+ break; /* exit loop */
+ }
+ else if (0==strcmp(msg_type, "trpc_send")) {
+ encoded_msg=tr_mq_msg_get_payload(msg);
+ if (encoded_msg==NULL)
+ tr_notice("tr_trpc_thread: null outgoing TRP message.");
+ else {
+ rc = trpc_send_msg(trpc, encoded_msg);
+ if (rc!=TRP_SUCCESS) {
+ tr_notice("tr_trpc_thread: trpc_send_msg failed.");
+ tr_mq_msg_free(msg);
+ break;
+ }
+ }
+ }
+ else
+ tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
+
+ tr_mq_msg_free(msg);
+ }
+ }
+ }
+
+ msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
+ tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
+ if (msg==NULL)
+ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
+ else
+ trps_mq_append(trps, msg);
+
+ talloc_free(tmp_ctx);
+ return NULL;
+}
+
+/* starts a trpc thread to connect to server:port */
+TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TRPC_INSTANCE *trpc=NULL;
+ TRP_CONNECTION *conn=NULL;
+ struct trpc_thread_data *thread_data=NULL;
+
+ tr_debug("tr_trpc_initiate entered");
+ trpc=trpc_new(tmp_ctx);
+ if (trpc==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
+ goto cleanup;
+ }
+ tr_debug("tr_trpc_initiate: allocated trpc");
+
+ conn=trp_connection_new(trpc);
+ if (conn==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
+ goto cleanup;
+ }
+ trpc_set_conn(trpc, conn);
+ trpc_set_server(trpc, talloc_strdup(trpc, server));
+ trpc_set_port(trpc, port);
+ tr_debug("tr_trpc_initiate: allocated connection");
+
+ /* start thread */
+ thread_data=talloc(trpc, struct trpc_thread_data);
+ if (thread_data==NULL) {
+ tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
+ goto cleanup;
+ }
+ thread_data->trpc=trpc;
+ thread_data->trps=trps;
+
+ pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
+ pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
+
+ tr_debug("tr_trpc_initiate: started trpc thread");
+ trps_add_trpc(trps, trpc);
+
+ talloc_report_full(trps, stderr);
+ talloc_report_full(tmp_ctx, stderr);
+
+ cleanup:
+ talloc_free(tmp_ctx);
+ return trpc;
+}
#include <tr_debug.h>
#include <trp_internal.h>
+/* Threading note: mutex lock is only used for protecting get_status() and set_status().
+ * If needed, locking for other operations (notably adding/removing connections) must be managed
+ * by whomever is holding on to the connection list. */
+
+int trp_connection_lock(TRP_CONNECTION *conn)
+{
+ return pthread_mutex_lock(&(conn->mutex));
+}
+
+int trp_connection_unlock(TRP_CONNECTION *conn)
+{
+ return pthread_mutex_unlock(&(conn->mutex));
+}
+
int trp_connection_get_fd(TRP_CONNECTION *conn)
{
return conn->fd;
TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn)
{
TRP_CONNECTION_STATUS status;
- pthread_mutex_lock(&(conn->status_mutex));
+ trp_connection_lock(conn);
status=conn->status;
- pthread_mutex_unlock(&(conn->status_mutex));
+ trp_connection_unlock(conn);
return status;
}
static void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status)
{
- pthread_mutex_lock(&(conn->status_mutex));
+ trp_connection_lock(conn);
conn->status=status;
- pthread_mutex_unlock(&(conn->status_mutex));
+ trp_connection_unlock(conn);
}
pthread_t *trp_connection_get_thread(TRP_CONNECTION *conn)
conn->next=next;
}
-/* Ok to call more than once; guarantees connection no longer in the list.
+/* Ok to call more than once; guarantees connection no longer in the list. Does not free removed element.
* Returns handle to new list, you must replace your old handle on the list with this. */
TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remove)
{
/* first element is a special case */
if (cur==remove) {
conn=trp_connection_get_next(cur); /* advance list head */
- trp_connection_free(cur);
} else {
/* it was not the first element */
last=cur;
while (cur!=NULL) {
if (cur==remove) {
trp_connection_set_next(last, trp_connection_get_next(cur));
- trp_connection_free(cur);
break;
}
last=cur;
static void trp_connection_mutex_init(TRP_CONNECTION *conn)
{
- pthread_mutex_init(&(conn->status_mutex), NULL);
+ pthread_mutex_init(&(conn->mutex), NULL);
}
/* talloc destructor for a connection: ensures connection is closed, memory freed */
if (new_conn != NULL) {
trp_connection_set_next(new_conn, NULL);
trp_connection_set_fd(new_conn, -1);
- trp_connection_set_gssctx(new_conn, NULL);
+ trp_connection_set_gssname(new_conn, NULL);
trp_connection_mutex_init(new_conn);
trp_connection_set_status(new_conn, TRP_CONNECTION_DOWN);
thread=talloc(new_conn, pthread_t);
return conn;
}
+/* Initiate connection */
+TRP_RC trp_connection_initiate(TRP_CONNECTION *conn, const char *server, unsigned int port)
+{
+ int err = 0;
+ int fd=-1;
+ unsigned int use_port=0;
+
+ if (0 == port)
+ use_port = TRP_PORT;
+ else
+ use_port = port;
+
+ if (conn==NULL) {
+ tr_err("trp_connection_initiate: null TRP_CONNECTION");
+ return TRP_BADARG;
+ }
+
+ tr_debug("trp_connection_initiate: opening GSS connection to %s:%d",
+ server,
+ use_port);
+ err = gsscon_connect(server,
+ use_port,
+ "trustrouter",
+ &fd,
+ trp_connection_get_gssctx(conn));
+ tr_debug("trp_connection_initiate: connected");
+
+ if (err) {
+ talloc_free(conn);
+ return TRP_ERROR;
+ } else {
+ trp_connection_set_fd(conn, fd);
+ trp_connection_set_status(conn, TRP_CONNECTION_UP);
+ return TRP_SUCCESS;
+ }
+}
#include <fcntl.h>
-#include <event2/event.h>
#include <talloc.h>
#include <errno.h>
#include <unistd.h>
#include <gsscon.h>
#include <tr_rp.h>
-#include <tr_config.h>
-#include <tr_event.h>
#include <tr_debug.h>
-#include <tr_trp.h>
+#include <trp_internal.h>
+/* also allocates the incoming mq */
TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx)
{
- return talloc_zero(mem_ctx, TRPC_INSTANCE);
+ TRPC_INSTANCE *trpc=talloc(mem_ctx, TRPC_INSTANCE);
+ if (trpc!=NULL) {
+ trpc->next=NULL;
+ trpc->server=NULL;
+ trpc->port=0;
+ trpc->conn=NULL;
+ trpc->dh=NULL;
+ trpc->mq=tr_mq_new(trpc);
+ if (trpc->mq==NULL) {
+ talloc_free(trpc);
+ trpc=NULL;
+ }
+ }
+ return trpc;
}
void trpc_free (TRPC_INSTANCE *trpc)
talloc_free(trpc);
}
-/* Connect to a TRP server */
-int trpc_open_connection (TRPC_INSTANCE *trpc,
- char *server,
- unsigned int port,
- gss_ctx_id_t *gssctx)
+TRPC_INSTANCE *trpc_get_next(TRPC_INSTANCE *trpc)
+{
+ return trpc->next;
+}
+
+void trpc_set_next(TRPC_INSTANCE *trpc, TRPC_INSTANCE *next)
+{
+ trpc->next=next;
+}
+
+/* Ok to call more than once; guarantees trpc no longer in the list. Does not free removed element.
+ * Returns handle to new list, you must replace your old handle on the list with this. */
+TRPC_INSTANCE *trpc_remove(TRPC_INSTANCE *trpc, TRPC_INSTANCE *remove)
+{
+ TRPC_INSTANCE *cur=trpc;
+ TRPC_INSTANCE *last=NULL;
+
+ if (cur==NULL)
+ return NULL;
+
+ /* first element is a special case */
+ if (cur==remove) {
+ trpc=trpc_get_next(cur); /* advance list head */
+ } else {
+ /* it was not the first element */
+ last=cur;
+ cur=trpc_get_next(cur);
+ while (cur!=NULL) {
+ if (cur==remove) {
+ trpc_set_next(last, trpc_get_next(cur));
+ break;
+ }
+ last=cur;
+ cur=trpc_get_next(cur);
+ }
+ }
+ return trpc;
+}
+
+static TRPC_INSTANCE *trpc_get_tail(TRPC_INSTANCE *trpc)
+{
+ while((trpc!=NULL)&&(trpc_get_next(trpc)!=NULL))
+ trpc=trpc_get_next(trpc);
+ return trpc;
+}
+
+void trpc_append(TRPC_INSTANCE *trpc, TRPC_INSTANCE *new)
+{
+ trpc_set_next(trpc_get_tail(trpc), new);
+}
+
+const char *trpc_get_server(TRPC_INSTANCE *trpc)
+{
+ return trpc->server;
+}
+
+void trpc_set_server(TRPC_INSTANCE *trpc, const char *server)
+{
+ trpc->server=server;
+}
+
+unsigned int trpc_get_port(TRPC_INSTANCE *trpc)
+{
+ return trpc->port;
+}
+
+void trpc_set_port(TRPC_INSTANCE *trpc, unsigned int port)
{
- int err = 0;
- int conn = -1;
- unsigned int use_port = 0;
+ trpc->port=port;
+}
+
+TRP_CONNECTION *trpc_get_conn(TRPC_INSTANCE *trpc)
+{
+ return trpc->conn;
+}
+
+void trpc_set_conn(TRPC_INSTANCE *trpc, TRP_CONNECTION *conn)
+{
+ trpc->conn=conn;
+}
+
+DH *trpc_get_dh(TRPC_INSTANCE *trpc)
+{
+ return trpc->dh;
+}
+
+void trpc_set_dh(TRPC_INSTANCE *trpc, DH *dh)
+{
+ trpc->dh=dh;
+}
+
+TRP_CONNECTION_STATUS trpc_get_status(TRPC_INSTANCE *trpc)
+{
+ return trp_connection_get_status(trpc_get_conn(trpc));
+}
- if (0 == port)
- use_port = TRP_PORT;
- else
- use_port = port;
+TR_MQ *trpc_get_mq(TRPC_INSTANCE *trpc)
+{
+ return trpc->mq;
+}
+
+void trpc_set_mq(TRPC_INSTANCE *trpc, TR_MQ *mq)
+{
+ trpc->mq=mq;
+}
- tr_debug("trpc_open_connection: opening GSS connection to %s:%d", server, use_port);
- err = gsscon_connect(server, use_port, "trustrouter", &conn, gssctx);
+/* submit msg to trpc for transmission */
+void trpc_mq_append(TRPC_INSTANCE *trpc, TR_MQ_MSG *msg)
+{
+ tr_mq_append(trpc->mq, msg);
+}
- if (!err)
- return conn;
- else
- return -1;
+TR_MQ_MSG *trpc_mq_pop(TRPC_INSTANCE *trpc)
+{
+ return tr_mq_pop(trpc->mq);
}
+TRP_RC trpc_connect(TRPC_INSTANCE *trpc)
+{
+ return trp_connection_initiate(trpc_get_conn(trpc), trpc_get_server(trpc), trpc_get_port(trpc));
+}
/* simple function, based on tidc_send_req */
-int trpc_send_msg (TRPC_INSTANCE *trpc,
- int conn,
- gss_ctx_id_t gssctx,
- const char *msg_content,
- int *resp_handler(),
- void *cookie)
+TRP_RC trpc_send_msg (TRPC_INSTANCE *trpc,
+ const char *msg_content)
{
int err=0;
- int rc=0;
+ TRP_RC rc=TRP_SUCCESS;
/* Send the request over the connection */
- if (err = gsscon_write_encrypted_token (conn,
- gssctx,
- msg_content,
- strlen(msg_content))) {
+ if (err = gsscon_write_encrypted_token(trp_connection_get_fd(trpc_get_conn(trpc)),
+ *trp_connection_get_gssctx(trpc_get_conn(trpc)),
+ msg_content,
+ strlen(msg_content))) {
tr_err( "trpc_send_msg: Error sending message over connection.\n");
- goto error;
+ rc=TRP_ERROR;
}
-
- goto cleanup;
-
- error:
- rc = -1;
- cleanup:
return rc;
}
#include <fcntl.h>
-#include <event2/event.h>
#include <talloc.h>
#include <errno.h>
#include <unistd.h>
#include <gsscon.h>
#include <tr_rp.h>
-#include <tr_event.h>
#include <tr_debug.h>
#include <trp_internal.h>
trps->port=0;
trps->cookie=NULL;
trps->conn=NULL;
+ trps->trpc=NULL;
trps->mq=tr_mq_new(trps);
if (trps->mq==NULL) {
/* failed to allocate mq */
talloc_steal(trps, new);
}
-/* ok to call more than once; guarantees connection no longer in the list */
+/* ok to call more than once; guarantees connection no longer in the list.
+ * Caller is responsible for freeing the removed element afterwards. */
void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove)
{
trps->conn=trp_connection_remove(trps->conn, remove);
}
-int trps_send_msg (TRPS_INSTANCE *trps,
- int conn,
- gss_ctx_id_t gssctx,
- const char *msg_content)
+void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
{
- int err=0;
- int rc=0;
-
- /* Send the request over the connection */
- if (err = gsscon_write_encrypted_token (conn,
- gssctx,
- msg_content,
- strlen(msg_content))) {
- tr_err( "trps_send_msg: Error sending message over connection.\n");
- rc = -1;
- }
+ if (trps->trpc==NULL)
+ trps->trpc=trpc;
+ else
+ trpc_append(trps->trpc, trpc);
+
+ talloc_steal(trps, trpc);
+}
+
+/* ok to call more than once; guarantees trpc no longer in the list.
+ * Caller is responsible for freeing the removed element afterwards. */
+void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove)
+{
+ trps->trpc=trpc_remove(trps->trpc, remove);
+}
+TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TR_MQ_MSG *mq_msg=NULL;
+ char *msg_dup=NULL;
+ TRP_RC rc=TRP_ERROR;
+
+ /* Currently ignore peer and just send to an open connection.
+ * In reality, need to identify the correct peer and send via that
+ * one. */
+ if (trps->trpc != NULL) {
+ if (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP)
+ tr_debug("trps_send_msg: skipping message sent while TRPC connection not up.");
+ else {
+ mq_msg=tr_mq_msg_new(tmp_ctx, "trpc_send");
+ msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
+ tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */
+ trpc_mq_append(trps->trpc, mq_msg);
+ rc=TRP_SUCCESS;
+ }
+ }
+ talloc_free(tmp_ctx);
return rc;
}