Merge pull request #12226 from taosdata/feature/TD-14481-3.0
feat: set tsdb cfg for rollup sma
This commit is contained in:
commit
2ada10a14a
|
@ -190,6 +190,8 @@ typedef struct SRetention {
|
|||
int8_t keepUnit;
|
||||
} SRetention;
|
||||
|
||||
#define RETENTION_VALID(r) (((r)->freq > 0) && ((r)->keep > 0))
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
||||
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
||||
|
|
|
@ -137,25 +137,15 @@ struct STsdbCfg {
|
|||
int8_t update;
|
||||
int8_t compression;
|
||||
int8_t slLevel;
|
||||
int32_t days;
|
||||
int32_t minRows;
|
||||
int32_t maxRows;
|
||||
int32_t keep0;
|
||||
int32_t keep1;
|
||||
int32_t keep2;
|
||||
// TODO: save to tsdb cfg file
|
||||
int8_t type; // ETsdbType
|
||||
int32_t days; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
||||
int32_t keep0; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
||||
int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
||||
int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
||||
SRetention retentions[TSDB_RETENTION_MAX];
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
TSDB_TYPE_TSDB = 0, // TSDB
|
||||
TSDB_TYPE_TSMA = 1, // TSMA
|
||||
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
|
||||
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
|
||||
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
|
||||
} ETsdbType;
|
||||
|
||||
struct SVnodeCfg {
|
||||
int32_t vgId;
|
||||
char dbname[TSDB_DB_NAME_LEN];
|
||||
|
|
|
@ -70,9 +70,10 @@ struct SSmaEnvs {
|
|||
struct STsdb {
|
||||
char *path;
|
||||
SVnode *pVnode;
|
||||
TdThreadMutex mutex;
|
||||
bool repoLocked;
|
||||
int8_t level; // retention level
|
||||
TdThreadMutex mutex;
|
||||
STsdbKeepCfg keepCfg;
|
||||
STsdbMemTable *mem;
|
||||
STsdbMemTable *imem;
|
||||
SRtn rtn;
|
||||
|
@ -185,6 +186,7 @@ struct STsdbFS {
|
|||
|
||||
#define REPO_ID(r) TD_VID((r)->pVnode)
|
||||
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
|
||||
#define REPO_KEEP_CFG(r) (&(r)->keepCfg)
|
||||
#define REPO_LEVEL(r) ((r)->level)
|
||||
#define REPO_FS(r) ((r)->fs)
|
||||
#define REPO_META(r) ((r)->pVnode->pMeta)
|
||||
|
@ -830,7 +832,7 @@ typedef struct {
|
|||
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
||||
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
||||
|
||||
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
|
||||
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
|
||||
void *tsdbFreeFS(STsdbFS *pfs);
|
||||
int tsdbOpenFS(STsdb *pRepo);
|
||||
void tsdbCloseFS(STsdb *pRepo);
|
||||
|
|
|
@ -59,6 +59,7 @@ typedef struct SQWorker SQHandle;
|
|||
#define VNODE_TQ_DIR "tq"
|
||||
#define VNODE_WAL_DIR "wal"
|
||||
#define VNODE_TSMA_DIR "tsma"
|
||||
#define VNODE_RSMA0_DIR "tsdb"
|
||||
#define VNODE_RSMA1_DIR "rsma1"
|
||||
#define VNODE_RSMA2_DIR "rsma2"
|
||||
|
||||
|
@ -154,6 +155,22 @@ struct SVnodeInfo {
|
|||
SVState state;
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
TSDB_TYPE_TSDB = 0, // TSDB
|
||||
TSDB_TYPE_TSMA = 1, // TSMA
|
||||
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
|
||||
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
|
||||
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
|
||||
} ETsdbType;
|
||||
|
||||
typedef struct {
|
||||
int8_t precision; // precision always be used with below keep cfgs
|
||||
int32_t days;
|
||||
int32_t keep0;
|
||||
int32_t keep1;
|
||||
int32_t keep2;
|
||||
} STsdbKeepCfg;
|
||||
|
||||
struct SVnode {
|
||||
char* path;
|
||||
SVnodeCfg config;
|
||||
|
@ -176,10 +193,11 @@ struct SVnode {
|
|||
SQHandle* pQuery;
|
||||
};
|
||||
|
||||
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
||||
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
|
||||
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
|
||||
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
|
||||
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
||||
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
|
||||
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
|
||||
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
|
||||
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
|
||||
|
||||
struct STbUidStore {
|
||||
tb_uid_t suid;
|
||||
|
|
|
@ -210,7 +210,7 @@ int tsdbCommit(STsdb *pRepo) {
|
|||
}
|
||||
|
||||
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
|
||||
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
|
||||
TSKEY minKey, midKey, maxKey, now;
|
||||
|
||||
now = taosGetTimestamp(pCfg->precision);
|
||||
|
@ -304,9 +304,9 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
|
|||
}
|
||||
|
||||
static int tsdbNextCommitFid(SCommitH *pCommith) {
|
||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||
int fid = TSDB_IVLD_FID;
|
||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
|
||||
int fid = TSDB_IVLD_FID;
|
||||
|
||||
for (int i = 0; i < pCommith->niters; i++) {
|
||||
SCommitIter *pIter = pCommith->iters + i;
|
||||
|
@ -337,8 +337,8 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
|
|||
}
|
||||
|
||||
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
|
||||
|
||||
ASSERT(pSet == NULL || pSet->fid == fid);
|
||||
|
||||
|
|
|
@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
|
|||
}
|
||||
|
||||
// ================== STsdbFS
|
||||
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) {
|
||||
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
|
||||
int keep = pCfg->keep2;
|
||||
int days = pCfg->days;
|
||||
int maxFSet = TSDB_MAX_FSETS(keep, days);
|
||||
|
|
|
@ -15,7 +15,30 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
|
||||
#define TSDB_OPEN_RSMA_IMPL(v, l) \
|
||||
do { \
|
||||
SRetention *r = VND_RETENTIONS(v)[0]; \
|
||||
if (RETENTION_VALID(r)) { \
|
||||
return tsdbOpenImpl((v), type, &VND_RSMA##l(v), VNODE_RSMA##l##_DIR, TSDB_RETENTION_L##l); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define TSDB_SET_KEEP_CFG(l) \
|
||||
do { \
|
||||
SRetention *r = &pCfg->retentions[l]; \
|
||||
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
|
||||
pKeepCfg->keep0 = pKeepCfg->keep2; \
|
||||
pKeepCfg->keep1 = pKeepCfg->keep2; \
|
||||
pKeepCfg->days = tsdbEvalDays(r, pCfg->precision); \
|
||||
} while (0)
|
||||
|
||||
#define RETENTION_DAYS_SPLIT_RATIO 10
|
||||
#define RETENTION_DAYS_SPLIT_MIN 1
|
||||
#define RETENTION_DAYS_SPLIT_MAX 30
|
||||
|
||||
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type);
|
||||
static int32_t tsdbEvalDays(SRetention *r, int8_t precision);
|
||||
static int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
|
||||
|
||||
int tsdbOpen(SVnode *pVnode, int8_t type) {
|
||||
switch (type) {
|
||||
|
@ -25,11 +48,63 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
|
|||
ASSERT(0);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L0:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
|
||||
TSDB_OPEN_RSMA_IMPL(pVnode, 0);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L1:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1);
|
||||
TSDB_OPEN_RSMA_IMPL(pVnode, 1);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L2:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2);
|
||||
TSDB_OPEN_RSMA_IMPL(pVnode, 2);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbEvalDays(SRetention *r, int8_t precision) {
|
||||
int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
|
||||
int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);
|
||||
|
||||
int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
|
||||
if (days <= RETENTION_DAYS_SPLIT_MIN) {
|
||||
days = RETENTION_DAYS_SPLIT_MIN;
|
||||
if (days < freqDays) {
|
||||
days = freqDays + 1;
|
||||
}
|
||||
} else {
|
||||
if (days > RETENTION_DAYS_SPLIT_MAX) {
|
||||
days = RETENTION_DAYS_SPLIT_MAX;
|
||||
}
|
||||
if (days < freqDays) {
|
||||
days = freqDays + 1;
|
||||
}
|
||||
}
|
||||
return days * 1440;
|
||||
}
|
||||
|
||||
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type) {
|
||||
pKeepCfg->precision = pCfg->precision;
|
||||
switch (type) {
|
||||
case TSDB_TYPE_TSDB:
|
||||
pKeepCfg->days = pCfg->days;
|
||||
pKeepCfg->keep0 = pCfg->keep0;
|
||||
pKeepCfg->keep1 = pCfg->keep1;
|
||||
pKeepCfg->keep2 = pCfg->keep2;
|
||||
break;
|
||||
case TSDB_TYPE_TSMA:
|
||||
ASSERT(0);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L0:
|
||||
TSDB_SET_KEEP_CFG(0);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L1:
|
||||
TSDB_SET_KEEP_CFG(1);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L2:
|
||||
TSDB_SET_KEEP_CFG(2);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
|
@ -38,16 +113,16 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pVnode
|
||||
* @param type
|
||||
* @param ppTsdb
|
||||
* @param dir
|
||||
* @brief
|
||||
*
|
||||
* @param pVnode
|
||||
* @param type
|
||||
* @param ppTsdb
|
||||
* @param dir
|
||||
* @param level retention level
|
||||
* @return int
|
||||
* @return int
|
||||
*/
|
||||
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
|
||||
int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
|
||||
STsdb *pTsdb = NULL;
|
||||
int slen = 0;
|
||||
|
||||
|
@ -62,13 +137,13 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, i
|
|||
}
|
||||
|
||||
pTsdb->path = (char *)&pTsdb[1];
|
||||
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
||||
dir);
|
||||
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
|
||||
pTsdb->pVnode = pVnode;
|
||||
pTsdb->level = level;
|
||||
pTsdb->repoLocked = false;
|
||||
taosThreadMutexInit(&pTsdb->mutex, NULL);
|
||||
pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb));
|
||||
tsdbSetKeepCfg(REPO_KEEP_CFG(pTsdb), REPO_CFG(pTsdb), type);
|
||||
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
|
||||
|
||||
// create dir (TODO: use tfsMkdir)
|
||||
taosMkDir(pTsdb->path);
|
||||
|
|
|
@ -320,7 +320,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
|
|||
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
||||
// the expired data to client, even it is queried already.
|
||||
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
|
||||
STsdbCfg* pCfg = REPO_CFG(pTsdb);
|
||||
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
|
||||
|
||||
int64_t now = taosGetTimestamp(pCfg->precision);
|
||||
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
|
||||
|
@ -2425,7 +2425,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
|
|||
int32_t numOfBlocks = 0;
|
||||
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||
|
||||
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
|
||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||
|
||||
while (true) {
|
||||
|
@ -2531,8 +2531,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
|||
|
||||
// find the start data block in file
|
||||
pTsdbReadHandle->locateStart = true;
|
||||
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
||||
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
|
||||
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
||||
|
||||
tsdbRLockFS(pFileHandle);
|
||||
tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
|
||||
|
@ -2632,8 +2632,8 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
|
|||
// find the start data block in file
|
||||
if (!pTsdbReadHandle->locateStart) {
|
||||
pTsdbReadHandle->locateStart = true;
|
||||
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
||||
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
|
||||
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
||||
|
||||
tsdbRLockFS(pFileHandle);
|
||||
tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
|
||||
|
|
|
@ -1013,8 +1013,8 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t
|
|||
* @return int32_t
|
||||
*/
|
||||
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
int32_t daysPerFile = pCfg->days;
|
||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
|
||||
int32_t daysPerFile = pCfg->days;
|
||||
|
||||
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
|
||||
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
|
||||
|
|
|
@ -60,7 +60,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
|||
SSubmitBlk *pBlock = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSRow *row = NULL;
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
|
||||
TSKEY now = taosGetTimestamp(pCfg->precision);
|
||||
TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2;
|
||||
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
|
||||
|
|
Loading…
Reference in New Issue