diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 0072075623..c3e8d7ef1d 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -209,6 +209,8 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); } +int32_t smaPreClose(SSma *pSma); + // rsma void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 38f04bb8e8..90542ecebd 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -17,18 +17,26 @@ extern SSmaMgmt smaMgmt; -static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); +/** + * @brief only applicable to Rollup SMA + * + * @param pSma + * @return int32_t + */ +int32_t smaPreClose(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma, false); } + /** * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma, true); } /** * @brief async commit, only applicable to Rollup SMA @@ -124,7 +132,7 @@ _exit: * @param pSma * @return int32_t */ -static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { +static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { int32_t code = 0; int32_t lino = 0; @@ -139,15 +147,18 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { - ++nLoops; - if (nLoops > 1000) { - sched_yield(); - nLoops = 0; + if (isCommit) { + while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } } } + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - if (ASSERTS(pRSmaStat->commitAppliedVer >= 0, "commit applied version %" PRIi64 " < 0", + if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < 0", pRSmaStat->commitAppliedVer)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); @@ -156,7 +167,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 2: wait for all triggered fetch tasks to finish nLoops = 0; while (1) { - if (T_REF_VAL_GET(pStat) == 0) { + if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) { smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); break; } else { @@ -184,6 +195,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { nLoops = 0; } } + + if (!isCommit) goto _exit; + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 21c2839943..00000cb129 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -167,6 +167,7 @@ _exit: int32_t smaClose(SSma *pSma) { if (pSma) { + smaPreClose(pSma); taosThreadMutexDestroy(&pSma->mutex); SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index aeb90b7080..241470ac8c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -296,12 +296,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); - taosThreadMutexLock(&pVnode->mutex); - ASSERT(pVnode->onCommit == NULL); - pVnode->onCommit = pVnode->inUse; - pVnode->inUse = NULL; - taosThreadMutexUnlock(&pVnode->mutex); - pVnode->state.commitTerm = pVnode->state.applyTerm; pInfo->info.config = pVnode->config; @@ -331,6 +325,12 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; + taosThreadMutexLock(&pVnode->mutex); + ASSERT(pVnode->onCommit == NULL); + pVnode->onCommit = pVnode->inUse; + pVnode->inUse = NULL; + taosThreadMutexUnlock(&pVnode->mutex); + _exit: if (code) { vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,