From f26f661f47920f050a7d3d8a6af954afc38439b4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 11:01:41 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/vnd.h | 2 + source/dnode/vnode/src/vnd/vnodeBufPool.c | 100 ++++++++++++++++------ source/dnode/vnode/src/vnd/vnodeCommit.c | 66 ++++---------- 3 files changed, 94 insertions(+), 74 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index fafb146ad8..e0d0e9408b 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -83,6 +83,8 @@ struct SVBufPool { int32_t vnodeOpenBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode); void vnodeBufPoolReset(SVBufPool* pPool); +void vnodeBufPoolAddToFreeList(SVBufPool* pPool); +int32_t vnodeBufPoolRecycle(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index d05f126413..2aff4a1927 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -212,34 +212,9 @@ void vnodeBufPoolRef(SVBufPool *pPool) { ASSERT(nRef > 0); } -void vnodeBufPoolUnRef(SVBufPool *pPool) { - if (pPool == NULL) return; - +void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { SVnode *pVnode = pPool->pVnode; - taosThreadMutexLock(&pVnode->mutex); - - if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; - - // remove from recycle list or on-recycle position - if (pVnode->onRecycle == pPool) { - pVnode->onRecycle = NULL; - } else { - if (pPool->recyclePrev) { - pPool->recyclePrev->recycleNext = pPool->recycleNext; - } else { - pVnode->recycleHead = pPool->recycleNext; - } - - if (pPool->recycleNext) { - pPool->recycleNext->recyclePrev = pPool->recyclePrev; - } else { - pVnode->recycleTail = pPool->recyclePrev; - } - pPool->recyclePrev = pPool->recycleNext = NULL; - } - - // change the pool size if need int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; if (pPool->node.size != size) { SVBufPool *pNewPool = NULL; @@ -257,10 +232,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { } // add to free list + vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); vnodeBufPoolReset(pPool); pPool->freeNext = pVnode->freeList; pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); +} + +void vnodeBufPoolUnRef(SVBufPool *pPool) { + if (pPool == NULL) return; + + SVnode *pVnode = pPool->pVnode; + + taosThreadMutexLock(&pVnode->mutex); + + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; + + // remove from recycle queue or on-recycle position + if (pVnode->onRecycle == pPool) { + pVnode->onRecycle = NULL; + } else { + if (pPool->recyclePrev) { + pPool->recyclePrev->recycleNext = pPool->recycleNext; + } else { + pVnode->recycleHead = pPool->recycleNext; + } + + if (pPool->recycleNext) { + pPool->recycleNext->recyclePrev = pPool->recyclePrev; + } else { + pVnode->recycleTail = pPool->recyclePrev; + } + pPool->recyclePrev = pPool->recycleNext = NULL; + } + + vnodeBufPoolAddToFreeList(pPool); _exit: taosThreadMutexUnlock(&pVnode->mutex); @@ -295,6 +301,48 @@ int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { taosThreadMutexUnlock(&pPool->mutex); +_exit: + return code; +} + +int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { + int32_t code = 0; + + bool canRecycle; + SVnode *pVnode = pPool->pVnode; + + vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); + + taosThreadMutexLock(&pPool->mutex); + + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + int32_t rc = pNode->reseek(pNode->pQHandle); + if (rc == 0) { + SQueryNode *pTNode = pNode->pNext; + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + pPool->nQuery--; + pNode = pTNode; + } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { + pNode = pNode->pNext; + } else { + taosThreadMutexUnlock(&pPool->mutex); + code = rc; + goto _exit; + } + } + + canRecycle = (pPool->nQuery == 0); + + taosThreadMutexUnlock(&pPool->mutex); + + if (canRecycle) { + ASSERT(atomic_load_32(&pPool->nRef) == 0); + pVnode->onRecycle = NULL; + vnodeBufPoolAddToFreeList(pPool); + } + _exit: return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8657b80bde..d3fe8e6cc5 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -31,7 +31,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); goto _exit; } else { - vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, pVnode->recycleHead->id); pVnode->onRecycle = pVnode->recycleHead; @@ -45,33 +45,8 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { } } - // do recycle the buffer pool - SVBufPool *pPool = pVnode->onRecycle; - vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id); - - taosThreadMutexLock(&pPool->mutex); - - SQueryNode *pNode = pPool->qList.pNext; - while (pNode != &pPool->qList) { - int32_t rc = pNode->reseek(pNode->pQHandle); - if (rc == 0) { - SQueryNode *pTNode = pNode->pNext; - pNode->pNext->ppNext = pNode->ppNext; - *pNode->ppNext = pNode->pNext; - pPool->nQuery--; - pNode = pTNode; - } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { - pNode = pNode->pNext; - } else { - taosThreadMutexUnlock(&pPool->mutex); - code = rc; - goto _exit; - } - } - - taosThreadMutexUnlock(&pPool->mutex); - - // TODO: if (pPool->nQuery == 0) add to free list + code = vnodeBufPoolRecycle(pVnode->onRecycle); + if (code) goto _exit; _exit: if (code) { @@ -360,17 +335,7 @@ _exit: return code; } -static int32_t vnodeCommitTask(void *arg) { - int32_t code = 0; - - SCommitInfo *pInfo = (SCommitInfo *)arg; - SVnode *pVnode = pInfo->pVnode; - - // commit - code = vnodeCommitImpl(pInfo); - if (code) goto _exit; - - // recycle buffer pool +static void vnodeReturnBufPool(SVnode *pVnode) { taosThreadMutexLock(&pVnode->mutex); SVBufPool *pPool = pVnode->onCommit; @@ -378,16 +343,9 @@ static int32_t vnodeCommitTask(void *arg) { pVnode->onCommit = NULL; if (nRef == 0) { - // add to free list - vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); - - vnodeBufPoolReset(pPool); - pPool->freeNext = pVnode->freeList; - pVnode->freeList = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); + vnodeBufPoolAddToFreeList(pPool); } else if (nRef > 0) { - // add to recycle list - vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); if (pVnode->recycleTail == NULL) { pPool->recyclePrev = pPool->recycleNext = NULL; @@ -403,6 +361,18 @@ static int32_t vnodeCommitTask(void *arg) { } taosThreadMutexUnlock(&pVnode->mutex); +} +static int32_t vnodeCommitTask(void *arg) { + int32_t code = 0; + + SCommitInfo *pInfo = (SCommitInfo *)arg; + SVnode *pVnode = pInfo->pVnode; + + // commit + code = vnodeCommitImpl(pInfo); + if (code) goto _exit; + + vnodeReturnBufPool(pVnode); _exit: // end commit