diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 735db64263..f1083c0d91 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -276,6 +276,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro */ void tsdbCleanupReadHandle(tsdbReaderT queryHandle); +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 58b4b4602e..6eb721fcb1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -227,7 +227,35 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) { + ASSERT(pMsg != NULL); + SSubmitMsgIter msgIter = {0}; + SSubmitBlk *pBlock = NULL; + SSubmitBlkIter blkIter = {0}; + STSRow *row = NULL; + + terrno = TSDB_CODE_SUCCESS; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + + pBlock->uid = htobe64(pBlock->uid); + pBlock->suid = htobe64(pBlock->suid); + pBlock->sversion = htonl(pBlock->sversion); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + pBlock->numOfRows = htons(pBlock->numOfRows); + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} + +int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; SSubmitMsgIter msgIter = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 0873e8edc1..9d5e132772 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -81,6 +81,7 @@ struct SSmaStat { // expired window static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg); +static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); @@ -384,17 +385,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_SUCCESS; }; -static STimeWindow getActiveTimeWindowX(int64_t ts, SInterval* pInterval) { - STimeWindow tw = {0}; - tw.skey = 100; - tw.ekey = 1000; - return tw; -} static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { - pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state + // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later + pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK); // TODO use the real state if (pItem == NULL) { // Response to stream computing: OOM // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. @@ -419,6 +415,9 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t taosMemoryFree(pItem); return TSDB_CODE_FAILED; } + } else if ((pItem = *(SSmaStatItem **)pItem) == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return TSDB_CODE_FAILED; } int8_t state = TSDB_SMA_STAT_EXPIRED; @@ -491,41 +490,39 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL); + // basic procedure + // TODO: optimization + tsdbRefSmaStat(pTsdb, pStat); SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock = NULL; SInterval interval = {0}; - if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } - // basic procedure - // TODO: optimization - tsdbRefSmaStat(pTsdb, pStat); - while (true) { tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - int64_t suid = htobe64(pBlock->uid); STSmaWrapper *pSW = NULL; STSma *pTSma = NULL; + SSubmitBlkIter blkIter = {0}; + if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { + tdFreeTSmaWrapper(pSW); + break; + } + while (true) { - SSubmitBlkIter blkIter = {0}; - if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { - tdFreeTSmaWrapper(pSW); - break; - } STSRow *row = tGetSubmitBlkNext(&blkIter); if (row == NULL) { tdFreeTSmaWrapper(pSW); break; } if(pSW == NULL) { - if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), suid)) == NULL) { + if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) { break; } if((pSW->number) <= 0 || (pSW->tSma == NULL)) { @@ -542,8 +539,9 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { interval.sliding = pTSma->sliding; interval.slidingUnit = pTSma->slidingUnit; - STimeWindow tw = getActiveTimeWindowX(TD_ROW_KEY(row), &interval); - tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, TD_ROW_KEY(row)); + TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); + + tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey); } } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index c9488549c6..86db3af4dc 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -43,7 +43,7 @@ TEST(testCase, unionEncodeDecodeTest) { }; }; col_id_t nBSmaCols; - col_id_t* pBSmaCols; + col_id_t *pBSmaCols; } SUnionTest; SUnionTest sut = {0}; @@ -51,13 +51,13 @@ TEST(testCase, unionEncodeDecodeTest) { sut.type = 1; sut.nBSmaCols = 2; - sut.pBSmaCols = (col_id_t*)taosMemoryMalloc(sut.nBSmaCols * sizeof(col_id_t)); + sut.pBSmaCols = (col_id_t *)taosMemoryMalloc(sut.nBSmaCols * sizeof(col_id_t)); for (col_id_t i = 0; i < sut.nBSmaCols; ++i) { sut.pBSmaCols[i] = i + 100; } - void* buf = taosMemoryMalloc(1024); - void * pBuf = buf; + void *buf = taosMemoryMalloc(1024); + void *pBuf = buf; int32_t tlen = 0; tlen += taosEncodeFixedU8(&buf, sut.info); tlen += taosEncodeFixedI16(&buf, sut.nBSmaCols); @@ -68,9 +68,9 @@ TEST(testCase, unionEncodeDecodeTest) { SUnionTest dut = {0}; pBuf = taosDecodeFixedU8(pBuf, &dut.info); pBuf = taosDecodeFixedI16(pBuf, &dut.nBSmaCols); - if(dut.nBSmaCols > 0) { - dut.pBSmaCols = (col_id_t*)taosMemoryMalloc(dut.nBSmaCols * sizeof(col_id_t)); - for(col_id_t i=0; i < dut.nBSmaCols; ++i) { + if (dut.nBSmaCols > 0) { + dut.pBSmaCols = (col_id_t *)taosMemoryMalloc(dut.nBSmaCols * sizeof(col_id_t)); + for (col_id_t i = 0; i < dut.nBSmaCols; ++i) { pBuf = taosDecodeFixedI16(pBuf, dut.pBSmaCols + i); } } else { @@ -83,9 +83,9 @@ TEST(testCase, unionEncodeDecodeTest) { ASSERT_EQ(sut.rollup, dut.rollup); ASSERT_EQ(sut.type, dut.type); ASSERT_EQ(sut.nBSmaCols, dut.nBSmaCols); - for (col_id_t i = 0; i< sut.nBSmaCols; ++i) { - ASSERT_EQ(*(col_id_t*)(sut.pBSmaCols + i), sut.pBSmaCols[i]); - ASSERT_EQ(*(col_id_t*)(sut.pBSmaCols + i), dut.pBSmaCols[i]); + for (col_id_t i = 0; i < sut.nBSmaCols; ++i) { + ASSERT_EQ(*(col_id_t *)(sut.pBSmaCols + i), sut.pBSmaCols[i]); + ASSERT_EQ(*(col_id_t *)(sut.pBSmaCols + i), dut.pBSmaCols[i]); } } #if 1 @@ -115,7 +115,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { // decode STSmaWrapper dstTSmaWrapper = {0}; - void * result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper); + void *result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper); ASSERT_NE(result, nullptr); ASSERT_EQ(tSmaWrapper.number, dstTSmaWrapper.number); @@ -148,12 +148,12 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { #if 1 TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { - const char * smaIndexName1 = "sma_index_test_1"; - const char * smaIndexName2 = "sma_index_test_2"; + const char *smaIndexName1 = "sma_index_test_1"; + const char *smaIndexName2 = "sma_index_test_2"; int8_t timezone = 8; - const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; - const char * tagsFilter = "I'm tags filter"; - const char * smaTestDir = "./smaTest"; + const char *expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; + const char *tagsFilter = "I'm tags filter"; + const char *smaTestDir = "./smaTest"; const tb_uid_t tbUid = 1234567890; const int64_t indexUid1 = 2000000001; const int64_t indexUid2 = 2000000002; @@ -180,8 +180,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { ASSERT_NE(tSma.tagsFilter, nullptr); tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); - SMeta * pMeta = NULL; - STSma * pSmaCfg = &tSma; + SMeta *pMeta = NULL; + STSma *pSmaCfg = &tSma; const SMetaCfg *pMetaCfg = &defaultMetaOptions; taosRemoveDir(smaTestDir); @@ -283,11 +283,11 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { #if 1 TEST(testCase, tSma_Data_Insert_Query_Test) { // step 1: prepare meta - const char * smaIndexName1 = "sma_index_test_1"; + const char *smaIndexName1 = "sma_index_test_1"; const int8_t timezone = 8; - const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; - const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'"; - const char * smaTestDir = "./smaTest"; + const char *expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; + const char *tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'"; + const char *smaTestDir = "./smaTest"; const tb_uid_t tbUid = 1234567890; const int64_t indexUid1 = 2000000001; const int64_t interval1 = 1; @@ -302,7 +302,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { tSma.intervalUnit = TIME_UNIT_DAY; tSma.interval = 1; tSma.slidingUnit = TIME_UNIT_HOUR; - tSma.sliding = 0; + tSma.sliding = 1; // sliding = interval when it's convert window tSma.indexUid = indexUid1; tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); tSma.timezoneInt = timezone; @@ -318,8 +318,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ASSERT_NE(tSma.tagsFilter, nullptr); tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); - SMeta * pMeta = NULL; - STSma * pSmaCfg = &tSma; + SMeta *pMeta = NULL; + STSma *pSmaCfg = &tSma; const SMetaCfg *pMetaCfg = &defaultMetaOptions; taosRemoveDir(smaTestDir); @@ -331,8 +331,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { // step 2: insert data STSmaDataWrapper *pSmaData = NULL; - STsdb * pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); - STsdbCfg * pCfg = &pTsdb->config; + STsdb *pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); + STsdbCfg *pCfg = &pTsdb->config; pTsdb->pMeta = pMeta; pTsdb->vgId = 2; @@ -367,15 +367,49 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks); ASSERT_NE(pTsdb->pTfs, nullptr); - char *msg = (char *)taosMemoryCalloc(1, 100); - ASSERT_NE(msg, nullptr); - ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, msg), 0); + // generate SSubmitReq msg and update expired window + int16_t schemaVer = 0; + uint32_t mockRowLen = sizeof(STSRow); + uint32_t mockRowNum = 2; + uint32_t mockBlkNum = 2; + uint32_t msgLen = sizeof(SSubmitReq) + mockBlkNum * sizeof(SSubmitBlk) + mockBlkNum * mockRowNum * mockRowLen; + + SSubmitReq *pMsg = (SSubmitReq *)taosMemoryCalloc(1, msgLen); + ASSERT_NE(pMsg, nullptr); + pMsg->version = htobe64(schemaVer); + pMsg->numOfBlocks = htonl(mockBlkNum); + pMsg->length = htonl(msgLen); + + SSubmitBlk *pBlk = NULL; + STSRow *pRow = NULL; + TSKEY now = taosGetTimestamp(pTsdb->config.precision); + + for (uint32_t b = 0; b < mockBlkNum; ++b) { + pBlk = (SSubmitBlk *)POINTER_SHIFT(pMsg, sizeof(SSubmitReq) + b * (sizeof(SSubmitBlk) + mockRowNum * mockRowLen)); + pBlk->uid = htobe64(tbUid); + pBlk->suid = htobe64(tbUid); + pBlk->sversion = htonl(schemaVer); + pBlk->padding = htonl(0); + pBlk->schemaLen = htonl(0); + pBlk->numOfRows = htons(mockRowNum); + pBlk->dataLen = htonl(mockRowNum * mockRowLen); + for (uint32_t r = 0; r < mockRowNum; ++r) { + pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen); + pRow->len = mockRowLen; + pRow->ts = now + b * 1000 + r * 1000; + pRow->sver = schemaVer; + } + } + + ASSERT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); + + ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, (const char *)pMsg), 0); // init int32_t allocCnt = 0; int32_t allocStep = 16384; int32_t buffer = 1024; - void * buf = NULL; + void *buf = NULL; ASSERT_EQ(tsdbMakeRoom(&buf, allocStep), 0); int32_t bufSize = taosTSizeof(buf); int32_t numOfTables = 10; @@ -443,7 +477,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); // release data - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg); taosTZfree(buf); // release meta tdDestroyTSma(&tSma); @@ -451,6 +485,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { tsdbClose(pTsdb); metaClose(pMeta); } + #endif #pragma GCC diagnostic pop \ No newline at end of file