TR_MQ *mq=talloc(mem_ctx, TR_MQ);
if (mq!=NULL) {
pthread_mutex_init(&(mq->mutex), 0);
+ pthread_cond_init(&(mq->have_msg_cond), 0);
mq->head=NULL;
mq->tail=NULL;
mq->last_hi_prio=NULL;
tr_mq_print(mq);
#endif
+ /* Before releasing the lock, signal any waiting threads that there's now
+ * something in the queue. Used for blocking tr_mq_pop() call. */
+
+ if (was_empty)
+ pthread_cond_broadcast(&(mq->have_msg_cond));
+
tr_mq_unlock(mq);
/* see if we need to tell someone we became non-empty */
notify_cb(mq, notify_cb_arg);
}
-/* caller must free msg via tr_mq_msg_free */
-TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
+/* Caller must free msg via tr_mq_msg_free, waiting up to maxwait seconds (0 for non-blocking).
+ * Not guaranteed to wait if an error occurs - immediately returns without a message. */
+TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, time_t maxwait)
{
TR_MQ_MSG *popped=NULL;
+ int wait_err=0;
+ struct timespec expiration={0};
tr_mq_lock(mq);
+ if ((tr_mq_get_head(mq)==NULL) && (maxwait>0)) {
+ /* No msgs yet, and blocking was requested */
+ if (0!=clock_gettime(CLOCK_REALTIME, &expiration)) {
+ tr_notice("tr_mq_pop: error reading realtime clock.");
+ return NULL;
+ }
+ expiration.tv_sec+=maxwait;
+
+ while ((wait_err==0) && (NULL==tr_mq_get_head(mq)))
+ wait_err=pthread_cond_timedwait(&(mq->have_msg_cond),
+ &(mq->mutex),
+ &expiration);
+
+ if ((wait_err!=0) && (wait_err!=ETIMEDOUT)) {
+ tr_notice("tr_mq_pop: error waiting for message.");
+ return NULL;
+ }
+ /* if it timed out, ok to go ahead and check once more for a message, so no special exit */
+ }
+
if (tr_mq_get_head(mq)!=NULL) {
popped=tr_mq_get_head(mq);
tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
typedef void (*TR_MQ_NOTIFY_FN)(TR_MQ *, void *);
struct tr_mq {
pthread_mutex_t mutex;
+ pthread_cond_t have_msg_cond;
TR_MQ_MSG *head;
TR_MQ_MSG *tail;
TR_MQ_MSG *last_hi_prio;
int tr_mq_unlock(TR_MQ *mq);
void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg);
void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg);
-TR_MQ_MSG *tr_mq_pop(TR_MQ *mq);
+TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, time_t maxwait);
void tr_mq_clear(TR_MQ *mq);
#endif /*_TR_MQ_H_ */