From 6d01e18f3135b794c08597d90099b7d68f28f665 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Dec 2022 23:37:58 +0800 Subject: [PATCH] chore: rsma fs and vnode commit optimization --- include/util/taoserror.h | 2 +- source/dnode/vnode/src/inc/sma.h | 52 +----------------------- source/dnode/vnode/src/sma/smaCommit.c | 10 +++-- source/dnode/vnode/src/sma/smaEnv.c | 2 + source/dnode/vnode/src/sma/smaFS.c | 39 +++++++++--------- source/dnode/vnode/src/sma/smaRollup.c | 28 +++++++------ source/dnode/vnode/src/sma/smaUtil.c | 2 +- source/dnode/vnode/src/vnd/vnodeCommit.c | 22 ++++++---- source/dnode/vnode/src/vnd/vnodeQuery.c | 14 +++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 ++- source/util/src/terror.c | 1 + 11 files changed, 73 insertions(+), 104 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 52221bdd44..2490074464 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -700,7 +700,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) -// #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) +#define TSDB_CODE_RSMA_FS_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3153) #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index f2673ca433..f2466e1843 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -89,6 +89,7 @@ struct SQTaskFile { int64_t suid; int64_t version; int64_t size; + int64_t mtime; }; struct SQTaskFReader { @@ -246,57 +247,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); } -// smaFileUtil ================ - -#define TD_FILE_HEAD_SIZE 512 - -typedef struct STFInfo STFInfo; -typedef struct STFile STFile; - -struct STFInfo { - // common fields - uint32_t magic; - uint32_t ftype; - uint32_t fver; - int64_t fsize; -}; - -enum { - TD_FTYPE_RSMA_QTASKINFO = 0, -}; - -#if 0 -struct STFile { - uint8_t state; - STFInfo info; - char *fname; - TdFilePtr pFile; -}; - -#define TD_TFILE_PFILE(tf) ((tf)->pFile) -#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) -#define TD_TFILE_FULL_NAME(tf) ((tf)->fname) -#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) -#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf)) -#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL) -#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s)) - -int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname); -int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType); -int32_t tdOpenTFile(STFile *pTFile, int flags); -int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte); -int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence); -int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte); -int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset); -int64_t tdGetTFileSize(STFile *pTFile, int64_t *size); -int32_t tdRemoveTFile(STFile *pTFile); -int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo); -int32_t tdUpdateTFileHeader(STFile *pTFile); -void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); -void tdCloseTFile(STFile *pTFile); -void tdDestroyTFile(STFile *pTFile); -#endif - void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index e7e1406238..d8e57263c1 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -187,9 +187,11 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { + int32_t code = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { - return TSDB_CODE_SUCCESS; + return code; } SSmaStat *pStat = SMA_ENV_STAT(pEnv); @@ -240,8 +242,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { - return TSDB_CODE_FAILED; + if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) { + return code; } smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); @@ -272,7 +274,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb); if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb); - return TSDB_CODE_SUCCESS; + return code; } /** diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index a272f5fc97..005e2d0397 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -247,6 +247,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } + + taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat)); } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 90d253b96d..525ca5b528 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -27,6 +27,7 @@ static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { n += tPutI64v(p ? p + n : p, pFile->size); n += tPutI64v(p ? p + n : p, pFile->suid); n += tPutI64v(p ? p + n : p, pFile->version); + n += tPutI64v(p ? p + n : p, pFile->mtime); return n; } @@ -54,6 +55,7 @@ int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) { n += tGetI64v(p + n, &pFile->size); n += tGetI64v(p + n, &pFile->suid); n += tGetI64v(p + n, &pFile->version); + n += tGetI64v(p + n, &pFile->mtime); return n; } @@ -157,7 +159,7 @@ static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) { TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); } if (current_t) { - snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs), + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); } } else { @@ -249,7 +251,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { return 0; } -static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { +static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) { int32_t code = 0; int32_t lino = 0; SVnode *pVnode = pSma->pVnode; @@ -260,10 +262,10 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { char fname[TSDB_FILENAME_LEN] = {0}; // SQTaskFile - int32_t nNew = taosArrayGetSize(pFS->aQTaskInf); + int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf); int32_t iNew = 0; while (iNew < nNew) { - SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFS->aQTaskInf, iNew); + SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++); int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE); @@ -272,22 +274,22 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { pQTaskFNew->nRef = 1; } else { SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx); - int32_t c = tdQTaskInfCmprFn1(pTaskF, pQTaskFNew); - if (c < 0) { - l - } else if(c == 0) { - ++iNew; + int32_t c = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF); + if (c == 0) { + // utilize the item in pFSOld->qQTaskInf, instead of pFSNew continue; + } else if (c < 0) { + // NOTHING TODO } else { ASSERT(0); + continue; } } - if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) { + if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - } _exit: @@ -500,7 +502,7 @@ int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) { tdRSmaGetCurrentFName(pSma, NULL, tfname); - // gnrt PRESENT.t + // generate PRESENT.t code = tdRSmaSaveFSToFile(pFSNew, tfname); TSDB_CHECK_CODE(code, lino, _exit); @@ -600,13 +602,12 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); if (c == 0) { - ASSERT(pTaskF->size == qTaskF->size); ASSERT(0); - goto _exit; + continue; } } - if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) { + if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -705,10 +706,10 @@ int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) { taosRLockLatch(RSMA_FS_LOCK(pStat)); code = tdRSmaFSCopy(pSma, pFSOut); TSDB_CHECK_CODE(code, lino, _exit); - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); _exit: if (code) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -720,13 +721,13 @@ int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaFS *pFS = RSMA_FS(pStat); - int32_t size = 0; + int32_t size = taosArrayGetSize(pFS->aQTaskInf); code = tdRSmaFSCreate(pFSOut, size); TSDB_CHECK_CODE(code, lino, _exit); taosArraySetSize(pFSOut->aQTaskInf, size); - memcpy(TARRAY_GET_ELEM(pFSOut->aQTaskInf, 0), TARRAY_GET_ELEM(pFS->aQTaskInf, 0), size * sizeof(SQTaskFile)); + memcpy(pFSOut->aQTaskInf->pData, pFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); _exit: if (code) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 7261241e9f..3410dcfd5c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -344,13 +344,8 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo) { - // TODO: free original pRSmaInfo if exists abnormally - tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true); - if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) { - terrno = TSDB_CODE_RSMA_REMOVE_EXISTS; - goto _err; - } - smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); + smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); + return TSDB_CODE_SUCCESS; } // from write queue: single thead @@ -1228,7 +1223,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; TSDB_CHECK_CODE(code, lino, _exit); } - smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", TD_VID(pVnode), + smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), pRSmaInfo->suid, i + 1); // qTaskInfo file @@ -1250,22 +1245,27 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { TSDB_CHECK_CODE(code, lino, _exit); } - int64_t size = 0; - if (taosFStatFile(pInFD, &size, NULL) < 0) { + int64_t size = 0; + uint32_t mtime = 0; + if (taosFStatFile(pInFD, &size, &mtime) < 0) { code = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code, lino, _exit); } + ASSERT(size > 0); int64_t offset = 0; if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { code = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, rsma persist, send qtaskinfo file %s failed since %s", TD_VID(pVnode), fname, tstrerror(code)); + smaError("vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), fname, + fnameVer, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit); } taosCloseFile(&pOutFD); taosCloseFile(&pInFD); - SQTaskFile qTaskF = {.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size}; + SQTaskFile qTaskF = { + .nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size, .mtime = mtime}; + taosArrayPush(qTaskFArray, &qTaskF); } } @@ -1274,8 +1274,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { // prepare code = tdRSmaFSTakeSnapshot(pSma, &fs); TSDB_CHECK_CODE(code, lino, _exit); + code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); TSDB_CHECK_CODE(code, lino, _exit); + code = tdRSmaFSPrepareCommit(pSma, &fs); TSDB_CHECK_CODE(code, lino, _exit); @@ -1283,11 +1285,13 @@ _exit: taosArrayDestroy(fs.aQTaskInf); taosArrayDestroy(qTaskFArray); + if (code) { if (pOutFD) taosCloseFile(&pOutFD); if (pInFD) taosCloseFile(&pInFD); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } + terrno = code; return code; } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 2b2c5cb1a5..56ba2b8e98 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -15,7 +15,7 @@ #include "sma.h" -#define TD_QTASKINFO_FNAME_PREFIX "main.db" +#define TD_QTASKINFO_FNAME_PREFIX "main.tdb" void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4daab074b5..2e0370a81c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -184,16 +184,21 @@ _err: return -1; } -static void vnodePrepareCommit(SVnode *pVnode) { +static int32_t vnodePrepareCommit(SVnode *pVnode) { + int32_t code = 0; tsem_wait(&pVnode->canCommit); tsdbPrepareCommit(pVnode->pTsdb); metaPrepareAsyncCommit(pVnode->pMeta); - smaPrepareAsyncCommit(pVnode->pSma); + code = smaPrepareAsyncCommit(pVnode->pSma); + if (code) goto _exit; +_exit: vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; + return code; } + static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; @@ -203,10 +208,9 @@ static int32_t vnodeCommitTask(void *arg) { code = vnodeCommitImpl(pInfo); if (code) goto _exit; +_exit: // end commit tsem_post(&pInfo->pVnode->canCommit); - -_exit: taosMemoryFree(pInfo); return code; } @@ -214,7 +218,8 @@ int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; // prepare to commit - vnodePrepareCommit(pVnode); + code = vnodePrepareCommit(pVnode); + if (code) goto _exit; // schedule the task pVnode->state.commitTerm = pVnode->state.applyTerm; @@ -230,14 +235,15 @@ int vnodeAsyncCommit(SVnode *pVnode) { pInfo->info.state.commitID = pVnode->state.commitID; pInfo->pVnode = pVnode; pInfo->txn = metaGetTxn(pVnode->pMeta); - vnodeScheduleTask(vnodeCommitTask, pInfo); + code = vnodeScheduleTask(vnodeCommitTask, pInfo); _exit: if (code) { - vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), + tsem_post(&pVnode->canCommit); + vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), pVnode->state.commitID); } else { - vDebug("vgId:%d %s done", TD_VID(pVnode), __func__); + vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 1199127f6d..8d4e70cff9 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -15,13 +15,13 @@ #include "vnd.h" -#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ - do { \ - int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ - ASSERT(newVal >= 0); \ - if (newVal < 0) { \ - vWarn("vgId:%d %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \ - } \ +#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ + do { \ + int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ + ASSERT(newVal >= 0); \ + if (newVal < 0) { \ + vWarn("vgId:%d, %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \ + } \ } while (0) int vnodeQueryOpen(SVnode *pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6092888136..6c56bd5f30 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -317,7 +317,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // commit if need if (vnodeShouldCommit(pVnode)) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); - vnodeAsyncCommit(pVnode); + if (vnodeAsyncCommit(pVnode) < 0) { + vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } // start a new one if (vnodeBegin(pVnode) < 0) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4b9dde5059..792fd0008a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -585,6 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Invalid rsma fs commit") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")