From 8617af2a5580d9a67b1a3e96a66c84be43ee867d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 8 May 2023 07:43:44 +0000 Subject: [PATCH] fix queue failure --- source/util/src/tqueue.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 767b0d7a24..81350dddd2 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -78,7 +78,7 @@ bool taosQueueEmpty(STaosQueue *queue) { bool empty = false; taosThreadMutexLock(&queue->mutex); - if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { + if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) { empty = true; } taosThreadMutexUnlock(&queue->mutex); @@ -162,7 +162,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { pNode->next = NULL; taosThreadMutexLock(&queue->mutex); - if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) { + if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) { code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, queue->memLimit, tstrerror(code)); @@ -208,7 +208,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; - queue->memOfItems -= pNode->size; + queue->memOfItems -= (pNode->size + pNode->dataSize); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, @@ -413,7 +413,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; // queue->numOfItems--; - queue->memOfItems -= pNode->size; + queue->memOfItems -= (pNode->size + pNode->dataSize); atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,