diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7636d9b9d0..72418148b0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -190,6 +190,8 @@ typedef struct SRetention { int8_t keepUnit; } SRetention; +#define RETENTION_VALID(r) (((r)->freq > 0) && ((r)->keep > 0)) + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f4e12d5f71..28f3b3edb9 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -137,25 +137,15 @@ struct STsdbCfg { int8_t update; int8_t compression; int8_t slLevel; - int32_t days; int32_t minRows; int32_t maxRows; - int32_t keep0; - int32_t keep1; - int32_t keep2; - // TODO: save to tsdb cfg file - int8_t type; // ETsdbType + int32_t days; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead + int32_t keep0; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead + int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead + int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead SRetention retentions[TSDB_RETENTION_MAX]; }; -typedef enum { - TSDB_TYPE_TSDB = 0, // TSDB - TSDB_TYPE_TSMA = 1, // TSMA - TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0 - TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1 - TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 -} ETsdbType; - struct SVnodeCfg { int32_t vgId; char dbname[TSDB_DB_NAME_LEN]; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 58003a97d7..b8cbb2d997 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -70,9 +70,10 @@ struct SSmaEnvs { struct STsdb { char *path; SVnode *pVnode; + TdThreadMutex mutex; bool repoLocked; int8_t level; // retention level - TdThreadMutex mutex; + STsdbKeepCfg keepCfg; STsdbMemTable *mem; STsdbMemTable *imem; SRtn rtn; @@ -185,6 +186,7 @@ struct STsdbFS { #define REPO_ID(r) TD_VID((r)->pVnode) #define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) +#define REPO_KEEP_CFG(r) (&(r)->keepCfg) #define REPO_LEVEL(r) ((r)->level) #define REPO_FS(r) ((r)->fs) #define REPO_META(r) ((r)->pVnode->pMeta) @@ -830,7 +832,7 @@ typedef struct { #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC -STsdbFS *tsdbNewFS(const STsdbCfg *pCfg); +STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg); void *tsdbFreeFS(STsdbFS *pfs); int tsdbOpenFS(STsdb *pRepo); void tsdbCloseFS(STsdb *pRepo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3724a98a5e..a7617291b0 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -59,6 +59,7 @@ typedef struct SQWorker SQHandle; #define VNODE_TQ_DIR "tq" #define VNODE_WAL_DIR "wal" #define VNODE_TSMA_DIR "tsma" +#define VNODE_RSMA0_DIR "tsdb" #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" @@ -154,6 +155,22 @@ struct SVnodeInfo { SVState state; }; +typedef enum { + TSDB_TYPE_TSDB = 0, // TSDB + TSDB_TYPE_TSMA = 1, // TSMA + TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0 + TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1 + TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 +} ETsdbType; + +typedef struct { + int8_t precision; // precision always be used with below keep cfgs + int32_t days; + int32_t keep0; + int32_t keep1; + int32_t keep2; +} STsdbKeepCfg; + struct SVnode { char* path; SVnodeCfg config; @@ -176,10 +193,11 @@ struct SVnode { SQHandle* pQuery; }; -#define VND_TSDB(vnd) ((vnd)->pTsdb) -#define VND_RSMA0(vnd) ((vnd)->pTsdb) -#define VND_RSMA1(vnd) ((vnd)->pRSma1) -#define VND_RSMA2(vnd) ((vnd)->pRSma2) +#define VND_TSDB(vnd) ((vnd)->pTsdb) +#define VND_RSMA0(vnd) ((vnd)->pTsdb) +#define VND_RSMA1(vnd) ((vnd)->pRSma1) +#define VND_RSMA2(vnd) ((vnd)->pRSma2) +#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions) struct STbUidStore { tb_uid_t suid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index f0b1baf1da..6c0df33d05 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -210,7 +210,7 @@ int tsdbCommit(STsdb *pRepo) { } void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { - STsdbCfg *pCfg = REPO_CFG(pRepo); + STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); TSKEY minKey, midKey, maxKey, now; now = taosGetTimestamp(pCfg->precision); @@ -304,9 +304,9 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { } static int tsdbNextCommitFid(SCommitH *pCommith) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg *pCfg = REPO_CFG(pRepo); - int fid = TSDB_IVLD_FID; + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); + int fid = TSDB_IVLD_FID; for (int i = 0; i < pCommith->niters; i++) { SCommitIter *pIter = pCommith->iters + i; @@ -337,8 +337,8 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { } static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg *pCfg = REPO_CFG(pRepo); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); ASSERT(pSet == NULL || pSet->fid == fid); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 6eda476b65..52b466d0f6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { } // ================== STsdbFS -STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) { +STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) { int keep = pCfg->keep2; int days = pCfg->days; int maxFSet = TSDB_MAX_FSETS(keep, days); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index e18c01dc01..807ee95b03 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -15,7 +15,30 @@ #include "tsdb.h" -static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level); +#define TSDB_OPEN_RSMA_IMPL(v, l) \ + do { \ + SRetention *r = VND_RETENTIONS(v)[0]; \ + if (RETENTION_VALID(r)) { \ + return tsdbOpenImpl((v), type, &VND_RSMA##l(v), VNODE_RSMA##l##_DIR, TSDB_RETENTION_L##l); \ + } \ + } while (0) + +#define TSDB_SET_KEEP_CFG(l) \ + do { \ + SRetention *r = &pCfg->retentions[l]; \ + pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \ + pKeepCfg->keep0 = pKeepCfg->keep2; \ + pKeepCfg->keep1 = pKeepCfg->keep2; \ + pKeepCfg->days = tsdbEvalDays(r, pCfg->precision); \ + } while (0) + +#define RETENTION_DAYS_SPLIT_RATIO 10 +#define RETENTION_DAYS_SPLIT_MIN 1 +#define RETENTION_DAYS_SPLIT_MAX 30 + +static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type); +static int32_t tsdbEvalDays(SRetention *r, int8_t precision); +static int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level); int tsdbOpen(SVnode *pVnode, int8_t type) { switch (type) { @@ -25,11 +48,63 @@ int tsdbOpen(SVnode *pVnode, int8_t type) { ASSERT(0); break; case TSDB_TYPE_RSMA_L0: - return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0); + TSDB_OPEN_RSMA_IMPL(pVnode, 0); + break; case TSDB_TYPE_RSMA_L1: - return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1); + TSDB_OPEN_RSMA_IMPL(pVnode, 1); + break; case TSDB_TYPE_RSMA_L2: - return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2); + TSDB_OPEN_RSMA_IMPL(pVnode, 2); + break; + default: + ASSERT(0); + break; + } + return 0; +} + +static int32_t tsdbEvalDays(SRetention *r, int8_t precision) { + int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY); + int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY); + + int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO; + if (days <= RETENTION_DAYS_SPLIT_MIN) { + days = RETENTION_DAYS_SPLIT_MIN; + if (days < freqDays) { + days = freqDays + 1; + } + } else { + if (days > RETENTION_DAYS_SPLIT_MAX) { + days = RETENTION_DAYS_SPLIT_MAX; + } + if (days < freqDays) { + days = freqDays + 1; + } + } + return days * 1440; +} + +static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type) { + pKeepCfg->precision = pCfg->precision; + switch (type) { + case TSDB_TYPE_TSDB: + pKeepCfg->days = pCfg->days; + pKeepCfg->keep0 = pCfg->keep0; + pKeepCfg->keep1 = pCfg->keep1; + pKeepCfg->keep2 = pCfg->keep2; + break; + case TSDB_TYPE_TSMA: + ASSERT(0); + break; + case TSDB_TYPE_RSMA_L0: + TSDB_SET_KEEP_CFG(0); + break; + case TSDB_TYPE_RSMA_L1: + TSDB_SET_KEEP_CFG(1); + break; + case TSDB_TYPE_RSMA_L2: + TSDB_SET_KEEP_CFG(2); + break; default: ASSERT(0); break; @@ -38,16 +113,16 @@ int tsdbOpen(SVnode *pVnode, int8_t type) { } /** - * @brief - * - * @param pVnode - * @param type - * @param ppTsdb - * @param dir + * @brief + * + * @param pVnode + * @param type + * @param ppTsdb + * @param dir * @param level retention level - * @return int + * @return int */ -int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) { +int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) { STsdb *pTsdb = NULL; int slen = 0; @@ -62,13 +137,13 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, i } pTsdb->path = (char *)&pTsdb[1]; - sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, - dir); + sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir); pTsdb->pVnode = pVnode; pTsdb->level = level; pTsdb->repoLocked = false; taosThreadMutexInit(&pTsdb->mutex, NULL); - pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb)); + tsdbSetKeepCfg(REPO_KEEP_CFG(pTsdb), REPO_CFG(pTsdb), type); + pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb)); // create dir (TODO: use tfsMkdir) taosMkDir(pTsdb->path); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0b9f21dbcf..e0f8943a48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -320,7 +320,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) { // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // the expired data to client, even it is queried already. static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { - STsdbCfg* pCfg = REPO_CFG(pTsdb); + STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb); int64_t now = taosGetTimestamp(pCfg->precision); return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick @@ -2425,7 +2425,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi int32_t numOfBlocks = 0; int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); + STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); STimeWindow win = TSWINDOW_INITIALIZER; while (true) { @@ -2531,8 +2531,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* // find the start data block in file pTsdbReadHandle->locateStart = true; - STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); - int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); + STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); + int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); @@ -2632,8 +2632,8 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis // find the start data block in file if (!pTsdbReadHandle->locateStart) { pTsdbReadHandle->locateStart = true; - STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); - int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); + STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); + int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 856481bc5f..615018a9ea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1013,8 +1013,8 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t * @return int32_t */ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) { - STsdbCfg *pCfg = REPO_CFG(pTsdb); - int32_t daysPerFile = pCfg->days; + STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb); + int32_t daysPerFile = pCfg->days; if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 88b637bc24..5a7892a750 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -60,7 +60,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { SSubmitBlk *pBlock = NULL; SSubmitBlkIter blkIter = {0}; STSRow *row = NULL; - STsdbCfg *pCfg = REPO_CFG(pTsdb); + STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb); TSKEY now = taosGetTimestamp(pCfg->precision); TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2; TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;