From a44784795fb45763dbb276878499afe28dc321eb Mon Sep 17 00:00:00 2001 From: Jennifer Richards Date: Wed, 22 Jun 2016 13:08:32 -0400 Subject: [PATCH] Implement message queue and test program. --- Makefile.am | 7 ++- common/mq_test/mq_test.c | 161 +++++++++++++++++++++++++++++++++++++++++++++++ common/tr_mq.c | 94 +++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 common/mq_test/mq_test.c create mode 100644 common/tr_mq.c diff --git a/Makefile.am b/Makefile.am index 8b06456..4313667 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,6 +1,6 @@ DISTCHECK_CONFIGURE_FLAGS = \ --with-systemdsystemunitdir=$$dc_install_base/$(systemdsystemunitdir) -bin_PROGRAMS= tr/trust_router tr/trpc tid/example/tidc tid/example/tids common/dh_test/tr_dh_test trp/msgtst +bin_PROGRAMS= tr/trust_router tr/trpc tid/example/tidc tid/example/tids common/dh_test/tr_dh_test common/mq_test/mq_test trp/msgtst AM_CPPFLAGS=-I$(srcdir)/include $(GLIB_CFLAGS) AM_CFLAGS = -Wall -Werror=missing-prototypes -Werror -Wno-parentheses $(GLIB_CFLAGS) SUBDIRS = gsscon @@ -69,6 +69,11 @@ common_dh_test_tr_dh_test_SOURCES = common/tr_dh.c \ common/tr_debug.c \ common/dh_test/dh_test.c +common_mq_test_mq_test_SOURCES = common/tr_mq.c \ +common/mq_test/mq_test.c + +common_mq_test_mq_test_LDFLAGS = -pthread -ltalloc + libtr_tid_la_SOURCES = tid/tids.c tid/tidc.c tid/tid_req.c tid/tid_resp.c \ $(common_srcs) diff --git a/common/mq_test/mq_test.c b/common/mq_test/mq_test.c new file mode 100644 index 0000000..a833510 --- /dev/null +++ b/common/mq_test/mq_test.c @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2016, JANET(UK) + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of JANET(UK) nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include + +#include + +static void notify_cb(TR_MQ *mq, void *arg) +{ + char *s=(char *)arg; + + printf("MQ %s no longer empty.\n", s); +} + +int main(void) +{ + TR_MQ *mq=NULL; + TR_MQ_MSG *msg=NULL; + TR_MQ_MSG *msg1=NULL; + TR_MQ_MSG *msg2=NULL; + TR_MQ_MSG *msg3=NULL; + TR_MQ_MSG *msg4=NULL; + char *mq_name="1"; + + mq=tr_mq_new(NULL); + mq->notify_cb=notify_cb; + mq->notify_cb_arg=mq_name; + + msg1=tr_mq_msg_new(NULL); + asprintf((char **)&(msg1->p), "First message.\n"); + msg1->p_free=free; + tr_mq_append(mq, msg1); + assert(mq->head==msg1); + assert(mq->tail==msg1); + assert(msg1->next==NULL); + + msg2=tr_mq_msg_new(NULL); + asprintf((char **)&(msg2->p), "Second message.\n"); + msg2->p_free=free; + tr_mq_append(mq, msg2); + assert(mq->head==msg1); + assert(msg1->next==msg2); + assert(mq->tail==msg2); + assert(msg2->next==NULL); + + msg=tr_mq_pop(mq); + assert(msg==msg1); + assert(mq->head==msg2); + assert(mq->tail==msg2); + assert(msg2->next==NULL); + if ((msg!=NULL) && (msg->p!=NULL)) { + printf((char *)msg->p); + tr_mq_msg_free(msg); + } else + printf("no message to pop\n"); + + msg3=tr_mq_msg_new(NULL); + asprintf((char **)&(msg3->p), "Third message.\n"); + msg3->p_free=free; + tr_mq_append(mq, msg3); + assert(mq->head==msg2); + assert(mq->tail==msg3); + assert(msg2->next==msg3); + assert(msg3->next==NULL); + + msg=tr_mq_pop(mq); + assert(msg==msg2); + assert(mq->head==msg3); + assert(mq->tail==msg3); + assert(msg3->next==NULL); + if ((msg!=NULL) && (msg->p!=NULL)) { + printf((char *)msg->p); + tr_mq_msg_free(msg); + } else + printf("no message to pop\n"); + + msg=tr_mq_pop(mq); + assert(msg==msg3); + assert(mq->head==NULL); + assert(mq->tail==NULL); + if ((msg!=NULL) && (msg->p!=NULL)) { + printf((char *)msg->p); + tr_mq_msg_free(msg); + } else + printf("no message to pop\n"); + + msg=tr_mq_pop(mq); + assert(msg==NULL); + assert(mq->head==NULL); + assert(mq->tail==NULL); + if ((msg!=NULL) && (msg->p!=NULL)) { + printf((char *)msg->p); + tr_mq_msg_free(msg); + } else + printf("no message to pop\n"); + + msg4=tr_mq_msg_new(NULL); + asprintf((char **)&(msg4->p), "Fourth message.\n"); + msg4->p_free=free; + tr_mq_append(mq, msg4); + assert(mq->head==msg4); + assert(mq->tail==msg4); + assert(msg4->next==NULL); + + msg=tr_mq_pop(mq); + assert(msg==msg4); + assert(mq->head==NULL); + assert(mq->tail==NULL); + if ((msg!=NULL) && (msg->p!=NULL)) { + printf((char *)msg->p); + tr_mq_msg_free(msg); + } else + printf("no message to pop\n"); + + msg=tr_mq_pop(mq); + assert(msg==NULL); + assert(mq->head==NULL); + assert(mq->tail==NULL); + if ((msg!=NULL) && (msg->p!=NULL)) { + printf((char *)msg->p); + tr_mq_msg_free(msg); + } else + printf("no message to pop\n"); + + tr_mq_free(mq); + + return 0; +} diff --git a/common/tr_mq.c b/common/tr_mq.c new file mode 100644 index 0000000..c41a48c --- /dev/null +++ b/common/tr_mq.c @@ -0,0 +1,94 @@ +#include +#include + +#include + +/* Messages */ +static int tr_mq_msg_destructor(void *object) +{ + TR_MQ_MSG *msg=talloc_get_type_abort(object, TR_MQ_MSG); + if ( (msg->p!=NULL) && (msg->p_free!=NULL)) + msg->p_free(msg->p); + return 0; +} + +TR_MQ_MSG *tr_mq_msg_new(TALLOC_CTX *mem_ctx) +{ + TR_MQ_MSG *msg=talloc(mem_ctx, TR_MQ_MSG); + if (msg!=NULL) { + msg->next=NULL; + msg->p=NULL; + talloc_set_destructor((void *)msg, tr_mq_msg_destructor); + } + return msg; +} + +void tr_mq_msg_free(TR_MQ_MSG *msg) +{ + if (msg!=NULL) + talloc_free(msg); +} + +/* Message Queues */ +TR_MQ *tr_mq_new(TALLOC_CTX *mem_ctx) +{ + TR_MQ *mq=talloc(mem_ctx, TR_MQ); + if (mq!=NULL) { + pthread_mutex_init(&(mq->lock), 0); + mq->head=NULL; + mq->tail=NULL; + } + return mq; +} + +void tr_mq_free(TR_MQ *mq) +{ + if (mq!=NULL) { + pthread_mutex_lock(&(mq->lock)); /* don't pull this out from under someone */ + talloc_free(mq); + } +} + +/* puts msg in mq's talloc context */ +void tr_mq_append(TR_MQ *mq, TR_MQ_MSG *msg) +{ + int was_empty=FALSE; + TR_MQ_NOTIFY_FN notify_cb=NULL; + void *notify_cb_arg=NULL; + + pthread_mutex_lock(&(mq->lock)); + if (mq->head==NULL) { + was_empty=TRUE; + mq->head=mq->tail=msg; + } else { + mq->tail->next=msg; /* add to list */ + mq->tail=msg; /* update tail of list */ + } + talloc_steal(mq, msg); + /* before releasing the lock, get notify_cb data out of mq */ + notify_cb=mq->notify_cb; + notify_cb_arg=mq->notify_cb_arg; + pthread_mutex_unlock(&(mq->lock)); + + /* see if we need to tell someone we became non-empty */ + if (was_empty && (notify_cb!=NULL)) + mq->notify_cb(mq, notify_cb_arg); +} + +/* caller must free msg via tr_mq_msg_free */ +TR_MQ_MSG *tr_mq_pop(TR_MQ *mq) +{ + TR_MQ_MSG *popped=NULL; + + pthread_mutex_lock(&(mq->lock)); + if (mq->head!=NULL) { + popped=mq->head; + mq->head=mq->head->next; + if (mq->head==NULL) + mq->tail=NULL; /* just popped the last element */ + } + pthread_mutex_unlock(&(mq->lock)); + if (popped!=NULL) + popped->next=NULL; /* disconnect */ + return popped; +} -- 2.1.4