Merge pull request #16649 from taosdata/feature/3.0_interval_hash_optimize

refactor: rsma fetch trigger refactor and use ref to manage qtaskf
This commit is contained in:
Shengliang Guan 2022-09-04 18:16:58 +08:00 committed by GitHub
commit 25df542a01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 692 additions and 446 deletions

View File

@ -65,13 +65,6 @@ typedef enum {
TSDB_STATIS_NONE = 1, // statis part not exist TSDB_STATIS_NONE = 1, // statis part not exist
} ETsdbStatisStatus; } ETsdbStatisStatus;
typedef enum {
TSDB_SMA_STAT_UNKNOWN = -1, // unknown
TSDB_SMA_STAT_OK = 0, // ready to provide service
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
TSDB_SMA_STAT_DROPPED = 2, // sma dropped
} ETsdbSmaStat; // bit operation
typedef enum { typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA

View File

@ -616,6 +616,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) #define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)

View File

@ -538,12 +538,12 @@ bool tdSTSRowIterGetTpVal(STSRowIter *pIter, col_type_t colType, int32_t offset,
} else { } else {
pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset); pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
} }
return TSDB_CODE_SUCCESS; return true;
} }
if (tdGetBitmapValType(pIter->pBitmap, pIter->colIdx - 1, &pVal->valType, 0) != TSDB_CODE_SUCCESS) { if (tdGetBitmapValType(pIter->pBitmap, pIter->colIdx - 1, &pVal->valType, 0) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE; pVal->valType = TD_VTYPE_NONE;
return terrno; return true;
} }
if (pVal->valType == TD_VTYPE_NORM) { if (pVal->valType == TD_VTYPE_NORM) {

View File

@ -29,6 +29,7 @@ target_sources(
# sma # sma
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
"src/sma/smaUtil.c" "src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
"src/sma/smaCommit.c" "src/sma/smaCommit.c"
"src/sma/smaRollup.c" "src/sma/smaRollup.c"

View File

@ -38,9 +38,10 @@ typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct STSmaStat STSmaStat; typedef struct STSmaStat STSmaStat;
typedef struct SRSmaStat SRSmaStat; typedef struct SRSmaStat SRSmaStat;
typedef struct SSmaKey SSmaKey; typedef struct SRSmaRef SRSmaRef;
typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfo SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem; typedef struct SRSmaInfoItem SRSmaInfoItem;
typedef struct SRSmaFS SRSmaFS;
typedef struct SQTaskFile SQTaskFile; typedef struct SQTaskFile SQTaskFile;
typedef struct SQTaskFReader SQTaskFReader; typedef struct SQTaskFReader SQTaskFReader;
typedef struct SQTaskFWriter SQTaskFWriter; typedef struct SQTaskFWriter SQTaskFWriter;
@ -54,10 +55,21 @@ struct SSmaEnv {
#define SMA_ENV_FLG_CLOSE ((int8_t)0x1) #define SMA_ENV_FLG_CLOSE ((int8_t)0x1)
struct SRSmaRef {
int64_t refId; // for SRSmaStat
int64_t suid;
};
typedef struct { typedef struct {
int8_t inited; int8_t inited;
int32_t rsetId; int32_t rsetId;
void *tmrHandle; // shared by all fetch tasks void *tmrHandle; // shared by all fetch tasks
/**
* @brief key: void* of SRSmaInfoItem, value: SRSmaRef
* N.B. Although there is a very small possibility that "void*" point to different objects while with the same
* address after release/renew, the functionality is not affected as it just used to fetch the rsma results.
*/
SHashObj *refHash; // shared by all vgroups
} SSmaMgmt; } SSmaMgmt;
#define SMA_ENV_LOCK(env) (&(env)->lock) #define SMA_ENV_LOCK(env) (&(env)->lock)
@ -73,20 +85,25 @@ struct STSmaStat {
struct SQTaskFile { struct SQTaskFile {
volatile int32_t nRef; volatile int32_t nRef;
int64_t commitID; int32_t padding;
int64_t version;
int64_t size; int64_t size;
}; };
struct SQTaskFReader { struct SQTaskFReader {
SSma *pSma; SSma *pSma;
SQTaskFile fTask; int64_t version;
TdFilePtr pReadH; TdFilePtr pReadH;
}; };
struct SQTaskFWriter { struct SQTaskFWriter {
SSma *pSma; SSma *pSma;
SQTaskFile fTask; int64_t version;
TdFilePtr pWriteH; TdFilePtr pWriteH;
char *fname; char *fname;
};
struct SRSmaFS {
SArray *aQTaskInf; // array of SQTaskFile
}; };
struct SRSmaStat { struct SRSmaStat {
@ -98,7 +115,7 @@ struct SRSmaStat {
volatile int32_t nFetchAll; // active number of fetch all volatile int32_t nFetchAll; // active number of fetch all
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing int8_t commitStat; // 0 not in committing, 1 in committing
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer tsem_t notEmpty; // has items in queue buffer
}; };
@ -118,6 +135,7 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId) #define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS(r) (&(r)->fs)
#define RSMA_FS_LOCK(r) (&(r)->lock) #define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem { struct SRSmaInfoItem {
@ -132,7 +150,6 @@ struct SRSmaInfoItem {
struct SRSmaInfo { struct SRSmaInfo {
STSchema *pTSchema; STSchema *pTSchema;
int64_t suid; int64_t suid;
int64_t refId; // refId of SRSmaStat
int64_t lastRecv; // ms int64_t lastRecv; // ms
int8_t assigned; // 0 idle, 1 assgined for exec int8_t assigned; // 0 idle, 1 assgined for exec
int8_t delFlag; int8_t delFlag;
@ -163,14 +180,6 @@ enum {
TASK_TRIGGER_STAT_DROPPED = 5, TASK_TRIGGER_STAT_DROPPED = 5,
}; };
enum {
RSMA_ROLE_CREATE = 0,
RSMA_ROLE_DROP = 1,
RSMA_ROLE_SUBMIT = 2,
RSMA_ROLE_FETCH = 3,
RSMA_ROLE_ITERATE = 4,
};
enum { enum {
RSMA_RESTORE_REBOOT = 1, RSMA_RESTORE_REBOOT = 1,
RSMA_RESTORE_SYNC = 2, RSMA_RESTORE_SYNC = 2,
@ -182,88 +191,34 @@ typedef enum {
RSMA_EXEC_COMMIT = 3, // triggered by commit RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType; } ERsmaExecType;
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); // sma
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
int32_t tdDropTSma(SSma *pSma, char *pMsg); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); int32_t tdLockSma(SSma *pSma);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); int32_t tdUnLockSma(SSma *pSma);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId); void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId); int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); // rsma
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdLockSma(SSma *pSma); void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
int32_t tdUnLockSma(SSma *pSma); void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) {
if (pTStat) {
return atomic_load_8(&pTStat->state);
}
return TSDB_SMA_STAT_UNKNOWN;
}
static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) {
if (!pTStat) {
return false;
}
if (state) {
*state = atomic_load_8(&pTStat->state);
return *state == TSDB_SMA_STAT_OK;
}
return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK;
}
static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) {
return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true;
}
static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) {
return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true;
}
static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) {
if (pTStat) {
atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK);
}
}
static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) {
if (pTStat) {
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED);
}
}
static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
if (pTStat) {
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
}
}
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// smaFileUtil ================ // smaFileUtil ================

View File

@ -71,8 +71,8 @@ typedef struct SStreamTaskReader SStreamTaskReader;
typedef struct SStreamTaskWriter SStreamTaskWriter; typedef struct SStreamTaskWriter SStreamTaskWriter;
typedef struct SStreamStateReader SStreamStateReader; typedef struct SStreamStateReader SStreamStateReader;
typedef struct SStreamStateWriter SStreamStateWriter; typedef struct SStreamStateWriter SStreamStateWriter;
typedef struct SRsmaSnapReader SRsmaSnapReader; typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRsmaSnapWriter SRsmaSnapWriter; typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
@ -248,14 +248,14 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
// SStreamTaskReader ====================================== // SStreamTaskReader ======================================
// SStreamStateWriter ===================================== // SStreamStateWriter =====================================
// SStreamStateReader ===================================== // SStreamStateReader =====================================
// SRsmaSnapReader ======================================== // SRSmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader); int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader); int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader);
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData); int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRsmaSnapWriter ======================================== // SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter); int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other

View File

@ -15,13 +15,15 @@
#include "sma.h" #include "sma.h"
extern SSmaMgmt smaMgmt;
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
/** /**
* @brief Only applicable to Rollup SMA * @brief Only applicable to Rollup SMA
@ -166,114 +168,49 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
SVnode *pVnode = pSma->pVnode;
int64_t committed = pRSmaStat->commitAppliedVer;
TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN];
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
regex_t regex;
int code = 0;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
// Resource allocation and init
if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if ((pDir = taosOpenDir(dir)) == NULL) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno);
smaDebug("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED;
}
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) {
continue;
}
code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) {
// match
int64_t version = -1;
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &version);
if ((version < committed) && (version > -1)) {
strncpy(dirEnd, entryName, TSDB_FILENAME_LEN - dirLen);
if (taosRemoveFile(dir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
dir, terrstr());
} else {
smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, dir);
}
}
} else if (code == REG_NOMATCH) {
// not match
smaTrace("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), entryName);
continue;
} else {
// has other error
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), errbuf);
taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_FAILED;
}
}
taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_SUCCESS;
}
// SQTaskFile ====================================================== // SQTaskFile ======================================================
// int32_t tCmprQTaskFile(void const *lhs, void const *rhs) {
// int64_t *lCommitted = *(int64_t *)lhs;
// SQTaskFile *rQTaskF = (SQTaskFile *)rhs;
// if (lCommitted < rQTaskF->commitID) {
// return -1;
// } else if (lCommitted > rQTaskF->commitID) {
// return 1;
// }
// return 0;
// }
#if 0
/** /**
* @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
* multiple qtaskinfo files supporting snapshot replication. * multiple qtaskinfo files supporting snapshot replication.
* *
* @param pSma * @param pSma
* @param pRSmaStat * @param pStat
* @return int32_t * @return int32_t
*/ */
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
int64_t committed = pRSmaStat->commitAppliedVer; SRSmaFS *pFS = RSMA_FS(pStat);
SArray *aTaskFile = pRSmaStat->aTaskFile; int64_t committed = pStat->commitAppliedVer;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
void *qTaskFile = taosArraySearch(aTaskFile, committed, tCmprQTaskFile, TD_LE); taosWLockLatch(RSMA_FS_LOCK(pStat));
for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosRemoveFile(qTaskInfoFullName) < 0) {
smaWarn("vgId:%d, cleanup qinf, failed to remove %s since %s", TD_VID(pVnode), qTaskInfoFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, cleanup qinf, success to remove %s", TD_VID(pVnode), qTaskInfoFullName);
}
taosArrayRemove(pFS->aQTaskInf, i);
continue;
}
++i;
}
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
if (tdRSmaFSUpsertQTaskFile(pFS, &qFile) < 0) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
return TSDB_CODE_FAILED;
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
/** /**
* @brief post-commit for rollup sma * @brief post-commit for rollup sma
@ -290,8 +227,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
// cleanup outdated qtaskinfo files tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -427,8 +363,8 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SArray *rsmaDeleted = NULL; SRSmaInfoItem *pItem = NULL;
// step 1: merge qTaskInfo and iQTaskInfo // step 1: merge qTaskInfo and iQTaskInfo
// lock // lock
@ -441,11 +377,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo); int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) { if (refVal == 0) {
if (!rsmaDeleted) { taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) {
taosArrayPush(rsmaDeleted, pSuid);
}
}
} else { } else {
smaDebug( smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
@ -471,25 +403,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
#endif #endif
} }
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
}
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
}
taosArrayDestroy(rsmaDeleted);
// unlock // unlock
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

View File

@ -28,6 +28,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
static int32_t tdRsmaStartExecutor(const SSma *pSma); static int32_t tdRsmaStartExecutor(const SSma *pSma);
static int32_t tdRsmaStopExecutor(const SSma *pSma); static int32_t tdRsmaStopExecutor(const SSma *pSma);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeTSmaStat(STSmaStat *pStat); static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat); static void tdDestroyRSmaStat(void *pRSmaStat);
@ -59,12 +61,23 @@ int32_t smaInit() {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
smaMgmt.refHash = taosHashInit(1, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
if (!smaMgmt.refHash) {
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr());
return TSDB_CODE_FAILED;
}
// init fetch timer handle // init fetch timer handle
smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!smaMgmt.tmrHandle) { if (!smaMgmt.tmrHandle) {
taosCloseRef(smaMgmt.rsetId); taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
atomic_store_8(&smaMgmt.inited, 0); atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr()); smaError("failed to init sma tmr handle since %s", terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -93,6 +106,7 @@ void smaCleanUp() {
if (old == 1) { if (old == 1) {
taosCloseRef(smaMgmt.rsetId); taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
taosTmrCleanUp(smaMgmt.tmrHandle); taosTmrCleanUp(smaMgmt.tmrHandle);
smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle); smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
atomic_store_8(&smaMgmt.inited, 0); atomic_store_8(&smaMgmt.inited, 0);
@ -195,6 +209,21 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
return 0; return 0;
} }
static void tRSmaInfoHashFreeNode(void *data) {
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfoItem *pItem = NULL;
if ((pRSmaInfo = *(SRSmaInfo **)data)) {
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 0)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
}
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
}
tdFreeRSmaInfo(NULL, pRSmaInfo, true);
}
}
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) { static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
ASSERT(pSmaStat != NULL); ASSERT(pSmaStat != NULL);
@ -240,10 +269,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (!RSMA_INFO_HASH(pRSmaStat)) { if (!RSMA_INFO_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode);
if (tdRsmaStartExecutor(pSma) < 0) { if (tdRsmaStartExecutor(pSma) < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (!(RSMA_FS(pRSmaStat)->aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO // TODO
} else { } else {
@ -278,14 +313,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
tsem_destroy(&(pStat->notEmpty)); tsem_destroy(&(pStat->notEmpty));
// step 2: destroy the rsma info and associated fetch tasks // step 2: destroy the rsma info and associated fetch tasks
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) {
SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pSma, pSmaInfo, true);
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
}
taosHashCleanup(RSMA_INFO_HASH(pStat)); taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: wait for all triggered fetch tasks to finish // step 3: wait for all triggered fetch tasks to finish
@ -307,12 +334,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 4: // step 4:
tdRsmaStopExecutor(pSma); tdRsmaStopExecutor(pSma);
// step 5: free pStat // step 5:
tdRSmaFSClose(RSMA_FS(pStat));
// step 6: free pStat
taosMemoryFreeClear(pStat); taosMemoryFreeClear(pStat);
} }
} }
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
tdDestroySmaState(pSmaStat, smaType); tdDestroySmaState(pSmaStat, smaType);
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
taosMemoryFreeClear(pSmaStat); taosMemoryFreeClear(pSmaStat);
@ -329,7 +359,7 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
* @return int32_t * @return int32_t
*/ */
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) { if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat)); tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
@ -337,7 +367,7 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat; SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
int32_t vid = SMA_VID(pRSmaStat->pSma); int32_t vid = SMA_VID(pRSmaStat->pSma);
int64_t refId = RSMA_REF_ID(pRSmaStat); int64_t refId = RSMA_REF_ID(pRSmaStat);
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { if (taosRemoveRef(smaMgmt.rsetId, refId) < 0) {
smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId, smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
smaMgmt.rsetId, terrstr()); smaMgmt.rsetId, terrstr());
} else { } else {

View File

@ -0,0 +1,252 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
// =================================================================================================
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
/**
* @brief Open RSma FS from qTaskInfo files
*
* @param pSma
* @param version
* @return int32_t
*/
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
SVnode *pVnode = pSma->pVnode;
int64_t commitID = pVnode->state.commitID;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SArray *output = NULL;
terrno = TSDB_CODE_SUCCESS;
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) {
goto _end;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
int32_t vid = 0;
int64_t version = -1;
sscanf((const char *)taosArrayGetP(output, i), "v%dqinfo.v%" PRIi64, &vid, &version);
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
goto _end;
}
smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%" PRIi64, TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef);
}
_end:
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
void *ptr = taosArrayGetP(output, i);
taosMemoryFreeClear(ptr);
}
taosArrayDestroy(output);
if (terrno != TSDB_CODE_SUCCESS) {
smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); }
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) {
return -1;
} else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) {
return 1;
}
return 0;
}
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if ((pTaskF = taosArraySearch(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if ((idx = taosArraySearchIdx(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, idx);
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
/**
* @brief Fetch qtaskfiles LE than version
*
* @param pSma
* @param version
* @param output
* @return int32_t
*/
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) {
SVnode *pVnode = pSma->pVnode;
TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN];
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
regex_t regex;
int code = 0;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
if (!taosCheckExistFile(dir)) {
smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir);
return TSDB_CODE_SUCCESS;
}
// Resource allocation and init
if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if (!(pDir = taosOpenDir(dir))) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED;
}
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir))) {
char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) {
continue;
}
code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) {
// match
smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName);
int64_t ver = -1;
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver);
if ((ver <= version) && (ver > -1)) {
if (!(*output)) {
if (!(*output = taosArrayInit(1, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
char *entryDup = strdup(entryName);
if (!entryDup) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!taosArrayPush(*output, &entryDup)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
} else {
}
} else if (code == REG_NOMATCH) {
// not match
smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName);
continue;
} else {
// has other error
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf);
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
goto _end;
}
}
_end:
taosCloseDir(&pDir);
regfree(&regex);
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) {
if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) {
return -1;
} else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) {
return 1;
}
return 0;
}
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn2, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aQTaskInf);
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile);
if (c == 0) {
pTaskF->nRef = qTaskFile->nRef;
pTaskF->version = qTaskFile->version;
pTaskF->size = qTaskFile->size;
goto _exit;
}
}
if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}

View File

@ -150,7 +150,7 @@ int32_t smaOpen(SVnode *pVnode) {
} }
// restore the rsma // restore the rsma
if (tdRsmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) { if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
goto _err; goto _err;
} }
} }
@ -181,8 +181,8 @@ int32_t smaClose(SSma *pSma) {
* @param committedVer * @param committedVer
* @return int32_t * @return int32_t
*/ */
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode)); ASSERT(VND_IS_RSMA(pSma->pVnode));
return tdProcessRSmaRestoreImpl(pSma, type, committedVer); return tdRSmaProcessRestoreImpl(pSma, type, committedVer);
} }

View File

@ -20,7 +20,7 @@
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (900000) // ms #define RSMA_FETCH_DELAY_MAX (900000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
@ -42,10 +42,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems); static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid); int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
@ -96,7 +98,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC // Note: free/kill may in RC
if (!taskHandle || !(*taskHandle)) return; if (!taskHandle || !(*taskHandle)) return;
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
@ -119,27 +121,27 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
*/ */
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) { if (pInfo) {
int32_t vid = pSma ? SMA_VID(pSma) : -1;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree && pItem->tmrId) { if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid, smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", vid, pItem->tmrId, pInfo->suid, i + 1);
pItem->tmrId, i + 1);
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
} }
if (isDeepFree && pInfo->taskInfo[i]) { if (isDeepFree && pInfo->taskInfo[i]) {
tdFreeQTaskInfo(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], vid, i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", vid,
pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
if (pInfo->iTaskInfo[i]) { if (pInfo->iTaskInfo[i]) {
tdFreeQTaskInfo(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], vid, i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", vid,
SMA_VID(pSma), pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
} }
if (isDeepFree) { if (isDeepFree) {
@ -313,10 +315,17 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
ASSERT(pItem->level > 0);
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32, ", finally maxdelay:%" PRIi32,
TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); TD_VID(pVnode), pItem, pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx],
pItem->maxDelay);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -330,7 +339,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
* @param tbName * @param tbName
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -383,7 +392,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err; goto _err;
} }
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
pRSmaInfo->refId = RSMA_REF_ID(pStat);
T_REF_INIT_VAL(pRSmaInfo, 1); T_REF_INIT_VAL(pRSmaInfo, 1);
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
@ -427,7 +435,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
} }
/** /**
@ -654,15 +662,15 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
if (taosArrayGetSize(pResList) == 0) { if (taosArrayGetSize(pResList) == 0) {
if (terrno == 0) { if (terrno == 0) {
// smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); // smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else { } else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err; goto _err;
} }
break; break;
} else { } else {
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); smaDebug("vgId:%d, rsma level %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
} }
#if 0 #if 0
char flag[10] = {0}; char flag[10] = {0};
@ -676,21 +684,22 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", smaError("vgId:%d, build submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
SMA_VID(pSma), suid, pItem->level, terrstr()); suid, pItem->level, terrstr());
goto _err; goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr()); SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err; goto _err;
} }
taosMemoryFreeClear(pReq);
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
SMA_VID(pSma), suid, pItem->level, output->info.version); SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen));
taosMemoryFreeClear(pReq);
} }
} }
@ -817,6 +826,95 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
ASSERT(!dstTaskInfo);
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!dstTaskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err;
}
if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
}
/**
* @brief Clone qTaskInfo of SRSmaInfo
*
* @param pSma
* @param pInfo
* @return int32_t
*/
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
SRSmaParam *param = NULL;
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pInfo->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED;
}
/** /**
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
* *
@ -848,7 +946,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL; return NULL;
} }
if (!pRSmaInfo->taskInfo[0]) { if (!pRSmaInfo->taskInfo[0]) {
if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) {
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); // taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
@ -1006,7 +1104,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
" qmsgLen:%" PRIi32, " qmsgLen:%" PRIi32,
TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
} }
if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err; goto _err;
} }
@ -1118,7 +1216,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) { int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
// step 1: iterate all stables to restore the rsma env // step 1: iterate all stables to restore the rsma env
int64_t nTables = 0; int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
@ -1139,6 +1237,12 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
if (tdRSmaRestoreTSDataReload(pSma) < 0) { if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err; goto _err;
} }
// step 4: open SRSmaFS for qTaskFiles
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
goto _err;
}
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer); smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
@ -1468,26 +1572,46 @@ _err:
* @param tmrId * @param tmrId
*/ */
static void tdRSmaFetchTrigger(void *param, void *tmrId) { static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param; SRSmaRef *pRSmaRef = NULL;
SSma *pSma = NULL; SSma *pSma = NULL;
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfoItem *pItem = NULL;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%" PRIi64, param,
pRSmaInfo->refId); *(int64_t *)&param, smaMgmt.refHash, smaMgmt.rsetId);
return; return;
} }
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
if (!pStat) {
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
smaMgmt.rsetId, pRSmaInfo->refId); smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return; return;
} }
pSma = pStat->pSma; pSma = pStat->pSma;
if (!(pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid))) {
smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return;
}
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return;
}
pItem = *(SRSmaInfoItem **)&param;
// if rsma trigger stat in paused, cancelled or finished, not start fetch task // if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
switch (rsmaTriggerStat) { switch (rsmaTriggerStat) {
@ -1495,11 +1619,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_CANCELLED: { case TASK_TRIGGER_STAT_CANCELLED: {
smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
", rsetId rsetId:%" PRIi64 " refId:%d", ", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} }
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
return; return;
} }
default: default:
@ -1511,7 +1636,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
switch (fetchTriggerStat) { switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
pItem->fetchLevel = pItem->level; pItem->fetchLevel = pItem->level;
#if 0 #if 0
@ -1544,11 +1669,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
_end: _end:
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
} }
static void tdFreeRSmaSubmitItems(SArray *pItems) { static void tdFreeRSmaSubmitItems(SArray *pItems) {
ASSERT(taosArrayGetSize(pItems) > 0);
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
taosFreeQitem(*(void **)taosArrayGet(pItems, i)); taosFreeQitem(*(void **)taosArrayGet(pItems, i));
} }
@ -1560,10 +1685,9 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
* *
* @param pSma * @param pSma
* @param pInfo * @param pInfo
* @param pSubmitArr
* @return int32_t * @return int32_t
*/ */
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
@ -1609,10 +1733,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmi
} }
_end: _end:
tdReleaseRSmaInfo(pSma, pInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
tdReleaseRSmaInfo(pSma, pInfo);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -1709,7 +1831,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (oldStat == 0 || if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); tdRSmaFetchAllResult(pSma, pInfo);
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
} }

View File

@ -15,11 +15,13 @@
#include "sma.h" #include "sma.h"
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData); static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version);
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader);
// SRsmaSnapReader ======================================== // SRSmaSnapReader ========================================
struct SRsmaSnapReader { struct SRSmaSnapReader {
SSma* pSma; SSma* pSma;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
@ -33,13 +35,13 @@ struct SRsmaSnapReader {
SQTaskFReader* pQTaskFReader; SQTaskFReader* pQTaskFReader;
}; };
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) { int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
SVnode* pVnode = pSma->pVnode; SVnode* pVnode = pSma->pVnode;
SRsmaSnapReader* pReader = NULL; SRSmaSnapReader* pReader = NULL;
// alloc // alloc
pReader = (SRsmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) { if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
@ -48,7 +50,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
// rsma1/rsma2 // open rsma1/rsma2
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) { if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2, code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
@ -59,51 +61,112 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
} }
} }
// qtaskinfo // open qtaskinfo
// 1. add ref to qtaskinfo.v${ever} if exists and then start to replicate if ((code = rsmaQTaskInfSnapReaderOpen(pReader, ever)) < 0) {
char qTaskInfoFullName[TSDB_FILENAME_LEN]; goto _err;
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (!taosCheckExistFile(qTaskInfoFullName)) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo not need as %s not exists", TD_VID(pVnode),
qTaskInfoFullName);
} else {
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
TdFilePtr qTaskF = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
if (!qTaskF) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pReader->pQTaskFReader->pReadH = qTaskF;
#if 0
SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask;
pQTaskF->nRef = 1;
#endif
} }
*ppReader = pReader; *ppReader = pReader;
smaInfo("vgId:%d, vnode snapshot rsma reader opened %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("vgId:%d, vnode snapshot rsma reader opened failed since %s", TD_VID(pVnode), tstrerror(code)); smaError("vgId:%d, vnode snapshot rsma reader open failed since %s", TD_VID(pVnode), tstrerror(code));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf) { static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
SVnode* pVnode = pSma->pVnode;
SSmaEnv* pEnv = NULL;
SRSmaStat* pStat = NULL;
if (!(pEnv = SMA_RSMA_ENV(pSma))) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as env is NULL",
TD_VID(pVnode), version);
return TSDB_CODE_SUCCESS;
}
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
int32_t ref = tdRSmaFSRef(pReader->pSma, pStat, version);
if (ref < 1) {
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as ref is %d",
TD_VID(pVnode), version, ref);
return TSDB_CODE_SUCCESS;
}
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (!taosCheckExistFile(qTaskInfoFullName)) {
tdRSmaFSUnRef(pSma, pStat, version);
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as %s not exists",
TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_SUCCESS;
}
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
if (!pReader->pQTaskFReader) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
TdFilePtr fp = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFreeClear(pReader->pQTaskFReader);
goto _end;
}
pReader->pQTaskFReader->pReadH = fp;
pReader->pQTaskFReader->pSma = pSma;
pReader->pQTaskFReader->version = pReader->ever;
_end:
if (code < 0) {
tdRSmaFSUnRef(pSma, pStat, version);
smaError("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
return TSDB_CODE_SUCCESS;
}
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) {
if (!(*ppReader)) {
return TSDB_CODE_SUCCESS;
}
SSma* pSma = (*ppReader)->pSma;
SRSmaStat* pStat = SMA_RSMA_STAT(pSma);
int64_t version = (*ppReader)->version;
taosCloseFile(&(*ppReader)->pReadH);
tdRSmaFSUnRef(pSma, pStat, version);
taosMemoryFreeClear(*ppReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version);
return TSDB_CODE_SUCCESS;
}
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
int32_t code = 0; int32_t code = 0;
SSma* pSma = pReader->pSma; SSma* pSma = pReader->pSma;
int64_t n = 0; int64_t n = 0;
uint8_t* pBuf = NULL; uint8_t* pBuf = NULL;
SQTaskFReader* qReader = pReader->pQTaskFReader; SQTaskFReader* qReader = pReader->pQTaskFReader;
if (!qReader) {
*ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", SMA_VID(pSma));
return 0;
}
if (!qReader->pReadH) { if (!qReader->pReadH) {
*ppBuf = NULL; *ppBuf = NULL;
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is empty", SMA_VID(pSma)); smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is NULL", SMA_VID(pSma));
return 0; return 0;
} }
@ -153,7 +216,7 @@ _err:
return code; return code;
} }
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) { int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
*ppData = NULL; *ppData = NULL;
@ -205,9 +268,9 @@ _err:
return code; return code;
} }
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) { int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
SRsmaSnapReader* pReader = *ppReader; SRSmaSnapReader* pReader = *ppReader;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) { if (pReader->pDataReader[i]) {
@ -215,11 +278,7 @@ int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
} }
} }
if (pReader->pQTaskFReader) { rsmaQTaskInfSnapReaderClose(&pReader->pQTaskFReader);
taosCloseFile(&pReader->pQTaskFReader->pReadH);
taosMemoryFreeClear(pReader->pQTaskFReader);
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
}
smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma)); smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
@ -227,8 +286,8 @@ int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
return code; return code;
} }
// SRsmaSnapWriter ======================================== // SRSmaSnapWriter ========================================
struct SRsmaSnapWriter { struct SRSmaSnapWriter {
SSma* pSma; SSma* pSma;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
@ -244,13 +303,13 @@ struct SRsmaSnapWriter {
SQTaskFWriter* pQTaskFWriter; SQTaskFWriter* pQTaskFWriter;
}; };
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) { int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
SRsmaSnapWriter* pWriter = NULL; SRSmaSnapWriter* pWriter = NULL;
SVnode* pVnode = pSma->pVnode; SVnode* pVnode = pSma->pVnode;
// alloc // alloc
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
@ -301,9 +360,9 @@ _err:
return code; return code;
} }
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
SRsmaSnapWriter* pWriter = *ppWriter; SRSmaSnapWriter* pWriter = *ppWriter;
SVnode* pVnode = pWriter->pSma->pVnode; SVnode* pVnode = pWriter->pSma->pVnode;
if (rollback) { if (rollback) {
@ -332,7 +391,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
pWriter->pQTaskFWriter->fname, qTaskInfoFullName); pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
// rsma restore // rsma restore
if ((code = tdRsmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) { if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
goto _err; goto _err;
} }
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName); smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
@ -349,7 +408,7 @@ _err:
return code; return code;
} }
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
@ -377,7 +436,7 @@ _err:
return code; return code;
} }
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
SQTaskFWriter* qWriter = pWriter->pQTaskFWriter; SQTaskFWriter* qWriter = pWriter->pQTaskFWriter;

View File

@ -20,6 +20,10 @@
#define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_MINUTES_DAY 1440
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// TODO: Who is responsible for resource allocate and release? // TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -59,7 +63,7 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *
* @param days unit is minute * @param days unit is minute
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pCont, contLen); tDecoderInit(&coder, pCont, contLen);
@ -106,7 +110,7 @@ _err:
* @param pMsg * @param pMsg
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
SSmaCfg *pCfg = (SSmaCfg *)pMsg; SSmaCfg *pCfg = (SSmaCfg *)pMsg;
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
@ -145,7 +149,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
const SArray *pDataBlocks = (const SArray *)msg; const SArray *pDataBlocks = (const SArray *)msg;
// TODO: destroy SSDataBlocks(msg) // TODO: destroy SSDataBlocks(msg)
if (!pDataBlocks) { if (!pDataBlocks) {

View File

@ -306,92 +306,3 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
ASSERT(!dstTaskInfo);
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!dstTaskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err;
}
if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
}
/**
* @brief Clone qTaskInfo of SRSmaInfo
*
* @param pSma
* @param pInfo
* @return int32_t
*/
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
SRSmaParam *param = NULL;
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pInfo->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED;
}

View File

@ -39,7 +39,7 @@ struct SVSnapReader {
SStreamStateReader *pStreamStateReader; SStreamStateReader *pStreamStateReader;
// rsma // rsma
int8_t rsmaDone; int8_t rsmaDone;
SRsmaSnapReader *pRsmaReader; SRSmaSnapReader *pRsmaReader;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
@ -241,7 +241,7 @@ struct SVSnapWriter {
SStreamTaskWriter *pStreamTaskWriter; SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter; SStreamStateWriter *pStreamStateWriter;
// rsma // rsma
SRsmaSnapWriter *pRsmaSnapWriter; SRSmaSnapWriter *pRsmaSnapWriter;
}; };
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {

View File

@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists"
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match")
//index //index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")