35d7882941515c41f44705abc1e83bd9ae2514e1
[trust_router.git] / common / tests / thread_test.c
1 /*
2  * Copyright (c) 2016, JANET(UK)
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * 3. Neither the name of JANET(UK) nor the names of its contributors
17  *    may be used to endorse or promote products derived from this software
18  *    without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
25  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
29  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
31  * OF THE POSSIBILITY OF SUCH DAMAGE.
32  *
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <pthread.h>
39 #include <talloc.h>
40 #include <time.h>
41 #include <errno.h>
42 #include <assert.h>
43
44 #include <tr_mq.h>
45
46 struct thread_data {
47   TR_MQ *mq;
48   useconds_t msg_dly;
49   int n_msgs;
50   char *label;
51 };
52
53 static TR_MQ_MSG *make_msg(char *label, int n)
54 {
55   TR_MQ_MSG *msg=NULL;
56   msg= tr_mq_msg_new(NULL, "Message");
57   assert(-1!=asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n));
58   msg->p_free=free;
59   return msg;
60 }
61
62 static void *thread_start(void *arg)
63 {
64   TR_MQ *mq=((struct thread_data *)arg)->mq;
65   int n_msgs=((struct thread_data *)arg)->n_msgs;
66   useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
67   char *label=((struct thread_data *)arg)->label;
68   
69   while (n_msgs>=0) {
70     usleep(msg_dly);
71     tr_mq_add(mq, make_msg(label, n_msgs));
72     n_msgs--;
73   }
74   tr_mq_add(mq, make_msg(label, -9999));
75   return NULL;
76 }
77
78 struct message_data {
79   pthread_mutex_t lock;
80   pthread_cond_t cond;
81   int ready;
82 };
83
84 static void handle_messages(TR_MQ *mq, void *arg)
85 {
86   struct message_data *status=(struct message_data *)arg;
87   pthread_mutex_lock(&(status->lock));
88   status->ready=1;
89   pthread_cond_signal(&(status->cond));
90   pthread_mutex_unlock(&(status->lock));
91 }
92
93 static void output_messages(TR_MQ *mq)
94 {
95   TR_MQ_MSG *msg=NULL;
96
97   printf("\n* handle_messages notified of new messages in queue.\n");
98   for (msg=tr_mq_pop(mq, NULL); msg!=NULL; msg=tr_mq_pop(mq, NULL)) {
99     printf("  > %s\n", (char *)msg->p);
100     tr_mq_msg_free(msg);
101   }
102   printf("* all messages handled\n");
103   fflush(stdout);
104 }
105
106 #define N_THREADS 2
107
108 int main(void)
109 {
110   TR_MQ *mq=NULL;
111   pthread_t thread[N_THREADS];
112   struct thread_data thread_data[N_THREADS];
113   useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
114   int ii=0;
115   struct message_data status;
116   struct timespec timeout={0,0};
117   int wait_result=0;
118
119   mq=tr_mq_new(NULL);
120   mq->notify_cb=handle_messages;
121   mq->notify_cb_arg=(void *)&status;
122
123   pthread_cond_init(&(status.cond), 0);
124   pthread_mutex_init(&(status.lock), 0);
125   status.ready=0;
126
127   printf("Starting threads\n");
128   for (ii=0; ii<N_THREADS; ii++) {
129     thread_data[ii].mq=mq;
130     thread_data[ii].msg_dly=dly[ii];
131     thread_data[ii].n_msgs=10;
132     assert(-1!=asprintf(&(thread_data[ii].label), "thread %d", ii+1));
133     pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
134     printf("%s started.\n", thread_data[ii].label);
135   }
136
137   while (1) {
138     pthread_mutex_lock(&(status.lock));
139     while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
140       clock_gettime(CLOCK_REALTIME, &timeout);
141       timeout.tv_sec+=1;
142       wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
143     }
144
145     if (wait_result==ETIMEDOUT)
146       break;
147
148     output_messages(mq);
149     status.ready=0;
150     pthread_mutex_unlock(&(status.lock));
151     usleep(2000000);
152   }
153   printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
154   for (ii=0; ii<N_THREADS; ii++)
155     pthread_join(thread[ii], NULL);
156
157   printf("success\n");
158   return 0;
159 }