Merge branch '3.0' into refact/submit_req

This commit is contained in:
Haojun Liao 2022-12-26 10:26:17 +08:00 committed by GitHub
commit fbc9e77004
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1314 additions and 1101 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 261fcca
GIT_TAG 11b60a4
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -84,20 +84,6 @@ func main() {
if err != nil {
panic(err)
}
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
if result.Type != common.TMQ_RES_TABLE_META {
panic("want message type 2 got " + strconv.Itoa(int(result.Type)))
}
data, _ := json.Marshal(result.Meta)
fmt.Println(string(data))
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
break
}
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)

View File

@ -701,7 +701,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
// #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_FS_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
@ -709,6 +709,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160)
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162)
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)

View File

@ -44,7 +44,6 @@ typedef struct SRSmaInfoItem SRSmaInfoItem;
typedef struct SRSmaFS SRSmaFS;
typedef struct SQTaskFile SQTaskFile;
typedef struct SQTaskFReader SQTaskFReader;
typedef struct SQTaskFWriter SQTaskFWriter;
struct SSmaEnv {
SRWLatch lock;
@ -85,22 +84,20 @@ struct STSmaStat {
struct SQTaskFile {
volatile int32_t nRef;
int32_t padding;
int8_t level;
int64_t suid;
int64_t version;
int64_t size;
int64_t mtime;
};
struct SQTaskFReader {
SSma *pSma;
int8_t level;
int64_t suid;
int64_t version;
TdFilePtr pReadH;
};
struct SQTaskFWriter {
SSma *pSma;
int64_t version;
TdFilePtr pWriteH;
char *fname;
};
struct SRSmaFS {
SArray *aQTaskInf; // array of SQTaskFile
@ -214,85 +211,40 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
// rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat);
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma);
int32_t tdRSmaFSFinishCommit(SSma *pSma);
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
smaTrace("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_DEC(pRSmaInfo);
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
// smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
typedef struct STFInfo STFInfo;
typedef struct STFile STFile;
struct STFInfo {
// common fields
uint32_t magic;
uint32_t ftype;
uint32_t fver;
int64_t fsize;
};
enum {
TD_FTYPE_RSMA_QTASKINFO = 0,
};
#if 0
struct STFile {
uint8_t state;
STFInfo info;
char *fname;
TdFilePtr pFile;
};
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_FULL_NAME(tf) ((tf)->fname)
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
int32_t tdOpenTFile(STFile *pTFile, int flags);
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size);
int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile);
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile);
#endif
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
char *outputName);
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName);
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus
}

View File

@ -210,9 +210,6 @@ void smaCleanUp();
int32_t smaOpen(SVnode* pVnode, int8_t rollback);
int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma);
@ -227,7 +224,6 @@ int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg,
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
void tdUidStoreDestory(STbUidStore* pStore);
void* tdUidStoreFree(STbUidStore* pStore);
// SMetaSnapReader ========================================
@ -274,6 +270,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
typedef struct {
@ -417,6 +414,7 @@ enum {
struct SSnapDataHdr {
int8_t type;
int8_t flag;
int64_t index;
int64_t size;
uint8_t data[];

View File

@ -17,42 +17,11 @@
extern SSmaMgmt smaMgmt;
#if 0
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
#endif
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
#if 0
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
#endif
/**
* @brief async commit, only applicable to Rollup SMA
*
@ -128,85 +97,25 @@ _exit:
int32_t smaFinishCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
code = tdRSmaFSFinishCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
terrno = code;
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
#if 0
/**
* @brief pre-commit for rollup sma(sync commit).
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) wait for all triggered fetch tasks to finish
* 3) perform persist task for qTaskInfo
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
// step 3: perform persist task for qTaskInfo
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
/**
* @brief commit for rollup sma
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
#if 0
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
#endif
return TSDB_CODE_SUCCESS;
}
#endif
// SQTaskFile ======================================================
/**
@ -218,6 +127,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
* @return int32_t
*/
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
#if 0
SVnode *pVnode = pSma->pVnode;
SRSmaFS *pFS = RSMA_FS(pStat);
int64_t committed = pStat->commitAppliedVer;
@ -264,30 +174,9 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
return TSDB_CODE_SUCCESS;
}
#if 0
/**
* @brief post-commit for rollup sma
* 1) clean up the outdated qtaskinfo files
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
if (!VND_IS_RSMA(pVnode)) {
return TSDB_CODE_SUCCESS;
}
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS;
}
#endif
return TSDB_CODE_SUCCESS;
}
/**
* @brief Rsma async commit implementation(only do some necessary light weighted task)
@ -298,9 +187,11 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
int32_t code = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
return TSDB_CODE_SUCCESS;
return code;
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
@ -317,7 +208,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
ASSERT(pRSmaStat->commitAppliedVer > 0);
// ASSERT(pRSmaStat->commitAppliedVer > 0);
// step 2: wait for all triggered fetch tasks to finish
nLoops = 0;
@ -351,8 +242,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) {
return code;
}
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
@ -383,7 +274,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
return TSDB_CODE_SUCCESS;
return code;
}
/**
@ -394,26 +285,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
*/
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
#endif
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA2(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbCommit(VND_RSMA2(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
_exit:
terrno = code;
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -243,10 +243,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
return TSDB_CODE_FAILED;
}
if (!(RSMA_FS(pRSmaStat)->aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO
} else {

View File

@ -17,250 +17,713 @@
// =================================================================================================
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
/**
* @brief Open RSma FS from qTaskInfo files
*
* @param pSma
* @param version
* @return int32_t
*/
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SArray *output = NULL;
terrno = TSDB_CODE_SUCCESS;
static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
n += tPutI8(p ? p + n : p, pFile->level);
n += tPutI64v(p ? p + n : p, pFile->size);
n += tPutI64v(p ? p + n : p, pFile->suid);
n += tPutI64v(p ? p + n : p, pFile->version);
n += tPutI64v(p ? p + n : p, pFile->mtime);
if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) {
goto _end;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
int32_t vid = 0;
int64_t version = -1;
sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version);
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
goto _end;
}
smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%d", TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef);
}
_end:
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
void *ptr = taosArrayGetP(output, i);
taosMemoryFreeClear(ptr);
}
taosArrayDestroy(output);
if (terrno != TSDB_CODE_SUCCESS) {
smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
return n;
}
void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); }
static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) {
int32_t n = 0;
uint32_t size = taosArrayGetSize(pFS->aQTaskInf);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) {
return -1;
} else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) {
return 1;
// version
n += tPutI8(p ? p + n : p, 0);
// SArray<SQTaskFile>
n += tPutU32v(p ? p + n : p, size);
for (uint32_t i = 0; i < size; ++i) {
n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i));
}
return 0;
return n;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) {
int32_t n = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if ((pTaskF = taosArraySearch(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
n += tGetI8(p + n, &pFile->level);
n += tGetI64v(p + n, &pFile->size);
n += tGetI64v(p + n, &pFile->suid);
n += tGetI64v(p + n, &pFile->version);
n += tGetI64v(p + n, &pFile->mtime);
return n;
}
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
int64_t version = -1;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (taosArrayGetSize(aQTaskInf) > 0) {
version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version;
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return version;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if ((idx = taosArraySearchIdx(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, idx);
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
/**
* @brief Fetch qtaskfiles LE than version
*
* @param pSma
* @param version
* @param output
* @return int32_t
*/
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) {
SVnode *pVnode = pSma->pVnode;
TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN];
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
regex_t regex;
int code = 0;
terrno = TSDB_CODE_SUCCESS;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
if (!taosCheckExistFile(dir)) {
smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir);
return TSDB_CODE_SUCCESS;
}
// Resource allocation and init
if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if (!(pDir = taosOpenDir(dir))) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED;
}
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir))) {
char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) {
continue;
}
code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) {
// match
smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName);
int64_t ver = -1;
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver);
if ((ver <= version) && (ver > -1)) {
if (!(*output)) {
if (!(*output = taosArrayInit(1, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
char *entryDup = strdup(entryName);
if (!entryDup) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!taosArrayPush(*output, &entryDup)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
} else {
}
} else if (code == REG_NOMATCH) {
// not match
smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName);
continue;
} else {
// has other error
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf);
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
goto _end;
}
}
_end:
taosCloseDir(&pDir);
regfree(&regex);
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) {
if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) {
return -1;
} else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) {
return 1;
}
return 0;
}
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn2, TD_GE);
int32_t n = 0;
int8_t version = 0;
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile);
if (c == 0) {
pTaskF->nRef = qTaskFile->nRef;
pTaskF->version = qTaskFile->version;
pTaskF->size = qTaskFile->size;
// version
n += tGetI8(pData + n, &version);
// SArray<SQTaskFile>
taosArrayClear(pFS->aQTaskInf);
uint32_t size = 0;
n += tGetU32v(pData + n, &size);
for (uint32_t i = 0; i < size; ++i) {
SQTaskFile qTaskF = {0};
int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF);
if (nt < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
n += nt;
if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) {
ASSERT(n + sizeof(TSCKSUM) == nData);
_exit:
return code;
}
static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
// encode to binary
int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
uint8_t *pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tdRSmaFSToBinary(pData, pFS);
taosCalcChecksumAppend(0, pData, size);
// save to file
TdFilePtr pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) {
int32_t code = 0;
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
}
static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) {
SVnode *pVnode = pSma->pVnode;
if (pVnode->pTfs) {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs),
TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP);
}
} else {
#if 0
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP);
}
#endif
}
}
static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
uint8_t *pData = NULL;
// load binary
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
// decode binary
code = tsdbBinaryToFS(pData, size, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
const SQTaskFile *q1 = (const SQTaskFile *)p1;
const SQTaskFile *q2 = (const SQTaskFile *)p2;
if (q1->suid < q2->suid) {
return -1;
} else if (q1->suid > q2->suid) {
return 1;
}
if (q1->level < q2->level) {
return -1;
} else if (q1->level > q2->level) {
return 1;
}
if (q1->version < q2->version) {
return -2;
} else if (q1->version > q2->version) {
return 1;
}
return 0;
}
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFSOld = RSMA_FS(pStat);
int64_t version = pStat->commitAppliedVer;
char fname[TSDB_FILENAME_LEN] = {0};
// SQTaskFile
int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf);
int32_t iNew = 0;
while (iNew < nNew) {
SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++);
int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFSOld->aQTaskInf);
pQTaskFNew->nRef = 1;
} else {
SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c1 == 0) {
// utilize the item in pFSOld->qQTaskInf, instead of pFSNew
continue;
} else if (c1 < 0) {
// NOTHING TODO
} else {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// remove previous version
while (--idx >= 0) {
SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew);
if (c2 == 0) {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c2 != -2) {
break;
}
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
(void)taosRemoveFile(fname);
taosArrayRemove(pFSOld->aQTaskInf, idx);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
int32_t code = 0;
#if 0
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *pFS = RSMA_FS(pStat);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
// SArray<SQTaskFile>
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
// main.tdb =========
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
if (taosCheckExistFile(fnameVer)) {
if (taosRenameFile(fnameVer, fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname);
} else if (taosCheckExistFile(fname)) {
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname);
}
}
{
// remove those invalid files (todo)
// main.tdb-journal.5 // TDB should handle its clear for kill -9
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
#endif
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
// open handle
code = tdRSmaFSCreate(RSMA_FS(pStat), 0);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (taosCheckExistFile(current)) {
code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
code = tdRSmaFSRollback(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
// 1st time open with empty current/qTaskInfoFile
code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current);
TSDB_CHECK_CODE(code, lino, _exit);
}
// scan and try fix(remove main.db/main.db.xxx and use the one with version)
code = tdRSmaFSScanAndTryFix(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
void tdRSmaFSClose(SRSmaFS *pFS) { pFS->aQTaskInf = taosArrayDestroy(pFS->aQTaskInf); }
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
tdRSmaGetCurrentFName(pSma, NULL, tfname);
// generate PRESENT.t
code = tdRSmaSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SRSmaFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, current, current_t);
if (!taosCheckExistFile(current_t)) {
goto _exit;
}
// rename the file
if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
// load the new FS
code = tdRSmaFSCreate(&fs, 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaLoadFSFromFile(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
code = tdRSmaFSApplyChange(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
tdRSmaFSClose(&fs);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSFinishCommit(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma));
}
return code;
}
int32_t tdRSmaFSRollback(SSma *pSma) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tdRSmaGetCurrentFName(pSma, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno));
}
return code;
}
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) {
int32_t code = 0;
for (int32_t i = 0; i < nSize; ++i) {
SQTaskFile *qTaskF = qTaskFile + i;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
if (c == 0) {
if (pTaskF->size != qTaskF->size) {
code = TSDB_CODE_RSMA_FS_UPDATE;
smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64
", size:%" PRIi64 " != %" PRIi64,
SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version,
pTaskF->size, qTaskF->size);
goto _exit;
}
continue;
}
}
if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
#if 0
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
ASSERT(version > 0);
if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
} else {
// ref all
int32_t size = taosArrayGetSize(aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
ASSERT(version > 0);
if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, idx);
}
}
} else {
for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
int32_t nRef = INT32_MAX;
if (pTaskF->version == version) {
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
} else if (pTaskF->version < version) {
nRef = atomic_load_32(&pTaskF->nRef);
}
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, i);
continue;
}
++i;
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
#endif
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
if (pFS->aQTaskInf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i);
nRef = atomic_fetch_add_32(&qTaskF->nRef, 1);
if (nRef <= 0) {
code = TSDB_CODE_RSMA_FS_REF;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
taosArraySetSize(pFS->aQTaskInf, size);
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code),
nRef);
}
return code;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN];
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
if (nRef == 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
if (taosRemoveFile(fname) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname);
}
} else if (nRef < 0) {
smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF));
}
}
taosArrayDestroy(pFS->aQTaskInf);
}
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaFS *qFS = RSMA_FS(pStat);
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
code = tdRSmaFSCreate(pFS, size);
TSDB_CHECK_CODE(code, lino, _exit);
taosArraySetSize(pFS->aQTaskInf, size);
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -121,8 +121,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
ASSERT(!pVnode->pSma);
SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
if (!pSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -137,7 +135,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
if (VND_IS_RSMA(pVnode)) {
STsdbKeepCfg keepCfg = {0};
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
if (i == TSDB_RETENTION_L0) {
SMA_OPEN_RSMA_IMPL(pVnode, 0);
} else if (i == TSDB_RETENTION_L1) {
@ -145,12 +143,14 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
} else if (i == TSDB_RETENTION_L2) {
SMA_OPEN_RSMA_IMPL(pVnode, 2);
} else {
ASSERT(0);
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), terrstr(), i);
goto _err;
}
}
// restore the rsma
if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) {
goto _err;
}
}
@ -181,8 +181,11 @@ int32_t smaClose(SSma *pSma) {
* @param committedVer
* @return int32_t
*/
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode));
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) {
if (!VND_IS_RSMA(pSma->pVnode)) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
return TSDB_CODE_FAILED;
}
return tdRSmaProcessRestoreImpl(pSma, type, committedVer);
return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback);
}

View File

@ -21,17 +21,17 @@
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
SSmaMgmt smaMgmt = {
.inited = 0,
.rsetId = -1,
};
#define TD_QTASKINFO_FNAME_PREFIX "qinf.v"
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx);
@ -57,38 +57,6 @@ struct SRSmaQTaskInfoItem {
void *qTaskInfo;
};
struct SRSmaQTaskInfoIter {
STFile *pTFile;
int64_t offset;
int64_t fsize;
int32_t nBytes;
int32_t nAlloc;
char *pBuf;
// ------------
char *qBuf; // for iterator
int32_t nBufPos;
};
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
}
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC
if (!taskHandle || !(*taskHandle)) return;
@ -363,10 +331,12 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS;
}
#if 0
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
#endif
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
@ -374,13 +344,8 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
// TODO: free original pRSmaInfo if exists abnormally
tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true);
if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) {
terrno = TSDB_CODE_RSMA_REMOVE_EXISTS;
goto _err;
}
smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS;
}
// from write queue: single thead
@ -449,8 +414,8 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
}
if (!VND_IS_RSMA(pVnode)) {
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
pReq->suid);
smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
pReq->suid);
return TSDB_CODE_SUCCESS;
}
@ -494,9 +459,8 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
// save to file
// TODO
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
// no need to save to file as triggered by dropping stable
smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
return TSDB_CODE_SUCCESS;
}
@ -561,7 +525,7 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid)
return TSDB_CODE_SUCCESS;
}
void tdUidStoreDestory(STbUidStore *pStore) {
static void tdUidStoreDestory(STbUidStore *pStore) {
if (pStore) {
if (pStore->uidHash) {
if (pStore->tbUids) {
@ -839,12 +803,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_SUCCESS;
}
if (!pInfo->pTSchema) {
terrno = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid);
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level,
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize);
#if 0
for (int32_t i = 0; i < msgSize; ++i) {
@ -853,7 +818,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
tdRsmaPrintSubmitReq(pSma, pReq);
}
#endif
if (qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
@ -870,6 +835,12 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
char *pOutput = NULL;
int32_t len = 0;
if (!srcTaskInfo) {
terrno = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d, rsma clone, table %" PRIi64 ", no need since srcTaskInfo is NULL", TD_VID(pVnode), suid);
return TSDB_CODE_FAILED;
}
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
@ -1052,12 +1023,8 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
}
STbUidStore uidStore = {0};
SRetention *pRetention = SMA_RETENTION(pSma);
if (!RETENTION_VALID(pRetention + 1)) {
// return directly if retention level 1 is invalid
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
@ -1189,18 +1156,21 @@ _err:
}
/**
* @brief reload ts data from checkpoint
*
* @param pSma
* @return int32_t
* N.B. the data would be restored from the unified WAL replay procedure
*/
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
// NOTHING TODO: the data would be restored from the unified WAL replay procedure
return TSDB_CODE_SUCCESS;
}
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
// step 1: init env
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
// step 1: iterate all stables to restore the rsma env
// step 2: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) {
goto _err;
}
// step 3: iterate all stables to restore the rsma env
int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err;
@ -1210,16 +1180,6 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
return TSDB_CODE_SUCCESS;
}
// step 2: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err;
}
// step 3: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
goto _err;
}
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS;
_err:
@ -1229,19 +1189,26 @@ _err:
}
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
int32_t vid = SMA_VID(pSma);
int32_t code = 0;
int32_t lino = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SArray *qTaskFArray = NULL;
int64_t version = pRSmaStat->commitAppliedVer;
TdFilePtr pOutFD = NULL;
TdFilePtr pInFD = NULL;
char fname[TSDB_FILENAME_LEN];
char fnameVer[TSDB_FILENAME_LEN];
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS;
qTaskFArray = taosArrayInit(taosHashGetSize(pInfoHash) << 1, sizeof(SQTaskFile));
if (!qTaskFArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
void *infoHash = NULL;
@ -1256,19 +1223,80 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
goto _err;
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid,
i + 1);
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1);
// qTaskInfo file
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs),
fnameVer);
if (taosCheckExistFile(fnameVer)) {
smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer);
}
pOutFD = taosCreateFile(fnameVer, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pInFD = taosOpenFile(fname, TD_FILE_READ);
if (pInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size = 0;
uint32_t mtime = 0;
if (taosFStatFile(pInFD, &size, &mtime) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
ASSERT(size > 0);
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), fname,
fnameVer, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pOutFD);
taosCloseFile(&pInFD);
SQTaskFile qTaskF = {
.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size, .mtime = mtime};
taosArrayPush(qTaskFArray, &qTaskF);
}
}
}
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
return TSDB_CODE_FAILED;
// prepare
code = tdRSmaFSCopy(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSUpsertQTaskFile(pSma, &fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray));
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaFSPrepareCommit(pSma, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosArrayDestroy(fs.aQTaskInf);
taosArrayDestroy(qTaskFArray);
if (code) {
if (pOutFD) taosCloseFile(&pOutFD);
if (pInFD) taosCloseFile(&pInFD);
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
terrno = code;
return code;
}
/**
@ -1355,12 +1383,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tsem_post(&(pStat->notEmpty));
}
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive",
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INIT: {
@ -1368,8 +1392,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
default: {
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8
" is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat);
} break;
}
@ -1468,7 +1493,6 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
if (size > 0) {
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
}
@ -1476,6 +1500,9 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
}
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
tdFreeRSmaSubmitItems(pSubmitArr);
while (1) {
void *msg = NULL;
taosGetQitem(qall, (void **)&msg);
@ -1522,8 +1549,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
while ((pIter = taosHashIterate(infoHash, pIter))) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) {
int32_t batchCnt = -1;
int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
bool occupied = (batchMax <= 1);
@ -1539,13 +1565,20 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
}
if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
if (RSMA_NEED_FETCH(pInfo)) {
int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
ASSERT(oldVal >= 0);
tdRSmaFetchAllResult(pSma, pInfo);
int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat));
if (curStat == 1) {
smaDebug("vgId:%d, fetch all not exec as commit stat is %" PRIi8, SMA_VID(pSma), curStat);
} else {
tdRSmaFetchAllResult(pSma, pInfo);
}
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
}
@ -1555,17 +1588,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (qallItemSize > 0) {
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
continue;
} else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) {
continue;
}
for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j);
if (pItem->fetchLevel) {
pItem->fetchLevel = 0;
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
}
}
}
if (RSMA_NEED_FETCH(pInfo)) {
continue;
}
break;

View File

@ -17,14 +17,13 @@
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version);
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader);
// SRSmaSnapReader ========================================
struct SRSmaSnapReader {
SSma* pSma;
int64_t sver;
int64_t ever;
SRSmaFS fs;
// for data file
int8_t rsmaDataDone[TSDB_RETENTION_L2];
@ -32,19 +31,23 @@ struct SRSmaSnapReader {
// for qtaskinfo file
int8_t qTaskDone;
int32_t fsIter;
SQTaskFReader* pQTaskFReader;
};
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
int32_t code = 0;
int32_t lino = 0;
SVnode* pVnode = pSma->pVnode;
SRSmaSnapReader* pReader = NULL;
SSmaEnv* pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat* pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
// alloc
pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->pSma = pSma;
pReader->sver = sver;
@ -55,171 +58,147 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
&pReader->pDataReader[i]);
if (code < 0) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// open qtaskinfo
if ((code = rsmaQTaskInfSnapReaderOpen(pReader, ever)) < 0) {
goto _err;
taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, &pReader->fs);
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) {
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->pQTaskFReader->pSma = pSma;
pReader->pQTaskFReader->version = pReader->ever;
}
*ppReader = pReader;
return TSDB_CODE_SUCCESS;
_err:
if (pReader) rsmaSnapReaderClose(&pReader);
*ppReader = NULL;
smaError("vgId:%d, vnode snapshot rsma reader open failed since %s", TD_VID(pVnode), tstrerror(code));
return TSDB_CODE_FAILED;
}
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
SVnode* pVnode = pSma->pVnode;
SSmaEnv* pEnv = NULL;
SRSmaStat* pStat = NULL;
if (!(pEnv = SMA_RSMA_ENV(pSma))) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as env is NULL",
TD_VID(pVnode), version);
return TSDB_CODE_SUCCESS;
_exit:
if (code) {
if (pReader) rsmaSnapReaderClose(&pReader);
*ppReader = NULL;
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
int32_t ref = tdRSmaFSRef(pReader->pSma, pStat, version);
if (ref < 1) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as ref is %d",
TD_VID(pVnode), version, ref);
return TSDB_CODE_SUCCESS;
}
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (!taosCheckExistFile(qTaskInfoFullName)) {
tdRSmaFSUnRef(pSma, pStat, version);
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as %s not exist",
TD_VID(pVnode), version, qTaskInfoFullName);
return TSDB_CODE_SUCCESS;
}
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
TdFilePtr fp = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFreeClear(pReader->pQTaskFReader);
goto _end;
}
pReader->pQTaskFReader->pReadH = fp;
pReader->pQTaskFReader->pSma = pSma;
pReader->pQTaskFReader->version = pReader->ever;
_end:
if (code < 0) {
tdRSmaFSUnRef(pSma, pStat, version);
smaError("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_SUCCESS;
}
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) {
if (!(*ppReader)) {
return TSDB_CODE_SUCCESS;
}
SSma* pSma = (*ppReader)->pSma;
SRSmaStat* pStat = SMA_RSMA_STAT(pSma);
int64_t version = (*ppReader)->version;
taosCloseFile(&(*ppReader)->pReadH);
tdRSmaFSUnRef(pSma, pStat, version);
taosMemoryFreeClear(*ppReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version);
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
int32_t lino = 0;
SVnode* pVnode = pReader->pSma->pVnode;
SQTaskFReader* qReader = pReader->pQTaskFReader;
SRSmaFS* pFS = &pReader->fs;
int64_t n = 0;
uint8_t* pBuf = NULL;
SQTaskFReader* qReader = pReader->pQTaskFReader;
int64_t version = pReader->ever;
char fname[TSDB_FILENAME_LEN];
if (!qReader) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", SMA_VID(pSma));
return 0;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode));
goto _exit;
}
if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode));
goto _exit;
}
while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) {
SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++);
if (qTaskF->version != version) {
continue;
}
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
fname);
if (!taosCheckExistFile(fname)) {
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
", version %" PRIi64 " failed since %s not exist",
TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
code = TSDB_CODE_RSMA_FS_SYNC;
TSDB_CHECK_CODE(code, lino, _exit);
}
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
qReader->pReadH = fp;
qReader->level = qTaskF->level;
qReader->suid = qTaskF->suid;
}
if (!qReader->pReadH) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is NULL", SMA_VID(pSma));
return 0;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode));
goto _exit;
}
int64_t size = 0;
if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
// seek
if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
ASSERT(!(*ppBuf));
// alloc
*ppBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + size);
if (*ppBuf) {
*ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size);
} else {
*ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
}
if (!(*ppBuf)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
// read
n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
} else if (n != size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size);
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version,
size);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
pHdr->type = SNAP_DATA_QTASK;
pHdr->flag = qReader->level;
pHdr->index = qReader->suid;
pHdr->size = size;
_exit:
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
return code;
if (qReader) taosCloseFile(&qReader->pReadH);
_err:
*ppBuf = NULL;
smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
if (code) {
*ppBuf = NULL;
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode));
}
return code;
}
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
*ppData = NULL;
@ -233,14 +212,11 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
if (!pReader->rsmaDataDone[i]) {
smaInfo("vgId:%d, vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i);
code = tsdbSnapRead(pTsdbSnapReader, ppData);
if (code) {
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->rsmaDataDone[i] = 1;
}
pReader->rsmaDataDone[i] = 1;
}
} else {
smaInfo("vgId:%d, vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i);
@ -251,22 +227,21 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
if (!pReader->qTaskDone) {
smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma));
code = rsmaSnapReadQTaskInfo(pReader, ppData);
if (code) {
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
} else {
pReader->qTaskDone = 1;
if (*ppData) {
goto _exit;
}
}
}
_exit:
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
return code;
_err:
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
if (code) {
rsmaSnapReaderClose(&pReader);
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
} else {
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
}
return code;
}
@ -274,14 +249,15 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
int32_t code = 0;
SRSmaSnapReader* pReader = *ppReader;
tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
taosMemoryFreeClear(pReader->pQTaskFReader);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) {
tsdbSnapReaderClose(&pReader->pDataReader[i]);
}
}
rsmaQTaskInfSnapReaderClose(&pReader->pQTaskFReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
taosMemoryFreeClear(*ppReader);
@ -293,28 +269,23 @@ struct SRSmaSnapWriter {
SSma* pSma;
int64_t sver;
int64_t ever;
// config
int64_t commitID;
SRSmaFS fs;
// for data file
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
// for qtaskinfo file
SQTaskFReader* pQTaskFReader;
SQTaskFWriter* pQTaskFWriter;
};
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
int32_t code = 0;
SRSmaSnapWriter* pWriter = NULL;
int32_t lino = 0;
SVnode* pVnode = pSma->pVnode;
SRSmaSnapWriter* pWriter = NULL;
// alloc
pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
if (!pWriter) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
pWriter->pSma = pSma;
pWriter->sver = sver;
@ -324,100 +295,152 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
if (code < 0) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// qtaskinfo
SQTaskFWriter* qWriter = (SQTaskFWriter*)taosMemoryCalloc(1, sizeof(SQTaskFWriter));
qWriter->pSma = pSma;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!qTaskF) {
taosMemoryFree(qWriter);
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName,
tstrerror(code));
goto _err;
}
qWriter->pWriteH = qTaskF;
int32_t fnameLen = strlen(qTaskInfoFullName) + 1;
qWriter->fname = taosMemoryCalloc(1, fnameLen);
strncpy(qWriter->fname, qTaskInfoFullName, fnameLen);
pWriter->pQTaskFWriter = qWriter;
smaDebug("vgId:%d, rsma snapshot writer open succeed for %s", TD_VID(pSma->pVnode), qTaskInfoFullName);
code = tdRSmaFSCopy(pSma, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// snapWriter
*ppWriter = pWriter;
smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
_exit:
if (code) {
if (pWriter) rsmaSnapWriterClose(&pWriter, 0);
*ppWriter = NULL;
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
}
return code;
}
_err:
smaError("vgId:%d, rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code));
if (pWriter) rsmaSnapWriterClose(&pWriter, 0);
*ppWriter = NULL;
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
int32_t code = 0;
int32_t lino = 0;
if (pWriter) {
code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code));
}
return code;
}
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
SSma* pSma = NULL;
SVnode* pVnode = NULL;
SSmaEnv* pEnv = NULL;
SRSmaStat* pStat = NULL;
SRSmaSnapWriter* pWriter = *ppWriter;
SVnode* pVnode = pWriter->pSma->pVnode;
const char* primaryPath = NULL;
char fname[TSDB_FILENAME_LEN] = {0};
char fnameVer[TSDB_FILENAME_LEN] = {0};
TdFilePtr pOutFD = NULL;
TdFilePtr pInFD = NULL;
if (rollback) {
// TODO: rsma1/rsma2
// qtaskinfo
if (pWriter->pQTaskFWriter) {
if (taosRemoveFile(pWriter->pQTaskFWriter->fname) != 0) {
smaWarn("vgId:%d, vnode snapshot rsma writer failed to remove %s since %s", SMA_VID(pWriter->pSma),
pWriter->pQTaskFWriter->fname ? pWriter->pQTaskFWriter->fname : "NULL",
tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
}
} else {
// rsma1/rsma2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pWriter->pDataWriter[i]) {
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
if (code) goto _err;
}
}
// qtaskinfo
if (pWriter->pQTaskFWriter) {
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma),
pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
if (!pWriter) {
goto _exit;
}
// rsma restore
if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
pSma = pWriter->pSma;
pVnode = pSma->pVnode;
pEnv = SMA_RSMA_ENV(pSma);
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
primaryPath = tfsGetPrimaryPath(pVnode->pTfs);
// rsma1/rsma2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pWriter->pDataWriter[i]) {
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;
// qtaskinfo
if (rollback) {
tdRSmaFSRollback(pSma);
// remove qTaskFiles
} else {
// sendFile from fname.Ver to fname
SRSmaFS* pFS = &pWriter->fs;
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
if (pTaskF->version == pWriter->ever) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname);
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
if (pInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size = 0;
if (taosFStatFile(pInFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode),
fnameVer, fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pOutFD);
taosCloseFile(&pInFD);
}
}
// lock
taosWLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
if (code) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
goto _exit;
}
// unlock
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
// rsma restore
code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback);
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, vnode snapshot rsma writer restore from sync succeed", SMA_VID(pSma));
_exit:
if (pWriter) taosMemoryFree(pWriter);
*ppWriter = NULL;
if (code) {
if (pOutFD) taosCloseFile(&pOutFD);
if (pInFD) taosCloseFile(&pInFD);
smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code));
} else {
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma));
}
_err:
smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int32_t lino = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// rsma1/rsma2
@ -430,42 +453,81 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
} else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else {
ASSERT(0);
code = TSDB_CODE_RSMA_FS_SYNC;
}
if (code < 0) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
return code;
_err:
smaError("vgId:%d, rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
tstrerror(code));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, data type %" PRIi8, SMA_VID(pWriter->pSma), __func__, lino,
tstrerror(code), pHdr->type);
} else {
smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
}
return code;
}
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SQTaskFWriter* qWriter = pWriter->pQTaskFWriter;
int32_t code = 0;
int32_t lino = 0;
SSma* pSma = pWriter->pSma;
SVnode* pVnode = pSma->pVnode;
char fname[TSDB_FILENAME_LEN];
TdFilePtr fp = NULL;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
if (qWriter && qWriter->pWriteH) {
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
int64_t size = pHdr->size;
ASSERT(size == (nData - sizeof(SSnapDataHdr)));
int64_t contLen = taosWriteFile(qWriter->pWriteH, pHdr->data, size);
if (contLen != size) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", SMA_VID(pWriter->pSma), qWriter->fname);
} else {
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo is not needed", SMA_VID(pWriter->pSma));
fname[0] = '\0';
if (pHdr->size != (nData - sizeof(SSnapDataHdr))) {
code = TSDB_CODE_RSMA_FS_SYNC;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
return code;
SQTaskFile qTaskFile = {
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size);
if (contLen != pHdr->size) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
uint32_t mtime = 0;
if (taosFStatFile(fp, NULL, &mtime) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
qTaskFile.mtime = mtime;
}
if (taosFsyncFile(fp) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&fp);
code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (fp) {
(void)taosRemoveFile(fname);
}
smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname);
} else {
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname);
}
_err:
smaError("vgId:%d, vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
return code;
}

View File

@ -111,35 +111,48 @@ _err:
* @return int32_t
*/
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
int32_t code = 0;
int32_t lino = 0;
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
SName stbFullName = {0};
SVCreateStbReq pReq = {0};
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
// create tsma meta in dstVgId
if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
return -1;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// create stable to save tsma result in dstVgId
SName stbFullName = {0};
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateStbReq pReq = {0};
pReq.name = (char *)tNameGetTableName(&stbFullName);
pReq.suid = pCfg->dstTbUid;
pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag;
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
return -1;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
code = terrno = TSDB_CODE_TSMA_INVALID_STAT;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
" dstTb:%s dstVg:%d",
SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
pCfg->dstVgId);
} else {
smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
" dstTb:%s dstVg:%d",
SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
} else {
ASSERT(0);
}
return 0;
return code;
}
/**
@ -174,7 +187,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
STSmaStat *pTsmaStat = NULL;
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
terrno = TSDB_CODE_TSMA_INVALID_STAT;
terrno = TSDB_CODE_TSMA_INVALID_ENV;
return TSDB_CODE_FAILED;
}
@ -213,12 +226,16 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
indexUid, tstrerror(terrno));
goto _err;
}
// TODO deleteReq
taosArrayDestroy(deleteReq.deleteReqs);
#if 0
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName);
goto _err;
}
#endif
SRpcMsg submitReqMsg = {

View File

@ -15,197 +15,72 @@
#include "sma.h"
// smaFileUtil ================
#if 0
#define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_BAD 1
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
return tlen;
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) {
tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
return buf;
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
char *outputName) {
tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName);
}
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid);
}
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) {
ASSERT(TD_TFILE_OPENED(pTFile));
return taosFStatFile(pTFile->pFile, size, NULL);
}
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
int32_t tdUpdateTFileHeader(STFile *pTFile) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
void *ptr = buf;
tdEncodeTFInfo(&ptr, &(pTFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE);
if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
return 0;
}
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char buf[TD_FILE_HEAD_SIZE] = "\0";
uint32_t _version;
ASSERT(TD_TFILE_OPENED(pTFile));
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1;
}
if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
void *pBuf = buf;
pBuf = tdDecodeTFInfo(pBuf, pInfo);
return 0;
}
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TD_TFILE_OPENED(pTFile));
int64_t toffset;
if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) {
return -1;
}
#if 0
smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile),
toffset, nbyte, toffset + nbyte);
#endif
ASSERT(pTFile->info.fsize == toffset);
if (offset) {
*offset = toffset;
}
if (tdWriteTFile(pTFile, buf, nbyte) < 0) {
return -1;
}
pTFile->info.fsize += nbyte;
return nbyte;
}
int32_t tdOpenTFile(STFile *pTFile, int flags) {
ASSERT(!TD_TFILE_OPENED(pTFile));
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
void tdCloseTFile(STFile *pTFile) {
if (TD_TFILE_OPENED(pTFile)) {
taosCloseFile(&pTFile->pFile);
TD_TFILE_SET_CLOSED(pTFile);
}
}
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
#endif
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
char *outputName) {
if (version < 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
int8_t level, int64_t version, char *outputName) {
if (level >= 0 && suid > 0) {
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname,
version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, version);
}
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP,
vgId, fname);
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", pdname,
TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname);
}
}
} else {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
if (version >= 0) {
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname, version);
}
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname, version);
if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname,
TD_DIRSEP, vgId, fname);
}
}
}
}
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
if (pdname) {
if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
@ -223,81 +98,13 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e
}
}
#if 0
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
TD_TFILE_SET_CLOSED(pTFile);
memset(&(pTFile->info), 0, sizeof(pTFile->info));
pTFile->info.magic = TD_FILE_INIT_MAGIC;
char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0};
snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname);
int32_t tmpNameLen = strlen(tmpName) + 1;
pTFile->fname = taosMemoryMalloc(tmpNameLen);
if (!pTFile->fname) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tstrncpy(pTFile->fname, tmpName, tmpNameLen);
return 0;
}
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
if (errno == ENOENT) {
// Try to create directory recursively
char *s = strdup(TD_TFILE_FULL_NAME(pTFile));
if (taosMulMkDir(taosDirName(s)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return -1;
}
taosMemoryFree(s);
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
}
if (!updateHeader) {
return 0;
}
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
pTFile->info.fver = 0;
if (tdUpdateTFileHeader(pTFile) < 0) {
tdCloseTFile(pTFile);
tdRemoveTFile(pTFile);
return -1;
}
return 0;
}
int32_t tdRemoveTFile(STFile *pTFile) {
if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
};
return 0;
}
#endif
// smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) {
smaWarn("rsma acquire ref for rsetId:%d refId:%" PRIi64 " failed since %s", rsetId, refId, terrstr());
} else {
smaDebug("rsma acquire ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
smaTrace("rsma acquire ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
}
return pResult;
}
@ -307,7 +114,7 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
smaWarn("rsma release ref for rsetId:%d refId:%" PRIi64 " failed since %s", rsetId, refId, terrstr());
return TSDB_CODE_FAILED;
}
smaDebug("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
smaTrace("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId);
return TSDB_CODE_SUCCESS;
}

View File

@ -85,7 +85,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
uint64_t ts = 0;
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr());
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
return -1;
}
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
@ -144,7 +144,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
uint64_t ts = 0;
tqDebug("tmqsnap task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr());
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
return -1;
}
tqDebug("tmqsnap task execute end, get %p", pDataBlock);

View File

@ -1370,7 +1370,7 @@ _exit:
taosMemoryFree(pWriter);
}
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__);
*ppWriter = pWriter;
}
return code;
@ -1391,7 +1391,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
}
return code;
}
@ -1442,7 +1442,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]);
}
tsdbInfo("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
tsdbInfo("vgId:%d, %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;

View File

@ -184,16 +184,21 @@ _err:
return -1;
}
static void vnodePrepareCommit(SVnode *pVnode) {
static int32_t vnodePrepareCommit(SVnode *pVnode) {
int32_t code = 0;
tsem_wait(&pVnode->canCommit);
tsdbPrepareCommit(pVnode->pTsdb);
metaPrepareAsyncCommit(pVnode->pMeta);
smaPrepareAsyncCommit(pVnode->pSma);
code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit;
_exit:
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
return code;
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
@ -203,10 +208,9 @@ static int32_t vnodeCommitTask(void *arg) {
code = vnodeCommitImpl(pInfo);
if (code) goto _exit;
_exit:
// end commit
tsem_post(&pInfo->pVnode->canCommit);
_exit:
taosMemoryFree(pInfo);
return code;
}
@ -214,7 +218,8 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int32_t code = 0;
// prepare to commit
vnodePrepareCommit(pVnode);
code = vnodePrepareCommit(pVnode);
if (code) goto _exit;
// schedule the task
pVnode->state.commitTerm = pVnode->state.applyTerm;
@ -230,14 +235,15 @@ int vnodeAsyncCommit(SVnode *pVnode) {
pInfo->info.state.commitID = pVnode->state.commitID;
pInfo->pVnode = pVnode;
pInfo->txn = metaGetTxn(pVnode->pMeta);
vnodeScheduleTask(vnodeCommitTask, pInfo);
code = vnodeScheduleTask(vnodeCommitTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
tsem_post(&pVnode->canCommit);
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
pVnode->state.commitID);
} else {
vDebug("vgId:%d %s done", TD_VID(pVnode), __func__);
vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
}
return code;
}

View File

@ -15,13 +15,13 @@
#include "vnd.h"
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
do { \
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \
ASSERT(newVal >= 0); \
if (newVal < 0) { \
vWarn("vgId:%d %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
} \
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
do { \
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \
ASSERT(newVal >= 0); \
if (newVal < 0) { \
vWarn("vgId:%d, %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
} \
} while (0)
int vnodeQueryOpen(SVnode *pVnode) {

View File

@ -337,7 +337,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// commit if need
if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
vnodeAsyncCommit(pVnode);
if (vnodeAsyncCommit(pVnode) < 0) {
vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// start a new one
if (vnodeBegin(pVnode) < 0) {

View File

@ -64,7 +64,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
sTrace("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
if (pMsg->success) {

View File

@ -340,7 +340,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer);
@ -394,7 +394,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
@ -415,7 +415,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer);

View File

@ -586,6 +586,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Rsma fs commit error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
@ -593,6 +594,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")