fix: rsma commit without table

This commit is contained in:
kailixu 2023-01-06 10:57:04 +08:00
parent f3831b54db
commit 9c33f014b1
1 changed files with 11 additions and 14 deletions

View File

@ -298,13 +298,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = NULL;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) { if (!pEnv) {
goto _exit; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
@ -355,8 +351,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
} }
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
TSDB_CHECK_CODE(code, lino, _exit); return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone #if 0 // consuming task of qTaskInfo clone
@ -381,16 +378,12 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif #endif
// all rsma results are written completely, start to tsdbPrepareCommit // all rsma results are written completely
STsdb *pTsdb = NULL;
_exit:
if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb); if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb); if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if (code) { return TSDB_CODE_SUCCESS;
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
} }
/** /**
@ -402,6 +395,10 @@ _exit:
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
goto _exit;
}
#if 0 #if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);