Decode JSON TRP messages, then send to main thread.
[trust_router.git] / common / tr_mq.c
1 #include <talloc.h>
2 #include <pthread.h>
3
4 #include <tr_mq.h>
5 #include <tr_debug.h>
6
7 /* Messages */
8 static int tr_mq_msg_destructor(void *object)
9 {
10   TR_MQ_MSG *msg=talloc_get_type_abort(object, TR_MQ_MSG);
11   if ( (msg->p!=NULL) && (msg->p_free!=NULL))
12     msg->p_free(msg->p);
13   return 0;
14 }
15
16 TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx, const char *message)
17 {
18   TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG);
19   if (msg!=NULL) {
20     msg->next=NULL;
21     msg->message=talloc_strdup(msg, message);
22     if (msg->message==NULL) {
23       talloc_free(msg);
24       return NULL;
25     }
26     msg->p=NULL;
27     talloc_set_destructor((void *)msg, tr_mq_msg_destructor);
28   }
29   return msg;
30 }
31
32 void tr_mq_msg_free(TR_MQ_MSG *msg)
33 {
34   if (msg!=NULL)
35     talloc_free(msg);
36 }
37
38 const char *tr_mq_msg_get_message(TR_MQ_MSG *msg)
39 {
40   return msg->message;
41 }
42
43 void *tr_mq_msg_get_payload(TR_MQ_MSG *msg)
44 {
45   return msg->p;
46 }
47
48 /* call with a pointer to the payload and a function to free it later */
49 void tr_mq_msg_set_payload(TR_MQ_MSG *msg, void *p, void (*p_free)(void *))
50 {
51   msg->p=p;
52   msg->p_free=p_free;
53 }
54
55
56 static TR_MQ_MSG *tr_mq_msg_get_next(TR_MQ_MSG *msg)
57 {
58   return msg->next;
59 }
60
61 static void tr_mq_msg_set_next(TR_MQ_MSG *msg, TR_MQ_MSG *next)
62 {
63   msg->next=next;
64 }
65
66 /* Message Queues */
67 TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx)
68 {
69   TR_MQ *mq=talloc(mem_ctx, TR_MQ);
70   if (mq!=NULL) {
71     pthread_mutex_init(&(mq->mutex), 0);
72     mq->head=NULL;
73     mq->tail=NULL;
74   }
75   return mq;
76 }
77
78 void tr_mq_free(TR_MQ *mq)
79 {
80   if (mq!=NULL) {
81     tr_mq_lock(mq); /* don't pull the rug out from under someone */
82     talloc_free(mq);
83   }
84 }
85
86 int tr_mq_lock(TR_MQ *mq)
87 {
88   return pthread_mutex_lock(&(mq->mutex));
89 }
90
91 int tr_mq_unlock(TR_MQ *mq)
92 {
93   return pthread_mutex_unlock(&(mq->mutex));
94 }
95
96 static TR_MQ_MSG *tr_mq_get_head(TR_MQ *mq)
97 {
98   return mq->head;
99 }
100
101 static void tr_mq_set_head(TR_MQ *mq, TR_MQ_MSG *msg)
102 {
103   mq->head=msg;
104 }
105
106 static TR_MQ_MSG *tr_mq_get_tail(TR_MQ *mq)
107 {
108   return mq->tail;
109 }
110
111 static void tr_mq_set_tail(TR_MQ *mq, TR_MQ_MSG *msg)
112 {
113   mq->tail=msg;
114 }
115
116 void tr_mq_set_notify_cb(TR_MQ *mq, TR_MQ_NOTIFY_FN cb, void *arg)
117 {
118   mq->notify_cb=cb;
119   mq->notify_cb_arg=arg;
120 }
121
122
123 /* puts msg in mq's talloc context */
124 void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg)
125 {
126   int was_empty=0;
127   TR_MQ_NOTIFY_FN notify_cb=NULL;
128   void *notify_cb_arg=NULL;
129
130   tr_mq_lock(mq);
131   if (tr_mq_get_head(mq)==NULL) {
132     was_empty=1;
133     tr_mq_set_head(mq, msg);
134     tr_mq_set_tail(mq, msg);
135   } else {
136     tr_mq_msg_set_next(tr_mq_get_tail(mq), msg); /* add to list */
137     tr_mq_set_tail(mq, msg); /* update tail of list */
138   }
139   talloc_steal(mq, msg);
140   /* before releasing the mutex, get notify_cb data out of mq */
141   notify_cb=mq->notify_cb;
142   notify_cb_arg=mq->notify_cb_arg;
143   tr_mq_unlock(mq);
144
145   /* see if we need to tell someone we became non-empty */
146   if (was_empty && (notify_cb!=NULL))
147     notify_cb(mq, notify_cb_arg);
148 }
149
150 /* caller must free msg via tr_mq_msg_free */
151 TR_MQ_MSG *tr_mq_pop(TR_MQ *mq)
152 {
153   TR_MQ_MSG *popped=NULL;
154
155   tr_mq_lock(mq);
156   if (tr_mq_get_head(mq)!=NULL) {
157     popped=tr_mq_get_head(mq);
158     tr_mq_set_head(mq, tr_mq_msg_get_next(popped)); /* popped is the old head */
159     if (tr_mq_get_head(mq)==NULL)
160       tr_mq_set_tail(mq, NULL); /* just popped the last element */
161   }
162   tr_mq_unlock(mq);
163   if (popped!=NULL)
164     tr_mq_msg_set_next(popped, NULL); /* disconnect from list */
165   return popped;
166 }