2 * Copyright (c) 2016, JANET(UK)
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the name of JANET(UK) nor the names of its contributors
17 * may be used to endorse or promote products derived from this software
18 * without specific prior written permission.
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
25 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
29 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
31 * OF THE POSSIBILITY OF SUCH DAMAGE.
53 static TR_MQ_MSG *make_msg(char *label, int n)
56 msg= tr_mq_msg_new(NULL, "Message");
57 assert(-1!=asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n));
62 static void *thread_start(void *arg)
64 TR_MQ *mq=((struct thread_data *)arg)->mq;
65 int n_msgs=((struct thread_data *)arg)->n_msgs;
66 useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
67 char *label=((struct thread_data *)arg)->label;
71 tr_mq_add(mq, make_msg(label, n_msgs));
74 tr_mq_add(mq, make_msg(label, -9999));
84 static void handle_messages(TR_MQ *mq, void *arg)
86 struct message_data *status=(struct message_data *)arg;
87 pthread_mutex_lock(&(status->lock));
89 pthread_cond_signal(&(status->cond));
90 pthread_mutex_unlock(&(status->lock));
93 static void output_messages(TR_MQ *mq)
97 printf("\n* handle_messages notified of new messages in queue.\n");
98 for (msg=tr_mq_pop(mq, NULL); msg!=NULL; msg=tr_mq_pop(mq, NULL)) {
99 printf(" > %s\n", (char *)msg->p);
102 printf("* all messages handled\n");
111 pthread_t thread[N_THREADS];
112 struct thread_data thread_data[N_THREADS];
113 useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
115 struct message_data status;
116 struct timespec timeout={0,0};
120 mq->notify_cb=handle_messages;
121 mq->notify_cb_arg=(void *)&status;
123 pthread_cond_init(&(status.cond), 0);
124 pthread_mutex_init(&(status.lock), 0);
127 printf("Starting threads\n");
128 for (ii=0; ii<N_THREADS; ii++) {
129 thread_data[ii].mq=mq;
130 thread_data[ii].msg_dly=dly[ii];
131 thread_data[ii].n_msgs=10;
132 assert(-1!=asprintf(&(thread_data[ii].label), "thread %d", ii+1));
133 pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
134 printf("%s started.\n", thread_data[ii].label);
138 pthread_mutex_lock(&(status.lock));
139 while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
140 clock_gettime(CLOCK_REALTIME, &timeout);
142 wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
145 if (wait_result==ETIMEDOUT)
150 pthread_mutex_unlock(&(status.lock));
153 printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
154 for (ii=0; ii<N_THREADS; ii++)
155 pthread_join(thread[ii], NULL);