From 356e386a5b3e1ae290f931001810846e93026b95 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 29 May 2020 12:58:29 +0000 Subject: [PATCH 1/2] set pNode->next to null, since item may be write into queue multiple times --- src/util/src/tqueue.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 475941dbdb..88830fa213 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -95,6 +95,7 @@ void *taosAllocateQitem(int size) { void taosFreeQitem(void *param) { if (param == NULL) return; + uTrace("item:%p is freed", param); char *temp = (char *)param; temp -= sizeof(STaosQnode); free(temp); @@ -104,6 +105,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); pNode->type = type; + pNode->next = NULL; pthread_mutex_lock(&queue->mutex); @@ -143,7 +145,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, *type, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -337,6 +339,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; + uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); From aa0ac1348ea688ca513d54f3459e1eef5e7adf95 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 29 May 2020 13:09:01 +0000 Subject: [PATCH 2/2] structure style --- src/util/src/tqueue.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 88830fa213..428c40ef7a 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -18,24 +18,24 @@ #include "taoserror.h" #include "tqueue.h" -typedef struct _taos_qnode { +typedef struct STaosQnode { int type; - struct _taos_qnode *next; + struct STaosQnode *next; char item[]; } STaosQnode; -typedef struct _taos_q { +typedef struct STaosQueue { int32_t itemSize; int32_t numOfItems; - struct _taos_qnode *head; - struct _taos_qnode *tail; - struct _taos_q *next; // for queue set - struct _taos_qset *qset; // for queue set + struct STaosQnode *head; + struct STaosQnode *tail; + struct STaosQueue *next; // for queue set + struct STaosQset *qset; // for queue set void *ahandle; // for queue set pthread_mutex_t mutex; } STaosQueue; -typedef struct _taos_qset { +typedef struct STaosQset { STaosQueue *head; STaosQueue *current; pthread_mutex_t mutex; @@ -44,7 +44,7 @@ typedef struct _taos_qset { tsem_t sem; } STaosQset; -typedef struct _taos_qall { +typedef struct STaosQall { STaosQnode *current; STaosQnode *start; int32_t itemSize;