refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-21 16:20:03 +08:00
parent 5313543a9e
commit efede8669f
3 changed files with 23 additions and 4 deletions

View File

@ -101,6 +101,9 @@ struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
int64_t memOfItems;
int32_t unAccessedNumOfItems;
int64_t unAccessMemOfItems;
};
STaosQueue *taosOpenQueue();
@ -123,6 +126,9 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall);
int32_t taosGetQitem(STaosQall *qall, void **ppItem);
void taosResetQitems(STaosQall *qall);
int32_t taosQallItemSize(STaosQall *qall);
int64_t taosQallMemSize(STaosQall *qll);
int64_t taosQallUnAccessedItemSize(STaosQall *qall);
int64_t taosQallUnAccessedMemSize(STaosQall *qall);
STaosQset *taosOpenQset();
void taosCloseQset(STaosQset *qset);
@ -135,8 +141,6 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
void taosResetQsetThread(STaosQset *qset, void *pItem);
extern int64_t tsRpcQueueMemoryAllowed;
#ifdef __cplusplus
}
#endif

View File

@ -119,9 +119,8 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2;
}
// todo: fix it: data in Qall is not included here
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
return taosQueueMemorySize(pQueue->pQueue);
return taosQueueMemorySize(pQueue->pQueue) + taosQallMemSize(pQueue->qall);
}
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {

View File

@ -242,6 +242,11 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->memOfItems = queue->memOfItems;
qall->unAccessedNumOfItems = queue->numOfItems;
qall->unAccessMemOfItems = queue->memOfItems;
numOfItems = qall->numOfItems;
queue->head = NULL;
@ -274,6 +279,10 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
if (pNode) {
*ppItem = pNode->item;
num = 1;
qall->unAccessedNumOfItems -= 1;
qall->unAccessMemOfItems -= pNode->dataSize;
uTrace("item:%p is fetched", *ppItem);
} else {
*ppItem = NULL;
@ -449,6 +458,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->memOfItems = queue->memOfItems;
code = qall->numOfItems;
qinfo->ahandle = queue->ahandle;
qinfo->fp = queue->itemsFp;
@ -476,6 +487,11 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
}
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
int64_t taosQallUnAccessedItemSize(STaosQall *qall) {return qall->unAccessedNumOfItems;}
int64_t taosQallUnAccessedMemSize(STaosQall *qall) {return qall->unAccessMemOfItems;}
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }