From: Jennifer Richards Date: Mon, 7 May 2018 16:05:23 +0000 (-0400) Subject: Merge branch 'milestone/monitoring' into jennifer/no_mq_priorities X-Git-Tag: 3.4.0~1^2~32^2 X-Git-Url: http://www.project-moonshot.org/gitweb/?p=trust_router.git;a=commitdiff_plain;h=c3962902ebd3d3f69725c9976f81b0b25c18f45c;hp=-c Merge branch 'milestone/monitoring' into jennifer/no_mq_priorities --- c3962902ebd3d3f69725c9976f81b0b25c18f45c diff --combined common/tr_mq.c index 1ff2cbf,6cf309f..6676880 --- a/common/tr_mq.c +++ b/common/tr_mq.c @@@ -49,11 -49,12 +49,11 @@@ static int tr_mq_msg_destructor(void *o return 0; } -TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message, TR_MQ_PRIORITY prio) +TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message) { TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG); if (msg!=NULL) { msg->next=NULL; - msg->prio=prio; msg->message=talloc_strdup(msg, message); if (msg->message==NULL) { talloc_free(msg); @@@ -71,6 -72,11 +71,6 @@@ void tr_mq_msg_free(TR_MQ_MSG *msg talloc_free(msg); } -TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg) -{ - return msg->prio; -} - const char *tr_mq_msg_get_message(TR_MQ_MSG *msg) { return msg->message; @@@ -115,6 -121,7 +115,6 @@@ TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx mq->head=NULL; mq->tail=NULL; - mq->last_hi_prio=NULL; mq->notify_cb=NULL; mq->notify_cb_arg=NULL; @@@ -201,6 -208,22 +201,6 @@@ static void tr_mq_append(TR_MQ *mq, TR_ talloc_steal(mq, msg); } -static void tr_mq_append_high_prio(TR_MQ *mq, TR_MQ_MSG *new) -{ - if (tr_mq_get_head(mq)==NULL) { - tr_mq_set_head(mq, new); - tr_mq_set_tail(mq, new); - } else if (mq->last_hi_prio==NULL) { - tr_mq_msg_set_next(new, tr_mq_get_head(mq)); /* add to front of list */ - tr_mq_set_head(mq, new); /* update head of list */ - } else { - tr_mq_msg_set_next(new, tr_mq_msg_get_next(mq->last_hi_prio)); - tr_mq_msg_set_next(mq->last_hi_prio, new); /* add to end of hi prio msgs */ - } - mq->last_hi_prio=new; /* in any case, this is now the last high priority msg */ - talloc_steal(mq,new); -} - #define DEBUG_TR_MQ 0 #if DEBUG_TR_MQ static void tr_mq_print(TR_MQ *mq) @@@ -211,8 -234,8 +211,8 @@@ tr_debug("tr_mq_print: mq contents:"); while(m!=NULL) { ii++; - tr_debug("tr_mq_print: Entry %02d: %-15s (prio %d)", - ii, tr_mq_msg_get_message(m), tr_mq_msg_get_prio(m)); + tr_debug("tr_mq_print: Entry %02d: %-15s", + ii, tr_mq_msg_get_message(m)); m=tr_mq_msg_get_next(m); } } @@@ -226,8 -249,14 +226,8 @@@ void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *ms tr_mq_lock(mq); was_empty=tr_mq_empty(mq); - switch (tr_mq_msg_get_prio(msg)) { - case TR_MQ_PRIO_HIGH: - tr_mq_append_high_prio(mq, msg); - break; - default: - tr_mq_append(mq, msg); - break; - } + tr_mq_append(mq, msg); + /* before releasing the mutex, get notify_cb data out of mq */ notify_cb=mq->notify_cb; notify_cb_arg=mq->notify_cb_arg; @@@ -260,13 -289,17 +260,17 @@@ int tr_mq_pop_timeout(time_t seconds, s return 0; } - /* Caller must free msg via tr_mq_msg_free, waiting until absolute + /* Retrieves a message from the queue, waiting until absolute * time ts_abort before giving up (using CLOCK_MONOTONIC). If ts_abort * has passed, returns an existing message but will not wait if one is * not already available. If ts_abort is null, no blocking. Not * guaranteed to wait if an error occurs - immediately returns without * a message. Use tr_mq_pop_timeout() to get an absolute time that - * is guaranteed compatible with this function. */ + * is guaranteed compatible with this function. + * + * Caller should free msg via tr_mq_msg_free when done with it. It stays + * in the TR_MQ's context, though, so use talloc_steal() if you want to do + * something clever with it. */ TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, struct timespec *ts_abort) { TR_MQ_MSG *popped=NULL; @@@ -291,6 -324,9 +295,6 @@@ popped=tr_mq_get_head(mq); tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */ - if (popped==mq->last_hi_prio) - mq->last_hi_prio=NULL; - if (tr_mq_get_head(mq)==NULL) tr_mq_set_tail(mq, NULL); /* just popped the last element */ } diff --combined tr/tr_tid.c index fc55bc3,62722e8..0e324eb --- a/tr/tr_tid.c +++ b/tr/tr_tid.c @@@ -196,9 -196,9 +196,9 @@@ cleanup /* mq is still valid, so we can queue our response */ tr_debug("tr_tids_req_fwd_thread: thread %d using valid msg queue.", cookie->thread_id); if (success) - msg=tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_SUCCESS, TR_MQ_PRIO_NORMAL); + msg= tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_SUCCESS); else - msg=tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_FAILURE, TR_MQ_PRIO_NORMAL); + msg= tr_mq_msg_new(tmp_ctx, TR_TID_MQMSG_FAILURE); if (msg==NULL) tr_notice("tr_tids_req_fwd_thread: thread %d unable to allocate response msg.", cookie->thread_id); @@@ -346,6 -346,12 +346,12 @@@ static int tr_tids_req_handler(TIDS_INS tr_debug("tr_tids_req_handler: Request received (conn = %d)! Realm = %s, Comm = %s", orig_req->conn, orig_req->realm->buf, orig_req->comm->buf); + if (orig_req->request_id) + tr_debug("tr_tids_req_handler: TID request ID: %.*s", orig_req->request_id->len, orig_req->request_id->buf); + else + tr_debug("tr_tids_req_handler: TID request ID: none"); + + tids->req_count++; /* Duplicate the request, so we can modify and forward it */ if (NULL == (fwd_req=tid_dup_req(orig_req))) { diff --combined tr/tr_trp.c index 44f001a,6cf2e46..126cf66 --- a/tr/tr_trp.c +++ b/tr/tr_trp.c @@@ -93,7 -93,7 +93,7 @@@ static TRP_RC tr_trps_msg_handler(TRPS_ /* n.b., conn is available here, but do not hold onto the reference * because it may be cleaned up if the originating connection goes * down before the message is processed */ - mq_msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED, TR_MQ_PRIO_NORMAL); + mq_msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_MSG_RECEIVED); if (mq_msg==NULL) { return TRP_NOMEM; } @@@ -114,7 -114,7 +114,7 @@@ static int tr_trps_gss_handler(gss_name tr_debug("tr_trps_gss_handler()"); - if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) { + if ((!client_name) || (!trps) || (!cfg_mgr)) { tr_debug("tr_trps_gss_handler: Bad parameters."); return -1; } @@@ -147,7 -147,7 +147,7 @@@ static void *tr_trps_thread(void *arg if (trps_authorize_connection(trps, conn)!=TRP_SUCCESS) goto cleanup; - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED, TR_MQ_PRIO_HIGH); + msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_CONNECTED); tr_mq_msg_set_payload(msg, (void *)tr_dup_name(trp_connection_get_peer(conn)), tr_free_name_helper); if (msg==NULL) { tr_err("tr_trps_thread: error allocating TR_MQ_MSG"); @@@ -159,7 -159,7 +159,7 @@@ trps_handle_connection(trps, conn); cleanup: - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED, TR_MQ_PRIO_HIGH); + msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPS_DISCONNECTED); tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */ if (msg==NULL) tr_err("tr_trps_thread: error allocating TR_MQ_MSG"); @@@ -223,17 -223,6 +223,6 @@@ static void tr_trps_cleanup_conn(TRPS_I static void tr_trps_cleanup_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc) { - TR_MQ_MSG *msg; - - /* tell the trpc thread to exit */ - msg = tr_mq_msg_new(NULL, TR_MQMSG_TRPC_EXIT_OK); - if (msg) { - tr_debug("tr_trps_cleanup_trpc: Notifying thread that it may now exit"); - trpc_mq_add(trpc, msg); - } else { - tr_crit("tr_trps_cleanup_trpc: Unable to acknowledge disconnection, thread will probably never terminate"); - } - pthread_join(*trp_connection_get_thread(trpc_get_conn(trpc)), NULL); trps_remove_trpc(trps, trpc); trpc_free(trpc); @@@ -294,7 -283,7 +283,7 @@@ static void tr_trps_process_mq(int sock tr_err("tr_trps_process_mq: incoming connection from unknown peer (%s) reported.", tmp); else { trp_peer_set_incoming_status(peer, PEER_CONNECTED); - tr_err("tr_trps_process_mq: incoming connection from %s established.", tmp); + tr_notice("tr_trps_process_mq: incoming connection from %s established.", tmp); } free(tmp); } @@@ -308,7 -297,7 +297,7 @@@ } else { trp_peer_set_incoming_status(peer, PEER_DISCONNECTED); tr_trps_cleanup_conn(trps, conn); - tr_err("tr_trps_process_mq: incoming connection from %s lost.", tmp); + tr_notice("tr_trps_process_mq: incoming connection from %s lost.", tmp); } free(tmp); } @@@ -320,13 -309,11 +309,11 @@@ tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) reported.", tmp); else { trp_peer_set_outgoing_status(peer, PEER_CONNECTED); - tr_err("tr_trps_process_mq: outgoing connection to %s established.", tmp); + tr_notice("tr_trps_process_mq: outgoing connection to %s established.", tmp); } free(tmp); } else if (0==strcmp(s, TR_MQMSG_TRPC_DISCONNECTED)) { - /* The trpc connection died - see note above tr_trpc_thread() regarding the shutdown protocol. - * The EXIT_OK message is sent in tr_trps_cleanup_trpc(). */ TRPC_INSTANCE *trpc=talloc_get_type_abort(tr_mq_msg_get_payload(msg), TRPC_INSTANCE); TR_NAME *svcname=trpc_get_gssname(trpc); peer=trps_get_peer_by_servicename(trps, svcname); @@@ -335,7 -322,7 +322,7 @@@ tr_err("tr_trps_process_mq: outgoing connection to unknown peer (%s) lost.", tmp); else { trp_peer_set_outgoing_status(peer, PEER_DISCONNECTED); - tr_err("tr_trps_process_mq: outgoing connection to %s lost.", tmp); + tr_notice("tr_trps_process_mq: outgoing connection to %s lost.", tmp); tr_trps_cleanup_trpc(trps, trpc); } free(tmp); @@@ -572,24 -559,6 +559,6 @@@ cleanup return retval; } - - struct trpc_notify_cb_data { - int msg_ready; - pthread_cond_t cond; - pthread_mutex_t mutex; - }; - - static void tr_trpc_mq_cb(TR_MQ *mq, void *arg) - { - struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg; - pthread_mutex_lock(&(cb_data->mutex)); - if (!cb_data->msg_ready) { - cb_data->msg_ready=1; - pthread_cond_signal(&(cb_data->cond)); - } - pthread_mutex_unlock(&(cb_data->mutex)); - } - /* data passed to thread */ struct trpc_thread_data { TRPC_INSTANCE *trpc; @@@ -597,46 -566,19 +566,19 @@@ }; /** - * Returns when a message is ready - */ - static void trpc_thread_wait_for_message(struct trpc_notify_cb_data *cb) - { - cb->msg_ready = 0; - do - pthread_cond_wait(&(cb->cond), &(cb->mutex)); - while (!cb->msg_ready); - } - - /** * Thread for handling TRPC (outgoing) connections * - * Locking protocol: - * - * This thread uses a mutex and condition variable to wait for incoming messages - * on its queue. The main thread interacts with these through the tr_trpc_mq_cb() - * callback. This callback is called by the TR_MQ whenever the queue goes from empty - * to non-empty. The callback locks the thread's mutex, then sets a msg_ready flag - * signals the condition variable, then unlocks the mutex. - * - * This thread waits for the condition variable to be signaled, then checks that the - * msg_ready flag has actually been set (pthreads does not guarantee against false - * wakes). If so, it holds the mutex lock while reading any messages in its queue and - * sending responses over the GSS connection. If it receives an ABORT message from - * the main thread (not currently used), or if sending the GSS message fails (which - * usually indicates that the connection has been lost), the thread begins the shutdown - * process. It still holds the mutex lock at the start of this process. - * - * This begins by setting trpc->shutting_down = 1, then sending a message to the - * TRPS (main) thread indicating that it has DISCONNECTED. It then releases the mutex - * and waits for more messages, again using the mutex and the condition variable. It - * ignores any except for an EXIT_OK message. When that is received, it exits, terminating - * the thread. + * Opens a connection to a peer. If successful, notifies the trps thread by + * posting a TR_MQMSG_TRPC_CONNECTED message to the trps message queue. + * It then waits for messages on trpc->mq. Normally these will be TR_MQMSG_TRPC_SEND + * messages, which this thread forwards to the peer. If its connection is lost or + * a TR_MQMSG_ABORT message is received on trpc->mq, the thread sends a + * TR_MQMSG_TRPC_DISCONNECTED message to the trps thread, then cleans up and + * terminates. * - * When the main thread receives the DISCONNECTED message, it must stop sending messages - * to this thread's message queue. When it is certain that it will not queue any more messages, - * it sends the EXIT_OK message as its last interaction with this thread. Once that message - * has been queued, queueing additional messages may result in deadlock or segfaults. - * (It is ok if there are other messages in the queue, but it must not queue more.) + * The trps may continue queueing messages for this client even when the + * connection is down. To prevent the queue from growing endlessly, this thread + * should clear its queue after failed connection attempts. */ static void *tr_trpc_thread(void *arg) { @@@ -649,38 -591,30 +591,30 @@@ const char *msg_type=NULL; char *encoded_msg=NULL; TR_NAME *peer_gssname=NULL; - int n_sent = 0; - int n_popped = 0; + struct timespec wait_until = {0}; int exit_loop=0; - struct trpc_notify_cb_data cb_data={0, - PTHREAD_COND_INITIALIZER, - PTHREAD_MUTEX_INITIALIZER}; - tr_debug("tr_trpc_thread: started"); - /* set up the mq for receiving */ - pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */ - - tr_mq_lock(trpc->mq); - tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data); - tr_mq_unlock(trpc->mq); - + /* Try to make the outgoing connection */ rc=trpc_connect(trpc); if (rc!=TRP_SUCCESS) { tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.", trpc_get_server(trpc), trpc_get_port(trpc)); + trpc_mq_clear(trpc); /* clear the queue even though we did not connect */ } else { + /* Retrieve the GSS name used by the peer for authentication */ peer_gssname=trp_connection_get_peer(trpc_get_conn(trpc)); if (peer_gssname==NULL) { tr_err("tr_trpc_thread: could not duplicate peer_gssname."); talloc_free(tmp_ctx); return NULL; } - tr_debug("tr_trpc_thread: connected to peer %s", peer_gssname->buf); + tr_debug("tr_trpc_thread: connected to peer %.*s", + peer_gssname->len, peer_gssname->buf); - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED, TR_MQ_PRIO_HIGH); + msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_CONNECTED); tr_mq_msg_set_payload(msg, (void *)tr_dup_name(peer_gssname), tr_free_name_helper); if (msg==NULL) { tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); @@@ -690,19 -624,22 +624,22 @@@ trps_mq_add(trps, msg); /* steals msg context */ msg=NULL; + /* Loop until we get an abort message or until the connection is lost. */ while(!exit_loop) { - trpc_thread_wait_for_message(&cb_data); /* handles locking */ - - n_popped = 0; /* have not popped any messages from the queue */ - n_sent = 0; /* have not sent any messages yet */ - for (msg = trpc_mq_pop(trpc); - msg != NULL; - msg = trpc_mq_pop(trpc)) { - n_popped++; + /* Wait up to 10 minutes for a message to be queued to send to the peer. + * Log a warning if we go longer than that, but don't give up. */ + if (tr_mq_pop_timeout(10 * 60, &wait_until) != 0) { + tr_err("tr_trpc_thread: unable to set abort timeout"); + break; /* immediately exit the loop, don't go through cleanup */ + } + + /* Pop a message from the queue. */ + msg = trpc_mq_pop(trpc, &wait_until); + if (msg) { msg_type = tr_mq_msg_get_message(msg); if (0 == strcmp(msg_type, TR_MQMSG_ABORT)) { + tr_debug("tr_trpc_thread: received abort message from main thread."); exit_loop = 1; - break; } else if (0 == strcmp(msg_type, TR_MQMSG_TRPC_SEND)) { encoded_msg = tr_mq_msg_get_payload(msg); if (encoded_msg == NULL) @@@ -710,60 -647,37 +647,36 @@@ else { rc = trpc_send_msg(trpc, encoded_msg); if (rc == TRP_SUCCESS) { - n_sent++; + tr_debug("tr_trpc_thread: sent message."); } else { tr_notice("tr_trpc_thread: trpc_send_msg failed."); /* Assume this means we lost the connection. */ exit_loop = 1; - break; } } } else tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type); tr_mq_msg_free(msg); + } else { + tr_warning("tr_trpc_thread: no outgoing messages to %.*s for 10 minutes", + peer_gssname->len, peer_gssname->buf); } - - /* if n_popped == 0, then n_sent must be zero (it's only set after at - * least one msg is popped) */ - if (n_popped==0) - tr_err("tr_trpc_thread: notified of message, but queue empty"); - else - tr_debug("tr_trpc_thread: sent %d messages.", n_sent); } } - tr_debug("tr_trpc_thread: Disconnected. Waiting to terminate thread."); - trpc->shutting_down = 1; - /* Send a DISCONNECTED message to the main thread */ - tr_debug("tr_trpc_thread: notifying main thread of disconnection."); - msg=tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED, TR_MQ_PRIO_NORMAL); + msg= tr_mq_msg_new(tmp_ctx, TR_MQMSG_TRPC_DISCONNECTED); tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */ if (msg==NULL) { - /* can't notify main thread of exit - just do it and hope for the best */ + /* can't notify main thread */ tr_err("tr_trpc_thread: error allocating TR_MQ_MSG"); } else { trps_mq_add(trps, msg); - - /* DISCONNECTED sent, now wait for an acknowledgement before exiting */ - exit_loop = 0; - while (!exit_loop) { - trpc_thread_wait_for_message(&cb_data); - - while (NULL != (msg = trpc_mq_pop(trpc))) { - msg_type = tr_mq_msg_get_message(msg); - - /* ignore anything except an EXIT_OK */ - if (0 == strcmp(msg_type, TR_MQMSG_TRPC_EXIT_OK)) { - exit_loop = 1; - break; /* skip any further messages */ - } - } - } } - tr_debug("tr_trpc_thread: thread terminating."); talloc_free(tmp_ctx); + tr_debug("tr_trpc_thread: thread terminating."); return NULL; } @@@ -972,7 -886,11 +885,11 @@@ void tr_config_changed(TR_CFG *new_cfg tr_debug("tr_config_changed: freeing tr->mons->authorized_gss_names"); tr_gss_names_free(tr->mons->authorized_gss_names); } - tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials); + if (new_cfg->internal->monitoring_credentials != NULL) { + tr->mons->authorized_gss_names = tr_gss_names_dup(tr->mons, new_cfg->internal->monitoring_credentials); + } else { + tr->mons->authorized_gss_names = tr_gss_names_new(tr->mons); + } if (tr->mons->authorized_gss_names == NULL) { tr_err("tr_config_changed: Error configuring monitoring credentials"); }