Add timed blocking to tr_mq_pop().
authorJennifer Richards <jennifer@painless-security.com>
Wed, 23 Nov 2016 17:52:58 +0000 (12:52 -0500)
committerJennifer Richards <jennifer@painless-security.com>
Wed, 23 Nov 2016 19:13:36 +0000 (14:13 -0500)
common/tr_mq.c
include/tr_mq.h

index 60313c6..6606be3 100644 (file)
@@ -109,6 +109,7 @@ TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx)
   TR_MQ *mq=talloc(mem_ctx, TR_MQ);
   if (mq!=NULL) {
     pthread_mutex_init(&(mq->mutex), 0);
+    pthread_cond_init(&(mq->have_msg_cond), 0);
     mq->head=NULL;
     mq->tail=NULL;
     mq->last_hi_prio=NULL;
@@ -252,6 +253,12 @@ void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
   tr_mq_print(mq);
 #endif 
 
+  /* Before releasing the lock, signal any waiting threads that there's now
+   * something in the queue. Used for blocking tr_mq_pop() call. */
+
+  if (was_empty)
+    pthread_cond_broadcast(&(mq->have_msg_cond));
+
   tr_mq_unlock(mq);
 
   /* see if we need to tell someone we became non-empty */
@@ -259,12 +266,35 @@ void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
     notify_cb(mq, notify_cb_arg);
 }
 
-/* caller must free msg via tr_mq_msg_free */
-TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
+/* Caller must free msg via tr_mq_msg_free, waiting up to maxwait seconds (0 for non-blocking).
+ * Not guaranteed to wait if an error occurs - immediately returns without a message. */
+TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, time_t maxwait)
 {
   TR_MQ_MSG *popped=NULL;
+  int wait_err=0;
+  struct timespec expiration={0};
 
   tr_mq_lock(mq);
+  if ((tr_mq_get_head(mq)==NULL) && (maxwait>0)) {
+    /* No msgs yet, and blocking was requested */
+    if (0!=clock_gettime(CLOCK_REALTIME, &expiration)) {
+      tr_notice("tr_mq_pop: error reading realtime clock.");
+      return NULL;
+    }
+    expiration.tv_sec+=maxwait;
+
+    while ((wait_err==0) && (NULL==tr_mq_get_head(mq)))
+      wait_err=pthread_cond_timedwait(&(mq->have_msg_cond),
+                                     &(mq->mutex),
+                                     &expiration);
+    
+    if ((wait_err!=0) && (wait_err!=ETIMEDOUT)) {
+      tr_notice("tr_mq_pop: error waiting for message.");
+      return NULL;
+    }
+    /* if it timed out, ok to go ahead and check once more for a message, so no special exit */
+  }
+
   if (tr_mq_get_head(mq)!=NULL) {
     popped=tr_mq_get_head(mq);
     tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
index 81cb325..9111f0f 100644 (file)
@@ -63,6 +63,7 @@ typedef struct tr_mq TR_MQ;
 typedef void (*TR_MQ_NOTIFY_FN)(TR_MQ *, void *);
 struct tr_mq {
   pthread_mutex_t mutex;
+  pthread_cond_t have_msg_cond;
   TR_MQ_MSG *head;
   TR_MQ_MSG *tail;
   TR_MQ_MSG *last_hi_prio;
@@ -87,7 +88,7 @@ int tr_mq_lock(TR_MQ *mq);
 int tr_mq_unlock(TR_MQ *mq);
 void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg);
 void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg);
-TR_MQ_MSG *tr_mq_pop(TR_MQ *mq);
+TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, time_t maxwait);
 void tr_mq_clear(TR_MQ *mq);
  
 #endif /*_TR_MQ_H_ */