Test multithreaded functionality of tm_mq.
authorJennifer Richards <jennifer@painless-security.com>
Wed, 22 Jun 2016 20:59:02 +0000 (16:59 -0400)
committerJennifer Richards <jennifer@painless-security.com>
Wed, 22 Jun 2016 20:59:02 +0000 (16:59 -0400)
Makefile.am
common/mq_test/thread_test.c [new file with mode: 0644]

index 4313667..0581b99 100644 (file)
@@ -1,6 +1,6 @@
 DISTCHECK_CONFIGURE_FLAGS = \
        --with-systemdsystemunitdir=$$dc_install_base/$(systemdsystemunitdir)
-bin_PROGRAMS= tr/trust_router tr/trpc tid/example/tidc tid/example/tids common/dh_test/tr_dh_test common/mq_test/mq_test trp/msgtst
+bin_PROGRAMS= tr/trust_router tr/trpc tid/example/tidc tid/example/tids common/dh_test/tr_dh_test common/mq_test/mq_test common/mq_test/thread_test trp/msgtst
 AM_CPPFLAGS=-I$(srcdir)/include $(GLIB_CFLAGS)
 AM_CFLAGS = -Wall -Werror=missing-prototypes -Werror -Wno-parentheses $(GLIB_CFLAGS)
 SUBDIRS = gsscon 
@@ -72,8 +72,16 @@ common/dh_test/dh_test.c
 common_mq_test_mq_test_SOURCES = common/tr_mq.c \
 common/mq_test/mq_test.c
 
+common_mq_test_mq_test_CFLAGS = -pthread
 common_mq_test_mq_test_LDFLAGS = -pthread -ltalloc
 
+common_mq_test_thread_test_SOURCES = common/tr_mq.c \
+common/tr_debug.c \
+common/mq_test/thread_test.c
+
+common_mq_test_thread_test_CFLAGS = -pthread
+common_mq_test_thread_test_LDFLAGS = -pthread -ltalloc
+
 libtr_tid_la_SOURCES = tid/tids.c tid/tidc.c tid/tid_req.c tid/tid_resp.c \
 $(common_srcs)
 
diff --git a/common/mq_test/thread_test.c b/common/mq_test/thread_test.c
new file mode 100644 (file)
index 0000000..7cb52e2
--- /dev/null
@@ -0,0 +1,123 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <talloc.h>
+#include <time.h>
+#include <errno.h>
+
+#include <tr_mq.h>
+
+struct thread_data {
+  TR_MQ *mq;
+  useconds_t msg_dly;
+  int n_msgs;
+  char *label;
+};
+
+TR_MQ_MSG *make_msg(label, n)
+{
+  TR_MQ_MSG *msg=NULL;
+  msg=tr_mq_msg_new(NULL);
+  asprintf((char **)&(msg->p), "%s: %d messages to go...", label, n);
+  msg->p_free=free;
+  return msg;
+}
+
+void *thread_start(void *arg)
+{
+  TR_MQ *mq=((struct thread_data *)arg)->mq;
+  int n_msgs=((struct thread_data *)arg)->n_msgs;
+  useconds_t msg_dly=((struct thread_data *)arg)->msg_dly;
+  const char *label=((struct thread_data *)arg)->label;
+  
+  while (n_msgs>=0) {
+    usleep(msg_dly);
+    tr_mq_append(mq, make_msg(label, n_msgs));
+    n_msgs--;
+  }
+  tr_mq_append(mq, make_msg(label, -9999));
+  return NULL;
+}
+
+struct message_data {
+  pthread_mutex_t lock;
+  pthread_cond_t cond;
+  int ready;
+};
+
+void handle_messages(TR_MQ *mq, void *arg)
+{
+  struct message_data *status=(struct message_data *)arg;
+  pthread_mutex_lock(&(status->lock));
+  status->ready=1;
+  pthread_cond_signal(&(status->cond));
+  pthread_mutex_unlock(&(status->lock));
+}
+
+void output_messages(TR_MQ *mq)
+{
+  TR_MQ_MSG *msg=NULL;
+
+  printf("\n* handle_messages notified of new messages in queue.\n");
+  for (msg=tr_mq_pop(mq); msg!=NULL; msg=tr_mq_pop(mq)) {
+    printf("  > %s\n", (char *)msg->p);
+    tr_mq_msg_free(msg);
+  }
+  printf("* all messages handled\n");
+  fflush(stdout);
+}
+
+#define N_THREADS 2
+
+int main(void)
+{
+  TR_MQ *mq=NULL;
+  pthread_t thread[N_THREADS];
+  struct thread_data thread_data[N_THREADS];
+  useconds_t dly[N_THREADS]={100000, 1000000}; /* must be N_THREADS long */
+  int ii=0;
+  struct message_data status;
+  struct timespec timeout={0,0};
+  int wait_result=0;
+
+  mq=tr_mq_new(NULL);
+  mq->notify_cb=handle_messages;
+  mq->notify_cb_arg=(void *)&status;
+
+  pthread_cond_init(&(status.cond), 0);
+  pthread_mutex_init(&(status.lock), 0);
+  status.ready=0;
+
+  printf("Starting threads\n");
+  for (ii=0; ii<N_THREADS; ii++) {
+    thread_data[ii].mq=mq;
+    thread_data[ii].msg_dly=dly[ii];
+    thread_data[ii].n_msgs=10;
+    asprintf(&(thread_data[ii].label), "thread %d", ii+1);
+    pthread_create(&(thread[ii]), NULL, thread_start, &thread_data[ii]);
+    printf("%s started.\n", thread_data[ii].label);
+  }
+
+  while (1) {
+    pthread_mutex_lock(&(status.lock));
+    while ((!status.ready) && (wait_result!=ETIMEDOUT)) {
+      clock_gettime(CLOCK_REALTIME, &timeout);
+      timeout.tv_sec+=1;
+      wait_result=pthread_cond_timedwait(&(status.cond), &(status.lock), &timeout);
+    }
+
+    if (wait_result==ETIMEDOUT)
+      break;
+
+    output_messages(mq);
+    status.ready=0;
+    pthread_mutex_unlock(&(status.lock));
+    usleep(2000000);
+  }
+  printf("\n*** Timeout expired with no new messages. Joining threads and terminating.\n");
+  for (ii=0; ii<N_THREADS; ii++)
+    pthread_join(thread[ii], NULL);
+
+  return 0;
+}