Add timed blocking to tr_mq_pop().
[trust_router.git] / common / tr_mq.c
index 60313c6..6606be3 100644 (file)
@@ -109,6 +109,7 @@ TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx)
   TR_MQ *mq=talloc(mem_ctx, TR_MQ);
   if (mq!=NULL) {
     pthread_mutex_init(&(mq->mutex), 0);
+    pthread_cond_init(&(mq->have_msg_cond), 0);
     mq->head=NULL;
     mq->tail=NULL;
     mq->last_hi_prio=NULL;
@@ -252,6 +253,12 @@ void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
   tr_mq_print(mq);
 #endif 
 
+  /* Before releasing the lock, signal any waiting threads that there's now
+   * something in the queue. Used for blocking tr_mq_pop() call. */
+
+  if (was_empty)
+    pthread_cond_broadcast(&(mq->have_msg_cond));
+
   tr_mq_unlock(mq);
 
   /* see if we need to tell someone we became non-empty */
@@ -259,12 +266,35 @@ void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
     notify_cb(mq, notify_cb_arg);
 }
 
-/* caller must free msg via tr_mq_msg_free */
-TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
+/* Caller must free msg via tr_mq_msg_free, waiting up to maxwait seconds (0 for non-blocking).
+ * Not guaranteed to wait if an error occurs - immediately returns without a message. */
+TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, time_t maxwait)
 {
   TR_MQ_MSG *popped=NULL;
+  int wait_err=0;
+  struct timespec expiration={0};
 
   tr_mq_lock(mq);
+  if ((tr_mq_get_head(mq)==NULL) && (maxwait>0)) {
+    /* No msgs yet, and blocking was requested */
+    if (0!=clock_gettime(CLOCK_REALTIME, &expiration)) {
+      tr_notice("tr_mq_pop: error reading realtime clock.");
+      return NULL;
+    }
+    expiration.tv_sec+=maxwait;
+
+    while ((wait_err==0) && (NULL==tr_mq_get_head(mq)))
+      wait_err=pthread_cond_timedwait(&(mq->have_msg_cond),
+                                     &(mq->mutex),
+                                     &expiration);
+    
+    if ((wait_err!=0) && (wait_err!=ETIMEDOUT)) {
+      tr_notice("tr_mq_pop: error waiting for message.");
+      return NULL;
+    }
+    /* if it timed out, ok to go ahead and check once more for a message, so no special exit */
+  }
+
   if (tr_mq_get_head(mq)!=NULL) {
     popped=tr_mq_get_head(mq);
     tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */