feat: rsma config optimization
This commit is contained in:
parent
1562fe5211
commit
b603e798c6
|
@ -190,6 +190,8 @@ typedef struct SRetention {
|
||||||
int8_t keepUnit;
|
int8_t keepUnit;
|
||||||
} SRetention;
|
} SRetention;
|
||||||
|
|
||||||
|
#define RETENTION_VALID(r) (((r)->freq > 0) && ((r)->keep > 0))
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
|
||||||
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
||||||
|
|
|
@ -41,7 +41,6 @@ extern "C" {
|
||||||
// vnode
|
// vnode
|
||||||
typedef struct SVnode SVnode;
|
typedef struct SVnode SVnode;
|
||||||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||||
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
|
||||||
typedef struct SVnodeCfg SVnodeCfg;
|
typedef struct SVnodeCfg SVnodeCfg;
|
||||||
|
|
||||||
extern const SVnodeCfg vnodeCfgDefault;
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
@ -147,21 +146,7 @@ struct STsdbCfg {
|
||||||
SRetention retentions[TSDB_RETENTION_MAX];
|
SRetention retentions[TSDB_RETENTION_MAX];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STsdbKeepCfg {
|
|
||||||
int8_t precision; // precision always use with below cfg
|
|
||||||
int32_t days;
|
|
||||||
int32_t keep0;
|
|
||||||
int32_t keep1;
|
|
||||||
int32_t keep2;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
TSDB_TYPE_TSDB = 0, // TSDB
|
|
||||||
TSDB_TYPE_TSMA = 1, // TSMA
|
|
||||||
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
|
|
||||||
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
|
|
||||||
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
|
|
||||||
} ETsdbType;
|
|
||||||
|
|
||||||
struct SVnodeCfg {
|
struct SVnodeCfg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -184,18 +184,18 @@ struct STsdbFS {
|
||||||
SFSStatus *nstatus; // new status
|
SFSStatus *nstatus; // new status
|
||||||
};
|
};
|
||||||
|
|
||||||
#define REPO_ID(r) TD_VID((r)->pVnode)
|
#define REPO_ID(r) TD_VID((r)->pVnode)
|
||||||
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
|
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
|
||||||
#define REPO_KEEP_CFG(r) (&(r)->keepCfg)
|
#define REPO_KEEP_CFG(r) (&(r)->keepCfg)
|
||||||
#define REPO_LEVEL(r) ((r)->level)
|
#define REPO_LEVEL(r) ((r)->level)
|
||||||
#define REPO_FS(r) ((r)->fs)
|
#define REPO_FS(r) ((r)->fs)
|
||||||
#define REPO_META(r) ((r)->pVnode->pMeta)
|
#define REPO_META(r) ((r)->pVnode->pMeta)
|
||||||
#define REPO_TFS(r) ((r)->pVnode->pTfs)
|
#define REPO_TFS(r) ((r)->pVnode->pTfs)
|
||||||
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
|
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
|
||||||
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
|
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
|
||||||
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
|
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
|
||||||
#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv)
|
#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv)
|
||||||
#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv)
|
#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv)
|
||||||
|
|
||||||
int tsdbLockRepo(STsdb *pTsdb);
|
int tsdbLockRepo(STsdb *pTsdb);
|
||||||
int tsdbUnlockRepo(STsdb *pTsdb);
|
int tsdbUnlockRepo(STsdb *pTsdb);
|
||||||
|
|
|
@ -59,6 +59,7 @@ typedef struct SQWorker SQHandle;
|
||||||
#define VNODE_TQ_DIR "tq"
|
#define VNODE_TQ_DIR "tq"
|
||||||
#define VNODE_WAL_DIR "wal"
|
#define VNODE_WAL_DIR "wal"
|
||||||
#define VNODE_TSMA_DIR "tsma"
|
#define VNODE_TSMA_DIR "tsma"
|
||||||
|
#define VNODE_RSMA0_DIR "tsdb"
|
||||||
#define VNODE_RSMA1_DIR "rsma1"
|
#define VNODE_RSMA1_DIR "rsma1"
|
||||||
#define VNODE_RSMA2_DIR "rsma2"
|
#define VNODE_RSMA2_DIR "rsma2"
|
||||||
|
|
||||||
|
@ -154,6 +155,22 @@ struct SVnodeInfo {
|
||||||
SVState state;
|
SVState state;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_TYPE_TSDB = 0, // TSDB
|
||||||
|
TSDB_TYPE_TSMA = 1, // TSMA
|
||||||
|
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
|
||||||
|
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
|
||||||
|
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
|
||||||
|
} ETsdbType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t precision; // precision always be used with below keep cfgs
|
||||||
|
int32_t days;
|
||||||
|
int32_t keep0;
|
||||||
|
int32_t keep1;
|
||||||
|
int32_t keep2;
|
||||||
|
} STsdbKeepCfg;
|
||||||
|
|
||||||
struct SVnode {
|
struct SVnode {
|
||||||
char* path;
|
char* path;
|
||||||
SVnodeCfg config;
|
SVnodeCfg config;
|
||||||
|
@ -176,10 +193,11 @@ struct SVnode {
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
||||||
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
|
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
|
||||||
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
|
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
|
||||||
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
|
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
|
||||||
|
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
|
||||||
|
|
||||||
struct STbUidStore {
|
struct STbUidStore {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
|
|
|
@ -15,9 +15,30 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
#define TSDB_OPEN_RSMA_IMPL(v, l) \
|
||||||
|
do { \
|
||||||
|
SRetention *r = VND_RETENTIONS(v)[0]; \
|
||||||
|
if (RETENTION_VALID(r)) { \
|
||||||
|
return tsdbOpenImpl((v), type, &VND_RSMA##l(v), VNODE_RSMA##l##_DIR, TSDB_RETENTION_L##l); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
static int tsdbSetKeepCfg(STsdbCfg *pCfg, STsdbKeepCfg *pKeepCfg, int8_t type);
|
#define TSDB_SET_KEEP_CFG(l) \
|
||||||
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
|
do { \
|
||||||
|
SRetention *r = &pCfg->retentions[l]; \
|
||||||
|
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
|
||||||
|
pKeepCfg->keep0 = pKeepCfg->keep2; \
|
||||||
|
pKeepCfg->keep1 = pKeepCfg->keep2; \
|
||||||
|
pKeepCfg->days = tsdbEvalDays(r, pCfg->precision); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define RETENTION_DAYS_SPLIT_RATIO 10
|
||||||
|
#define RETENTION_DAYS_SPLIT_MIN 1
|
||||||
|
#define RETENTION_DAYS_SPLIT_MAX 30
|
||||||
|
|
||||||
|
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type);
|
||||||
|
static int32_t tsdbEvalDays(SRetention *r, int8_t precision);
|
||||||
|
static int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
|
||||||
|
|
||||||
int tsdbOpen(SVnode *pVnode, int8_t type) {
|
int tsdbOpen(SVnode *pVnode, int8_t type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -27,11 +48,14 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
break;
|
break;
|
||||||
case TSDB_TYPE_RSMA_L0:
|
case TSDB_TYPE_RSMA_L0:
|
||||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
|
TSDB_OPEN_RSMA_IMPL(pVnode, 0);
|
||||||
|
break;
|
||||||
case TSDB_TYPE_RSMA_L1:
|
case TSDB_TYPE_RSMA_L1:
|
||||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1);
|
TSDB_OPEN_RSMA_IMPL(pVnode, 1);
|
||||||
|
break;
|
||||||
case TSDB_TYPE_RSMA_L2:
|
case TSDB_TYPE_RSMA_L2:
|
||||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2);
|
TSDB_OPEN_RSMA_IMPL(pVnode, 2);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
break;
|
break;
|
||||||
|
@ -39,7 +63,28 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbSetKeepCfg(STsdbCfg *pCfg, STsdbKeepCfg *pKeepCfg, int8_t type) {
|
static int32_t tsdbEvalDays(SRetention *r, int8_t precision) {
|
||||||
|
int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
|
||||||
|
int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);
|
||||||
|
|
||||||
|
int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
|
||||||
|
if (days <= RETENTION_DAYS_SPLIT_MIN) {
|
||||||
|
days = RETENTION_DAYS_SPLIT_MIN;
|
||||||
|
if (days < freqDays) {
|
||||||
|
days = freqDays + 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (days > RETENTION_DAYS_SPLIT_MAX) {
|
||||||
|
days = RETENTION_DAYS_SPLIT_MAX;
|
||||||
|
}
|
||||||
|
if (days < freqDays) {
|
||||||
|
days = freqDays + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return days * 1440;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type) {
|
||||||
pKeepCfg->precision = pCfg->precision;
|
pKeepCfg->precision = pCfg->precision;
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case TSDB_TYPE_TSDB:
|
case TSDB_TYPE_TSDB:
|
||||||
|
@ -52,22 +97,13 @@ static int tsdbSetKeepCfg(STsdbCfg *pCfg, STsdbKeepCfg *pKeepCfg, int8_t type) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
break;
|
break;
|
||||||
case TSDB_TYPE_RSMA_L0:
|
case TSDB_TYPE_RSMA_L0:
|
||||||
pKeepCfg->days = pCfg->days;
|
TSDB_SET_KEEP_CFG(0);
|
||||||
pKeepCfg->keep0 = pCfg->keep0;
|
|
||||||
pKeepCfg->keep1 = pCfg->keep1;
|
|
||||||
pKeepCfg->keep2 = pCfg->keep2;
|
|
||||||
break;
|
break;
|
||||||
case TSDB_TYPE_RSMA_L1:
|
case TSDB_TYPE_RSMA_L1:
|
||||||
pKeepCfg->days = pCfg->days;
|
TSDB_SET_KEEP_CFG(1);
|
||||||
pKeepCfg->keep0 = pCfg->keep0;
|
|
||||||
pKeepCfg->keep1 = pCfg->keep1;
|
|
||||||
pKeepCfg->keep2 = pCfg->keep2;
|
|
||||||
break;
|
break;
|
||||||
case TSDB_TYPE_RSMA_L2:
|
case TSDB_TYPE_RSMA_L2:
|
||||||
pKeepCfg->days = pCfg->days;
|
TSDB_SET_KEEP_CFG(2);
|
||||||
pKeepCfg->keep0 = pCfg->keep0;
|
|
||||||
pKeepCfg->keep1 = pCfg->keep1;
|
|
||||||
pKeepCfg->keep2 = pCfg->keep2;
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -77,16 +113,16 @@ static int tsdbSetKeepCfg(STsdbCfg *pCfg, STsdbKeepCfg *pKeepCfg, int8_t type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief
|
* @brief
|
||||||
*
|
*
|
||||||
* @param pVnode
|
* @param pVnode
|
||||||
* @param type
|
* @param type
|
||||||
* @param ppTsdb
|
* @param ppTsdb
|
||||||
* @param dir
|
* @param dir
|
||||||
* @param level retention level
|
* @param level retention level
|
||||||
* @return int
|
* @return int
|
||||||
*/
|
*/
|
||||||
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
|
int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
|
||||||
STsdb *pTsdb = NULL;
|
STsdb *pTsdb = NULL;
|
||||||
int slen = 0;
|
int slen = 0;
|
||||||
|
|
||||||
|
@ -101,13 +137,12 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, i
|
||||||
}
|
}
|
||||||
|
|
||||||
pTsdb->path = (char *)&pTsdb[1];
|
pTsdb->path = (char *)&pTsdb[1];
|
||||||
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
|
||||||
dir);
|
|
||||||
pTsdb->pVnode = pVnode;
|
pTsdb->pVnode = pVnode;
|
||||||
pTsdb->level = level;
|
pTsdb->level = level;
|
||||||
pTsdb->repoLocked = false;
|
pTsdb->repoLocked = false;
|
||||||
taosThreadMutexInit(&pTsdb->mutex, NULL);
|
taosThreadMutexInit(&pTsdb->mutex, NULL);
|
||||||
tsdbSetKeepCfg(REPO_CFG(pTsdb), REPO_KEEP_CFG(pTsdb), type);
|
tsdbSetKeepCfg(REPO_KEEP_CFG(pTsdb), REPO_CFG(pTsdb), type);
|
||||||
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
|
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
|
||||||
|
|
||||||
// create dir (TODO: use tfsMkdir)
|
// create dir (TODO: use tfsMkdir)
|
||||||
|
|
Loading…
Reference in New Issue