diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index a8fbff4597..32dfd5a41c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -61,12 +61,14 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* freeNext; - SVBufPool* recycleNext; - SVBufPool* recyclePrev; + SVBufPool* freeNext; + SVBufPool* recycleNext; + SVBufPool* recyclePrev; + SVnode* pVnode; - TdThreadSpinlock* lock; + int32_t id; volatile int32_t nRef; + TdThreadSpinlock* lock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6fb6802966..3b751d506f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -344,6 +344,7 @@ struct SVnode { SVBufPool* onCommit; SVBufPool* recycleHead; SVBufPool* recycleTail; + SVBufPool* onRecycle; SMeta* pMeta; SSma* pSma; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 3ae5755b49..3a02a4a0bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -16,7 +16,7 @@ #include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ -static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { +static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); @@ -24,6 +24,14 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + memset(pPool, 0, sizeof(SVBufPool)); + pPool->pVnode = pVnode; + pPool->id = id; + pPool->ptr = pPool->node.data; + pPool->pTail = &pPool->node; + pPool->node.prev = NULL; + pPool->node.pnext = &pPool->pTail; + pPool->node.size = size; if (VND_IS_RSMA(pVnode)) { pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); @@ -42,16 +50,6 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) pPool->lock = NULL; } - pPool->freeNext = NULL; - pPool->pVnode = pVnode; - pPool->nRef = 0; - pPool->size = 0; - pPool->ptr = pPool->node.data; - pPool->pTail = &pPool->node; - pPool->node.prev = NULL; - pPool->node.pnext = &pPool->pTail; - pPool->node.size = size; - *ppPool = pPool; return 0; } @@ -71,7 +69,7 @@ int vnodeOpenBufPool(SVnode *pVnode) { for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool - if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) { + if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); return -1; @@ -206,34 +204,54 @@ void vnodeBufPoolRef(SVBufPool *pPool) { } void vnodeBufPoolUnRef(SVBufPool *pPool) { - if (pPool == NULL) { - return; - } - int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); - if (nRef == 0) { - SVnode *pVnode = pPool->pVnode; + if (pPool == NULL) return; - vnodeBufPoolReset(pPool); + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) return; - taosThreadMutexLock(&pVnode->mutex); + SVnode *pVnode = pPool->pVnode; - int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - if (pPool->node.size != size) { - SVBufPool *pPoolT = NULL; - if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) { - vWarn("vgId:%d, try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode), - pPool->node.size, size, tstrerror(errno)); - } else { - vnodeBufPoolDestroy(pPool); - pPool = pPoolT; - vDebug("vgId:%d, change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size); - } + taosThreadMutexLock(&pVnode->mutex); + + // remove from recycle list or on-recycle position + if (pVnode->onRecycle == pPool) { + pVnode->onRecycle = NULL; + } else { + if (pPool->recyclePrev) { + pPool->recyclePrev->recycleNext = pPool->recycleNext; + } else { + pVnode->recycleHead = pPool->recycleNext; } - pPool->freeNext = pVnode->freeList; - pVnode->freeList = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); - - taosThreadMutexUnlock(&pVnode->mutex); + if (pPool->recycleNext) { + pPool->recycleNext->recyclePrev = pPool->recyclePrev; + } else { + pVnode->recycleTail = pPool->recyclePrev; + } + pPool->recyclePrev = pPool->recycleNext = NULL; } + + // change the pool size if need + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; + if (pPool->node.size != size) { + SVBufPool *pNewPool = NULL; + if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { + vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", + TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); + } else { + vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, + pPool->node.size, size); + + vnodeBufPoolDestroy(pPool); + pPool = pNewPool; + pVnode->aBufPool[pPool->id] = pPool; + } + } + + // add to free list + vnodeBufPoolReset(pPool); + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); + + taosThreadMutexUnlock(&pVnode->mutex); } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e22add192d..f16f9ae1ef 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -25,7 +25,34 @@ static int vnodeCommitImpl(SCommitInfo *pInfo); static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { int32_t code = 0; - ASSERT(0); // TODO + + if (pVnode->onRecycle) { + vDebug("vgId:%d buffer pool %p of id %d is on recycling", TD_VID(pVnode), pVnode->onRecycle, pVnode->onRecycle->id); + goto _exit; + } + + if (pVnode->recycleHead) { + vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + pVnode->recycleHead->id); + + // pop the header buffer pool for recycling + pVnode->onRecycle = pVnode->recycleHead; + if (pVnode->recycleHead == pVnode->recycleTail) { + pVnode->recycleHead = pVnode->recycleTail = NULL; + } else { + pVnode->recycleHead = pVnode->recycleHead->recycleNext; + pVnode->recycleHead->recyclePrev = NULL; + } + pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; + + { + // TODO: do recycle the buffer pool + ASSERT(0); + } + } else { + vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + } + _exit: return code; } @@ -36,7 +63,8 @@ int vnodeBegin(SVnode *pVnode) { int32_t nTry = 0; while (++nTry) { if (pVnode->freeList) { - vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p", TD_VID(pVnode), nTry, pVnode->freeList); + vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, + pVnode->freeList->id); pVnode->inUse = pVnode->freeList; pVnode->inUse->nRef = 1; @@ -259,8 +287,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; - vnodeBufPoolUnRef(pVnode->inUse); + taosThreadMutexLock(&pVnode->mutex); + ASSERT(pVnode->onCommit == NULL); + pVnode->onCommit = pVnode->inUse; pVnode->inUse = NULL; + taosThreadMutexUnlock(&pVnode->mutex); _exit: if (code) { @@ -272,19 +303,51 @@ _exit: return code; } - static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; SCommitInfo *pInfo = (SCommitInfo *)arg; + SVnode *pVnode = pInfo->pVnode; // commit code = vnodeCommitImpl(pInfo); if (code) goto _exit; + // recycle buffer pool + SVBufPool *pPool = pVnode->onCommit; + + taosThreadMutexLock(&pVnode->mutex); + + pVnode->onCommit = NULL; + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + if (nRef == 0) { + // add to free list + vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); + + vnodeBufPoolReset(pPool); + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); + } else { + // add to recycle list + vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); + + if (pVnode->recycleTail == NULL) { + pPool->recyclePrev = pPool->recycleNext = NULL; + pVnode->recycleHead = pVnode->recycleTail = pPool; + } else { + pPool->recyclePrev = pVnode->recycleTail; + pPool->recycleNext = NULL; + pVnode->recycleTail->recycleNext = pPool; + pVnode->recycleTail = pPool; + } + } + + taosThreadMutexUnlock(&pVnode->mutex); + _exit: // end commit - tsem_post(&pInfo->pVnode->canCommit); + tsem_post(&pVnode->canCommit); taosMemoryFree(pInfo); return code; }