From c41364d5cdaf7b3275f6d89a0d503603fdda93b0 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 3 Jul 2022 20:38:54 +0800 Subject: [PATCH] refactor: rsma commit and recovery --- source/dnode/vnode/src/inc/sma.h | 6 ---- source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/sma/smaCommit.c | 3 +- source/dnode/vnode/src/sma/smaOpen.c | 12 ++------ source/dnode/vnode/src/sma/smaRollup.c | 37 ++++++++++++++++-------- source/dnode/vnode/src/sma/smaUtil.c | 39 +++++++++++++------------- source/dnode/vnode/src/vnd/vnodeOpen.c | 6 ++-- 7 files changed, 54 insertions(+), 52 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 7eb682e0a4..7f7b3fa885 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -22,19 +22,13 @@ extern "C" { #endif -#undef SMA_DEBUG_MODE // TODO: remove when release - // smaDebug ================ // clang-format off #define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#ifdef SMA_DEBUG_MODE -#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#else #define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#endif #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6ea46b8b58..4f81e9d62a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -163,8 +163,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool // sma int32_t smaOpen(SVnode* pVnode); -int32_t smaCloseEnv(SSma* pSma); -int32_t smaCloseEx(SSma* pSma); +int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaPreCommit(SSma* pSma); int32_t smaCommit(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index d9d65d7228..30299e8792 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -98,7 +98,6 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - smaDebug("vgId:%d, rsma pre commit", SMA_VID(pSma)); // step 1: set persistence task paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); @@ -122,6 +121,8 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { } } + smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma)); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 641b8c7934..d73b03f4a2 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -135,17 +135,11 @@ _err: return -1; } -int32_t smaCloseEnv(SSma *pSma) { - if (pSma) { - SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); - SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); - } - return 0; -} - -int32_t smaCloseEx(SSma *pSma) { +int32_t smaClose(SSma *pSma) { if (pSma) { taosThreadMutexDestroy(&pSma->mutex); + SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); + SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma)); if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma)); if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index da405700cd..45ebde48a5 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -43,8 +43,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma); -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed); +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; @@ -803,7 +803,7 @@ _err: return TSDB_CODE_FAILED; } -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { SVnode *pVnode = pSma->pVnode; STFile tFile = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN] = {0}; @@ -814,13 +814,14 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { } if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { - if (pVnode->state.committed) { - goto _err; + if (pVnode->state.committed > 0) { + smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode), + pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); } else { smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode), pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); - return TSDB_CODE_SUCCESS; } + return TSDB_CODE_SUCCESS; } if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { @@ -845,6 +846,10 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { tdRSmaQTaskInfoIterDestroy(&fIter); tdCloseTFile(&tFile); tdDestroyTFile(&tFile); + + // restored successfully from committed + *committed = pVnode->state.committed; + return TSDB_CODE_SUCCESS; _err: smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode), @@ -856,34 +861,39 @@ _err: * @brief reload ts data from checkpoint * * @param pSma + * @param committed restore from committed version * @return int32_t */ -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) { // TODO + smaDebug("vgId:%d, rsma restore from %" PRIi64 ", ts data reload success", SMA_VID(pSma), committed); return TSDB_CODE_SUCCESS; _err: - smaError("rsma restore, ts data reload failed since %s", terrstr()); + smaError("vgId:%d, rsma restore from %" PRIi64 ", ts data reload failed since %s", SMA_VID(pSma), committed, + terrstr()); return TSDB_CODE_FAILED; } int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { - int64_t nTables = 0; // step 1: iterate all stables to restore the rsma env + int64_t nTables = 0; if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { goto _err; } + if (nTables <= 0) { smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma)); return TSDB_CODE_SUCCESS; } // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore - if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) { + int64_t committed = -1; + if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) { goto _err; } // step 3: reload ts data from checkpoint - if (tdRSmaRestoreTSDataReload(pSma) < 0) { + if ((committed > 0) && (tdRSmaRestoreTSDataReload(pSma, committed)) < 0) { goto _err; } @@ -1112,11 +1122,15 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { + smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } if (tdCreateTFile(&tFile, true, -1) < 0) { + smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, + i + 1, TD_TFILE_FULL_NAME(&tFile)); isFileCreated = true; } @@ -1156,6 +1170,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { } return TSDB_CODE_SUCCESS; _err: + smaError("vgId:%d, rsma persit failed since %s", vid, terrstr()); if (isFileCreated) { tdRemoveTFile(&tFile); tdDestroyTFile(&tFile); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 2bba313a6a..14caf4144e 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -140,7 +140,7 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) return -1; } -#if 1 +#if 0 smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset, nbyte, toffset + nbyte); #endif @@ -242,35 +242,36 @@ int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) { ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pTFile->pFile == NULL) { if (errno == ENOENT) { // Try to create directory recursively - if (taosMulMkDir(taosDirName(TD_TFILE_FULL_NAME(pTFile))) != 0) { + char *s = strdup(TD_TFILE_FULL_NAME(pTFile)); + if (taosMulMkDir(taosDirName(s)) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(s); + return -1; + } + taosMemoryFree(s); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pTFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; - } else { - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pTFile->pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } } } + } - if (!updateHeader) { - return 0; - } + if (!updateHeader) { + return 0; + } - pTFile->info.fsize += TD_FILE_HEAD_SIZE; - pTFile->info.fver = 0; + pTFile->info.fsize += TD_FILE_HEAD_SIZE; + pTFile->info.fver = 0; - if (tdUpdateTFileHeader(pTFile) < 0) { - tdCloseTFile(pTFile); - tdRemoveTFile(pTFile); - return -1; - } + if (tdUpdateTFileHeader(pTFile) < 0) { + tdCloseTFile(pTFile); + tdRemoveTFile(pTFile); + return -1; } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 57d7386667..0c654bee1f 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -152,12 +152,11 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return pVnode; _err: - if (pVnode->pSma) smaCloseEnv(pVnode->pSma); if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); - if (pVnode->pSma) smaCloseEx(pVnode->pSma); + if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); tsem_destroy(&(pVnode->canCommit)); @@ -167,14 +166,13 @@ _err: void vnodeClose(SVnode *pVnode) { if (pVnode) { - smaCloseEnv(pVnode->pSma); vnodeCommit(pVnode); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); walClose(pVnode->pWal); tqClose(pVnode->pTq); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); - smaCloseEx(pVnode->pSma); + smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); // destroy handle