return 0;
}
-TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx)
+TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message)
{
TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG);
if (msg!=NULL) {
msg->next=NULL;
+ msg->message=talloc_strdup(msg, message);
+ if (msg->message==NULL) {
+ talloc_free(msg);
+ return NULL;
+ }
msg->p=NULL;
talloc_set_destructor((void *)msg, tr_mq_msg_destructor);
}
talloc_free(msg);
}
+const char *tr_mq_msg_get_message(TR_MQ_MSG *msg)
+{
+ return msg->message;
+}
+
void *tr_mq_msg_get_payload(TR_MQ_MSG *msg)
{
return msg->p;
#include <tr_msg.h>
#include <trust_router/tr_name.h>
-#include <tid_internal.h>
#include <trp_internal.h>
#include <trust_router/tr_constraint.h>
#include <tr_debug.h>
free (msg);
}
}
-
-
typedef struct tr_mq_msg TR_MQ_MSG;
struct tr_mq_msg {
TR_MQ_MSG *next;
+ char *message;
void *p; /* payload */
void (*p_free)(void *); /* function to free payload */
};
void *notify_cb_arg;
};
-TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx);
+TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *msg);
void tr_mq_msg_free(TR_MQ_MSG *msg);
+const char *tr_mq_msg_get_message(TR_MQ_MSG *msg);
void *tr_mq_msg_get_payload(TR_MQ_MSG *msg);
void tr_mq_msg_set_payload(TR_MQ_MSG *msg, void *p, void (*p_free)(void *));
#include <gsscon.h>
#include <trust_router/tr_dh.h>
#include <tr_mq.h>
+#include <tr_msg.h>
#include <trust_router/trp.h>
/* info records */
typedef struct trps_instance TRPS_INSTANCE;
-typedef int (*TRP_REQ_FUNC)();
-typedef void (*TRP_RESP_FUNC)();
-/*typedef int (*TRP_AUTH_FUNC)(gss_name_t client_name, TR_NAME *display_name, void *cookie);*/
-typedef client_cb_fn TRP_AUTH_FUNC;
typedef enum trp_connection_status {
TRP_CONNECTION_DOWN=0,
pthread_mutex_t status_mutex;
};
+typedef TRP_RC (*TRPS_MSG_FUNC)(TRPS_INSTANCE *, TRP_CONNECTION *, TR_MSG *);
+typedef void (*TRP_RESP_FUNC)();
+/*typedef int (*TRP_AUTH_FUNC)(gss_name_t client_name, TR_NAME *display_name, void *cookie);*/
+typedef client_cb_fn TRP_AUTH_FUNC;
+
/* TRP Client Instance Data */
typedef struct trpc_instance {
TRP_CONNECTION *conn;
char *hostname;
unsigned int port;
TRP_AUTH_FUNC auth_handler;
- TRP_REQ_FUNC req_handler;
+ TRPS_MSG_FUNC msg_handler;
void *cookie;
TRP_CONNECTION *conn; /* connections to peers */
TR_MQ *mq;
TRP_CONNECTION *trp_connection_new(TALLOC_CTX *mem_ctx);
void trp_connection_free(TRP_CONNECTION *conn);
+void trp_connection_close(TRP_CONNECTION *conn);
int trp_connection_get_fd(TRP_CONNECTION *conn);
void trp_connection_set_fd(TRP_CONNECTION *conn, int fd);
TR_NAME *trp_connection_get_gssname(TRP_CONNECTION *conn);
gss_ctx_id_t *trp_connection_get_gssctx(TRP_CONNECTION *conn);
void trp_connection_set_gssctx(TRP_CONNECTION *conn, gss_ctx_id_t *gssctx);
TRP_CONNECTION_STATUS trp_connection_get_status(TRP_CONNECTION *conn);
-void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status);
pthread_t *trp_connection_get_thread(TRP_CONNECTION *conn);
void trp_connection_set_thread(TRP_CONNECTION *conn, pthread_t *thread);
TRP_CONNECTION *trp_connection_get_next(TRP_CONNECTION *conn);
+TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remove);
void trp_connection_append(TRP_CONNECTION *conn, TRP_CONNECTION *new);
int trp_connection_auth(TRP_CONNECTION *conn, TRP_AUTH_FUNC auth_callback, void *callback_data);
-TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname,
- TRP_AUTH_FUNC auth_callback, TRP_REQ_FUNC req_handler,
- void *callback_data);
+TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname);
TRPC_INSTANCE *trpc_new (TALLOC_CTX *mem_ctx);
void trpc_free (TRPC_INSTANCE *trpc);
int trps_send_msg (TRPS_INSTANCE *trps, int conn, gss_ctx_id_t gssctx, const char *msg_content);
int trps_accept(TRPS_INSTANCE *trps, int listen);
void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new);
+void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove);
int trps_get_listener(TRPS_INSTANCE *trps,
- TRP_REQ_FUNC req_handler,
+ TRPS_MSG_FUNC msg_handler,
TRP_AUTH_FUNC auth_handler,
const char *hostname,
unsigned int port,
int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data);
TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps);
void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg);
+void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn);
#endif /* TRP_INTERNAL_H */
#include <talloc.h>
#include <errno.h>
#include <unistd.h>
+#include <string.h>
#include <gsscon.h>
#include <tr_rp.h>
#include <trp_internal.h>
#include <tr_config.h>
#include <tr_event.h>
-#include <tr_debug.h>
+#include <tr_msg.h>
#include <tr_trp.h>
+#include <tr_debug.h>
/* hold a trps instance and a config manager */
struct tr_trps_event_cookie {
event_active(mq_ev, 0, 0);
}
-static int tr_trps_req_handler (TRPS_INSTANCE *trps,
- TRP_REQ *orig_req,
- void *tr_in)
+static void msg_free_helper(void *p)
{
- if (orig_req != NULL)
- free(orig_req);
- return -1; /* not handling anything right now */
+ tr_msg_free_decoded((TR_MSG *)p);
+}
+/* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
+static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
+ TRP_CONNECTION *conn,
+ TR_MSG *tr_msg)
+{
+ TALLOC_CTX *tmp_ctx=talloc_new(NULL);
+ TR_MQ_MSG *mq_msg=NULL;
+
+ /* n.b., conn is available here, but do not hold onto the reference
+ * because it may be cleaned up if the originating connection goes
+ * down before the message is processed */
+ mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
+ if (mq_msg==NULL) {
+ return TRP_NOMEM;
+ }
+ tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
+ trps_mq_append(trps, mq_msg);
+ talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
+ return TRP_SUCCESS;
}
TR_MQ_MSG *msg=NULL;
tr_debug("tr_trps_conn_thread: started");
- /* try to establish a GSS context */
- if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
- tr_notice("tr_trps_conn_thread: failed to authorize connection");
- pthread_exit(NULL);
- }
- tr_notice("tr_trps_conn_thread: authorized connection");
-
- msg=tr_mq_msg_new(tmp_ctx);
+ trps_handle_connection(trps, conn);
+
+ msg=tr_mq_msg_new(tmp_ctx, "thread_exit");
+ tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
if (msg==NULL)
tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
else
asprintf(&name, "trustrouter@%s", trps->hostname);
gssname=tr_new_name(name);
free(name); name=NULL;
- conn=trp_connection_accept(tmp_ctx, listener, gssname, trps_auth_cb, NULL, trps);
+ conn=trp_connection_accept(tmp_ctx, listener, gssname);
if (conn!=NULL) {
/* need to monitor this fd and trigger events when read becomes possible */
thread_data=talloc(conn, struct thread_data);
talloc_free(tmp_ctx);
}
+static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
+{
+ /* everything belonging to the thread is in the TRP_CONNECTION
+ * associated with it */
+ trps_remove_connection(trps, conn);
+ tr_debug("Deleted connection");
+}
+
static void tr_trps_process_mq(int socket, short event, void *arg)
{
TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
TR_MQ_MSG *msg=NULL;
+ const char *s=NULL;
+ char *tmp=NULL;
tr_debug("tr_trps_process_mw: starting");
msg=trps_mq_pop(trps);
while (msg!=NULL) {
- tr_debug("tr_trps_process_mq: received message");
+ s=tr_mq_msg_get_message(msg);
+ if (0==strcmp(s, "thread_exit")) {
+ tr_trps_cleanup_thread(trps,
+ talloc_get_type_abort(tr_mq_msg_get_payload(msg),
+ TRP_CONNECTION));
+ }
+ else if (0==strcmp(s, "tr_msg")) {
+ tmp=tr_msg_encode(tr_mq_msg_get_payload(msg));
+ tr_debug("tr_msg: %s", tmp);
+ tr_msg_free_encoded(tmp);
+ }
+ else
+ tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
+
tr_mq_msg_free(msg);
msg=trps_mq_pop(trps);
}
tr_debug("tr_trps_process_mw: ending");
}
+static int tr_trps_events_destructor(void *obj)
+{
+ TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
+ if (ev->mq_ev!=NULL)
+ event_free(ev->mq_ev);
+ return 0;
+}
TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
{
TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
talloc_free(ev);
ev=NULL;
}
+ talloc_set_destructor((void *)ev, tr_trps_events_destructor);
}
return ev;
}
/* get a trps listener */
listen_ev->sock_fd=trps_get_listener(trps,
- tr_trps_req_handler,
- tr_trps_gss_handler,
- cfg_mgr->active->internal->hostname,
- cfg_mgr->active->internal->trps_port,
- (void *)cookie);
+ tr_trps_msg_handler,
+ tr_trps_gss_handler,
+ cfg_mgr->active->internal->hostname,
+ cfg_mgr->active->internal->trps_port,
+ (void *)cookie);
if (listen_ev->sock_fd < 0) {
tr_crit("Error opening TRP server socket.");
retval=1;
return status;
}
-void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status)
+static void trp_connection_set_status(TRP_CONNECTION *conn, TRP_CONNECTION_STATUS status)
{
pthread_mutex_lock(&(conn->status_mutex));
conn->status=status;
conn->next=next;
}
+/* Ok to call more than once; guarantees connection no longer in the list.
+ * Returns handle to new list, you must replace your old handle on the list with this. */
+TRP_CONNECTION *trp_connection_remove(TRP_CONNECTION *conn, TRP_CONNECTION *remove)
+{
+ TRP_CONNECTION *cur=conn;
+ TRP_CONNECTION *last=NULL;
+
+ if (cur==NULL)
+ return NULL;
+
+ /* first element is a special case */
+ if (cur==remove) {
+ conn=trp_connection_get_next(cur); /* advance list head */
+ trp_connection_free(cur);
+ } else {
+ /* it was not the first element */
+ last=cur;
+ cur=trp_connection_get_next(cur);
+ while (cur!=NULL) {
+ if (cur==remove) {
+ trp_connection_set_next(last, trp_connection_get_next(cur));
+ trp_connection_free(cur);
+ break;
+ }
+ last=cur;
+ cur=trp_connection_get_next(cur);
+ }
+ }
+ return conn;
+}
+
static TRP_CONNECTION *trp_connection_get_tail(TRP_CONNECTION *conn)
{
while((conn!=NULL)&&(trp_connection_get_next(conn)!=NULL))
trp_connection_mutex_init(new_conn);
trp_connection_set_status(new_conn, TRP_CONNECTION_DOWN);
thread=talloc(new_conn, pthread_t);
- if (thread==NULL) {
- talloc_free(new_conn);
- return NULL;
- }
- trp_connection_set_thread(new_conn, thread);
gssctx=talloc(new_conn, gss_ctx_id_t);
if (gssctx==NULL) {
talloc_free(new_conn);
return NULL;
}
trp_connection_set_gssctx(new_conn, gssctx);
+ if (thread==NULL) {
+ talloc_free(new_conn);
+ return NULL;
+ }
+ trp_connection_set_thread(new_conn, thread);
talloc_set_destructor((void *)new_conn, trp_connection_destructor);
}
return new_conn;
void trp_connection_free(TRP_CONNECTION *conn)
{
- /* TODO: shut down connection if it is still open */
talloc_free(conn);
}
+void trp_connection_close(TRP_CONNECTION *conn)
+{
+ close(trp_connection_get_fd(conn));
+ trp_connection_set_fd(conn, -1);
+ trp_connection_set_status(conn, TRP_CONNECTION_DOWN);
+}
/* returns 0 on authorization success, 1 on failure, or -1 in case of error */
int trp_connection_auth(TRP_CONNECTION *conn, TRP_AUTH_FUNC auth_callback, void *callback_data)
int rc = 0;
int auth, autherr = 0;
gss_buffer_desc nameBuffer = {0, NULL};
- gss_ctx_id_t gssctx;
+ gss_ctx_id_t *gssctx=trp_connection_get_gssctx(conn);
/* TODO: shouldn't really peek into TR_NAME... */
nameBuffer.length = trp_connection_get_gssname(conn)->len;
nameBuffer.value = trp_connection_get_gssname(conn)->buf;
tr_debug("trp_connection_auth: beginning passive authentication");
- if (rc = gsscon_passive_authenticate(trp_connection_get_fd(conn), nameBuffer, &gssctx, auth_callback, callback_data)) {
+ rc = gsscon_passive_authenticate(trp_connection_get_fd(conn), nameBuffer, gssctx, auth_callback, callback_data);
+ gss_release_buffer(NULL, &nameBuffer);
+ if (rc!=0) {
tr_debug("trp_connection_auth: Error from gsscon_passive_authenticate(), rc = 0x%08X.", rc);
return -1;
}
tr_debug("trp_connection_auth: beginning second stage authentication");
- if (rc = gsscon_authorize(gssctx, &auth, &autherr)) {
+ if (rc = gsscon_authorize(*gssctx, &auth, &autherr)) {
tr_debug("trp_connection_auth: Error from gsscon_authorize, rc = %d, autherr = %d.",
rc, autherr);
return -1;
}
/* Accept connection */
-TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname, TRP_AUTH_FUNC auth_handler, TRP_REQ_FUNC req_handler,
- void *cookie)
+TRP_CONNECTION *trp_connection_accept(TALLOC_CTX *mem_ctx, int listen, TR_NAME *gssname)
{
int conn_fd=-1;
TRP_CONNECTION *conn=NULL;
conn=trp_connection_new(mem_ctx);
trp_connection_set_fd(conn, conn_fd);
trp_connection_set_gssname(conn, gssname);
+ trp_connection_set_status(conn, TRP_CONNECTION_UP);
return conn;
}
goto error;
}
- /* Read the response from the connection */
- if (err = gsscon_read_encrypted_token(conn, gssctx, &resp_buf, &resp_buflen)) {
- if (resp_buf)
- free(resp_buf);
- goto error;
- }
-
- tr_debug( "trpc_send_msg: Response Received (%u bytes).\n", (unsigned) resp_buflen);
- tr_debug( "%s\n", resp_buf);
-
- if (resp_handler)
- /* Call the caller's response function */
- (*resp_handler)(trpc, resp_buf, cookie);
goto cleanup;
error:
talloc_steal(trps, new);
}
+/* ok to call more than once; guarantees connection no longer in the list */
+void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove)
+{
+ trps->conn=trp_connection_remove(trps->conn, remove);
+}
+
int trps_send_msg (TRPS_INSTANCE *trps,
int conn,
gss_ctx_id_t gssctx,
return result;
}
-#if 0
-static int trps_read_message (TRPS_INSTANCE *trps, int conn, gss_ctx_id_t *gssctx, char **msg)
+static TRP_RC trps_read_message(TRPS_INSTANCE *trps, TRP_CONNECTION *conn, TR_MSG **msg)
{
int err;
char *buf;
size_t buflen = 0;
- if (err = gsscon_read_encrypted_token(conn, *gssctx, &buf, &buflen)) {
+ tr_debug("trps_read_message: started");
+ if (err = gsscon_read_encrypted_token(trp_connection_get_fd(conn),
+ *(trp_connection_get_gssctx(conn)),
+ &buf,
+ &buflen)) {
+ tr_debug("trps_read_message: error");
if (buf)
free(buf);
- return -1;
+ return TRP_ERROR;
}
- tr_debug("trps_read_request(): Request Received, %u bytes.", (unsigned) buflen);
- tr_debug("trps_read_request(): %.*s", buflen, buf);
+ tr_debug("trps_read_message(): Request Received, %u bytes.", (unsigned) buflen);
+ tr_debug("trps_read_message(): %.*s", buflen, buf);
- *msg=talloc_strndup(NULL, buf, buflen); /* no context owns this! */
+ *msg=tr_msg_decode(buf, buflen);
free(buf);
- return buflen;
+ return TRP_SUCCESS;
}
-#endif
int trps_get_listener(TRPS_INSTANCE *trps,
- TRP_REQ_FUNC req_handler,
+ TRPS_MSG_FUNC msg_handler,
TRP_AUTH_FUNC auth_handler,
const char *hostname,
unsigned int port,
if (listen > 0) {
/* store the caller's request handler & cookie */
- trps->req_handler = req_handler;
+ trps->msg_handler = msg_handler;
trps->auth_handler = auth_handler;
trps->hostname = talloc_strdup(trps, hostname);
trps->port = port;
return listen;
}
-/* old cruft */
-#if 0
-static gss_ctx_id_t trps_establish_gss_context (TRPS_INSTANCE *trps, int conn)
+void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
{
TALLOC_CTX *tmp_ctx=talloc_new(NULL);
- gss_ctx_id_t gssctx = GSS_C_NO_CONTEXT;
- char *msg_rec=NULL;
- int msg_len = 0;
- int rc=0;
+ TR_MSG *msg=NULL;
+ TRP_RC rc=TRP_ERROR;
- if (trps_auth_connection(trps, conn, &gssctx))
- tr_notice("trps_establish_gss_context: Error authorizing TID Server connection.");
- else:
- tr_notice("trps_establish_gss_context: Connection authorized!");
- return gssctx;
-
- msg_len = trps_read_message(trps, conn, &gssctx, &msg_rec);
- talloc_steal(tmp_ctx, msg_rec); /* get this in our context */
- if (0 > msg_len) {
- tr_debug("trps_handle_connection: Error from trps_read_message()");
- goto cleanup;
+ /* try to establish a GSS context */
+ if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
+ tr_notice("tr_trps_conn_thread: failed to authorize connection");
+ pthread_exit(NULL);
}
+ tr_notice("trps_handle_connection: authorized connection");
- tr_debug("trps_handle_connection: msg_len=%d", msg_len);
- reply=talloc_asprintf(tmp_ctx, "TRPS heard: %.*s", msg_len, msg_rec);
- if (0 > (rc = trps_send_msg(trps, conn, gssctx, reply))) {
- tr_debug("trps_handle_connection: Error from trps_send_message(), rc = %d.", rc);
+ /* loop as long as the connection exists */
+ while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
+ rc=trps_read_message(trps, conn, &msg);
+ switch(rc) {
+ case TRP_SUCCESS:
+ trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */
+ break;
+
+ case TRP_ERROR:
+ trp_connection_close(conn);
+ break;
+
+ default:
+ tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc);
+ }
}
-cleanup:
+ tr_debug("trps_handle_connection: connection closed.");
talloc_free(tmp_ctx);
- return conn;
}
-#endif