Update test programs. All now succeed.
[trust_router.git] / common / tests / thread_test.c
1 /*
2  * Copyright (c) 2016, JANET(UK)
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * 3. Neither the name of JANET(UK) nor the names of its contributors
17  *    may be used to endorse or promote products derived from this software
18  *    without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
25  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
29  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
31  * OF THE POSSIBILITY OF SUCH DAMAGE.
32  *
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <pthread.h>
39 #include <talloc.h>
40 #include <time.h>
41 #include <errno.h>
42
43 #include <tr_mq.h>
44
45 struct thread_data {
46   TR_MQ *mq;
47   useconds_t msg_dly;
48   int n_msgs;
49   char *label;
50 };
51
52 static TR_MQ_MSG *make_msg(char *label, int n)
53 {
54   TR_MQ_MSG *msg=NULL;
55   msg=tr_mq_msg_new(NULL, "Message", TR_MQ_PRIO_NORMAL);
56   asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n);
57   msg->p_free=free;
58   return msg;
59 }
60
61 static void *thread_start(void *arg)
62 {
63   TR_MQ *mq=((struct thread_data *)arg)->mq;
64   int n_msgs=((struct thread_data *)arg)->n_msgs;
65   useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
66   char *label=((struct thread_data *)arg)->label;
67   
68   while (n_msgs>=0) {
69     usleep(msg_dly);
70     tr_mq_add(mq, make_msg(label, n_msgs));
71     n_msgs--;
72   }
73   tr_mq_add(mq, make_msg(label, -9999));
74   return NULL;
75 }
76
77 struct message_data {
78   pthread_mutex_t lock;
79   pthread_cond_t cond;
80   int ready;
81 };
82
83 static void handle_messages(TR_MQ *mq, void *arg)
84 {
85   struct message_data *status=(struct message_data *)arg;
86   pthread_mutex_lock(&(status->lock));
87   status->ready=1;
88   pthread_cond_signal(&(status->cond));
89   pthread_mutex_unlock(&(status->lock));
90 }
91
92 static void output_messages(TR_MQ *mq)
93 {
94   TR_MQ_MSG *msg=NULL;
95
96   printf("\n* handle_messages notified of new messages in queue.\n");
97   for (msg=tr_mq_pop(mq, NULL); msg!=NULL; msg=tr_mq_pop(mq, NULL)) {
98     printf("  > %s\n", (char *)msg->p);
99     tr_mq_msg_free(msg);
100   }
101   printf("* all messages handled\n");
102   fflush(stdout);
103 }
104
105 #define N_THREADS 2
106
107 int main(void)
108 {
109   TR_MQ *mq=NULL;
110   pthread_t thread[N_THREADS];
111   struct thread_data thread_data[N_THREADS];
112   useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
113   int ii=0;
114   struct message_data status;
115   struct timespec timeout={0,0};
116   int wait_result=0;
117
118   mq=tr_mq_new(NULL);
119   mq->notify_cb=handle_messages;
120   mq->notify_cb_arg=(void *)&status;
121
122   pthread_cond_init(&(status.cond), 0);
123   pthread_mutex_init(&(status.lock), 0);
124   status.ready=0;
125
126   printf("Starting threads\n");
127   for (ii=0; ii<N_THREADS; ii++) {
128     thread_data[ii].mq=mq;
129     thread_data[ii].msg_dly=dly[ii];
130     thread_data[ii].n_msgs=10;
131     asprintf(&(thread_data[ii].label), "thread %d", ii+1);
132     pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
133     printf("%s started.\n", thread_data[ii].label);
134   }
135
136   while (1) {
137     pthread_mutex_lock(&(status.lock));
138     while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
139       clock_gettime(CLOCK_REALTIME, &timeout);
140       timeout.tv_sec+=1;
141       wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
142     }
143
144     if (wait_result==ETIMEDOUT)
145       break;
146
147     output_messages(mq);
148     status.ready=0;
149     pthread_mutex_unlock(&(status.lock));
150     usleep(2000000);
151   }
152   printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
153   for (ii=0; ii<N_THREADS; ii++)
154     pthread_join(thread[ii], NULL);
155
156   printf("success\n");
157   return 0;
158 }