Fixes so that all binaries build cleanly.
[trust_router.git] / common / mq_test / thread_test.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <pthread.h>
5 #include <talloc.h>
6 #include <time.h>
7 #include <errno.h>
8
9 #include <tr_mq.h>
10
11 struct thread_data {
12   TR_MQ *mq;
13   useconds_t msg_dly;
14   int n_msgs;
15   char *label;
16 };
17
18 TR_MQ_MSG *make_msg(label, n)
19 {
20   TR_MQ_MSG *msg=NULL;
21   msg=tr_mq_msg_new(NULL, "Message", TR_MQ_PRIO_NORMAL);
22   asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n);
23   msg->p_free=free;
24   return msg;
25 }
26
27 void *thread_start(void *arg)
28 {
29   TR_MQ *mq=((struct thread_data *)arg)->mq;
30   int n_msgs=((struct thread_data *)arg)->n_msgs;
31   useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
32   const char *label=((struct thread_data *)arg)->label;
33   
34   while (n_msgs>=0) {
35     usleep(msg_dly);
36     tr_mq_add(mq, make_msg(label, n_msgs));
37     n_msgs--;
38   }
39   tr_mq_add(mq, make_msg(label, -9999));
40   return NULL;
41 }
42
43 struct message_data {
44   pthread_mutex_t lock;
45   pthread_cond_t cond;
46   int ready;
47 };
48
49 void handle_messages(TR_MQ *mq, void *arg)
50 {
51   struct message_data *status=(struct message_data *)arg;
52   pthread_mutex_lock(&(status->lock));
53   status->ready=1;
54   pthread_cond_signal(&(status->cond));
55   pthread_mutex_unlock(&(status->lock));
56 }
57
58 void output_messages(TR_MQ *mq)
59 {
60   TR_MQ_MSG *msg=NULL;
61
62   printf("\n* handle_messages notified of new messages in queue.\n");
63   for (msg=tr_mq_pop(mq); msg!=NULL; msg=tr_mq_pop(mq)) {
64     printf("  > %s\n", (char *)msg->p);
65     tr_mq_msg_free(msg);
66   }
67   printf("* all messages handled\n");
68   fflush(stdout);
69 }
70
71 #define N_THREADS 2
72
73 int main(void)
74 {
75   TR_MQ *mq=NULL;
76   pthread_t thread[N_THREADS];
77   struct thread_data thread_data[N_THREADS];
78   useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
79   int ii=0;
80   struct message_data status;
81   struct timespec timeout={0,0};
82   int wait_result=0;
83
84   mq=tr_mq_new(NULL);
85   mq->notify_cb=handle_messages;
86   mq->notify_cb_arg=(void *)&status;
87
88   pthread_cond_init(&(status.cond), 0);
89   pthread_mutex_init(&(status.lock), 0);
90   status.ready=0;
91
92   printf("Starting threads\n");
93   for (ii=0; ii<N_THREADS; ii++) {
94     thread_data[ii].mq=mq;
95     thread_data[ii].msg_dly=dly[ii];
96     thread_data[ii].n_msgs=10;
97     asprintf(&(thread_data[ii].label), "thread %d", ii+1);
98     pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
99     printf("%s started.\n", thread_data[ii].label);
100   }
101
102   while (1) {
103     pthread_mutex_lock(&(status.lock));
104     while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
105       clock_gettime(CLOCK_REALTIME, &timeout);
106       timeout.tv_sec+=1;
107       wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
108     }
109
110     if (wait_result==ETIMEDOUT)
111       break;
112
113     output_messages(mq);
114     status.ready=0;
115     pthread_mutex_unlock(&(status.lock));
116     usleep(2000000);
117   }
118   printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
119   for (ii=0; ii<N_THREADS; ii++)
120     pthread_join(thread[ii], NULL);
121
122   return 0;
123 }