From 0a24fbcdd6a0af4c5ceec2a2d830bc062a845297 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 23 Jun 2022 14:19:21 +0800 Subject: [PATCH] feat: persistence of rsma qtaskinfo --- include/util/taoserror.h | 1 + source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/inc/sma.h | 119 ++++++++--- source/dnode/vnode/src/meta/metaQuery.c | 4 +- source/dnode/vnode/src/sma/smaEnv.c | 51 +++-- source/dnode/vnode/src/sma/smaOpen.c | 13 ++ source/dnode/vnode/src/sma/smaRollup.c | 166 ++++++++++++--- source/dnode/vnode/src/sma/smaTimeRange.c | 4 +- source/dnode/vnode/src/sma/smaUtil.c | 236 ++++++++++++++++++++++ source/util/src/terror.c | 3 +- 10 files changed, 514 insertions(+), 84 deletions(-) create mode 100644 source/dnode/vnode/src/sma/smaUtil.c diff --git a/include/util/taoserror.h b/include/util/taoserror.h index eb68f52a40..8af5945300 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -71,6 +71,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) +#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 15eb35c700..b09ab34224 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -29,6 +29,7 @@ target_sources( # sma "src/sma/sma.c" "src/sma/smaEnv.c" + "src/sma/smaUtil.c" "src/sma/smaOpen.c" "src/sma/smaRollup.c" "src/sma/smaTimeRange.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 902e5e1bcc..a4233e1ac4 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -34,7 +34,8 @@ extern "C" { typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; -typedef struct SSmaStatItem SSmaStatItem; +typedef struct STSmaStat STSmaStat; +typedef struct SRSmaStat SRSmaStat; typedef struct SSmaKey SSmaKey; typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfoItem SRSmaInfoItem; @@ -45,26 +46,38 @@ struct SSmaEnv { SSmaStat *pStat; }; -#define SMA_ENV_LOCK(env) ((env)->lock) -#define SMA_ENV_TYPE(env) ((env)->type) -#define SMA_ENV_STAT(env) ((env)->pStat) -#define SMA_ENV_STAT_ITEM(env) ((env)->pStat->tsmaStatItem) +#define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_TYPE(env) ((env)->type) +#define SMA_ENV_STAT(env) ((env)->pStat) -struct SSmaStatItem { +struct STSmaStat { int8_t state; // ETsdbSmaStat STSma *pTSma; // cache schema STSchema *pTSchema; }; +struct SRSmaStat { + SSma *pSma; + void *tmrHandle; + tmr_h tmrId; + int8_t tmrStat; + int32_t tmrSeconds; + SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; +}; + struct SSmaStat { union { - SSmaStatItem tsmaStatItem; - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + STSmaStat tsmaStat; // time-range-wise sma + SRSmaStat rsmaStat; // rollup sma }; T_REF_DECLARE() }; -#define SMA_STAT_ITEM(s) ((s)->tsmaStatItem) -#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) +#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) +#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) +#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash) +#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle) +#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat) +#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -107,53 +120,51 @@ static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) { return 0; } -static FORCE_INLINE int8_t tdSmaStat(SSmaStatItem *pStatItem) { - if (pStatItem) { - return atomic_load_8(&pStatItem->state); +static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) { + if (pTStat) { + return atomic_load_8(&pTStat->state); } return TSDB_SMA_STAT_UNKNOWN; } -static FORCE_INLINE bool tdSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) { - if (!pStatItem) { +static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) { + if (!pTStat) { return false; } if (state) { - *state = atomic_load_8(&pStatItem->state); + *state = atomic_load_8(&pTStat->state); return *state == TSDB_SMA_STAT_OK; } - return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK; + return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK; } -static FORCE_INLINE bool tdSmaStatIsExpired(SSmaStatItem *pStatItem) { - return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true; +static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) { + return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true; } -static FORCE_INLINE bool tdSmaStatIsDropped(SSmaStatItem *pStatItem) { - return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true; +static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) { + return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true; } -static FORCE_INLINE void tdSmaStatSetOK(SSmaStatItem *pStatItem) { - if (pStatItem) { - atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK); +static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) { + if (pTStat) { + atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK); } } -static FORCE_INLINE void tdSmaStatSetExpired(SSmaStatItem *pStatItem) { - if (pStatItem) { - atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED); +static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) { + if (pTStat) { + atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED); } } -static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) { - if (pStatItem) { - atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED); +static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { + if (pTStat) { + atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED); } } -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); -void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); @@ -163,6 +174,50 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); +typedef struct STFInfo STFInfo; +typedef struct STFile STFile; + +struct STFInfo { + uint32_t magic; + uint32_t fver; + uint64_t size; +}; + +struct STFile { + STFInfo info; + STfsFile f; + TdFilePtr pFile; + uint8_t state; +}; + +#define TD_FILE_F(tf) (&((tf)->f)) +#define TD_FILE_PFILE(tf) ((tf)->pFile) +#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) +#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname) +#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname) +#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) +#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf)) +#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL) +#define TD_FILE_STATE(tf) ((tf)->state) +#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s)) +#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did) +#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK) +#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD) + +int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname); +int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType); +int32_t tdOpenTFile(STFile *pTFile, int flags); +int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte); +int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence); +int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte); +int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset); +int32_t tdRemoveTFile(STFile *pTFile); +int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo); +int32_t tdUpdateTFileHeader(STFile *pTFile); +void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); +void tdCloseTFile(STFile *pTFile); +void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index a5ca90e55f..2eb39cdddf 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -265,8 +265,8 @@ struct SMCtbCursor { SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *pCtbCur = NULL; SCtbIdxKey ctbIdxKey; - int ret; - int c; + int ret = 0; + int c = 0; pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur)); if (pCtbCur == NULL) { diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index a80af2b202..4e907e93ca 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -21,9 +21,10 @@ typedef struct SSmaStat SSmaStat; // declaration of static functions -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); +static void *tdFreeTSmaStat(STSmaStat *pStat); // implementation @@ -45,7 +46,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) return NULL; } - if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) { + if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); return NULL; } @@ -105,7 +106,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { return 0; } -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) { ASSERT(pSmaStat != NULL); if (*pSmaStat) { // no lock @@ -125,10 +126,23 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { } if (smaType == TSDB_SMA_TYPE_ROLLUP) { - SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit( - RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + SMA_RSMA_STAT(*pSmaStat)->pSma = (SSma*)pSma; + // init timer + SMA_RSMA_TMR_HANDLE(*pSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA_G"); + if (!SMA_RSMA_TMR_HANDLE(*pSmaStat)) { + taosMemoryFreeClear(*pSmaStat); + return TSDB_CODE_FAILED; + } + + atomic_store_8(&SMA_RSMA_TMR_STAT(*pSmaStat), TASK_TRIGGER_STATUS__ACTIVE); - if (!SMA_STAT_INFO_HASH(*pSmaStat)) { + // init hash + SMA_RSMA_INFO_HASH(*pSmaStat) = taosHashInit( + RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (!SMA_RSMA_INFO_HASH(*pSmaStat)) { + if (SMA_RSMA_TMR_HANDLE(*pSmaStat)) { + taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(*pSmaStat)); + } taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED; } @@ -141,16 +155,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { return TSDB_CODE_SUCCESS; } -void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { - if (pSmaStatItem) { - tDestroyTSma(pSmaStatItem->pTSma); - taosMemoryFreeClear(pSmaStatItem->pTSma); - taosMemoryFreeClear(pSmaStatItem); +static void *tdFreeTSmaStat(STSmaStat *pStat) { + if (pStat) { + tDestroyTSma(pStat->pTSma); + taosMemoryFreeClear(pStat->pTSma); + taosMemoryFreeClear(pStat); } return NULL; } -void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { +void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { tdDestroySmaState(pSmaStat, smaType); taosMemoryFreeClear(pSmaStat); return NULL; @@ -165,16 +179,19 @@ void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - tdFreeSmaStatItem(&pSmaStat->tsmaStatItem); + tdFreeTSmaStat(&pSmaStat->tsmaStat); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + if (SMA_RSMA_TMR_HANDLE(pSmaStat)) { + taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(pSmaStat)); + } // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. - void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL); + void *infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), NULL); while (infoHash) { SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; tdFreeRSmaInfo(pInfoHash); - infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash); + infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), infoHash); } - taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat)); + taosHashCleanup(SMA_RSMA_INFO_HASH(pSmaStat)); } else { ASSERT(0); } @@ -273,4 +290,4 @@ void smaTimerCleanUp(void *timer, int8_t *initFlag) { taosTmrCleanUp(timer); atomic_store_8(initFlag, 0); } -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index a1c47a96c0..383c264e7b 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -137,4 +137,17 @@ int32_t smaClose(SSma *pSma) { taosMemoryFreeClear(pSma); } return 0; +} + +/** + * @brief rsma env restore + * + * @param pSma + * @return int32_t + */ +int32_t smaRestore(SSma *pSma) { + if (!pSma) return 0; + // iterate all stables to restore the rsma env + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f70c98bae6..8576c9ca43 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -16,12 +16,17 @@ #include "sma.h" #include "tstream.h" +typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; +static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; + static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle, int8_t idx); static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, tb_uid_t suid, int8_t level); +static void tdRSmaFetchTrigger(void *param, void *tmrId); +static void tdRSmaPersistTrigger(void *param, void *tmrId); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; @@ -33,14 +38,6 @@ struct SRSmaInfoItem { int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE int32_t maxDelay; }; - -typedef struct { - int64_t suid; - SRSmaInfoItem *pItem; - SSma *pSma; - STSchema *pTSchema; -} SRSmaTriggerParam; - struct SRSmaInfo { STSchema *pTSchema; SSma *pSma; @@ -95,7 +92,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA return TSDB_CODE_FAILED; } - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); terrno = TSDB_CODE_RSMA_INVALID_STAT; @@ -164,7 +161,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; - if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) { + if (!pStat || !(infoHash = SMA_RSMA_INFO_HASH(pStat))) { terrno = TSDB_CODE_RSMA_INVALID_STAT; return TSDB_CODE_FAILED; } @@ -257,7 +254,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); if (pRSmaInfo) { ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); @@ -300,7 +297,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { goto _err; } - if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { + if (taosHashPut(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; } else { smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); @@ -449,8 +446,9 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) if (!output) { break; } + if (!pResult) { - pResult = taosArrayInit(0, sizeof(SSDataBlock)); + pResult = taosArrayInit(1, sizeof(SSDataBlock)); if (!pResult) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; @@ -461,7 +459,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } if (taosArrayGetSize(pResult) > 0) { -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowData(pResult, flag); @@ -469,14 +467,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) { - taosArrayDestroy(pResult); - return TSDB_CODE_FAILED; + goto _err; } if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { - taosArrayDestroy(pResult); taosMemoryFreeClear(pReq); - return TSDB_CODE_FAILED; + goto _err; } taosMemoryFreeClear(pReq); @@ -489,7 +485,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } taosArrayDestroy(pResult); - return 0; + return TSDB_CODE_SUCCESS; +_err: + taosArrayDestroy(pResult); + return TSDB_CODE_FAILED; } /** @@ -498,13 +497,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) * @param param * @param tmrId */ -static void rsmaTriggerByTimer(void *param, void *tmrId) { - // SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param; - // SRSmaInfoItem *pItem = pParam->pItem; +static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { - smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); + smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is active for tb suid:%" PRIi64, __func__, __LINE__, + taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); @@ -512,10 +510,11 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) { tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); } else { - smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); + smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is inactive for tb suid:%" PRIi64, __func__, __LINE__, + taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid); } - // taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); + // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); } static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, @@ -528,16 +527,20 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, pItem->taskInfo, suid); - // inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1) - if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { + if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // STREAM_DATA_TYPE_SUBMIT_BLOCK smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } - // SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema}; tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); - taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); + smaWarn("%s:%d THREAD:%" PRIi64 " process rsma insert", __func__, __LINE__, taosGetSelfPthreadId()); + + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); + + taosTmrStart(tdRSmaPersistTrigger, 5000, pStat, pStat->tmrHandle); + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); return TSDB_CODE_SUCCESS; } @@ -552,7 +555,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); @@ -604,3 +607,106 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { } return TSDB_CODE_SUCCESS; } + +void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char* outputName) { + tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName); +} + +static void *tdRSmaPersistExec(void *param) { + setThreadName("rsma-task-persist"); + SRSmaStat *pRSmaStat = param; + SSma *pSma = pRSmaStat->pSma; + STfs *pTfs = pSma->pVnode->pTfs; + int64_t toffset = 0; + + void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); + if (!infoHash) { + goto _end; + } + + STFile tFile = {0}; + int32_t vid = 2; + char qTaskInfoFName[TSDB_FILENAME_LEN]; + tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); + tdInitTFile(&tFile, pTfs, qTaskInfoFName); + tdCreateTFile(&tFile, pTfs, true, -1); + + while (infoHash) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + char *pOutput = NULL; + int32_t len = 0; + if (qSerializeTaskStatus(pRSmaInfo->items[0].taskInfo, &pOutput, &len) < 0) { + smaError("serialize rsma task for table %" PRIi64 " failed since %s", pRSmaInfo->items[0].pRsmaInfo->suid, + terrstr(terrno)); + } else { + smaWarn("serialize rsma task for table %" PRIi64 " success and len is %d", pRSmaInfo->items[0].pRsmaInfo->suid, + len); + } + tdAppendTFile(&tFile, &len, sizeof(len), &toffset); + tdAppendTFile(&tFile, pOutput, len, &toffset); + + taosMemoryFree(pOutput); + infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash); + } +_end: + + if (tdUpdateTFileHeader(&tFile) < 0) { + smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno)); + tdCloseTFile(&tFile); + tdRemoveTFile(&tFile); + return NULL; + } + + tdCloseTFile(&tFile); + + char newFName[TSDB_FILENAME_LEN]; + strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); + char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); + strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); + taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName); + + atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE); + return NULL; +_err: + atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE); + // remove the .tmp file + return NULL; +} + +static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { + smaWarn("%s:%d entry ", __func__, __LINE__); + TdThread threadId; + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); + + if (taosThreadCreate(&threadId, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) { + smaError("failed to create thread to persist rsma qTaskInfo since %s", strerror(errno)); + } + + taosThreadAttrDestroy(&thAttr); + smaWarn("%s:%d end ", __func__, __LINE__); +} + +/** + * @brief trigger to persist rsma qTaskInfo + * + * @param param + * @param tmrId + */ +static void tdRSmaPersistTrigger(void *param, void *tmrId) { + SRSmaStat *pRSmaStat = param; + + if (atomic_load_8(&pRSmaStat->tmrStat) == TASK_TRIGGER_STATUS__ACTIVE) { + smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence start since active", __func__, __LINE__, taosGetSelfPthreadId()); + atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__IN_ACTIVE); + + // execution + tdRSmaPersistTask(pRSmaStat); + } else { + smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence not start since inactive", __func__, __LINE__, + taosGetSelfPthreadId()); + } + + taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 4352c466c5..2244b91c28 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); SSmaStat *pStat = NULL; - SSmaStatItem *pItem = NULL; + STSmaStat *pItem = NULL; if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { terrno = TSDB_CODE_TSMA_INVALID_STAT; @@ -137,7 +137,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } tdRefSmaStat(pSma, pStat); - pItem = &pStat->tsmaStatItem; + pItem = &pStat->tsmaStat; ASSERT(pItem); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c new file mode 100644 index 0000000000..77e85d3227 --- /dev/null +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sma.h" + +#define TD_FILE_HEAD_SIZE 512 + +#define TD_FILE_STATE_OK 0 +#define TD_FILE_STATE_BAD 1 + +#define TD_FILE_INIT_MAGIC 0xFFFFFFFF + + +static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); +static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); + +static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU32(buf, pInfo->magic); + tlen += taosEncodeFixedU32(buf, pInfo->fver); + tlen += taosEncodeFixedU64(buf, pInfo->size); + + return tlen; +} + +static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + buf = taosDecodeFixedU32(buf, &(pInfo->fver)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + return buf; +} + +int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); + if (nwrite < nbyte) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return nwrite; +} + +int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence); + if (loffset < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return loffset; +} + +int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); + if (nread < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return nread; +} + +int32_t tdUpdateTFileHeader(STFile *pTFile) { + char buf[TD_FILE_HEAD_SIZE] = "\0"; + + if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { + return -1; + } + + void *ptr = buf; + tdEncodeTFInfo(&ptr, &(pTFile->info)); + + taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE); + if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { + return -1; + } + + return 0; +} + +int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { + char buf[TD_FILE_HEAD_SIZE] = "\0"; + uint32_t _version; + + ASSERT(TD_FILE_OPENED(pTFile)); + + if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { + return -1; + } + + if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + + void *pBuf = buf; + pBuf = tdDecodeTFInfo(pBuf, pInfo); + return 0; +} + +void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { + pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM)); +} + +int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t toffset; + + if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) { + return -1; + } + + ASSERT(pTFile->info.size == toffset); + + if (offset) { + *offset = toffset; + } + + if (tdWriteTFile(pTFile, buf, nbyte) < 0) { + return -1; + } + + pTFile->info.size += nbyte; + + return nbyte; +} + +int32_t tdOpenTFile(STFile *pTFile, int flags) { + ASSERT(!TD_FILE_OPENED(pTFile)); + + pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags); + if (pTFile->pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + +void tdCloseTFile(STFile *pTFile) { + if (TD_FILE_OPENED(pTFile)) { + taosCloseFile(&pTFile->pFile); + TD_FILE_SET_CLOSED(pTFile); + } +} + +void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName) { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vid, dname, fname); +} + +int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { + char fullname[TSDB_FILENAME_LEN]; + SDiskID did = {0}; + + TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK); + TD_FILE_SET_CLOSED(pTFile); + + memset(&(pTFile->info), 0, sizeof(pTFile->info)); + pTFile->info.magic = TD_FILE_INIT_MAGIC; + + if (tfsAllocDisk(pTfs, 0, &did) < 0) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return -1; + } + + tfsInitFile(pTfs, &(pTFile->f), did, fname); + + return 0; +} + +int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) { + ASSERT(pTFile->info.size == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); + + pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pTFile->pFile == NULL) { + if (errno == ENOENT) { + // Try to create directory recursively + char *s = strdup(TD_FILE_REL_NAME(pTFile)); + if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) { + taosMemoryFreeClear(s); + return -1; + } + taosMemoryFreeClear(s); + + pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pTFile->pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } else { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + if (!updateHeader) { + return 0; + } + + pTFile->info.size += TD_FILE_HEAD_SIZE; + pTFile->info.fver = 0; + + if (tdUpdateTFileHeader(pTFile) < 0) { + tdCloseTFile(pTFile); + tdRemoveTFile(pTFile); + return -1; + } + + return 0; +} + +int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); } \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b89e908df7..fa10fc26dd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") @@ -356,7 +357,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, "No table data in memo TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, "File already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information to create table") -TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "TSDB no available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")