18 TR_MQ_MSG *make_msg(label, n)
21 msg=tr_mq_msg_new(NULL, "Message", TR_MQ_PRIO_NORMAL);
22 asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n);
27 void *thread_start(void *arg)
29 TR_MQ *mq=((struct thread_data *)arg)->mq;
30 int n_msgs=((struct thread_data *)arg)->n_msgs;
31 useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
32 const char *label=((struct thread_data *)arg)->label;
36 tr_mq_add(mq, make_msg(label, n_msgs));
39 tr_mq_add(mq, make_msg(label, -9999));
49 void handle_messages(TR_MQ *mq, void *arg)
51 struct message_data *status=(struct message_data *)arg;
52 pthread_mutex_lock(&(status->lock));
54 pthread_cond_signal(&(status->cond));
55 pthread_mutex_unlock(&(status->lock));
58 void output_messages(TR_MQ *mq)
62 printf("\n* handle_messages notified of new messages in queue.\n");
63 for (msg=tr_mq_pop(mq); msg!=NULL; msg=tr_mq_pop(mq)) {
64 printf(" > %s\n", (char *)msg->p);
67 printf("* all messages handled\n");
76 pthread_t thread[N_THREADS];
77 struct thread_data thread_data[N_THREADS];
78 useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
80 struct message_data status;
81 struct timespec timeout={0,0};
85 mq->notify_cb=handle_messages;
86 mq->notify_cb_arg=(void *)&status;
88 pthread_cond_init(&(status.cond), 0);
89 pthread_mutex_init(&(status.lock), 0);
92 printf("Starting threads\n");
93 for (ii=0; ii<N_THREADS; ii++) {
94 thread_data[ii].mq=mq;
95 thread_data[ii].msg_dly=dly[ii];
96 thread_data[ii].n_msgs=10;
97 asprintf(&(thread_data[ii].label), "thread %d", ii+1);
98 pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
99 printf("%s started.\n", thread_data[ii].label);
103 pthread_mutex_lock(&(status.lock));
104 while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
105 clock_gettime(CLOCK_REALTIME, &timeout);
107 wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
110 if (wait_result==ETIMEDOUT)
115 pthread_mutex_unlock(&(status.lock));
118 printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
119 for (ii=0; ii<N_THREADS; ii++)
120 pthread_join(thread[ii], NULL);