diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 7d874f1d36..8657b80bde 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -79,12 +79,16 @@ _exit: } return code; } -int vnodeBegin(SVnode *pVnode) { - // alloc buffer pool +static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; + taosThreadMutexLock(&pVnode->mutex); int32_t nTry = 0; - while (++nTry) { + for (;;) { + ++nTry; + if (pVnode->freeList) { vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, pVnode->freeList->id); @@ -97,12 +101,8 @@ int vnodeBegin(SVnode *pVnode) { } else { vInfo("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); - terrno = vnodeTryRecycleBufPool(pVnode); - if (terrno != TSDB_CODE_SUCCESS) { - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); - taosThreadMutexUnlock(&pVnode->mutex); - return -1; - } + code = vnodeTryRecycleBufPool(pVnode); + TSDB_CHECK_CODE(code, lino, _exit); if (pVnode->freeList == NULL) { vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); @@ -115,37 +115,53 @@ int vnodeBegin(SVnode *pVnode) { int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); if (rc && rc != ETIMEDOUT) { - terrno = TAOS_SYSTEM_ERROR(rc); - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); - taosThreadMutexUnlock(&pVnode->mutex); - return -1; + code = TAOS_SYSTEM_ERROR(rc); + TSDB_CHECK_CODE(code, lino, _exit); } } } } +_exit: taosThreadMutexUnlock(&pVnode->mutex); + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; +} +int vnodeBegin(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; pVnode->state.commitID++; + + // alloc buffer pool + code = vnodeGetBufPoolToUse(pVnode); + TSDB_CHECK_CODE(code, lino, _exit); + // begin meta if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { - vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // begin tsdb if (tsdbBegin(pVnode->pTsdb) < 0) { - vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // begin sma if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { - vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } - return 0; +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; } void vnodeUpdCommitSched(SVnode *pVnode) { @@ -160,7 +176,7 @@ int vnodeShouldCommit(SVnode *pVnode) { } SVCommitSched *pSched = &pVnode->commitSched; - int64_t nowMs = taosGetMonoTimestampMs(); + int64_t nowMs = taosGetMonoTimestampMs(); return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));