From 9ae0e15d6b38dafe064b3ce4dec39ddd63a43df6 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 1 Sep 2022 16:51:23 +0800 Subject: [PATCH] refactor: rsma code refactor and use ref for qtaskf --- include/common/taosdef.h | 7 - include/util/taoserror.h | 1 + source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/inc/sma.h | 114 +++---------- source/dnode/vnode/src/sma/smaCommit.c | 129 ++++---------- source/dnode/vnode/src/sma/smaEnv.c | 16 +- source/dnode/vnode/src/sma/smaFS.c | 198 ++++++++++++++++++++++ source/dnode/vnode/src/sma/smaOpen.c | 6 +- source/dnode/vnode/src/sma/smaRollup.c | 113 +++++++++++- source/dnode/vnode/src/sma/smaSnapshot.c | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 10 +- source/dnode/vnode/src/sma/smaUtil.c | 89 ---------- source/util/src/terror.c | 1 + 13 files changed, 387 insertions(+), 300 deletions(-) create mode 100644 source/dnode/vnode/src/sma/smaFS.c diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 9bfee56e29..bf4de9d4de 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -65,13 +65,6 @@ typedef enum { TSDB_STATIS_NONE = 1, // statis part not exist } ETsdbStatisStatus; -typedef enum { - TSDB_SMA_STAT_UNKNOWN = -1, // unknown - TSDB_SMA_STAT_OK = 0, // ready to provide service - TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired - TSDB_SMA_STAT_DROPPED = 2, // sma dropped -} ETsdbSmaStat; // bit operation - typedef enum { TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e39172d74e..d16a599811 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -616,6 +616,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) +#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index a3e17f5377..6107f97eec 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -29,6 +29,7 @@ target_sources( # sma "src/sma/smaEnv.c" "src/sma/smaUtil.c" + "src/sma/smaFS.c" "src/sma/smaOpen.c" "src/sma/smaCommit.c" "src/sma/smaRollup.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c29c4cb6c4..85720ccef6 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -41,6 +41,7 @@ typedef struct SRSmaStat SRSmaStat; typedef struct SSmaKey SSmaKey; typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfoItem SRSmaInfoItem; +typedef struct SRSmaFS SRSmaFS; typedef struct SQTaskFile SQTaskFile; typedef struct SQTaskFReader SQTaskFReader; typedef struct SQTaskFWriter SQTaskFWriter; @@ -73,7 +74,8 @@ struct STSmaStat { struct SQTaskFile { volatile int32_t nRef; - int64_t commitID; + int32_t padding; + int64_t version; int64_t size; }; @@ -89,6 +91,10 @@ struct SQTaskFWriter { char *fname; }; +struct SRSmaFS { + SArray *aQTaskInf; // array of SQTaskFile +}; + struct SRSmaStat { SSma *pSma; int64_t commitAppliedVer; // vnode applied version for async commit @@ -98,7 +104,7 @@ struct SRSmaStat { volatile int32_t nFetchAll; // active number of fetch all int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing - SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) + SRSmaFS fs; // for recovery/snapshot r/w SHashObj *infoHash; // key: suid, value: SRSmaInfo tsem_t notEmpty; // has items in queue buffer }; @@ -163,14 +169,6 @@ enum { TASK_TRIGGER_STAT_DROPPED = 5, }; -enum { - RSMA_ROLE_CREATE = 0, - RSMA_ROLE_DROP = 1, - RSMA_ROLE_SUBMIT = 2, - RSMA_ROLE_FETCH = 3, - RSMA_ROLE_ITERATE = 4, -}; - enum { RSMA_RESTORE_REBOOT = 1, RSMA_RESTORE_SYNC = 2, @@ -182,88 +180,32 @@ typedef enum { RSMA_EXEC_COMMIT = 3, // triggered by commit } ERsmaExecType; -void tdDestroySmaEnv(SSmaEnv *pSmaEnv); -void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); - -int32_t tdDropTSma(SSma *pSma, char *pMsg); -int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid); -int32_t tdInsertRSmaData(SSma *pSma, char *msg); - +// sma +int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); +void tdDestroySmaEnv(SSmaEnv *pSmaEnv); +void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); -int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); -int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); - +int32_t tdLockSma(SSma *pSma); +int32_t tdUnLockSma(SSma *pSma); void *tdAcquireSmaRef(int32_t rsetId, int64_t refId); int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId); -int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); +// rsma +int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); +int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); +void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); +void tdRSmaFSClose(SRSmaFS *fs); +int32_t tdRSmaFSUpsertQFile(SRSmaFS *fs, SQTaskFile *pTaskF); +int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer); +int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); +int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); +int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); -int32_t tdLockSma(SSma *pSma); -int32_t tdUnLockSma(SSma *pSma); - -static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) { - if (pTStat) { - return atomic_load_8(&pTStat->state); - } - return TSDB_SMA_STAT_UNKNOWN; -} - -static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) { - if (!pTStat) { - return false; - } - - if (state) { - *state = atomic_load_8(&pTStat->state); - return *state == TSDB_SMA_STAT_OK; - } - return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK; -} - -static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) { - return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true; -} - -static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) { - return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true; -} - -static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) { - if (pTStat) { - atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK); - } -} - -static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) { - if (pTStat) { - atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED); - } -} - -static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { - if (pTStat) { - atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED); - } -} - -void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); -void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); -static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); -void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); -void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); -int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); - -int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); -int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); -int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer); - -int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); -int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); -int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); +void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); // smaFileUtil ================ diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 3cf50a035a..77b5399139 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -21,7 +21,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); -static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); +static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); /** * @brief Only applicable to Rollup SMA @@ -166,114 +166,45 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } -static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { - SVnode *pVnode = pSma->pVnode; - int64_t committed = pRSmaStat->commitAppliedVer; - TdDirPtr pDir = NULL; - TdDirEntryPtr pDirEntry = NULL; - char dir[TSDB_FILENAME_LEN]; - const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$"; - regex_t regex; - int code = 0; - - tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); - - // Resource allocation and init - if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) { - char errbuf[128]; - regerror(code, ®ex, errbuf, sizeof(errbuf)); - smaWarn("vgId:%d, rsma post commit, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf); - return TSDB_CODE_FAILED; - } - - if ((pDir = taosOpenDir(dir)) == NULL) { - regfree(®ex); - terrno = TAOS_SYSTEM_ERROR(errno); - smaDebug("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); - return TSDB_CODE_FAILED; - } - - int32_t dirLen = strlen(dir); - char *dirEnd = POINTER_SHIFT(dir, dirLen); - regmatch_t regMatch[2]; - while ((pDirEntry = taosReadDir(pDir)) != NULL) { - char *entryName = taosGetDirEntryName(pDirEntry); - if (!entryName) { - continue; - } - - code = regexec(®ex, entryName, 2, regMatch, 0); - - if (code == 0) { - // match - int64_t version = -1; - sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &version); - if ((version < committed) && (version > -1)) { - strncpy(dirEnd, entryName, TSDB_FILENAME_LEN - dirLen); - if (taosRemoveFile(dir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed, - dir, terrstr()); - } else { - smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, dir); - } - } - } else if (code == REG_NOMATCH) { - // not match - smaTrace("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), entryName); - continue; - } else { - // has other error - char errbuf[128]; - regerror(code, ®ex, errbuf, sizeof(errbuf)); - smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), errbuf); - - taosCloseDir(&pDir); - regfree(®ex); - return TSDB_CODE_FAILED; - } - } - - taosCloseDir(&pDir); - regfree(®ex); - - return TSDB_CODE_SUCCESS; -} - // SQTaskFile ====================================================== -// int32_t tCmprQTaskFile(void const *lhs, void const *rhs) { -// int64_t *lCommitted = *(int64_t *)lhs; -// SQTaskFile *rQTaskF = (SQTaskFile *)rhs; -// if (lCommitted < rQTaskF->commitID) { -// return -1; -// } else if (lCommitted > rQTaskF->commitID) { -// return 1; -// } - -// return 0; -// } - -#if 0 /** * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be * multiple qtaskinfo files supporting snapshot replication. * * @param pSma - * @param pRSmaStat + * @param pStat * @return int32_t */ -static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { - SVnode *pVnode = pSma->pVnode; - int64_t committed = pRSmaStat->commitAppliedVer; - SArray *aTaskFile = pRSmaStat->aTaskFile; +static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { + SVnode *pVnode = pSma->pVnode; + SRSmaFS *pFS = &pStat->fs; + int64_t committed = pStat->commitAppliedVer; + char qTaskInfoFullName[TSDB_FILENAME_LEN]; - void *qTaskFile = taosArraySearch(aTaskFile, committed, tCmprQTaskFile, TD_LE); - + for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) { + SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i); + if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); + if (taosRemoveFile(qTaskInfoFullName) < 0) { + smaWarn("vgId:%d, cleanup qinf, failed to remove %s since %s", TD_VID(pVnode), qTaskInfoFullName, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, cleanup qinf, success to remove %s", TD_VID(pVnode), qTaskInfoFullName); + } + taosArrayRemove(pFS->aQTaskInf, i); + continue; + } + ++i; + } + + SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; + if (tdRSmaFSUpsertQFile(pFS, &qFile) < 0) { + return TSDB_CODE_FAILED; + } return TSDB_CODE_SUCCESS; } -#endif /** * @brief post-commit for rollup sma @@ -290,8 +221,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - // cleanup outdated qtaskinfo files - tdCleanupQTaskInfoFiles(pSma, pRSmaStat); + tdUpdateQTaskInfoFiles(pSma, pRSmaStat); return TSDB_CODE_SUCCESS; } @@ -488,8 +418,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // unlock // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - // step 2: cleanup outdated qtaskinfo files - tdCleanupQTaskInfoFiles(pSma, pRSmaStat); + tdUpdateQTaskInfoFiles(pSma, pRSmaStat); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 32a419022a..21d64238a1 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -28,6 +28,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv); static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); static int32_t tdRsmaStartExecutor(const SSma *pSma); static int32_t tdRsmaStopExecutor(const SSma *pSma); +static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); +static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); static void *tdFreeTSmaStat(STSmaStat *pStat); static void tdDestroyRSmaStat(void *pRSmaStat); @@ -244,6 +246,11 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (tdRsmaStartExecutor(pSma) < 0) { return TSDB_CODE_FAILED; } + + if (!(pRSmaStat->fs.aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { @@ -307,12 +314,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { // step 4: tdRsmaStopExecutor(pSma); - // step 5: free pStat + // step 5: + tdRSmaFSClose(&pStat->fs); + + // step 6: free pStat taosMemoryFreeClear(pStat); } } -void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { +static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { tdDestroySmaState(pSmaStat, smaType); if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { taosMemoryFreeClear(pSmaStat); @@ -329,7 +339,7 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { * @return int32_t */ -int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { +static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat)); diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c new file mode 100644 index 0000000000..2ab189bc8c --- /dev/null +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sma.h" + +// ================================================================================================= + +static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); + +/** + * @brief Open RSma FS from qTaskInfo files + * + * @param pSma + * @param version + * @return int32_t + */ +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) { + SVnode *pVnode = pSma->pVnode; + int64_t commitID = pVnode->state.commitID; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = NULL; + SArray *output = NULL; + SRSmaFS *fs = NULL; + + if (!pEnv) { + return TSDB_CODE_SUCCESS; + } + + if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) { + goto _end; + } + + pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + fs = &pStat->fs; + + for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { + int32_t vid = 0; + int64_t version = -1; + sscanf((const char *)taosArrayGetP(output, i), "v%dqinfo.v%" PRIi64, &vid, &version); + SQTaskFile qTaskFile = {.version = version, .nRef = 1}; + if ((terrno = tdRSmaFSUpsertQFile(fs, &qTaskFile)) < 0) { + goto _end; + } + smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%" PRIi64, TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef); + } + +_end: + for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { + void *ptr = taosArrayGetP(output, i); + taosMemoryFreeClear(ptr); + } + taosArrayDestroy(output); + + if (terrno != 0) { + smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr()); + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); } + +/** + * @brief Fetch qtaskfiles no more than version + * + * @param pSma + * @param version + * @param output + * @return int32_t + */ +static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) { + SVnode *pVnode = pSma->pVnode; + TdDirPtr pDir = NULL; + TdDirEntryPtr pDirEntry = NULL; + char dir[TSDB_FILENAME_LEN]; + const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$"; + regex_t regex; + int code = 0; + + tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); + + // Resource allocation and init + if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) { + terrno = TSDB_CODE_RSMA_REGEX_MATCH; + char errbuf[128]; + regerror(code, ®ex, errbuf, sizeof(errbuf)); + smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf); + return TSDB_CODE_FAILED; + } + + if ((pDir = taosOpenDir(dir)) == NULL) { + regfree(®ex); + terrno = TAOS_SYSTEM_ERROR(errno); + smaDebug("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); + return TSDB_CODE_FAILED; + } + + int32_t dirLen = strlen(dir); + char *dirEnd = POINTER_SHIFT(dir, dirLen); + regmatch_t regMatch[2]; + while ((pDirEntry = taosReadDir(pDir)) != NULL) { + char *entryName = taosGetDirEntryName(pDirEntry); + if (!entryName) { + continue; + } + + code = regexec(®ex, entryName, 2, regMatch, 0); + + if (code == 0) { + // match + smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName); + + int64_t ver = -1; + sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver); + if ((ver <= version) && (ver > -1)) { + if (!(*output)) { + if (!(*output = taosArrayInit(1, POINTER_BYTES))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + } + char *entryDup = strdup(entryName); + if (!entryDup) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + if (!taosArrayPush(*output, &entryDup)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + } + } else if (code == REG_NOMATCH) { + // not match + smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName); + continue; + } else { + // has other error + char errbuf[128]; + regerror(code, ®ex, errbuf, sizeof(errbuf)); + smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf); + terrno = TSDB_CODE_RSMA_REGEX_MATCH; + goto _end; + } + } + +_end: + taosCloseDir(&pDir); + regfree(®ex); + return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; +} + +static int32_t tdQTaskFileCmprFn(const void *p1, const void *p2) { + if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) { + return -1; + } else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) { + return 1; + } + + return 0; +} + +int32_t tdRSmaFSUpsertQFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) { + int32_t code = 0; + int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn, TD_GE); + + if (idx < 0) { + idx = taosArrayGetSize(pFS->aQTaskInf); + } else { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); + int32_t c = tdQTaskFileCmprFn(pTaskF, qTaskFile); + if (c == 0) { + pTaskF->nRef = qTaskFile->nRef; + pTaskF->version = qTaskFile->version; + pTaskF->size = qTaskFile->size; + goto _exit; + } + } + + if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + return code; +} diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 3c3097bb2f..d9ffda279f 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -150,7 +150,7 @@ int32_t smaOpen(SVnode *pVnode) { } // restore the rsma - if (tdRsmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) { + if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) { goto _err; } } @@ -181,8 +181,8 @@ int32_t smaClose(SSma *pSma) { * @param committedVer * @return int32_t */ -int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { +int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { ASSERT(VND_IS_RSMA(pSma->pVnode)); - return tdProcessRSmaRestoreImpl(pSma, type, committedVer); + return tdRSmaProcessRestoreImpl(pSma, type, committedVer); } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f2063e3067..977f7dda07 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -46,6 +46,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSu static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); +static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo); +static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); @@ -96,7 +98,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } -void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { +static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC if (!taskHandle || !(*taskHandle)) return; qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); @@ -129,14 +131,14 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { } if (isDeepFree && pInfo->taskInfo[i]) { - tdFreeQTaskInfo(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); + tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } else { smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), pInfo->suid, i + 1); } if (pInfo->iTaskInfo[i]) { - tdFreeQTaskInfo(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); + tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); } else { smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", SMA_VID(pSma), pInfo->suid, i + 1); @@ -330,7 +332,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat * @param tbName * @return int32_t */ -int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { +int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; @@ -427,7 +429,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) { return TSDB_CODE_SUCCESS; } - return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); + return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); } /** @@ -817,6 +819,95 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_SUCCESS; } +static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param, + tb_uid_t suid, int8_t idx) { + SVnode *pVnode = pSma->pVnode; + char *pOutput = NULL; + int32_t len = 0; + + if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) { + smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + + SReadHandle handle = { + .meta = pVnode->pMeta, + .vnode = pVnode, + .initTqReader = 1, + }; + ASSERT(!dstTaskInfo); + dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); + if (!dstTaskInfo) { + terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; + goto _err; + } + + if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) { + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + + smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); + + taosMemoryFreeClear(pOutput); + return TSDB_CODE_SUCCESS; +_err: + taosMemoryFreeClear(pOutput); + tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1); + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, + terrstr()); + return TSDB_CODE_FAILED; +} + +/** + * @brief Clone qTaskInfo of SRSmaInfo + * + * @param pSma + * @param pInfo + * @return int32_t + */ +static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { + SRSmaParam *param = NULL; + if (!pInfo) { + return TSDB_CODE_SUCCESS; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, SMA_META(pSma), 0); + smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); + if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) { + smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, + terrstr()); + goto _err; + } + ASSERT(mr.me.type == TSDB_SUPER_TABLE); + ASSERT(mr.me.uid == pInfo->suid); + if (TABLE_IS_ROLLUP(mr.me.flags)) { + param = &mr.me.stbEntry.rsmaParam; + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (!pInfo->iTaskInfo[i]) { + continue; + } + if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { + goto _err; + } + } + smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid); + } else { + terrno = TSDB_CODE_RSMA_INVALID_SCHEMA; + goto _err; + } + + metaReaderClear(&mr); + return TSDB_CODE_SUCCESS; +_err: + metaReaderClear(&mr); + smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); + return TSDB_CODE_FAILED; +} + /** * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. * @@ -848,7 +939,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } if (!pRSmaInfo->taskInfo[0]) { - if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { + if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) { // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } @@ -1006,7 +1097,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { " qmsgLen:%" PRIi32, TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); } - if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { + if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); goto _err; } @@ -1118,7 +1209,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { return TSDB_CODE_SUCCESS; } -int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) { +int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) { // step 1: iterate all stables to restore the rsma env int64_t nTables = 0; if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { @@ -1139,6 +1230,12 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) if (tdRSmaRestoreTSDataReload(pSma) < 0) { goto _err; } + + // step 4: open SRSmaFS for qTaskFiles + if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) { + goto _err; + } + smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer); return TSDB_CODE_SUCCESS; _err: diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 335c15a539..75ec26bef2 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -332,7 +332,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { pWriter->pQTaskFWriter->fname, qTaskInfoFullName); // rsma restore - if ((code = tdRsmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) { + if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) { goto _err; } smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 1687cd46a0..173391b769 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -20,6 +20,10 @@ #define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file +static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); +static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); +static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); + // TODO: Who is responsible for resource allocate and release? int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { int32_t code = TSDB_CODE_SUCCESS; @@ -59,7 +63,7 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t * * @param days unit is minute * @return int32_t */ -int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { +static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { SDecoder coder = {0}; tDecoderInit(&coder, pCont, contLen); @@ -106,7 +110,7 @@ _err: * @param pMsg * @return int32_t */ -int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { +static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { SSmaCfg *pCfg = (SSmaCfg *)pMsg; if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { @@ -145,7 +149,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { * @param msg * @return int32_t */ -int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { +static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { const SArray *pDataBlocks = (const SArray *)msg; // TODO: destroy SSDataBlocks(msg) if (!pDataBlocks) { diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index d771797963..a4ba0a61a5 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -305,93 +305,4 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) { smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId); return TSDB_CODE_SUCCESS; -} - -static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param, - tb_uid_t suid, int8_t idx) { - SVnode *pVnode = pSma->pVnode; - char *pOutput = NULL; - int32_t len = 0; - - if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) { - smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; - } - - SReadHandle handle = { - .meta = pVnode->pMeta, - .vnode = pVnode, - .initTqReader = 1, - }; - ASSERT(!dstTaskInfo); - dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); - if (!dstTaskInfo) { - terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; - goto _err; - } - - if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) { - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; - } - - smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); - - taosMemoryFreeClear(pOutput); - return TSDB_CODE_SUCCESS; -_err: - taosMemoryFreeClear(pOutput); - tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1); - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, - terrstr()); - return TSDB_CODE_FAILED; -} - -/** - * @brief Clone qTaskInfo of SRSmaInfo - * - * @param pSma - * @param pInfo - * @return int32_t - */ -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { - SRSmaParam *param = NULL; - if (!pInfo) { - return TSDB_CODE_SUCCESS; - } - - SMetaReader mr = {0}; - metaReaderInit(&mr, SMA_META(pSma), 0); - smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); - if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) { - smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, - terrstr()); - goto _err; - } - ASSERT(mr.me.type == TSDB_SUPER_TABLE); - ASSERT(mr.me.uid == pInfo->suid); - if (TABLE_IS_ROLLUP(mr.me.flags)) { - param = &mr.me.stbEntry.rsmaParam; - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (!pInfo->iTaskInfo[i]) { - continue; - } - if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { - goto _err; - } - } - smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid); - } else { - terrno = TSDB_CODE_RSMA_INVALID_SCHEMA; - goto _err; - } - - metaReaderClear(&mr); - return TSDB_CODE_SUCCESS; -_err: - metaReaderClear(&mr); - smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); - return TSDB_CODE_FAILED; } \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f99d126892..b382257972 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists" TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")