feat: persistence of rsma qtaskinfo
This commit is contained in:
parent
ee7ace765b
commit
0a24fbcdd6
|
@ -71,6 +71,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
|
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
|
||||||
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
|
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
|
||||||
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
|
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
|
||||||
|
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
|
||||||
|
|
||||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
|
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
|
||||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
|
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
|
||||||
|
|
|
@ -29,6 +29,7 @@ target_sources(
|
||||||
# sma
|
# sma
|
||||||
"src/sma/sma.c"
|
"src/sma/sma.c"
|
||||||
"src/sma/smaEnv.c"
|
"src/sma/smaEnv.c"
|
||||||
|
"src/sma/smaUtil.c"
|
||||||
"src/sma/smaOpen.c"
|
"src/sma/smaOpen.c"
|
||||||
"src/sma/smaRollup.c"
|
"src/sma/smaRollup.c"
|
||||||
"src/sma/smaTimeRange.c"
|
"src/sma/smaTimeRange.c"
|
||||||
|
|
|
@ -34,7 +34,8 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SSmaEnv SSmaEnv;
|
typedef struct SSmaEnv SSmaEnv;
|
||||||
typedef struct SSmaStat SSmaStat;
|
typedef struct SSmaStat SSmaStat;
|
||||||
typedef struct SSmaStatItem SSmaStatItem;
|
typedef struct STSmaStat STSmaStat;
|
||||||
|
typedef struct SRSmaStat SRSmaStat;
|
||||||
typedef struct SSmaKey SSmaKey;
|
typedef struct SSmaKey SSmaKey;
|
||||||
typedef struct SRSmaInfo SRSmaInfo;
|
typedef struct SRSmaInfo SRSmaInfo;
|
||||||
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
||||||
|
@ -45,26 +46,38 @@ struct SSmaEnv {
|
||||||
SSmaStat *pStat;
|
SSmaStat *pStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||||
#define SMA_ENV_TYPE(env) ((env)->type)
|
#define SMA_ENV_TYPE(env) ((env)->type)
|
||||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||||
#define SMA_ENV_STAT_ITEM(env) ((env)->pStat->tsmaStatItem)
|
|
||||||
|
|
||||||
struct SSmaStatItem {
|
struct STSmaStat {
|
||||||
int8_t state; // ETsdbSmaStat
|
int8_t state; // ETsdbSmaStat
|
||||||
STSma *pTSma; // cache schema
|
STSma *pTSma; // cache schema
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SRSmaStat {
|
||||||
|
SSma *pSma;
|
||||||
|
void *tmrHandle;
|
||||||
|
tmr_h tmrId;
|
||||||
|
int8_t tmrStat;
|
||||||
|
int32_t tmrSeconds;
|
||||||
|
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||||
|
};
|
||||||
|
|
||||||
struct SSmaStat {
|
struct SSmaStat {
|
||||||
union {
|
union {
|
||||||
SSmaStatItem tsmaStatItem;
|
STSmaStat tsmaStat; // time-range-wise sma
|
||||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
SRSmaStat rsmaStat; // rollup sma
|
||||||
};
|
};
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
};
|
};
|
||||||
#define SMA_STAT_ITEM(s) ((s)->tsmaStatItem)
|
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
|
||||||
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
|
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
|
||||||
|
#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash)
|
||||||
|
#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle)
|
||||||
|
#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat)
|
||||||
|
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
|
||||||
|
|
||||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
|
@ -107,53 +120,51 @@ static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int8_t tdSmaStat(SSmaStatItem *pStatItem) {
|
static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) {
|
||||||
if (pStatItem) {
|
if (pTStat) {
|
||||||
return atomic_load_8(&pStatItem->state);
|
return atomic_load_8(&pTStat->state);
|
||||||
}
|
}
|
||||||
return TSDB_SMA_STAT_UNKNOWN;
|
return TSDB_SMA_STAT_UNKNOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool tdSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
|
static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) {
|
||||||
if (!pStatItem) {
|
if (!pTStat) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
*state = atomic_load_8(&pStatItem->state);
|
*state = atomic_load_8(&pTStat->state);
|
||||||
return *state == TSDB_SMA_STAT_OK;
|
return *state == TSDB_SMA_STAT_OK;
|
||||||
}
|
}
|
||||||
return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
|
return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool tdSmaStatIsExpired(SSmaStatItem *pStatItem) {
|
static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) {
|
||||||
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
|
return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool tdSmaStatIsDropped(SSmaStatItem *pStatItem) {
|
static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) {
|
||||||
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
|
return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tdSmaStatSetOK(SSmaStatItem *pStatItem) {
|
static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) {
|
||||||
if (pStatItem) {
|
if (pTStat) {
|
||||||
atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
|
atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tdSmaStatSetExpired(SSmaStatItem *pStatItem) {
|
static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) {
|
||||||
if (pStatItem) {
|
if (pTStat) {
|
||||||
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
|
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) {
|
static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
|
||||||
if (pStatItem) {
|
if (pTStat) {
|
||||||
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
|
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
|
|
||||||
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
|
|
||||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
|
|
||||||
|
@ -163,6 +174,50 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
|
||||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
||||||
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
||||||
|
typedef struct STFInfo STFInfo;
|
||||||
|
typedef struct STFile STFile;
|
||||||
|
|
||||||
|
struct STFInfo {
|
||||||
|
uint32_t magic;
|
||||||
|
uint32_t fver;
|
||||||
|
uint64_t size;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct STFile {
|
||||||
|
STFInfo info;
|
||||||
|
STfsFile f;
|
||||||
|
TdFilePtr pFile;
|
||||||
|
uint8_t state;
|
||||||
|
};
|
||||||
|
|
||||||
|
#define TD_FILE_F(tf) (&((tf)->f))
|
||||||
|
#define TD_FILE_PFILE(tf) ((tf)->pFile)
|
||||||
|
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
|
||||||
|
#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname)
|
||||||
|
#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname)
|
||||||
|
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
|
||||||
|
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
|
||||||
|
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
|
||||||
|
#define TD_FILE_STATE(tf) ((tf)->state)
|
||||||
|
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||||
|
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
|
||||||
|
#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK)
|
||||||
|
#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD)
|
||||||
|
|
||||||
|
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
|
||||||
|
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);
|
||||||
|
int32_t tdOpenTFile(STFile *pTFile, int flags);
|
||||||
|
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
|
||||||
|
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
|
||||||
|
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
|
||||||
|
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
|
||||||
|
int32_t tdRemoveTFile(STFile *pTFile);
|
||||||
|
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
|
||||||
|
int32_t tdUpdateTFileHeader(STFile *pTFile);
|
||||||
|
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
|
void tdCloseTFile(STFile *pTFile);
|
||||||
|
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -265,8 +265,8 @@ struct SMCtbCursor {
|
||||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
|
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
|
||||||
SMCtbCursor *pCtbCur = NULL;
|
SMCtbCursor *pCtbCur = NULL;
|
||||||
SCtbIdxKey ctbIdxKey;
|
SCtbIdxKey ctbIdxKey;
|
||||||
int ret;
|
int ret = 0;
|
||||||
int c;
|
int c = 0;
|
||||||
|
|
||||||
pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
|
pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
|
||||||
if (pCtbCur == NULL) {
|
if (pCtbCur == NULL) {
|
||||||
|
|
|
@ -21,9 +21,10 @@ typedef struct SSmaStat SSmaStat;
|
||||||
|
|
||||||
// declaration of static functions
|
// declaration of static functions
|
||||||
|
|
||||||
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
|
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
|
||||||
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
|
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
|
||||||
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
|
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
|
||||||
|
static void *tdFreeTSmaStat(STSmaStat *pStat);
|
||||||
|
|
||||||
// implementation
|
// implementation
|
||||||
|
|
||||||
|
@ -45,7 +46,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
|
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
|
||||||
tdFreeSmaEnv(pEnv);
|
tdFreeSmaEnv(pEnv);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -105,7 +106,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
|
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
|
||||||
ASSERT(pSmaStat != NULL);
|
ASSERT(pSmaStat != NULL);
|
||||||
|
|
||||||
if (*pSmaStat) { // no lock
|
if (*pSmaStat) { // no lock
|
||||||
|
@ -125,10 +126,23 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||||
SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit(
|
SMA_RSMA_STAT(*pSmaStat)->pSma = (SSma*)pSma;
|
||||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
// init timer
|
||||||
|
SMA_RSMA_TMR_HANDLE(*pSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA_G");
|
||||||
|
if (!SMA_RSMA_TMR_HANDLE(*pSmaStat)) {
|
||||||
|
taosMemoryFreeClear(*pSmaStat);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
if (!SMA_STAT_INFO_HASH(*pSmaStat)) {
|
atomic_store_8(&SMA_RSMA_TMR_STAT(*pSmaStat), TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
|
||||||
|
// init hash
|
||||||
|
SMA_RSMA_INFO_HASH(*pSmaStat) = taosHashInit(
|
||||||
|
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||||
|
if (!SMA_RSMA_INFO_HASH(*pSmaStat)) {
|
||||||
|
if (SMA_RSMA_TMR_HANDLE(*pSmaStat)) {
|
||||||
|
taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(*pSmaStat));
|
||||||
|
}
|
||||||
taosMemoryFreeClear(*pSmaStat);
|
taosMemoryFreeClear(*pSmaStat);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -141,16 +155,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
|
static void *tdFreeTSmaStat(STSmaStat *pStat) {
|
||||||
if (pSmaStatItem) {
|
if (pStat) {
|
||||||
tDestroyTSma(pSmaStatItem->pTSma);
|
tDestroyTSma(pStat->pTSma);
|
||||||
taosMemoryFreeClear(pSmaStatItem->pTSma);
|
taosMemoryFreeClear(pStat->pTSma);
|
||||||
taosMemoryFreeClear(pSmaStatItem);
|
taosMemoryFreeClear(pStat);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||||
tdDestroySmaState(pSmaStat, smaType);
|
tdDestroySmaState(pSmaStat, smaType);
|
||||||
taosMemoryFreeClear(pSmaStat);
|
taosMemoryFreeClear(pSmaStat);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -165,16 +179,19 @@ void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||||
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||||
if (pSmaStat) {
|
if (pSmaStat) {
|
||||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||||
tdFreeSmaStatItem(&pSmaStat->tsmaStatItem);
|
tdFreeTSmaStat(&pSmaStat->tsmaStat);
|
||||||
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||||
|
if (SMA_RSMA_TMR_HANDLE(pSmaStat)) {
|
||||||
|
taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(pSmaStat));
|
||||||
|
}
|
||||||
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||||
void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL);
|
void *infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), NULL);
|
||||||
while (infoHash) {
|
while (infoHash) {
|
||||||
SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
|
SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
|
||||||
tdFreeRSmaInfo(pInfoHash);
|
tdFreeRSmaInfo(pInfoHash);
|
||||||
infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash);
|
infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), infoHash);
|
||||||
}
|
}
|
||||||
taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat));
|
taosHashCleanup(SMA_RSMA_INFO_HASH(pSmaStat));
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,3 +138,16 @@ int32_t smaClose(SSma *pSma) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief rsma env restore
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaRestore(SSma *pSma) {
|
||||||
|
if (!pSma) return 0;
|
||||||
|
// iterate all stables to restore the rsma env
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -16,12 +16,17 @@
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
|
||||||
|
typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T;
|
||||||
|
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
|
||||||
|
|
||||||
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle,
|
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle,
|
||||||
int8_t idx);
|
int8_t idx);
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
||||||
tb_uid_t suid, int8_t level);
|
tb_uid_t suid, int8_t level);
|
||||||
|
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||||
|
static void tdRSmaPersistTrigger(void *param, void *tmrId);
|
||||||
|
|
||||||
struct SRSmaInfoItem {
|
struct SRSmaInfoItem {
|
||||||
SRSmaInfo *pRsmaInfo;
|
SRSmaInfo *pRsmaInfo;
|
||||||
|
@ -33,14 +38,6 @@ struct SRSmaInfoItem {
|
||||||
int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
|
int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
|
||||||
int32_t maxDelay;
|
int32_t maxDelay;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t suid;
|
|
||||||
SRSmaInfoItem *pItem;
|
|
||||||
SSma *pSma;
|
|
||||||
STSchema *pTSchema;
|
|
||||||
} SRSmaTriggerParam;
|
|
||||||
|
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
SSma *pSma;
|
SSma *pSma;
|
||||||
|
@ -95,7 +92,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
|
pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
|
||||||
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||||
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
|
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
|
||||||
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||||
|
@ -164,7 +161,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
||||||
|
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
SHashObj *infoHash = NULL;
|
SHashObj *infoHash = NULL;
|
||||||
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
|
if (!pStat || !(infoHash = SMA_RSMA_INFO_HASH(pStat))) {
|
||||||
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -257,7 +254,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
|
pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
|
||||||
if (pRSmaInfo) {
|
if (pRSmaInfo) {
|
||||||
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
|
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
|
||||||
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||||
|
@ -300,7 +297,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
|
if (taosHashPut(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
|
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
|
||||||
|
@ -449,8 +446,9 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
|
||||||
if (!output) {
|
if (!output) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
pResult = taosArrayInit(0, sizeof(SSDataBlock));
|
pResult = taosArrayInit(1, sizeof(SSDataBlock));
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -461,7 +459,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pResult) > 0) {
|
if (taosArrayGetSize(pResult) > 0) {
|
||||||
#if 0
|
#if 1
|
||||||
char flag[10] = {0};
|
char flag[10] = {0};
|
||||||
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
||||||
blockDebugShowData(pResult, flag);
|
blockDebugShowData(pResult, flag);
|
||||||
|
@ -469,14 +467,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
|
||||||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
|
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
|
||||||
SSubmitReq *pReq = NULL;
|
SSubmitReq *pReq = NULL;
|
||||||
if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) {
|
if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) {
|
||||||
taosArrayDestroy(pResult);
|
goto _err;
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
|
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
|
||||||
taosArrayDestroy(pResult);
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
return TSDB_CODE_FAILED;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
|
@ -489,7 +485,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pResult);
|
taosArrayDestroy(pResult);
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
taosArrayDestroy(pResult);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -498,13 +497,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
|
||||||
* @param param
|
* @param param
|
||||||
* @param tmrId
|
* @param tmrId
|
||||||
*/
|
*/
|
||||||
static void rsmaTriggerByTimer(void *param, void *tmrId) {
|
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
// SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param;
|
|
||||||
// SRSmaInfoItem *pItem = pParam->pItem;
|
|
||||||
SRSmaInfoItem *pItem = param;
|
SRSmaInfoItem *pItem = param;
|
||||||
|
|
||||||
if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
|
if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid);
|
smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is active for tb suid:%" PRIi64, __func__, __LINE__,
|
||||||
|
taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
|
|
||||||
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
|
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
|
||||||
|
@ -512,10 +510,11 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) {
|
||||||
|
|
||||||
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||||
} else {
|
} else {
|
||||||
smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid);
|
smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is inactive for tb suid:%" PRIi64, __func__, __LINE__,
|
||||||
|
taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
}
|
}
|
||||||
|
|
||||||
// taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
|
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
|
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
|
||||||
|
@ -528,16 +527,20 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
|
||||||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
||||||
pItem->taskInfo, suid);
|
pItem->taskInfo, suid);
|
||||||
|
|
||||||
// inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1)
|
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // STREAM_DATA_TYPE_SUBMIT_BLOCK
|
||||||
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) {
|
|
||||||
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema};
|
|
||||||
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||||
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
|
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
|
smaWarn("%s:%d THREAD:%" PRIi64 " process rsma insert", __func__, __LINE__, taosGetSelfPthreadId());
|
||||||
|
|
||||||
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
||||||
|
|
||||||
|
taosTmrStart(tdRSmaPersistTrigger, 5000, pStat, pStat->tmrHandle);
|
||||||
|
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -552,7 +555,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
|
|
||||||
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||||
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
|
@ -604,3 +607,106 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char* outputName) {
|
||||||
|
tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tdRSmaPersistExec(void *param) {
|
||||||
|
setThreadName("rsma-task-persist");
|
||||||
|
SRSmaStat *pRSmaStat = param;
|
||||||
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
|
STfs *pTfs = pSma->pVnode->pTfs;
|
||||||
|
int64_t toffset = 0;
|
||||||
|
|
||||||
|
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||||
|
if (!infoHash) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
STFile tFile = {0};
|
||||||
|
int32_t vid = 2;
|
||||||
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
|
tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName);
|
||||||
|
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
|
||||||
|
tdCreateTFile(&tFile, pTfs, true, -1);
|
||||||
|
|
||||||
|
while (infoHash) {
|
||||||
|
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
||||||
|
char *pOutput = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
if (qSerializeTaskStatus(pRSmaInfo->items[0].taskInfo, &pOutput, &len) < 0) {
|
||||||
|
smaError("serialize rsma task for table %" PRIi64 " failed since %s", pRSmaInfo->items[0].pRsmaInfo->suid,
|
||||||
|
terrstr(terrno));
|
||||||
|
} else {
|
||||||
|
smaWarn("serialize rsma task for table %" PRIi64 " success and len is %d", pRSmaInfo->items[0].pRsmaInfo->suid,
|
||||||
|
len);
|
||||||
|
}
|
||||||
|
tdAppendTFile(&tFile, &len, sizeof(len), &toffset);
|
||||||
|
tdAppendTFile(&tFile, pOutput, len, &toffset);
|
||||||
|
|
||||||
|
taosMemoryFree(pOutput);
|
||||||
|
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
|
||||||
|
}
|
||||||
|
_end:
|
||||||
|
|
||||||
|
if (tdUpdateTFileHeader(&tFile) < 0) {
|
||||||
|
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno));
|
||||||
|
tdCloseTFile(&tFile);
|
||||||
|
tdRemoveTFile(&tFile);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tdCloseTFile(&tFile);
|
||||||
|
|
||||||
|
char newFName[TSDB_FILENAME_LEN];
|
||||||
|
strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
|
||||||
|
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]);
|
||||||
|
strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
|
||||||
|
taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName);
|
||||||
|
|
||||||
|
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
return NULL;
|
||||||
|
_err:
|
||||||
|
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
// remove the .tmp file
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
|
||||||
|
smaWarn("%s:%d entry ", __func__, __LINE__);
|
||||||
|
TdThread threadId;
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED);
|
||||||
|
|
||||||
|
if (taosThreadCreate(&threadId, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) {
|
||||||
|
smaError("failed to create thread to persist rsma qTaskInfo since %s", strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
smaWarn("%s:%d end ", __func__, __LINE__);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief trigger to persist rsma qTaskInfo
|
||||||
|
*
|
||||||
|
* @param param
|
||||||
|
* @param tmrId
|
||||||
|
*/
|
||||||
|
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
|
SRSmaStat *pRSmaStat = param;
|
||||||
|
|
||||||
|
if (atomic_load_8(&pRSmaStat->tmrStat) == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
|
smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence start since active", __func__, __LINE__, taosGetSelfPthreadId());
|
||||||
|
atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__IN_ACTIVE);
|
||||||
|
|
||||||
|
// execution
|
||||||
|
tdRSmaPersistTask(pRSmaStat);
|
||||||
|
} else {
|
||||||
|
smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence not start since inactive", __func__, __LINE__,
|
||||||
|
taosGetSelfPthreadId());
|
||||||
|
}
|
||||||
|
|
||||||
|
taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
|
||||||
|
}
|
|
@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
|
|
||||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
||||||
SSmaStat *pStat = NULL;
|
SSmaStat *pStat = NULL;
|
||||||
SSmaStatItem *pItem = NULL;
|
STSmaStat *pItem = NULL;
|
||||||
|
|
||||||
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
|
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
|
||||||
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
||||||
|
@ -137,7 +137,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tdRefSmaStat(pSma, pStat);
|
tdRefSmaStat(pSma, pStat);
|
||||||
pItem = &pStat->tsmaStatItem;
|
pItem = &pStat->tsmaStat;
|
||||||
|
|
||||||
ASSERT(pItem);
|
ASSERT(pItem);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,236 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sma.h"
|
||||||
|
|
||||||
|
#define TD_FILE_HEAD_SIZE 512
|
||||||
|
|
||||||
|
#define TD_FILE_STATE_OK 0
|
||||||
|
#define TD_FILE_STATE_BAD 1
|
||||||
|
|
||||||
|
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
|
||||||
|
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
|
||||||
|
|
||||||
|
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedU32(buf, pInfo->magic);
|
||||||
|
tlen += taosEncodeFixedU32(buf, pInfo->fver);
|
||||||
|
tlen += taosEncodeFixedU64(buf, pInfo->size);
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
|
||||||
|
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
|
||||||
|
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
|
||||||
|
buf = taosDecodeFixedU64(buf, &(pInfo->size));
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
|
||||||
|
ASSERT(TD_FILE_OPENED(pTFile));
|
||||||
|
|
||||||
|
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
|
||||||
|
if (nwrite < nbyte) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nwrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
|
||||||
|
ASSERT(TD_FILE_OPENED(pTFile));
|
||||||
|
|
||||||
|
int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence);
|
||||||
|
if (loffset < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return loffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
|
||||||
|
ASSERT(TD_FILE_OPENED(pTFile));
|
||||||
|
|
||||||
|
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
|
||||||
|
if (nread < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nread;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdUpdateTFileHeader(STFile *pTFile) {
|
||||||
|
char buf[TD_FILE_HEAD_SIZE] = "\0";
|
||||||
|
|
||||||
|
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *ptr = buf;
|
||||||
|
tdEncodeTFInfo(&ptr, &(pTFile->info));
|
||||||
|
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE);
|
||||||
|
if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
|
||||||
|
char buf[TD_FILE_HEAD_SIZE] = "\0";
|
||||||
|
uint32_t _version;
|
||||||
|
|
||||||
|
ASSERT(TD_FILE_OPENED(pTFile));
|
||||||
|
|
||||||
|
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) {
|
||||||
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pBuf = buf;
|
||||||
|
pBuf = tdDecodeTFInfo(pBuf, pInfo);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
|
||||||
|
pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
|
||||||
|
ASSERT(TD_FILE_OPENED(pTFile));
|
||||||
|
|
||||||
|
int64_t toffset;
|
||||||
|
|
||||||
|
if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pTFile->info.size == toffset);
|
||||||
|
|
||||||
|
if (offset) {
|
||||||
|
*offset = toffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdWriteTFile(pTFile, buf, nbyte) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTFile->info.size += nbyte;
|
||||||
|
|
||||||
|
return nbyte;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdOpenTFile(STFile *pTFile, int flags) {
|
||||||
|
ASSERT(!TD_FILE_OPENED(pTFile));
|
||||||
|
|
||||||
|
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags);
|
||||||
|
if (pTFile->pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tdCloseTFile(STFile *pTFile) {
|
||||||
|
if (TD_FILE_OPENED(pTFile)) {
|
||||||
|
taosCloseFile(&pTFile->pFile);
|
||||||
|
TD_FILE_SET_CLOSED(pTFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName) {
|
||||||
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vid, dname, fname);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
|
||||||
|
char fullname[TSDB_FILENAME_LEN];
|
||||||
|
SDiskID did = {0};
|
||||||
|
|
||||||
|
TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||||
|
TD_FILE_SET_CLOSED(pTFile);
|
||||||
|
|
||||||
|
memset(&(pTFile->info), 0, sizeof(pTFile->info));
|
||||||
|
pTFile->info.magic = TD_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
|
if (tfsAllocDisk(pTfs, 0, &did) < 0) {
|
||||||
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsInitFile(pTfs, &(pTFile->f), did, fname);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) {
|
||||||
|
ASSERT(pTFile->info.size == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
|
||||||
|
|
||||||
|
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (pTFile->pFile == NULL) {
|
||||||
|
if (errno == ENOENT) {
|
||||||
|
// Try to create directory recursively
|
||||||
|
char *s = strdup(TD_FILE_REL_NAME(pTFile));
|
||||||
|
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) {
|
||||||
|
taosMemoryFreeClear(s);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosMemoryFreeClear(s);
|
||||||
|
|
||||||
|
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (pTFile->pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!updateHeader) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTFile->info.size += TD_FILE_HEAD_SIZE;
|
||||||
|
pTFile->info.fver = 0;
|
||||||
|
|
||||||
|
if (tdUpdateTFileHeader(pTFile) < 0) {
|
||||||
|
tdCloseTFile(pTFile);
|
||||||
|
tdRemoveTFile(pTFile);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); }
|
|
@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
|
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||||
|
@ -356,7 +357,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, "No table data in memo
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, "File already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, "File already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, "Need to reconfigure table")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, "Need to reconfigure table")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information to create table")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information to create table")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "TSDB no available disk")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
|
||||||
|
|
Loading…
Reference in New Issue