Add timed blocking to tr_mq_pop().
[trust_router.git] / common / tr_mq.c
1 /*
2  * Copyright (c) 2016, JANET(UK)
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * 3. Neither the name of JANET(UK) nor the names of its contributors
17  *    may be used to endorse or promote products derived from this software
18  *    without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
25  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
29  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
31  * OF THE POSSIBILITY OF SUCH DAMAGE.
32  *
33  */
34
35 #include <talloc.h>
36 #include <pthread.h>
37
38 #include <tr_mq.h>
39 #include <tr_debug.h>
40
41 /* Messages */
42 static int tr_mq_msg_destructor(void *object)
43 {
44   TR_MQ_MSG *msg=talloc_get_type_abort(object, TR_MQ_MSG);
45   if ( (msg->p!=NULL) && (msg->p_free!=NULL))
46     msg->p_free(msg->p);
47   return 0;
48 }
49
50 TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message, TR_MQ_PRIORITY prio)
51 {
52   TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG);
53   if (msg!=NULL) {
54     msg->next=NULL;
55     msg->prio=prio;
56     msg->message=talloc_strdup(msg, message);
57     if (msg->message==NULL) {
58       talloc_free(msg);
59       return NULL;
60     }
61     msg->p=NULL;
62     talloc_set_destructor((void *)msg, tr_mq_msg_destructor);
63   }
64   return msg;
65 }
66
67 void tr_mq_msg_free(TR_MQ_MSG *msg)
68 {
69   if (msg!=NULL)
70     talloc_free(msg);
71 }
72
73 TR_MQ_PRIORITY tr_mq_msg_get_prio(TR_MQ_MSG *msg)
74 {
75   return msg->prio;
76 }
77
78 const char *tr_mq_msg_get_message(TR_MQ_MSG *msg)
79 {
80   return msg->message;
81 }
82
83 void *tr_mq_msg_get_payload(TR_MQ_MSG *msg)
84 {
85   return msg->p;
86 }
87
88 /* call with a pointer to the payload and a function to free it later */
89 void tr_mq_msg_set_payload(TR_MQ_MSG *msg, void *p, void (*p_free)(void *))
90 {
91   msg->p=p;
92   msg->p_free=p_free;
93 }
94
95
96 static TR_MQ_MSG *tr_mq_msg_get_next(TR_MQ_MSG *msg)
97 {
98   return msg->next;
99 }
100
101 static void tr_mq_msg_set_next(TR_MQ_MSG *msg, TR_MQ_MSG *next)
102 {
103   msg->next=next;
104 }
105
106 /* Message Queues */
107 TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx)
108 {
109   TR_MQ *mq=talloc(mem_ctx, TR_MQ);
110   if (mq!=NULL) {
111     pthread_mutex_init(&(mq->mutex), 0);
112     pthread_cond_init(&(mq->have_msg_cond), 0);
113     mq->head=NULL;
114     mq->tail=NULL;
115     mq->last_hi_prio=NULL;
116   }
117   return mq;
118 }
119
120 void tr_mq_free(TR_MQ *mq)
121 {
122   if (mq!=NULL) {
123     tr_mq_lock(mq); /* don't pull the rug out from under someone */
124     talloc_free(mq);
125   }
126 }
127
128 int tr_mq_lock(TR_MQ *mq)
129 {
130   return pthread_mutex_lock(&(mq->mutex));
131 }
132
133 int tr_mq_unlock(TR_MQ *mq)
134 {
135   return pthread_mutex_unlock(&(mq->mutex));
136 }
137
138 static TR_MQ_MSG *tr_mq_get_head(TR_MQ *mq)
139 {
140   return mq->head;
141 }
142
143 static void tr_mq_set_head(TR_MQ *mq, TR_MQ_MSG *msg)
144 {
145   mq->head=msg;
146 }
147
148 static TR_MQ_MSG *tr_mq_get_tail(TR_MQ *mq)
149 {
150   return mq->tail;
151 }
152
153 static void tr_mq_set_tail(TR_MQ *mq, TR_MQ_MSG *msg)
154 {
155   mq->tail=msg;
156 }
157
158 void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg)
159 {
160   mq->notify_cb=cb;
161   mq->notify_cb_arg=arg;
162 }
163
164 void tr_mq_clear(TR_MQ *mq)
165 {
166   TR_MQ_MSG *m=NULL;
167   TR_MQ_MSG *n=NULL;
168
169   tr_mq_lock(mq);
170   m=tr_mq_get_head(mq);
171   while (m!=NULL) {
172     n=tr_mq_msg_get_next(m);
173     tr_mq_msg_free(m);
174     m=n;
175   }
176   tr_mq_set_head(mq, NULL);
177   tr_mq_set_tail(mq, NULL);
178   tr_mq_unlock(mq);
179 }
180
181 static int tr_mq_empty(TR_MQ *mq)
182 {
183   return tr_mq_get_head(mq)==NULL;
184 }
185
186 /* puts msg in mq's talloc context */
187 static void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
188 {
189   if (tr_mq_get_head(mq)==NULL) {
190     tr_mq_set_head(mq, msg);
191     tr_mq_set_tail(mq, msg);
192   } else {
193     tr_mq_msg_set_next(tr_mq_get_tail(mq), msg); /* add to list */
194     tr_mq_set_tail(mq, msg); /* update tail of list */
195   }
196   talloc_steal(mq, msg);
197 }
198
199 static void tr_mq_append_high_prio(TR_MQ *mq, TR_MQ_MSG *new)
200 {
201   if (tr_mq_get_head(mq)==NULL) {
202     tr_mq_set_head(mq, new);
203     tr_mq_set_tail(mq, new);
204   } else if (mq->last_hi_prio==NULL) {
205     tr_mq_msg_set_next(new, tr_mq_get_head(mq)); /* add to front of list */
206     tr_mq_set_head(mq, new); /* update head of list */
207   } else {
208     tr_mq_msg_set_next(new, tr_mq_msg_get_next(mq->last_hi_prio));
209     tr_mq_msg_set_next(mq->last_hi_prio, new); /* add to end of hi prio msgs */
210   }
211   mq->last_hi_prio=new; /* in any case, this is now the last high priority msg */
212   talloc_steal(mq,new);
213 }
214
215 #define DEBUG_TR_MQ 0
216 #if DEBUG_TR_MQ
217 static void tr_mq_print(TR_MQ *mq)
218 {
219   TR_MQ_MSG *m=mq->head;
220   int ii=0;
221
222   tr_debug("tr_mq_print: mq contents:");
223   while(m!=NULL) {
224     ii++;
225     tr_debug("tr_mq_print: Entry %02d: %-15s (prio %d)",
226              ii, tr_mq_msg_get_message(m), tr_mq_msg_get_prio(m));
227     m=tr_mq_msg_get_next(m);
228   }
229 }
230 #endif
231 void tr_mq_add(TR_MQ *mq, TR_MQ_MSG *msg)
232 {
233   int was_empty=0;
234   TR_MQ_NOTIFY_FN notify_cb=NULL;
235   void *notify_cb_arg=NULL;
236
237   tr_mq_lock(mq);
238   
239   was_empty=tr_mq_empty(mq);
240   switch (tr_mq_msg_get_prio(msg)) {
241   case TR_MQ_PRIO_HIGH:
242     tr_mq_append_high_prio(mq, msg);
243     break;
244   default:
245     tr_mq_append(mq, msg);
246     break;
247   }
248   /* before releasing the mutex, get notify_cb data out of mq */
249   notify_cb=mq->notify_cb;
250   notify_cb_arg=mq->notify_cb_arg;
251
252 #if DEBUG_TR_MQ
253   tr_mq_print(mq);
254 #endif 
255
256   /* Before releasing the lock, signal any waiting threads that there's now
257    * something in the queue. Used for blocking tr_mq_pop() call. */
258
259   if (was_empty)
260     pthread_cond_broadcast(&(mq->have_msg_cond));
261
262   tr_mq_unlock(mq);
263
264   /* see if we need to tell someone we became non-empty */
265   if (was_empty && (notify_cb!=NULL))
266     notify_cb(mq, notify_cb_arg);
267 }
268
269 /* Caller must free msg via tr_mq_msg_free, waiting up to maxwait seconds (0 for non-blocking).
270  * Not guaranteed to wait if an error occurs - immediately returns without a message. */
271 TR_MQ_MSG *tr_mq_pop(TR_MQ *mq, time_t maxwait)
272 {
273   TR_MQ_MSG *popped=NULL;
274   int wait_err=0;
275   struct timespec expiration={0};
276
277   tr_mq_lock(mq);
278   if ((tr_mq_get_head(mq)==NULL) && (maxwait>0)) {
279     /* No msgs yet, and blocking was requested */
280     if (0!=clock_gettime(CLOCK_REALTIME, &expiration)) {
281       tr_notice("tr_mq_pop: error reading realtime clock.");
282       return NULL;
283     }
284     expiration.tv_sec+=maxwait;
285
286     while ((wait_err==0) && (NULL==tr_mq_get_head(mq)))
287       wait_err=pthread_cond_timedwait(&(mq->have_msg_cond),
288                                      &(mq->mutex),
289                                      &expiration);
290     
291     if ((wait_err!=0) && (wait_err!=ETIMEDOUT)) {
292       tr_notice("tr_mq_pop: error waiting for message.");
293       return NULL;
294     }
295     /* if it timed out, ok to go ahead and check once more for a message, so no special exit */
296   }
297
298   if (tr_mq_get_head(mq)!=NULL) {
299     popped=tr_mq_get_head(mq);
300     tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
301
302     if (popped==mq->last_hi_prio)
303       mq->last_hi_prio=NULL;
304
305     if (tr_mq_get_head(mq)==NULL)
306       tr_mq_set_tail(mq, NULL); /* just popped the last element */
307   }
308   tr_mq_unlock(mq);
309   if (popped!=NULL)
310     tr_mq_msg_set_next(popped, NULL); /* disconnect from list */
311   return popped;
312 }
313