From efede8669f435eae99dda7727fa4eef890d4aca5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 21 Oct 2023 16:20:03 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/util/tqueue.h | 8 ++++++-- source/libs/stream/src/streamQueue.c | 3 +-- source/util/src/tqueue.c | 16 ++++++++++++++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 503d15e793..9f09bd2930 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -101,6 +101,9 @@ struct STaosQall { STaosQnode *current; STaosQnode *start; int32_t numOfItems; + int64_t memOfItems; + int32_t unAccessedNumOfItems; + int64_t unAccessMemOfItems; }; STaosQueue *taosOpenQueue(); @@ -123,6 +126,9 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); int32_t taosGetQitem(STaosQall *qall, void **ppItem); void taosResetQitems(STaosQall *qall); int32_t taosQallItemSize(STaosQall *qall); +int64_t taosQallMemSize(STaosQall *qll); +int64_t taosQallUnAccessedItemSize(STaosQall *qall); +int64_t taosQallUnAccessedMemSize(STaosQall *qall); STaosQset *taosOpenQset(); void taosCloseQset(STaosQset *qset); @@ -135,8 +141,6 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo); void taosResetQsetThread(STaosQset *qset, void *pItem); -extern int64_t tsRpcQueueMemoryAllowed; - #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 70a065c22e..7305e9db83 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -119,9 +119,8 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } -// todo: fix it: data in Qall is not included here int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { - return taosQueueMemorySize(pQueue->pQueue); + return taosQueueMemorySize(pQueue->pQueue) + taosQallMemSize(pQueue->qall); } int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 81350dddd2..1dfdd637b6 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -242,6 +242,11 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; + qall->memOfItems = queue->memOfItems; + + qall->unAccessedNumOfItems = queue->numOfItems; + qall->unAccessMemOfItems = queue->memOfItems; + numOfItems = qall->numOfItems; queue->head = NULL; @@ -274,6 +279,10 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { if (pNode) { *ppItem = pNode->item; num = 1; + + qall->unAccessedNumOfItems -= 1; + qall->unAccessMemOfItems -= pNode->dataSize; + uTrace("item:%p is fetched", *ppItem); } else { *ppItem = NULL; @@ -449,6 +458,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; + qall->memOfItems = queue->memOfItems; + code = qall->numOfItems; qinfo->ahandle = queue->ahandle; qinfo->fp = queue->itemsFp; @@ -476,6 +487,11 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * } int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; } +int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; } + +int64_t taosQallUnAccessedItemSize(STaosQall *qall) {return qall->unAccessedNumOfItems;} +int64_t taosQallUnAccessedMemSize(STaosQall *qall) {return qall->unAccessMemOfItems;} + void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }