fix: rsma close and commit

This commit is contained in:
kailixu 2023-02-07 15:10:11 +08:00
parent 9598fe684e
commit 8df67a9a3c
1 changed files with 13 additions and 13 deletions

View File

@ -130,6 +130,7 @@ _exit:
* 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
* *
* @param pSma * @param pSma
* @param isCommit
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
@ -155,23 +156,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
nLoops = 0; nLoops = 0;
} }
} }
}
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < 0", if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1",
pRSmaStat->commitAppliedVer)) { pRSmaStat->commitAppliedVer)) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
} }
// step 2: wait for all triggered fetch tasks to finish // step 2: wait for all triggered fetch tasks to finish
nLoops = 0; nLoops = 0;
while (1) { while (1) {
if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) { if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) {
smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); smaDebug("vgId:%d, rsma commit:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit);
break; break;
} else { } else {
smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma)); smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
} }
++nLoops; ++nLoops;
if (nLoops > 1000) { if (nLoops > 1000) {
@ -185,7 +185,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
* 1) This is high cost task and should not put in asyncPreCommit originally. * 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/ */
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), smaInfo("vgId:%d, rsma commit:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit,
(void *)taosGetSelfPthreadId()); (void *)taosGetSelfPthreadId());
nLoops = 0; nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
@ -237,7 +237,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
_exit: _exit:
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s(%d)", SMA_VID(pSma), __func__, lino, tstrerror(code), isCommit);
} }
return code; return code;
} }