From 91df290152424fd4fb1216d0886139bb66c33291 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 17 Mar 2022 11:46:34 +0800 Subject: [PATCH] add REF for SSmaStat in case of delete SSmaStatItem when drop sma index --- source/dnode/vnode/inc/tsdb.h | 6 +- source/dnode/vnode/src/meta/metaBDBImpl.c | 2 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 146 ++++++++++++++-------- source/dnode/vnode/test/tsdbSmaTest.cpp | 2 +- 4 files changed, 95 insertions(+), 61 deletions(-) diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 87edfb8dde..7b93f7580d 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -107,10 +107,8 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); // TODO: This is the basic params, and should wrap the params to a queryHandle. -int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, - int32_t nMaxResult); - +int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, + tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult); // STsdbCfg int tsdbOptionsInit(STsdbCfg *); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index a729288e34..df2d9604d2 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -636,7 +636,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { } // Decode - pCfg = (STSma *)malloc(sizeof(STSma)); + pCfg = (STSma *)calloc(1, sizeof(STSma)); if (pCfg == NULL) { return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index cd8b60385f..7a5c9c5c1e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -20,7 +20,7 @@ static const char *TSDB_SMA_DNAME[] = { "tsma", // TSDB_SMA_TYPE_TIME_RANGE "rsma", // TSDB_SMA_TYPE_ROLLUP }; - +#define SMA_CHECK_HASH #undef SMA_PRINT_DEBUG_LOG #define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_TIMES 10 @@ -72,35 +72,43 @@ typedef struct { struct SSmaStat { SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem + T_REF_DECLARE() }; // declaration of static functions -static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); -static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); -// TODO: This is the basic params, and should wrap the params to a queryHandle. -static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, - int32_t nMaxResult); -static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg); - +// expired window +static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); -static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); -static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); +static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey); +static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); +static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); +// read data +// TODO: This is the basic params, and should wrap the params to a queryHandle. +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, + int32_t nMaxResult); + +// insert data +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); +static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]); +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +// implementation static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]); } @@ -192,6 +200,21 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { return NULL; } +static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { + if (pStat == NULL) return 0; + int ref = T_REF_INC(pStat); + tsdbDebug("vgId:%d ref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref); + return 0; +} + +static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { + if (pStat == NULL) return 0; + + int ref = T_REF_DEC(pStat); + tsdbDebug("vgId:%d unref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref); + return 0; +} + static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { ASSERT(pSmaStat != NULL); @@ -296,7 +319,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { * @param msg * @return int32_t */ -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { if (!msg || !pTsdb->pMeta) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; @@ -307,27 +330,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { return TSDB_CODE_FAILED; } - SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, smaType); - - // TODO: decode the msg => start - int64_t indexUid = SMA_TEST_INDEX_UID; - // const char * indexName = SMA_TEST_INDEX_NAME; + // TODO: decode the msg from Stream Computing module => start + int64_t indexUid = SMA_TEST_INDEX_UID; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; - int64_t now = taosGetTimestampMs(); + TSKEY skey1 = 1646987196 * 1e3; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { - expiredWindows[i] = now + i; + expiredWindows[i] = skey1 + i; } - // TODO: decode the msg <= end + + SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); + tsdbRefSmaStat(pTsdb, pStat); SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state if (pItem == NULL) { // Response to stream computing: OOM - // For query, if the indexName not found, the TSDB should tell query module to query raw TS data. + // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } @@ -337,6 +361,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); free(pItem); + tsdbUnRefSmaStat(pTsdb, pStat); tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, tstrerror(terrno)); return TSDB_CODE_FAILED; @@ -347,18 +372,14 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { // If error occurs during put smaStatItem, free the resources of pItem taosHashCleanup(pItem->expiredWindows); free(pItem); + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } } -#if 0 - SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); - int size1 = taosHashGetSize(pItem1->expiredWindows); - tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1); -#endif int8_t state = TSDB_SMA_STAT_EXPIRED; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { - if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) { + if (taosHashPut(pItem->expiredWindows, (void *)(expiredWindows + i), sizeof(TSKEY), &state, sizeof(state)) != 0) { // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would // tell query module to query raw TS data. // N.B. @@ -368,37 +389,43 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { taosHashCleanup(pItem->expiredWindows); tfree(pItem->pSma); taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid)); + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } + tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid, + expiredWindows[i]); } -#if 0 - SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); - int size2 = taosHashGetSize(pItem1->expiredWindows); - tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2); -#endif - + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; } -static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) { +static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) { SSmaStatItem *pItem = NULL; - // TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode? + tsdbRefSmaStat(pTsdb, pStat); + if (pStat && pStat->smaStatItems) { - pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); } -#if 0 if (pItem != NULL) { - // TODO: reset time window for the sma data blocks + // pItem resides in hash buffer all the time unless drop sma index + // TODO: multithread protect if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { // error handling + tsdbUnRefSmaStat(pTsdb, pStat); + tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " failed", REPO_ID(pTsdb), + skey, indexUid); + return TSDB_CODE_FAILED; } - } else { // error handling + tsdbUnRefSmaStat(pTsdb, pStat); + tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid); + return TSDB_CODE_FAILED; } -#endif + + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; } @@ -705,7 +732,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { // TODO:tsdbEndTSmaCommit(); // Step 3: reset the SSmaStat - tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); + tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); tsdbDestroyTSmaWriteH(&tSmaH); return TSDB_CODE_SUCCESS; @@ -751,7 +778,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // TODO:tsdbEndTSmaCommit(); // reset the SSmaStat - tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); + tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); return TSDB_CODE_SUCCESS; } @@ -839,7 +866,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { * @return int32_t */ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { if (!pTsdb->pTSmaEnv) { terrno = TSDB_CODE_INVALID_PTR; @@ -847,12 +874,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ return TSDB_CODE_FAILED; } - SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); + tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); if (pItem == NULL) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. + tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); terrno = TSDB_CODE_TDB_INVALID_ACTION; - tsdbWarn("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid); + tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid); return TSDB_CODE_FAILED; } @@ -866,16 +895,23 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ } #endif -#if 0 - if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) { +#if 1 + if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) { // TODO: mark this window as expired. + tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb), + querySKey, indexUid); + } else { + tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey, + indexUid); } + tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + #endif STSmaReadH tReadH = {0}; tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit); tsdbCloseDBF(&tReadH.dFile); - tsdbInitTSmaFile(&tReadH, querySkey); + tsdbInitTSmaFile(&tReadH, querySKey); if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno)); return TSDB_CODE_FAILED; @@ -883,7 +919,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ char smaKey[SMA_KEY_LEN] = {0}; void *pSmaKey = &smaKey; - tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey); + tsdbEncodeTSmaKey(tableUid, colId, querySKey, (void **)&pSmaKey); tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb), tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), @@ -1010,9 +1046,9 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { } int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, - tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) { + tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey, + if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySKey, nMaxResult)) < 0) { tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 96fd35e700..ae4e9b0a75 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -301,7 +301,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { break; } - char *msg = (char *)calloc(100, 1); + char *msg = (char *)calloc(1, 100); assert(msg != NULL); ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);