diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index bd2e4213d5..4b8aa5dfaf 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -30,6 +30,7 @@ target_sources( # tsdb # "src/tsdb/tsdbBDBImpl.c" + "src/tsdb/tsdbTDBImpl.c" "src/tsdb/tsdbCommit.c" "src/tsdb/tsdbCompact.c" "src/tsdb/tsdbFile.c" @@ -40,7 +41,7 @@ target_sources( "src/tsdb/tsdbRead.c" "src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbScan.c" - # "src/tsdb/tsdbSma.c" + "src/tsdb/tsdbSma.c" "src/tsdb/tsdbWrite.c" # tq diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 113501a26b..aab835b958 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -357,7 +357,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); -STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid); +void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup); int metaGetTbNum(SMeta *pMeta); @@ -369,8 +369,8 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseSmaCurosr(SMSmaCursor *pSmaCur); -const char *metaSmaCursorNext(SMSmaCursor *pSmaCur); +void metaCloseSmaCursor(SMSmaCursor *pSmaCur); +int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur); // Options void metaOptionsInit(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h new file mode 100644 index 0000000000..ce03fa7c67 --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VNODE_TSDB_SMA_H_ +#define _TD_VNODE_TSDB_SMA_H_ + +#include "tdbInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SSmaKey SSmaKey; + +struct SSmaKey { + TSKEY skey; + int64_t groupId; +}; + + +typedef struct SDBFile SDBFile; + +struct SDBFile { + int32_t fid; + TDB *pDB; + char *path; +}; + +int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path); +int32_t tsdbCloseDBEnv(TENV *pEnv); +int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF); +int32_t tsdbCloseDBF(SDBFile *pDBF); +int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn); +void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen); + +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); +void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); +#if 0 +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); +#endif + +// internal func +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) { + int32_t len = 0; + len += taosEncodeFixedI64(pData, tsKey); + len += taosEncodeFixedI64(pData, groupId); + return len; +} + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VNODE_TSDB_SMA_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6e1f00f931..29a45723c4 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -220,6 +220,8 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); #include "tq.h" +#include "tsdbSma.h" + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index caa101b5d0..a8270f746d 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -667,7 +667,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { return pTbCfg; } -STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { +void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) { STSma * pCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; @@ -920,7 +920,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { return pCur; } -void metaCloseSmaCurosr(SMSmaCursor *pCur) { +void metaCloseSmaCursor(SMSmaCursor *pCur) { if (pCur) { if (pCur->pCur) { pCur->pCur->close(pCur->pCur); @@ -930,7 +930,8 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) { } } -const char *metaSmaCursorNext(SMSmaCursor *pCur) { +int64_t metaSmaCursorNext(SMSmaCursor *pCur) { +#if 0 DBT skey = {0}; DBT pkey = {0}; DBT pval = {0}; @@ -946,6 +947,8 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) { } else { return NULL; } +#endif + return 0; } STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { @@ -972,7 +975,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { ++pSW->number; STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma)); if (tptr == NULL) { - metaCloseSmaCurosr(pCur); + metaCloseSmaCursor(pCur); tdDestroyTSmaWrapper(pSW); taosMemoryFreeClear(pSW); return NULL; @@ -980,7 +983,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { pSW->tSma = tptr; pBuf = pval.data; if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { - metaCloseSmaCurosr(pCur); + metaCloseSmaCursor(pCur); tdDestroyTSmaWrapper(pSW); taosMemoryFreeClear(pSW); return NULL; @@ -990,8 +993,8 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { break; } - metaCloseSmaCurosr(pCur); - + metaCloseSmaCursor(pCur); + return pSW; } @@ -1004,7 +1007,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { int ret; // TODO: lock? - ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0); + ret = pDB->pSmaIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0); if (ret != 0) { return NULL; } diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c index e3acba7eb6..6f218ad72b 100644 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ b/source/dnode/vnode/src/meta/metaTDBImpl.c @@ -22,6 +22,8 @@ typedef struct SPoolMem { struct SPoolMem *next; } SPoolMem; +#define META_TDB_SMA_TEST + static SPoolMem *openPool(); static void clearPool(SPoolMem *pPool); static void closePool(SPoolMem *pPool); @@ -38,6 +40,10 @@ struct SMetaDB { TDB * pNtbIdx; TDB * pCtbIdx; SPoolMem *pPool; +#ifdef META_TDB_SMA_TEST + TDB *pSmaDB; + TDB *pSmaIdx; +#endif }; typedef struct __attribute__((__packed__)) { @@ -55,6 +61,11 @@ typedef struct { tb_uid_t uid; } SCtbIdxKey; +typedef struct { + tb_uid_t uid; + int64_t smaUid; +} SSmaIdxKey; + static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); @@ -115,6 +126,17 @@ static inline int metaCtbIdxCmpr(const void *arg1, int len1, const void *arg2, i return metaUidCmpr(&pKey1->uid, sizeof(tb_uid_t), &pKey2->uid, sizeof(tb_uid_t)); } +static inline int metaSmaIdxCmpr(const void *arg1, int len1, const void *arg2, int len2) { + int c; + SSmaIdxKey *pKey1 = (SSmaIdxKey *)arg1; + SSmaIdxKey *pKey2 = (SSmaIdxKey *)arg2; + + c = metaUidCmpr(arg1, sizeof(tb_uid_t), arg2, sizeof(tb_uid_t)); + if (c) return c; + + return metaUidCmpr(&pKey1->smaUid, sizeof(int64_t), &pKey2->smaUid, sizeof(int64_t)); +} + int metaOpenDB(SMeta *pMeta) { SMetaDB *pMetaDb; int ret; @@ -143,6 +165,15 @@ int metaOpenDB(SMeta *pMeta) { return -1; } +#ifdef META_TDB_SMA_TEST + ret = tdbDbOpen("sma.db", sizeof(int64_t), TDB_VARIANT_LEN, metaUidCmpr, pMetaDb->pEnv, &(pMetaDb->pSmaDB)); + if (ret < 0) { + // TODO + ASSERT(0); + return -1; + } +#endif + // open schema DB ret = tdbDbOpen("schema.db", sizeof(SSchemaDbKey), TDB_VARIANT_LEN, metaSchemaKeyCmpr, pMetaDb->pEnv, &(pMetaDb->pSchemaDB)); @@ -180,6 +211,15 @@ int metaOpenDB(SMeta *pMeta) { return -1; } +#ifdef META_TDB_SMA_TEST + ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, metaSmaIdxCmpr, pMetaDb->pEnv, &(pMetaDb->pSmaIdx)); + if (ret < 0) { + // TODO + ASSERT(0); + return -1; + } +#endif + pMetaDb->pPool = openPool(); tdbTxnOpen(&pMetaDb->txn, 0, poolMalloc, poolFree, pMetaDb->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); tdbBegin(pMetaDb->pEnv, NULL); @@ -193,10 +233,16 @@ void metaCloseDB(SMeta *pMeta) { tdbCommit(pMeta->pDB->pEnv, &pMeta->pDB->txn); tdbTxnClose(&pMeta->pDB->txn); clearPool(pMeta->pDB->pPool); +#ifdef META_TDB_SMA_TEST + tdbDbClose(pMeta->pDB->pSmaIdx); +#endif tdbDbClose(pMeta->pDB->pCtbIdx); tdbDbClose(pMeta->pDB->pNtbIdx); tdbDbClose(pMeta->pDB->pStbIdx); tdbDbClose(pMeta->pDB->pNameIdx); +#ifdef META_TDB_SMA_TEST + tdbDbClose(pMeta->pDB->pSmaDB); +#endif tdbDbClose(pMeta->pDB->pSchemaDB); tdbDbClose(pMeta->pDB->pTbDB); taosMemoryFree(pMeta->pDB); @@ -491,7 +537,6 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { taosMemoryFree(tbCfg.name); taosMemoryFree(tbCfg.stbCfg.pTagSchema); continue; - ; } else if (tbCfg.type == META_CHILD_TABLE) { kvRowFree(tbCfg.ctbCfg.pTag); } @@ -566,51 +611,326 @@ int metaGetTbNum(SMeta *pMeta) { return 0; } +struct SMSmaCursor { + TDBC *pCur; + tb_uid_t uid; + void *pKey; + void *pVal; + int kLen; + int vLen; +}; + STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { // TODO - ASSERT(0); - return NULL; + // ASSERT(0); + // return NULL; +#ifdef META_TDB_SMA_TEST + STSmaWrapper *pSW = NULL; + + pSW = taosMemoryCalloc(1, sizeof(*pSW)); + if (pSW == NULL) { + return NULL; + } + + SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); + if (pCur == NULL) { + taosMemoryFree(pSW); + return NULL; + } + + void *pBuf = NULL; + SSmaIdxKey *pSmaIdxKey = NULL; + + while (true) { + // TODO: lock during iterate? + if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) { + pSmaIdxKey = pCur->pKey; + ASSERT(pSmaIdxKey != NULL); + + void *pSmaVal = metaGetSmaInfoByIndex(pMeta, pSmaIdxKey->smaUid, false); + + if (pSmaVal == NULL) { + tsdbWarn("no tsma exists for indexUid: %" PRIi64, pSmaIdxKey->smaUid); + continue; + } + + + ++pSW->number; + STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma)); + if (tptr == NULL) { + TDB_FREE(pSmaVal); + metaCloseSmaCursor(pCur); + tdDestroyTSmaWrapper(pSW); + taosMemoryFreeClear(pSW); + return NULL; + } + pSW->tSma = tptr; + pBuf = pSmaVal; + if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { + TDB_FREE(pSmaVal); + metaCloseSmaCursor(pCur); + tdDestroyTSmaWrapper(pSW); + taosMemoryFreeClear(pSW); + return NULL; + } + TDB_FREE(pSmaVal); + continue; + } + break; + } + + metaCloseSmaCursor(pCur); + + return pSW; + +#endif } int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) { // TODO ASSERT(0); +#ifndef META_TDB_SMA_TEST + DBT key = {0}; + + key.data = (void *)indexName; + key.size = strlen(indexName); + + metaDBWLock(pMeta->pDB); + // TODO: No guarantee of consistence. + // Use transaction or DB->sync() for some guarantee. + pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0); + metaDBULock(pMeta->pDB); +#endif return 0; } int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { // TODO - ASSERT(0); + // ASSERT(0); + +#ifdef META_TDB_SMA_TEST + int32_t ret = 0; + SMetaDB *pMetaDb = pMeta->pDB; + void *pBuf = NULL, *qBuf = NULL; + void *key = {0}, *val = {0}; + + // save sma info + int32_t len = tEncodeTSma(NULL, pSmaCfg); + pBuf = taosMemoryCalloc(1, len); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + key = (void *)&pSmaCfg->indexUid; + qBuf = pBuf; + tEncodeTSma(&qBuf, pSmaCfg); + val = pBuf; + + int32_t kLen = sizeof(pSmaCfg->indexUid); + int32_t vLen = POINTER_DISTANCE(qBuf, pBuf); + + ret = tdbDbInsert(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn); + if (ret < 0) { + taosMemoryFreeClear(pBuf); + return -1; + } + + // add sma idx + SSmaIdxKey smaIdxKey; + smaIdxKey.uid = pSmaCfg->tableUid; + smaIdxKey.smaUid = pSmaCfg->indexUid; + key = &smaIdxKey; + kLen = sizeof(smaIdxKey); + val = NULL; + vLen = 0; + + ret = tdbDbInsert(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn); + if (ret < 0) { + taosMemoryFreeClear(pBuf); + return -1; + } + + // release + taosMemoryFreeClear(pBuf); + + if (pMeta->pDB->pPool->size > 0) { + metaCommit(pMeta); + } + +#endif return 0; } -STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { +void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) { // TODO - ASSERT(0); - return NULL; + // ASSERT(0); + // return NULL; +#ifdef META_TDB_SMA_TEST + SMetaDB *pDB = pMeta->pDB; + void *pKey = NULL; + void *pVal = NULL; + int kLen = 0; + int vLen = 0; + int ret = -1; + + // Set key + pKey = (void *)&indexUid; + kLen = sizeof(indexUid); + + // Query + ret = tdbDbGet(pDB->pSmaDB, pKey, kLen, &pVal, &vLen); + if (ret != 0 || !pVal) { + return NULL; + } + + if (!isDecode) { + // return raw value + return pVal; + } + + // Decode + STSma *pCfg = (STSma *)taosMemoryCalloc(1, sizeof(STSma)); + if (pCfg == NULL) { + taosMemoryFree(pVal); + return NULL; + } + + void *pBuf = pVal; + if (tDecodeTSma(pBuf, pCfg) == NULL) { + tdDestroyTSma(pCfg); + taosMemoryFree(pCfg); + TDB_FREE(pVal); + return NULL; + } + + TDB_FREE(pVal); + return pCfg; +#endif } -const char *metaSmaCursorNext(SMSmaCursor *pCur) { +/** + * @brief + * + * @param pMeta + * @param uid 0 means iterate all uids. + * @return SMSmaCursor* + */ +SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { // TODO - ASSERT(0); - return NULL; + // ASSERT(0); + // return NULL; +#ifdef META_TDB_SMA_TEST + SMSmaCursor *pCur = NULL; + SMetaDB *pDB = pMeta->pDB; + int ret; + + pCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pCur)); + if (pCur == NULL) { + return NULL; + } + + pCur->uid = uid; + ret = tdbDbcOpen(pDB->pSmaIdx, &(pCur->pCur)); + if ((ret != 0) || (pCur->pCur == NULL)) { + taosMemoryFree(pCur); + return NULL; + } + + if (uid != 0) { + // TODO: move to the specific uid + } + + return pCur; +#endif } -void metaCloseSmaCurosr(SMSmaCursor *pCur) { +/** + * @brief + * + * @param pCur + * @return int64_t smaIndexUid + */ +int64_t metaSmaCursorNext(SMSmaCursor *pCur) { // TODO - ASSERT(0); + // ASSERT(0); + // return NULL; +#ifdef META_TDB_SMA_TEST + int ret; + void *pBuf; + SSmaIdxKey *smaIdxKey; + + ret = tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, &pCur->pVal, &pCur->vLen); + if (ret < 0) { + return 0; + } + smaIdxKey = pCur->pKey; + return smaIdxKey->smaUid; +#endif +} + +void metaCloseSmaCursor(SMSmaCursor *pCur) { + // TODO + // ASSERT(0); +#ifdef META_TDB_SMA_TEST + if (pCur) { + if (pCur->pCur) { + tdbDbcClose(pCur->pCur); + } + + taosMemoryFree(pCur); + } +#endif } SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { // TODO // ASSERT(0); // comment this line to pass CI - return NULL; -} + // return NULL: +#ifdef META_TDB_SMA_TEST + SArray *pUids = NULL; + SMetaDB *pDB = pMeta->pDB; + void *pKey; -SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { - // TODO - ASSERT(0); - return NULL; + // TODO: lock? + SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0); + if (pCur == NULL) { + return NULL; + } + // TODO: lock? + + SSmaIdxKey *pSmaIdxKey = NULL; + tb_uid_t uid = 0; + while (true) { + // TODO: lock during iterate? + if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) { + ASSERT(pSmaIdxKey != NULL); + pSmaIdxKey = pCur->pKey; + + if (pSmaIdxKey->uid == 0 || pSmaIdxKey->uid == uid) { + continue; + } + uid = pSmaIdxKey->uid; + + if (!pUids) { + pUids = taosArrayInit(16, sizeof(tb_uid_t)); + if (!pUids) { + metaCloseSmaCursor(pCur); + return NULL; + } + } + + taosArrayPush(pUids, &uid); + + continue; + } + break; + } + + metaCloseSmaCursor(pCur); + + return pUids; +#endif } static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0d4acd4594..794427475e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -82,9 +82,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi memcpy(data, msg, msgLen); if (msgType == TDMT_VND_SUBMIT) { - // if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) { - // return -1; - // } + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) { + return -1; + } } SRpcMsg req = { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 07b7d62165..bfdad836f1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -38,6 +38,29 @@ typedef enum { SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma } ESmaStorageLevel; +typedef struct SPoolMem { + int64_t size; + struct SPoolMem *prev; + struct SPoolMem *next; +} SPoolMem; + +struct SSmaEnv { + TdThreadRwlock lock; + TXN txn; + SPoolMem *pPool; + SDiskID did; + TENV *dbEnv; // TODO: If it's better to put it in smaIndex level? + char *path; // relative path + SSmaStat *pStat; +}; + +#define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_DID(env) ((env)->did) +#define SMA_ENV_ENV(env) ((env)->dbEnv) +#define SMA_ENV_PATH(env) ((env)->path) +#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) + typedef struct { STsdb *pTsdb; SDBFile dFile; @@ -104,7 +127,8 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid); -static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen, + TXN *txn); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted); static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid); @@ -117,9 +141,121 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); +// Pool Memory +static SPoolMem *openPool(); +static void clearPool(SPoolMem *pPool); +static void closePool(SPoolMem *pPool); +static void *poolMalloc(void *arg, size_t size); +static void poolFree(void *arg, void *ptr); + +static int tsdbSmaBeginCommit(SSmaEnv *pEnv); +static int tsdbSmaEndCommit(SSmaEnv *pEnv); + // implementation -static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) { return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n); } -static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) { return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n); } +static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) { + return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n); +} +static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) { + return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n); +} + +static FORCE_INLINE int32_t tsdbRLockSma(SSmaEnv *pEnv) { + int code = taosThreadRwlockRdlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int32_t tsdbWLockSma(SSmaEnv *pEnv) { + int code = taosThreadRwlockWrlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int32_t tsdbUnLockSma(SSmaEnv *pEnv) { + int code = taosThreadRwlockUnlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static SPoolMem *openPool() { + SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool)); + + pPool->prev = pPool->next = pPool; + pPool->size = 0; + + return pPool; +} + +static void clearPool(SPoolMem *pPool) { + if (!pPool) return; + + SPoolMem *pMem; + + do { + pMem = pPool->next; + + if (pMem == pPool) break; + + pMem->next->prev = pMem->prev; + pMem->prev->next = pMem->next; + pPool->size -= pMem->size; + + tdbOsFree(pMem); + } while (1); + + assert(pPool->size == 0); +} + +static void closePool(SPoolMem *pPool) { + if (pPool) { + clearPool(pPool); + tdbOsFree(pPool); + } +} + +static void *poolMalloc(void *arg, size_t size) { + void *ptr = NULL; + SPoolMem *pPool = (SPoolMem *)arg; + SPoolMem *pMem; + + pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size); + if (pMem == NULL) { + assert(0); + } + + pMem->size = sizeof(*pMem) + size; + pMem->next = pPool->next; + pMem->prev = pPool; + + pPool->next->prev = pMem; + pPool->next = pMem; + pPool->size += pMem->size; + + ptr = (void *)(&pMem[1]); + return ptr; +} + +static void poolFree(void *arg, void *ptr) { + SPoolMem *pPool = (SPoolMem *)arg; + SPoolMem *pMem; + + pMem = &(((SPoolMem *)ptr)[-1]); + + pMem->next->prev = pMem->prev; + pMem->prev->next = pMem->next; + pPool->size -= pMem->size; + + tdbOsFree(pMem); +} int32_t tsdbInitSma(STsdb *pTsdb) { // tSma @@ -213,7 +349,12 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) char aname[TSDB_FILENAME_LEN] = {0}; tfsAbsoluteName(pTsdb->pTfs, did, path, aname); - if (tsdbOpenBDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) { + if (tsdbOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + + if ((pEnv->pPool = openPool()) == NULL) { tsdbFreeSmaEnv(pEnv); return NULL; } @@ -248,7 +389,8 @@ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) { taosMemoryFreeClear(pSmaEnv->pStat); taosMemoryFreeClear(pSmaEnv->path); taosThreadRwlockDestroy(&(pSmaEnv->lock)); - tsdbCloseBDBEnv(pSmaEnv->dbEnv); + tsdbCloseDBEnv(pSmaEnv->dbEnv); + closePool(pSmaEnv->pPool); } } @@ -414,7 +556,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t } // cache smaMeta - STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid); + STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid, true); if (pSma == NULL) { terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); @@ -498,10 +640,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { return TSDB_CODE_FAILED; } -#ifndef TSDB_SMA_TEST - TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; -#endif - // Firstly, assume that tSma can only be created on super table/normal table. // getActiveTimeWindow @@ -563,6 +701,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey); + + // TODO: release only when suid changes. + tdDestroyTSmaWrapper(pSW); + taosMemoryFreeClear(pSW); } } @@ -676,10 +818,12 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { * @param dataLen * @return int32_t */ -static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen, + TXN *txn) { SDBFile *pDBFile = &pSmaH->dFile; + // TODO: insert sma data blocks into B+Tree(TDB) - if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) { + if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) { tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); return TSDB_CODE_FAILED; @@ -826,6 +970,30 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe return daysPerFile; } +static int tsdbSmaBeginCommit(SSmaEnv *pEnv) { + TXN *pTxn = &pEnv->txn; + // start a new txn + tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (tdbBegin(pEnv->dbEnv, pTxn) != 0) { + tsdbWarn("tsdbSma tdb restart txn fail"); + return -1; + } + return 0; +} + +static int tsdbSmaEndCommit(SSmaEnv *pEnv) { + TXN *pTxn = &pEnv->txn; + + // Commit current txn + if (tdbCommit(pEnv->dbEnv, pTxn) != 0) { + tsdbWarn("tsdbSma tdb commit fail"); + return -1; + } + tdbTxnClose(pTxn); + clearPool(pEnv->pPool); + return 0; +} + /** * @brief Insert/Update Time-range-wise SMA data. * - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g. @@ -911,14 +1079,10 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char int64_t groupId = pDataBlock->info.groupId; for (int32_t j = 0; j < rows; ++j) { printf("|"); - TSKEY skey = 1649295200000; // TSKEY_INITIAL_VAL; // the start key of TS window by interval + TSKEY skey = TSKEY_INITIAL_VAL; // the start key of TS window by interval void *pSmaKey = &smaKey; bool isStartKey = false; - { - // just for debugging - isStartKey = true; - tsdbEncodeTSmaKey(groupId, skey, &pSmaKey); - } + int32_t tlen = 0; // reset the len pDataBuf = &dataBuf; // reset the buf for (int32_t k = 0; k < colNum; ++k) { @@ -929,7 +1093,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char if (!isStartKey) { isStartKey = true; skey = *(TSKEY *)var; - printf("==> skey = %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId); + printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId); tsdbEncodeTSmaKey(groupId, skey, &pSmaKey); } else { printf(" %" PRIi64 " |", *(int64_t *)var); @@ -1010,6 +1174,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char // TODO: tsdbStartTSmaCommit(); if (fid != tSmaH.dFile.fid) { if (tSmaH.dFile.fid != TSDB_IVLD_FID) { + tsdbSmaEndCommit(pEnv); tsdbCloseDBF(&tSmaH.dFile); } tsdbSetTSmaDataFile(&tSmaH, indexUid, fid); @@ -1020,12 +1185,14 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } + tsdbSmaBeginCommit(pEnv); } - if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen) != 0) { + if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) { tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno)); + tsdbSmaEndCommit(pEnv); tsdbDestroyTSmaWriteH(&tSmaH); tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; @@ -1044,9 +1211,10 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char printf("\n"); } } - + tsdbSmaEndCommit(pEnv); // TODO: not commit for every insert tsdbDestroyTSmaWriteH(&tSmaH); tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_SUCCESS; } @@ -1370,8 +1538,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb), tReadH.dFile.path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN); - void *result = NULL; - uint32_t valueSize = 0; + void *result = NULL; + int32_t valueSize = 0; if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) { tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s", REPO_ID(pTsdb), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno)); @@ -1422,7 +1590,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { return -1; } tsdbDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName, - vCreateSmaReq.tSma.indexUid); + vCreateSmaReq.tSma.indexUid); // record current timezone of server side vCreateSmaReq.tSma.timezoneInt = tsTimezone; @@ -1464,7 +1632,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { return -1; } - tsdbTSmaSub(pTsdb, 1); + tsdbTSmaSub(pTsdb, 1); // TODO: return directly or go on follow steps? return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c new file mode 100644 index 0000000000..519ecd3fa0 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define ALLOW_FORBID_FUNC + +#include "vnodeInt.h" + +int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) { + int ret = 0; + + if (path == NULL) return -1; + + ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param + + if (ret != 0) { + tsdbError("Failed to create tsdb db env, ret = %d", ret); + return -1; + } + + return 0; +} + +int32_t tsdbCloseDBEnv(TENV *pEnv) { return tdbEnvClose(pEnv); } + +static inline int tsdbSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) { + const SSmaKey *pKey1 = (const SSmaKey *)arg1; + const SSmaKey *pKey2 = (const SSmaKey *)arg2; + + ASSERT(len1 == len2 && len1 == sizeof(SSmaKey)); + + if (pKey1->skey < pKey2->skey) { + return -1; + } else if (pKey1->skey > pKey2->skey) { + return 1; + } + if (pKey1->groupId < pKey2->groupId) { + return -1; + } else if (pKey1->groupId > pKey2->groupId) { + return 1; + } + + return 0; +} + +static int32_t tsdbOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) { + int ret; + FKeyComparator compFunc; + + // Create a database + compFunc = tsdbSmaKeyCmpr; + ret = tdbDbOpen(pFName, TDB_VARIANT_LEN, TDB_VARIANT_LEN, compFunc, pEnv, ppDB); + + return 0; +} + +static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); } + +int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF) { + // TEnv is shared by a group of SDBFile + if (!pEnv || !pDBF) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + // Open DBF + if (tsdbOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + tsdbCloseDBDb(pDBF->pDB); + return -1; + } + + return 0; +} + +int32_t tsdbCloseDBF(SDBFile *pDBF) { + int32_t ret = 0; + if (pDBF->pDB) { + ret = tsdbCloseDBDb(pDBF->pDB); + pDBF->pDB = NULL; + } + taosMemoryFreeClear(pDBF->path); + return ret; +} + +int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) { + int32_t ret; + + ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); + if (ret < 0) { + tsdbError("Failed to create insert sma data into db, ret = %d", ret); + return -1; + } + + return 0; +} + +void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen) { + void *result; + void *pVal; + int ret; + + ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); + + if (ret < 0) { + tsdbError("Failed to get sma data from db, ret = %d", ret); + return NULL; + } + + ASSERT(*valLen >= 0); + + result = taosMemoryMalloc(*valLen); + + if (result == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + // TODO: lock? + // TODO: Would the key/value be destoryed during return the data? + // TODO: How about the key is updated while value length is changed? The original value buffer would be freed + // automatically? + memcpy(result, pVal, *valLen); + + return result; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index bf2260c51c..0449319dc2 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -19,7 +19,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO // blockDebugShowData(data); - // tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); + tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); } void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { @@ -232,9 +232,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { tdDestroyTSma(&vCreateSmaReq.tSma); // TODO: return directly or go on follow steps? #endif - // if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // // TODO - // } + if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // TODO + } // } break; // case TDMT_VND_CANCEL_SMA: { // timeRangeSMA // } break; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 6a4adfe4f8..da874716f2 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -210,7 +210,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { // get value by indexName STSma *qSmaCfg = NULL; - qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1); + qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1, true); assert(qSmaCfg != NULL); printf("name1 = %s\n", qSmaCfg->indexName); printf("timezone1 = %" PRIi8 "\n", qSmaCfg->timezoneInt); @@ -221,7 +221,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { tdDestroyTSma(qSmaCfg); taosMemoryFreeClear(qSmaCfg); - qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2); + qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2, true); assert(qSmaCfg != NULL); printf("name2 = %s\n", qSmaCfg->indexName); printf("timezone2 = %" PRIi8 "\n", qSmaCfg->timezoneInt); @@ -233,11 +233,12 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { taosMemoryFreeClear(qSmaCfg); // get index name by table uid +#if 0 SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid); assert(pSmaCur != NULL); uint32_t indexCnt = 0; while (1) { - const char *indexName = metaSmaCursorNext(pSmaCur); + const char *indexName = (const char *)metaSmaCursorNext(pSmaCur); if (indexName == NULL) { break; } @@ -245,8 +246,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ++indexCnt; } EXPECT_EQ(indexCnt, nCntTSma); - metaCloseSmaCurosr(pSmaCur); - + metaCloseSmaCursor(pSmaCur); +#endif // get wrapper by table uid STSmaWrapper *pSW = metaGetSmaInfoByTable(pMeta, tbUid); assert(pSW != NULL); diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 9df749bec7..8a95635ddc 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1196,21 +1196,22 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { return -1; } - // TODO: vLen may be zero - pVal = TDB_REALLOC(*ppVal, cd.vLen); - if (pVal == NULL) { - TDB_FREE(pKey); - return -1; - } - *ppKey = pKey; - *ppVal = pVal; - *kLen = cd.kLen; - *vLen = cd.vLen; - memcpy(pKey, cd.pKey, cd.kLen); - memcpy(pVal, cd.pVal, cd.vLen); + + if (ppVal) { + // TODO: vLen may be zero + pVal = TDB_REALLOC(*ppVal, cd.vLen); + if (pVal == NULL) { + TDB_FREE(pKey); + return -1; + } + + *ppVal = pVal; + *vLen = cd.vLen; + memcpy(pVal, cd.pVal, cd.vLen); + } ret = tdbBtcMoveToNext(pBtc); if (ret < 0) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index c5c041be91..c14d4fe27b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -80,4 +80,7 @@ ./test.sh -f tsim/qnode/basic1.sim -m ./test.sh -f tsim/mnode/basic1.sim -m +# --- sma +./test.sh -f tsim/sma/tsmaCreateInsertData.sim + #======================b1-end=============== diff --git a/tests/script/tsim/sma/tsmaCreateInsertData.sim b/tests/script/tsim/sma/tsmaCreateInsertData.sim new file mode 100644 index 0000000000..9fc3700da1 --- /dev/null +++ b/tests/script/tsim/sma/tsmaCreateInsertData.sim @@ -0,0 +1,48 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database d1 +sql show databases +if $rows != 2 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use d1 + +print =============== create super table, include column type for count/sum/min/max/first +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct1 using stb tags(1000) + +sql show tables +if $rows != 1 then + return -1 +endi + +print =============== insert data, mode1: one row one table in sql +sql insert into ct1 values(now+0s, 10, 2.0, 3.0) +sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) + + +print =============== create sma index from super table +sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(5m,10s) sliding(2m) +print $data00 $data01 $data02 $data03 + +print =============== trigger stream to execute sma aggr task and insert sma data into sma store +sql insert into ct1 values(now+5s, 20, 20.0, 30.0) +#=================================================================== + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT