From abbd818fe6e639cd8e7cc171446b6de6699095f2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 13 Mar 2022 23:32:33 +0800 Subject: [PATCH] import SmaEnv/SmaStat to facilitate save data --- include/util/taoserror.h | 1 + source/dnode/vnode/src/inc/tsdbDBDef.h | 8 ++ source/dnode/vnode/src/inc/tsdbDef.h | 3 +- source/dnode/vnode/src/inc/tsdbSma.h | 2 +- source/dnode/vnode/src/meta/metaBDBImpl.c | 37 +++--- source/dnode/vnode/src/tsdb/tsdbBDBImpl.c | 50 ++++++-- source/dnode/vnode/src/tsdb/tsdbSma.c | 149 +++++++++++++++------- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/test/tsdbSmaTest.cpp | 21 +-- source/util/src/terror.c | 1 + 10 files changed, 186 insertions(+), 88 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3a1343b384..1c61d738b5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -353,6 +353,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) +#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617) // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) diff --git a/source/dnode/vnode/src/inc/tsdbDBDef.h b/source/dnode/vnode/src/inc/tsdbDBDef.h index cc40cec7d1..7740dd0fab 100644 --- a/source/dnode/vnode/src/inc/tsdbDBDef.h +++ b/source/dnode/vnode/src/inc/tsdbDBDef.h @@ -25,8 +25,16 @@ extern "C" { typedef struct SDBFile SDBFile; typedef DB_ENV* TDBEnv; +struct SDBFile { + DB* pDB; + char* path; +}; + int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF); void tsdbCloseDBF(SDBFile* pDBF); +int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path); +void tsdbCloseBDBEnv(DB_ENV* pEnv); +int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 0956e418bb..6f91b4d3ab 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -27,6 +27,7 @@ #include "ttime.h" #include "tsdb.h" +#include "tsdbDBDef.h" #include "tsdbCommit.h" #include "tsdbFS.h" #include "tsdbFile.h" @@ -37,6 +38,7 @@ #include "tsdbReadImpl.h" #include "tsdbSma.h" + #ifdef __cplusplus extern "C" { #endif @@ -56,7 +58,6 @@ struct STsdb { STfs * pTfs; SSmaEnv * pTSmaEnv; SSmaEnv * pRSmaEnv; - // SSmaStat * pSmaStat; }; #define REPO_ID(r) ((r)->vgId) diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 87e20b8aa9..c54fdf85a3 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -19,9 +19,9 @@ typedef struct SSmaStat SSmaStat; typedef struct SSmaEnv SSmaEnv; - struct SSmaEnv { pthread_rwlock_t lock; + TDBEnv dbEnv; char * path; SSmaStat * pStat; }; diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index d9af526c2a..a729288e34 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -231,30 +231,31 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { void *pBuf = NULL, *qBuf = NULL; DBT key1 = {0}, value1 = {0}; - { - // save sma info - int32_t len = tEncodeTSma(NULL, pSmaCfg); - pBuf = calloc(len, 1); - if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - key1.data = (void *)&pSmaCfg->indexUid; - key1.size = sizeof(pSmaCfg->indexUid); - - qBuf = pBuf; - tEncodeTSma(&qBuf, pSmaCfg); - - value1.data = pBuf; - value1.size = POINTER_DISTANCE(qBuf, pBuf); - value1.app_data = pSmaCfg; + // save sma info + int32_t len = tEncodeTSma(NULL, pSmaCfg); + pBuf = calloc(len, 1); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } + key1.data = (void *)&pSmaCfg->indexUid; + key1.size = sizeof(pSmaCfg->indexUid); + + qBuf = pBuf; + tEncodeTSma(&qBuf, pSmaCfg); + + value1.data = pBuf; + value1.size = POINTER_DISTANCE(qBuf, pBuf); + value1.app_data = pSmaCfg; + metaDBWLock(pMeta->pDB); pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0); metaDBULock(pMeta->pDB); + // release + tfree(pBuf); + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c index 7ea9f134cf..4fc415cfd1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -16,30 +16,29 @@ #define ALLOW_FORBID_FUNC #include "db.h" +#include "taoserror.h" #include "tcoding.h" #include "thash.h" #include "tsdbDBDef.h" +#include "tsdbLog.h" #define IMPL_WITH_LOCK 1 -struct SDBFile { - DB * pDB; - char *path; -}; +static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); +static void tsdbCloseBDBDb(DB *pDB); -static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path); -static void tsdbCloseBDBEnv(DB_ENV *pEnv); -static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); -static void tsdbCloseBDBDb(DB *pDB); - -#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) +#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code)) int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { // TDBEnv is shared by a group of SDBFile - ASSERT(pEnv != NULL); + if (!pEnv) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } // Open DBF if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) { + terrno = TSDB_CODE_TDB_INIT_FAILED; tsdbCloseBDBDb(pDBF->pDB); return -1; } @@ -61,7 +60,7 @@ void tsdbCloseDBF(SDBFile *pDBF) { } } -static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { +int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { int ret = 0; DB_ENV *pEnv = NULL; @@ -75,7 +74,8 @@ static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0); if (ret != 0) { - BDB_PERR("Failed to open tsdb env", ret); + // BDB_PERR("Failed to open tsdb env", ret); + tsdbWarn("Failed to open tsdb env for path %s since %d", path ? path : "NULL", ret); return -1; } @@ -84,7 +84,7 @@ static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { return 0; } -static void tsdbCloseBDBEnv(DB_ENV *pEnv) { +void tsdbCloseBDBEnv(DB_ENV *pEnv) { if (pEnv) { pEnv->close(pEnv, 0); } @@ -123,4 +123,26 @@ static void tsdbCloseBDBDb(DB *pDB) { if (pDB) { pDB->close(pDB, 0); } +} + +int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, uint32_t dataSize) { + int ret; + DBT key1 = {0}, value1 = {0}; + + key1.data = key; + key1.size = keySize; + + value1.data = data; + value1.size = dataSize; + + // TODO: lock + ret = pDBF->pDB->put(pDBF->pDB, NULL, &key1, &value1, 0); + if (ret) { + BDB_PERR("Failed to put data to DBF", ret); + // TODO: unlock + return -1; + } + // TODO: unlock + + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index ba8cde2121..f96ea644b4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -24,16 +24,16 @@ #define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test -#define SMA_TEST_INDEX_UID 123456 // TODO: just for test +#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test typedef enum { SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat } ESmaStorageLevel; typedef struct { - STsdb * pTsdb; - char * pDFile; // TODO: use the real DFile type, not char* - int32_t interval; // interval with the precision of DB + STsdb * pTsdb; + SDBFile *pDFile; + int32_t interval; // interval with the precision of DB // TODO } STSmaWriteH; @@ -74,10 +74,11 @@ static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); -static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel); @@ -114,6 +115,11 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { return NULL; } + if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + return pEnv; } @@ -158,6 +164,7 @@ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) { tfree(pSmaEnv->pStat); tfree(pSmaEnv->path); pthread_rwlock_destroy(&(pSmaEnv->lock)); + tsdbCloseBDBEnv(pSmaEnv->dbEnv); } } @@ -213,9 +220,9 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { /** * @brief Release resources allocated for its member fields, not including itself. - * - * @param pSmaStat - * @return int32_t + * + * @param pSmaStat + * @return int32_t */ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { if (pSmaStat) { @@ -232,11 +239,11 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { /** * @brief Update expired window according to msg from stream computing module. - * - * @param pTsdb + * + * @param pTsdb * @param smaType ETsdbSmaType - * @param msg - * @return int32_t + * @param msg + * @return int32_t */ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { STsdbCfg *pCfg = REPO_CFG(pTsdb); @@ -247,21 +254,21 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { return TSDB_CODE_FAILED; } - if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - pEnv = pTsdb->pTSmaEnv; - } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { - pEnv = pTsdb->pRSmaEnv; - } else { - ASSERT(0); - } - char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/"; if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } + if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + pTsdb->pTSmaEnv = pEnv; + } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + pTsdb->pRSmaEnv = pEnv; + } else { + ASSERT(0); + } + // TODO: decode the msg => start - int64_t indexUid = SMA_TEST_INDEX_UID; + int64_t indexUid = SMA_TEST_INDEX_UID; // const char * indexName = SMA_TEST_INDEX_NAME; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; @@ -285,8 +292,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { // cache smaMeta STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid); if (pSma == NULL) { + terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); free(pItem); + tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, + tstrerror(terrno)); return TSDB_CODE_FAILED; } pItem->pSma = pSma; @@ -299,6 +309,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { return TSDB_CODE_FAILED; } } +#if 0 + SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); + int size1 = taosHashGetSize(pItem1->expiredWindows); + tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1); +#endif int8_t state = TSDB_SMA_STAT_EXPIRED; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { @@ -316,6 +331,12 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { } } +#if 0 + SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); + int size2 = taosHashGetSize(pItem1->expiredWindows); + tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2); +#endif + return TSDB_CODE_SUCCESS; } @@ -326,7 +347,7 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s if (pStat && pStat->smaStatItems) { pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); } - +#if 0 if (pItem != NULL) { // TODO: reset time window for the sma data blocks if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { @@ -336,6 +357,7 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s } else { // error handling } +#endif return TSDB_CODE_SUCCESS; } @@ -394,10 +416,17 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { * @param dataLen * @return int32_t */ -static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { + SDBFile *pDBFile = pSmaH->pDFile; + // TODO: insert sma data blocks into B+Tree - tsdbDebug("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", - *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + tsdbDebug("insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", pDBFile->path, + *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + + if(tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0){ + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; } @@ -486,8 +515,6 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p // TODO: check the data integrity - void *bTree = pSmaH->pDFile; - int32_t len = 0; while (true) { if (len >= pData->dataLen) { @@ -510,7 +537,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); #endif tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey); - if (tsdbInsertTSmaBlocks(bTree, smaKey, pColData->data, pColData->blockSize) < 0) { + if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) { tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } tbLen += (sizeof(STSmaColData) + pColData->blockSize); @@ -524,13 +551,28 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { pSmaH->pTsdb = pTsdb; pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->pDFile = (SDBFile *)calloc(1, sizeof(SDBFile *)); + if (!pSmaH->pDFile) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { + if (pSmaH) { + if (pSmaH->pDFile) { + tsdbCloseDBF(pSmaH->pDFile); + } + } } static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; - - pSmaH->pDFile = "tSma_interval_file_name"; - + ASSERT(pSmaH->pDFile->path == NULL && pSmaH->pDFile->pDB == NULL); + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid); + pSmaH->pDFile->path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; } @@ -559,21 +601,25 @@ static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) { * @return int32_t */ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg * pCfg = REPO_CFG(pTsdb); + STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - STSmaWriteH tSmaH = {0}; - tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); + if (!pTsdb->pTSmaEnv) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); + return terrno; + } if (pData->dataLen <= 0) { TASSERT(0); terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_FAILED; } - if (!pTsdb->pTSmaEnv) { - terrno = TSDB_CODE_INVALID_PTR; - return terrno; + STSmaWriteH tSmaH = {0}; + + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) { + return TSDB_CODE_FAILED; } // Step 1: Judge the storage level and days @@ -585,27 +631,41 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { // - Set and open the DFile or the B+Tree file // TODO: tsdbStartTSmaCommit(); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, tSmaH.pDFile) != 0) { + tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), + tSmaH.pDFile->path ? tSmaH.pDFile->path : "path is NULL", tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + return TSDB_CODE_FAILED; + } - tsdbInsertTSmaDataSection(&tSmaH, pData); + if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) { + tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + return TSDB_CODE_FAILED; + } // TODO:tsdbEndTSmaCommit(); - // reset the SSmaStat + // Step 3: reset the SSmaStat tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); + tsdbDestroyTSmaWriteH(&tSmaH); return TSDB_CODE_SUCCESS; } static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { - // TODO - pSmaH->pDFile = "rSma_interval_file_name"; + STsdb *pTsdb = pSmaH->pTsdb; + + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid); + pSmaH->pDFile->path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; } int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg * pCfg = REPO_CFG(pTsdb); + STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - STSmaWriteH tSmaH = {0}; + STSmaWriteH tSmaH = {0}; tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); @@ -627,6 +687,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // Save all the TSma data to one file // TODO: tsdbStartTSmaCommit(); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + tsdbInsertTSmaDataSection(&tSmaH, pData); // TODO:tsdbEndTSmaCommit(); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index dede1502f4..9cccea9853 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -54,7 +54,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { int32_t code = TSDB_CODE_SUCCESS; if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { - tsdbWarn("vgId:%d update expired window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 159ad98219..f815291c77 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -223,14 +223,12 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { TEST(testCase, tSmaInsertTest) { // prepare meta const char * smaIndexName1 = "sma_index_test_1"; - const char * smaIndexName2 = "sma_index_test_2"; const char * timezone = "Asia/Shanghai"; const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; const char * tagsFilter = "I'm tags filter"; const char * smaTestDir = "./smaTest"; const tb_uid_t tbUid = 1234567890; const int64_t indexUid1 = 2000000001; - const int64_t indexUid2 = 2000000002; const uint32_t nCntTSma = 2; // encode STSma tSma = {0}; @@ -263,15 +261,20 @@ TEST(testCase, tSmaInsertTest) { // save index 1 EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); - // insert data - const int64_t indexUid = 2000000002; STSmaDataWrapper *pSmaData = NULL; STsdb tsdb = {0}; STsdbCfg * pCfg = &tsdb.config; - pCfg->daysPerFile = 1; tsdb.pMeta = pMeta; + tsdb.vgId = 2; + tsdb.config.daysPerFile = 10; // default days is 10 + tsdb.config.keep1 = 30; + tsdb.config.keep2 = 90; + tsdb.config.keep = 365; + tsdb.config.precision = TSDB_TIME_PRECISION_MILLI; + tsdb.config.update = TD_ROW_OVERWRITE_UPDATE; + tsdb.config.compression = TWO_STAGE_COMP; char *msg = (char *)calloc(100, 1); EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); @@ -283,16 +286,16 @@ TEST(testCase, tSmaInsertTest) { void * buf = NULL; EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); int32_t bufSize = taosTSizeof(buf); - int32_t numOfTables = 25; - col_id_t numOfCols = 4096; + int32_t numOfTables = 5; + col_id_t numOfCols = 10; EXPECT_GT(numOfCols, 0); pSmaData = (STSmaDataWrapper *)buf; printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); - pSmaData->skey = 1646987196; + pSmaData->skey = 1646987196000; pSmaData->interval = 10; pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; - pSmaData->indexUid = indexUid; + pSmaData->indexUid = indexUid1; int32_t len = sizeof(STSmaDataWrapper); for (int32_t t = 0; t < numOfTables; ++t) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c1cb4f8a41..f97df62ccc 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -349,6 +349,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")