2 * Copyright (c) 2016, JANET(UK)
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the name of JANET(UK) nor the names of its contributors
17 * may be used to endorse or promote products derived from this software
18 * without specific prior written permission.
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
25 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
29 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
31 * OF THE POSSIBILITY OF SUCH DAMAGE.
52 TR_MQ_MSG *make_msg(label, n)
55 msg=tr_mq_msg_new(NULL, "Message", TR_MQ_PRIO_NORMAL);
56 asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n);
61 void *thread_start(void *arg)
63 TR_MQ *mq=((struct thread_data *)arg)->mq;
64 int n_msgs=((struct thread_data *)arg)->n_msgs;
65 useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
66 const char *label=((struct thread_data *)arg)->label;
70 tr_mq_add(mq, make_msg(label, n_msgs));
73 tr_mq_add(mq, make_msg(label, -9999));
83 void handle_messages(TR_MQ *mq, void *arg)
85 struct message_data *status=(struct message_data *)arg;
86 pthread_mutex_lock(&(status->lock));
88 pthread_cond_signal(&(status->cond));
89 pthread_mutex_unlock(&(status->lock));
92 void output_messages(TR_MQ *mq)
96 printf("\n* handle_messages notified of new messages in queue.\n");
97 for (msg=tr_mq_pop(mq); msg!=NULL; msg=tr_mq_pop(mq)) {
98 printf(" > %s\n", (char *)msg->p);
101 printf("* all messages handled\n");
110 pthread_t thread[N_THREADS];
111 struct thread_data thread_data[N_THREADS];
112 useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
114 struct message_data status;
115 struct timespec timeout={0,0};
119 mq->notify_cb=handle_messages;
120 mq->notify_cb_arg=(void *)&status;
122 pthread_cond_init(&(status.cond), 0);
123 pthread_mutex_init(&(status.lock), 0);
126 printf("Starting threads\n");
127 for (ii=0; ii<N_THREADS; ii++) {
128 thread_data[ii].mq=mq;
129 thread_data[ii].msg_dly=dly[ii];
130 thread_data[ii].n_msgs=10;
131 asprintf(&(thread_data[ii].label), "thread %d", ii+1);
132 pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
133 printf("%s started.\n", thread_data[ii].label);
137 pthread_mutex_lock(&(status.lock));
138 while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
139 clock_gettime(CLOCK_REALTIME, &timeout);
141 wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
144 if (wait_result==ETIMEDOUT)
149 pthread_mutex_unlock(&(status.lock));
152 printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
153 for (ii=0; ii<N_THREADS; ii++)
154 pthread_join(thread[ii], NULL);