diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 42ab0c2a37..a5dc4431ab 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -211,6 +211,7 @@ int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma); int32_t smaPreCommit(SSma* pSma); int32_t smaCommit(SSma* pSma); +int32_t smaFinishCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 3dce724de7..a79ae35d79 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -78,38 +78,69 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } /** - * @brief set rsma trigger stat active + * @brief prepare rsma1/2, and set rsma trigger stat active * * @param pSma * @return int32_t */ int32_t smaBegin(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; + int32_t code = 0; + SVnode *pVnode = pSma->pVnode; + + if ((code = tsdbBegin(VND_RSMA1(pVnode))) < 0) { + smaError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; } - SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); + if ((code = tsdbBegin(VND_RSMA2(pVnode))) < 0) { + smaError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } - int8_t rsmaTriggerStat = + // set trigger stat + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + goto _exit; + } + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); + int8_t rsmaTriggerStat = atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma trigger stat from paused to active", TD_VID(pVnode)); break; } case TASK_TRIGGER_STAT_INIT: { atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); - smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma trigger stat from init to active", TD_VID(pVnode)); break; } default: { atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); - smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat); + smaWarn("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", TD_VID(pVnode), rsmaTriggerStat); break; } } - return TSDB_CODE_SUCCESS; +_exit: + terrno = code; + return code; +} + +int32_t smaFinishCommit(SSma *pSma) { + int32_t code = 0; + SVnode *pVnode = pSma->pVnode; + + if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { + smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) { + smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } +_exit: + terrno = code; + return code; } #if 0 @@ -309,15 +340,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { * 1) This is high cost task and should not put in asyncPreCommit originally. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. */ - nLoops = 0; - while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { - ++nLoops; - if (nLoops > 1000) { - sched_yield(); - nLoops = 0; - } - } - smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); nLoops = 0; @@ -368,10 +390,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { int32_t code = 0; SVnode *pVnode = pSma->pVnode; - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - goto _exit; - } #if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); @@ -380,10 +398,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { return TSDB_CODE_FAILED; } #endif - if ((code = tsdbCommit(VND_RSMA0(pVnode))) < 0) { - smaError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; - } + if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) { smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); goto _exit; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 6f4364c0c7..7cbd27e762 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -51,20 +51,11 @@ int vnodeBegin(SVnode *pVnode) { return -1; } - if (pVnode->pSma) { - if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) { - vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - - if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) { - vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - } - // begin sma - smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged + if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { + vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } return 0; } @@ -239,10 +230,8 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); - if (smaPreCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } + code = smaPreCommit(pVnode->pSma); + TSDB_CHECK_CODE(code, lino, _exit); vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; @@ -253,13 +242,11 @@ int vnodeCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); } + code = tsdbCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_IS_RSMA(pVnode)) { - if (smaCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - } else { - code = tsdbCommit(pVnode->pTsdb); + code = smaCommit(pVnode->pSma); TSDB_CHECK_CODE(code, lino, _exit); } @@ -274,7 +261,13 @@ int vnodeCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); } - tsdbFinishCommit(pVnode->pTsdb); + code = tsdbFinishCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + if (VND_IS_RSMA(pVnode)) { + code = smaFinishCommit(pVnode->pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } if (metaFinishCommit(pVnode->pMeta) < 0) { code = terrno;