diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 1fbbb3f159..85d11be817 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -71,13 +71,19 @@ typedef enum { TSDB_SMA_STAT_DROPPED = 2, // sma dropped } ETsdbSmaStat; // bit operation - typedef enum { TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA } ETsdbSmaType; +typedef enum { + TSDB_RSMA_RETENTION_0 = 0, + TSDB_RSMA_RETENTION_1 = 1, + TSDB_RSMA_RETENTION_2 = 2, + TSDB_RSMA_RETENTION_MAX = 3 +} ERSmaRetention; + extern char *qtypeStr[]; #define TSDB_PORT_HTTP 11 diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index f01b6b6425..b63a7f5b32 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -112,7 +112,9 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.keep2 = 3650; pCfg->tsdbCfg.keep0 = 3650; pCfg->tsdbCfg.keep1 = 3650; - pCfg->tsdbCfg.retentions = pCreate->pRetensions; + for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { + memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); + } pCfg->walCfg.vgId = pCreate->vgId; pCfg->hashBegin = pCreate->hashBegin; pCfg->hashEnd = pCreate->hashEnd; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5adbc5d6c1..daf8c4f339 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -37,7 +37,7 @@ #ifdef __cplusplus extern "C" { #endif - +#define TSDB_VNODE_SMA_DEBUG // TODO: evaluate to remove the macro and the relative codes // vnode typedef struct SVnode SVnode; typedef struct STsdbCfg STsdbCfg; // todo: remove @@ -143,9 +143,19 @@ struct STsdbCfg { int32_t keep0; int32_t keep1; int32_t keep2; - SArray *retentions; + // TODO: save to tsdb cfg file + int8_t type; // ETsdbType + SRetention retentions[TSDB_RSMA_RETENTION_MAX]; }; +typedef enum { + TSDB_TYPE_TSDB = 0, // TSDB + TSDB_TYPE_TSMA = 1, // TSMA + TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0 + TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1 + TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 +} ETsdbType; + struct SVnodeCfg { int32_t vgId; char dbname[TSDB_DB_NAME_LEN]; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b58248d091..5930dcaf96 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -54,10 +54,13 @@ typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; -#define VNODE_META_DIR "meta" -#define VNODE_TSDB_DIR "tsdb" -#define VNODE_TQ_DIR "tq" -#define VNODE_WAL_DIR "wal" +#define VNODE_META_DIR "meta" +#define VNODE_TSDB_DIR "tsdb" +#define VNODE_TQ_DIR "tq" +#define VNODE_WAL_DIR "wal" +#define VNODE_TSMA_DIR "tsma" +#define VNODE_RSMA1_DIR "rsma1" +#define VNODE_RSMA2_DIR "rsma2" // vnd.h void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); @@ -88,7 +91,7 @@ int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb); +int tsdbOpen(SVnode* pVnode, int8_t type); int tsdbClose(STsdb* pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); @@ -161,6 +164,8 @@ struct SVnode { SVBufPool* onRecycle; SMeta* pMeta; STsdb* pTsdb; + STsdb* pRSma1; + STsdb* pRSma2; SWal* pWal; STQ* pTq; SSink* pSink; @@ -169,6 +174,11 @@ struct SVnode { SQHandle* pQuery; }; +#define VND_TSDB(vnd) ((vnd)->pTsdb) +#define VND_RSMA0(vnd) ((vnd)->pTsdb) +#define VND_RSMA1(vnd) ((vnd)->pRSma1) +#define VND_RSMA2(vnd) ((vnd)->pRSma2) + struct STbUidStore { tb_uid_t suid; SArray* tbUids; @@ -177,7 +187,11 @@ struct STbUidStore { #define TD_VID(PVNODE) (PVNODE)->config.vgId -// typedef struct STbDdlH STbDdlH; + +static FORCE_INLINE bool tsdbIsRollup(SVnode* pVnode) { + SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]); + return (pRetention->freq > 0 && pRetention->keep > 0); +} // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index e3580958f6..0827ba6eab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -15,12 +15,34 @@ #include "tsdb.h" -int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { +static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir); + +int tsdbOpen(SVnode *pVnode, int8_t type) { + switch (type) { + case TSDB_TYPE_TSDB: + return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR); + case TSDB_TYPE_TSMA: + ASSERT(0); + break; + case TSDB_TYPE_RSMA_L0: + return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR); + case TSDB_TYPE_RSMA_L1: + return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR); + case TSDB_TYPE_RSMA_L2: + return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR); + default: + ASSERT(0); + break; + } + return 0; +} + +int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) { STsdb *pTsdb = NULL; int slen = 0; *ppTsdb = NULL; - slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_TSDB_DIR) + 3; + slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3; // create handle pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen); @@ -31,7 +53,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { pTsdb->path = (char *)&pTsdb[1]; sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, - VNODE_TSDB_DIR); + dir); pTsdb->pVnode = pVnode; pTsdb->repoLocked = false; taosThreadMutexInit(&pTsdb->mutex, NULL); @@ -45,7 +67,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { goto _err; } - tsdbDebug("vgId:%d tsdb is opened", TD_VID(pVnode)); + tsdbDebug("vgId: %d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path); *ppTsdb = pTsdb; return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 08f603c960..d131cef8a2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -103,10 +103,9 @@ typedef struct { STSma *pSma; // cache schema } SSmaStatItem; -#define RSMA_MAX_LEVEL 2 #define RSMA_TASK_INFO_HASH_SLOT 8 struct SRSmaInfo { - void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t + void *taskInfo[TSDB_RSMA_RETENTION_2]; // qTaskInfo_t }; struct SSmaStat { @@ -128,7 +127,7 @@ static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) { } static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) { - for (int32_t i = 0; i < RSMA_MAX_LEVEL; ++i) { + for (int32_t i = 0; i < TSDB_RSMA_RETENTION_MAX; ++i) { if (pInfo->taskInfo[i]) { tsdbFreeTaskHandle(pInfo->taskInfo[i]); } diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 1f870884b0..8130a00e04 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -66,6 +66,28 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; +#ifdef TSDB_VNODE_SMA_DEBUG + if (pCfg->tsdbCfg.retentions[0].freq > 0) { + int32_t nRetention = 1; + if (pCfg->tsdbCfg.retentions[1].freq > 0) { + ++nRetention; + if (pCfg->tsdbCfg.retentions[2].freq > 0) { + ++nRetention; + } + } + SJson *pNodeRetentions = tjsonCreateArray(); + tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); + for (int32_t i = 0; i < nRetention; ++i) { + SJson *pNodeRetention = tjsonCreateObject(); + const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; + tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); + tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); + tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep); + tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit); + tjsonAddItemToArray(pNodeRetentions, pNodeRetention); + } + } +#endif if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; @@ -113,6 +135,20 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; +#ifdef TSDB_VNODE_SMA_DEBUG + SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); + int nRetention = tjsonGetArraySize(pNodeRetentions); + ASSERT(nRetention <= TSDB_RSMA_RETENTION_MAX); + + for (int32_t i = 0; i < nRetention; ++i) { + SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); + ASSERT(pNodeRetention != NULL); + tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq); + tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit); + tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep); + tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit); + } +#endif if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 63f55fddfd..3737bcfe3b 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -96,9 +96,26 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open tsdb - if (tsdbOpen(pVnode, &pVnode->pTsdb) < 0) { - vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - goto _err; + if (tsdbIsRollup(pVnode)) { + if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) { + vError("vgId: %d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) { + vError("vgId: %d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) { + vError("vgId: %d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + } else { + if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) { + vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } } // open wal @@ -143,6 +160,8 @@ _err: if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb); if (pVnode->pMeta) metaClose(pVnode->pMeta); + tsdbClose(VND_RSMA1(pVnode)); + tsdbClose(VND_RSMA2(pVnode)); tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); return NULL; @@ -155,7 +174,9 @@ void vnodeClose(SVnode *pVnode) { vnodeQueryClose(pVnode); walClose(pVnode->pWal); tqClose(pVnode->pTq); - tsdbClose(pVnode->pTsdb); + tsdbClose(VND_TSDB(pVnode)); + tsdbClose(VND_RSMA1(pVnode)); + tsdbClose(VND_RSMA2(pVnode)); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); // destroy handle