diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index e98e7736a9..a8fbff4597 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -62,6 +62,8 @@ struct SVBufPoolNode { struct SVBufPool { SVBufPool* freeNext; + SVBufPool* recycleNext; + SVBufPool* recyclePrev; SVnode* pVnode; TdThreadSpinlock* lock; volatile int32_t nRef; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index b9b273468c..8b6247163a 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -69,8 +69,6 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { int vnodeOpenBufPool(SVnode *pVnode) { int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - ASSERT(pVnode->freeList == NULL); - for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 428af47f0b..4114651904 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -21,18 +21,48 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeCommitImpl(SCommitInfo *pInfo); +#define WAIT_TIME_MILI_SEC 10 // miliseconds + int vnodeBegin(SVnode *pVnode) { // alloc buffer pool taosThreadMutexLock(&pVnode->mutex); - while (pVnode->freeList == NULL) { - taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); - } + int32_t nTry = 0; + for (;;) { + while (pVnode->freeList == NULL) { + vDebug("vgId:%d no free buffer pool, try to wait %d...", TD_VID(pVnode), ++nTry); - pVnode->inUse = pVnode->freeList; - pVnode->inUse->nRef = 1; - pVnode->freeList = pVnode->inUse->freeNext; - pVnode->inUse->freeNext = NULL; + struct timeval tv; + struct timespec ts; + + taosGetTimeOfDay(&tv); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; + + int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); + if (rc == ETIMEDOUT) { // time out, break + break; + } else if (rc != 0) { // error occurred + terrno = TAOS_SYSTEM_ERROR(rc); + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); + taosThreadMutexUnlock(&pVnode->mutex); + return -1; + } + } + + if (pVnode->freeList) { + // allocate from free list + pVnode->inUse = pVnode->freeList; + pVnode->inUse->nRef = 1; + pVnode->freeList = pVnode->inUse->freeNext; + pVnode->inUse->freeNext = NULL; + break; + } else if (nTry == 1) { + // try to recycle a buffer pool + vInfo("vgId:%d no free buffer pool, try to recycle...", TD_VID(pVnode)); + ASSERT(false); // TODO: condition of nTry == 1 may not be reasonable + } + } taosThreadMutexUnlock(&pVnode->mutex);