more code

This commit is contained in:
Hongze Cheng 2023-01-10 11:01:41 +08:00
parent 851dba5166
commit f26f661f47
3 changed files with 94 additions and 74 deletions

View File

@ -83,6 +83,8 @@ struct SVBufPool {
int32_t vnodeOpenBufPool(SVnode* pVnode);
int32_t vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool);
void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);

View File

@ -212,34 +212,9 @@ void vnodeBufPoolRef(SVBufPool *pPool) {
ASSERT(nRef > 0);
}
void vnodeBufPoolUnRef(SVBufPool *pPool) {
if (pPool == NULL) return;
void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
SVnode *pVnode = pPool->pVnode;
taosThreadMutexLock(&pVnode->mutex);
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
// remove from recycle list or on-recycle position
if (pVnode->onRecycle == pPool) {
pVnode->onRecycle = NULL;
} else {
if (pPool->recyclePrev) {
pPool->recyclePrev->recycleNext = pPool->recycleNext;
} else {
pVnode->recycleHead = pPool->recycleNext;
}
if (pPool->recycleNext) {
pPool->recycleNext->recyclePrev = pPool->recyclePrev;
} else {
pVnode->recycleTail = pPool->recyclePrev;
}
pPool->recyclePrev = pPool->recycleNext = NULL;
}
// change the pool size if need
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
if (pPool->node.size != size) {
SVBufPool *pNewPool = NULL;
@ -257,10 +232,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
}
// add to free list
vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
vnodeBufPoolReset(pPool);
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
}
void vnodeBufPoolUnRef(SVBufPool *pPool) {
if (pPool == NULL) return;
SVnode *pVnode = pPool->pVnode;
taosThreadMutexLock(&pVnode->mutex);
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
// remove from recycle queue or on-recycle position
if (pVnode->onRecycle == pPool) {
pVnode->onRecycle = NULL;
} else {
if (pPool->recyclePrev) {
pPool->recyclePrev->recycleNext = pPool->recycleNext;
} else {
pVnode->recycleHead = pPool->recycleNext;
}
if (pPool->recycleNext) {
pPool->recycleNext->recyclePrev = pPool->recyclePrev;
} else {
pVnode->recycleTail = pPool->recyclePrev;
}
pPool->recyclePrev = pPool->recycleNext = NULL;
}
vnodeBufPoolAddToFreeList(pPool);
_exit:
taosThreadMutexUnlock(&pVnode->mutex);
@ -295,6 +301,48 @@ int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
taosThreadMutexUnlock(&pPool->mutex);
_exit:
return code;
}
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
int32_t code = 0;
bool canRecycle;
SVnode *pVnode = pPool->pVnode;
vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
taosThreadMutexLock(&pPool->mutex);
SQueryNode *pNode = pPool->qList.pNext;
while (pNode != &pPool->qList) {
int32_t rc = pNode->reseek(pNode->pQHandle);
if (rc == 0) {
SQueryNode *pTNode = pNode->pNext;
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
pPool->nQuery--;
pNode = pTNode;
} else if (rc == TSDB_CODE_VND_QUERY_BUSY) {
pNode = pNode->pNext;
} else {
taosThreadMutexUnlock(&pPool->mutex);
code = rc;
goto _exit;
}
}
canRecycle = (pPool->nQuery == 0);
taosThreadMutexUnlock(&pPool->mutex);
if (canRecycle) {
ASSERT(atomic_load_32(&pPool->nRef) == 0);
pVnode->onRecycle = NULL;
vnodeBufPoolAddToFreeList(pPool);
}
_exit:
return code;
}

View File

@ -31,7 +31,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode));
goto _exit;
} else {
vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
pVnode->recycleHead->id);
pVnode->onRecycle = pVnode->recycleHead;
@ -45,33 +45,8 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
}
}
// do recycle the buffer pool
SVBufPool *pPool = pVnode->onRecycle;
vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id);
taosThreadMutexLock(&pPool->mutex);
SQueryNode *pNode = pPool->qList.pNext;
while (pNode != &pPool->qList) {
int32_t rc = pNode->reseek(pNode->pQHandle);
if (rc == 0) {
SQueryNode *pTNode = pNode->pNext;
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
pPool->nQuery--;
pNode = pTNode;
} else if (rc == TSDB_CODE_VND_QUERY_BUSY) {
pNode = pNode->pNext;
} else {
taosThreadMutexUnlock(&pPool->mutex);
code = rc;
goto _exit;
}
}
taosThreadMutexUnlock(&pPool->mutex);
// TODO: if (pPool->nQuery == 0) add to free list
code = vnodeBufPoolRecycle(pVnode->onRecycle);
if (code) goto _exit;
_exit:
if (code) {
@ -360,17 +335,7 @@ _exit:
return code;
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
SCommitInfo *pInfo = (SCommitInfo *)arg;
SVnode *pVnode = pInfo->pVnode;
// commit
code = vnodeCommitImpl(pInfo);
if (code) goto _exit;
// recycle buffer pool
static void vnodeReturnBufPool(SVnode *pVnode) {
taosThreadMutexLock(&pVnode->mutex);
SVBufPool *pPool = pVnode->onCommit;
@ -378,16 +343,9 @@ static int32_t vnodeCommitTask(void *arg) {
pVnode->onCommit = NULL;
if (nRef == 0) {
// add to free list
vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
vnodeBufPoolReset(pPool);
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
vnodeBufPoolAddToFreeList(pPool);
} else if (nRef > 0) {
// add to recycle list
vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id);
vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
if (pVnode->recycleTail == NULL) {
pPool->recyclePrev = pPool->recycleNext = NULL;
@ -403,6 +361,18 @@ static int32_t vnodeCommitTask(void *arg) {
}
taosThreadMutexUnlock(&pVnode->mutex);
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
SCommitInfo *pInfo = (SCommitInfo *)arg;
SVnode *pVnode = pInfo->pVnode;
// commit
code = vnodeCommitImpl(pInfo);
if (code) goto _exit;
vnodeReturnBufPool(pVnode);
_exit:
// end commit