diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 9d5e132772..07eafd6df0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -38,9 +38,10 @@ typedef enum { } ESmaStorageLevel; typedef struct { - STsdb *pTsdb; - SDBFile dFile; - int32_t interval; // interval with the precision of DB + STsdb *pTsdb; + SDBFile dFile; + SSDataBlock *pData; // sma data + int32_t interval; // interval with the precision of DB } STSmaWriteH; typedef struct { @@ -98,7 +99,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ int32_t nMaxResult); // insert data -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval, + int8_t intervalUnit); static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); @@ -800,9 +802,10 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p return TSDB_CODE_SUCCESS; } -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval, int8_t intervalUnit) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->pData = pData; return TSDB_CODE_SUCCESS; } @@ -857,10 +860,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe * @return int32_t */ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg *pCfg = REPO_CFG(pTsdb); - STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); - int64_t indexUid = SMA_TEST_INDEX_UID; + STsdbCfg *pCfg = REPO_CFG(pTsdb); + SSDataBlock *pData = (SSDataBlock *)msg; + SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + int64_t indexUid = SMA_TEST_INDEX_UID; if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -868,15 +871,15 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { return terrno; } - if (pData->dataLen <= 0) { - TASSERT(0); - terrno = TSDB_CODE_INVALID_PARA; - return TSDB_CODE_FAILED; + if (pData == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbWarn("vgId:%d insert tSma data failed since pData is NULL", REPO_ID(pTsdb)); + return terrno; } - STSmaWriteH tSmaH = {0}; - - if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) { + if (taosArrayGetSize(pData->pDataBlock) <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + tsdbWarn("vgId:%d insert tSma data failed since pDataBlock is empty", REPO_ID(pTsdb)); return TSDB_CODE_FAILED; } @@ -895,6 +898,14 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { return TSDB_CODE_FAILED; } + STSma *pSma = pItem->pSma; + + STSmaWriteH tSmaH = {0}; + + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) { + return TSDB_CODE_FAILED; + } + char rPath[TSDB_FILENAME_LEN] = {0}; char aPath[TSDB_FILENAME_LEN] = {0}; snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); @@ -907,8 +918,11 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { } // Step 1: Judge the storage level and days - int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); + int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); + + +#if 0 int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file @@ -933,7 +947,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { // Step 3: reset the SSmaStat tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); - +#endif tsdbDestroyTSmaWriteH(&tSmaH); tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; @@ -999,29 +1013,58 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, } static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg *pCfg = REPO_CFG(pTsdb); - STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); + STsdbCfg *pCfg = REPO_CFG(pTsdb); + SSDataBlock *pData = (SSDataBlock *)msg; + SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); + int64_t indexUid = SMA_TEST_INDEX_UID; if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; - tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); + tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return terrno; } - if (pData->dataLen <= 0) { - TASSERT(0); + if (pEnv == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); + return terrno; + } + + if (pData == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbWarn("vgId:%d insert rSma data failed since pData is NULL", REPO_ID(pTsdb)); + return terrno; + } + + if (taosArrayGetSize(pData->pDataBlock) <= 0) { terrno = TSDB_CODE_INVALID_PARA; + tsdbWarn("vgId:%d insert rSma data failed since pDataBlock is empty", REPO_ID(pTsdb)); return TSDB_CODE_FAILED; } + SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv); + SSmaStatItem *pItem = NULL; + + tsdbRefSmaStat(pTsdb, pStat); + + if (pStat && pStat->smaStatItems) { + pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + } + + if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) { + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } + + STSma *pSma = pItem->pSma; + STSmaWriteH tSmaH = {0}; - if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) { + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) { return TSDB_CODE_FAILED; } - int64_t indexUid = SMA_TEST_INDEX_UID; char rPath[TSDB_FILENAME_LEN] = {0}; char aPath[TSDB_FILENAME_LEN] = {0}; snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); @@ -1033,8 +1076,9 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { } // Step 1: Judge the storage level and days - int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); + int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); + #if 0 int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file @@ -1057,8 +1101,10 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // Step 3: reset the SSmaStat tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); +#endif tsdbDestroyTSmaWriteH(&tSmaH); + tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 86db3af4dc..29a4b7f552 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -280,7 +280,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { } #endif -#if 1 +#if 0 TEST(testCase, tSma_Data_Insert_Query_Test) { // step 1: prepare meta const char *smaIndexName1 = "sma_index_test_1";