From b9165a95039bf3ab8fbb490d5c61ba23fd112024 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 13:56:54 +0800 Subject: [PATCH] fix: reset queueSize after the queueItem is consumed and executed by the worker --- include/util/tqueue.h | 1 + source/libs/sync/src/syncIO.c | 2 ++ source/libs/transport/test/pushServer.c | 5 ++++- source/util/src/tqueue.c | 17 +++++++++++++---- source/util/src/tworker.c | 5 ++++- 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 2886190997..0f4f1db9ee 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -67,6 +67,7 @@ void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); +void taosUpdateItemSize(STaosQueue *queue, int32_t items); int32_t taosQueueItemSize(STaosQueue *queue); int64_t taosQueueMemorySize(STaosQueue *queue); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 72d74d7ae5..d9f11ba80f 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -369,6 +369,8 @@ static void *syncIOConsumerFunc(void *param) { taosFreeQitem(pRpcMsg); } + + taosUpdateItemSize(qinfo.queue, numOfMsgs); } taosFreeQall(qall); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index 6a4ff213d0..25972c9ec1 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -32,11 +32,12 @@ void processShellMsg() { SRpcMsg * pRpcMsg, rpcMsg; int type; void * pvnode; + SQueueInfo qinfo = {0}; qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL); + int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, &qinfo); tDebug("%d shell msgs are received", numOfMsgs); if (numOfMsgs <= 0) break; @@ -86,6 +87,8 @@ void processShellMsg() { rpcSendResponse(&nRpcMsg); } } + + taosUpdateItemSize(qinfo.queue, numOfMsgs); } taosFreeQall(qall); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 1895472472..50beba8a9b 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -123,6 +123,14 @@ bool taosQueueEmpty(STaosQueue *queue) { return empty; } +void taosUpdateItemSize(STaosQueue *queue, int32_t items) { + if (queue == NULL) return; + + taosThreadMutexLock(&queue->mutex); + queue->numOfItems -= items; + taosThreadMutexUnlock(&queue->mutex); +} + int32_t taosQueueItemSize(STaosQueue *queue) { if (queue == NULL) return 0; @@ -257,6 +265,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { queue->tail = NULL; queue->numOfItems = 0; queue->memOfItems = 0; + uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } @@ -424,11 +433,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; - queue->numOfItems--; + // queue->numOfItems--; queue->memOfItems -= pNode->size; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, + uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1, queue->memOfItems); } @@ -468,9 +477,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * queue->head = NULL; queue->tail = NULL; - queue->numOfItems = 0; + // queue->numOfItems = 0; queue->memOfItems = 0; - uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); + uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); for (int32_t j = 1; j < qall->numOfItems; ++j) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 5e3a0dc109..1f0731812c 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -79,7 +79,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } @@ -89,6 +89,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) { qinfo.threadNum = pool->num; (*((FItem)qinfo.fp))(&qinfo, msg); } + + taosUpdateItemSize(qinfo.queue, 1); } return NULL; @@ -214,6 +216,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { qinfo.threadNum = pool->num; (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs); } + taosUpdateItemSize(qinfo.queue, numOfMsgs); } return NULL;