diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 90542ecebd..51011ef791 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -130,6 +130,7 @@ _exit: * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) * * @param pSma + * @param isCommit * @return int32_t */ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { @@ -155,23 +156,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { nLoops = 0; } } - } - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < 0", - pRSmaStat->commitAppliedVer)) { - code = TSDB_CODE_APP_ERROR; - TSDB_CHECK_CODE(code, lino, _exit); + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1", + pRSmaStat->commitAppliedVer)) { + code = TSDB_CODE_APP_ERROR; + TSDB_CHECK_CODE(code, lino, _exit); + } } - // step 2: wait for all triggered fetch tasks to finish nLoops = 0; while (1) { if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) { - smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit); break; } else { - smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit); } ++nLoops; if (nLoops > 1000) { @@ -185,7 +185,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { * 1) This is high cost task and should not put in asyncPreCommit originally. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. */ - smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), + smaInfo("vgId:%d, rsma commit:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit, (void *)taosGetSelfPthreadId()); nLoops = 0; while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { @@ -201,7 +201,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); TSDB_CHECK_CODE(code, lino, _exit); - + smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone @@ -237,7 +237,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { _exit: if (code) { - smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + smaError("vgId:%d, %s failed at line %d since %s(%d)", SMA_VID(pSma), __func__, lino, tstrerror(code), isCommit); } return code; } @@ -257,7 +257,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { if (!pSmaEnv) { goto _exit; } - + code = tdRSmaFSCommit(pSma); TSDB_CHECK_CODE(code, lino, _exit);