From 1fbd79bafbd11b1563b2959f97ef513cf934310a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 25 Mar 2022 16:40:59 +0800 Subject: [PATCH 1/3] add updateExpiredWindow test case --- source/dnode/vnode/inc/tsdb.h | 2 + source/dnode/vnode/src/tsdb/tsdbMemTable.c | 30 ++++++- source/dnode/vnode/src/tsdb/tsdbSma.c | 28 ++++--- source/dnode/vnode/test/tsdbSmaTest.cpp | 98 +++++++++++++++------- 4 files changed, 112 insertions(+), 46 deletions(-) diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 735db64263..f1083c0d91 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -276,6 +276,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro */ void tsdbCleanupReadHandle(tsdbReaderT queryHandle); +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index a6df63dfa0..44581dc2ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -227,7 +227,35 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) { + ASSERT(pMsg != NULL); + SSubmitMsgIter msgIter = {0}; + SSubmitBlk *pBlock = NULL; + SSubmitBlkIter blkIter = {0}; + STSRow *row = NULL; + + terrno = TSDB_CODE_SUCCESS; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + + pBlock->uid = htobe64(pBlock->uid); + pBlock->suid = htobe64(pBlock->suid); + pBlock->sversion = htonl(pBlock->sversion); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + pBlock->numOfRows = htons(pBlock->numOfRows); + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} + +int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; SSubmitMsgIter msgIter = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index a10252e286..ff102dc8b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -394,7 +394,8 @@ static STimeWindow getActiveTimeWindowX(int64_t ts, SInterval* pInterval) { static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { - pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state + // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later + pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK); // TODO use the real state if (pItem == NULL) { // Response to stream computing: OOM // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. @@ -419,6 +420,9 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t free(pItem); return TSDB_CODE_FAILED; } + } else if ((pItem = *(SSmaStatItem **)pItem) == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return TSDB_CODE_FAILED; } int8_t state = TSDB_SMA_STAT_EXPIRED; @@ -491,41 +495,39 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL); + // basic procedure + // TODO: optimization + tsdbRefSmaStat(pTsdb, pStat); SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock = NULL; SInterval interval = {0}; - if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } - // basic procedure - // TODO: optimization - tsdbRefSmaStat(pTsdb, pStat); - while (true) { tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - int64_t suid = htobe64(pBlock->uid); STSmaWrapper *pSW = NULL; STSma *pTSma = NULL; + SSubmitBlkIter blkIter = {0}; + if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { + tdFreeTSmaWrapper(pSW); + break; + } + while (true) { - SSubmitBlkIter blkIter = {0}; - if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { - tdFreeTSmaWrapper(pSW); - break; - } STSRow *row = tGetSubmitBlkNext(&blkIter); if (row == NULL) { tdFreeTSmaWrapper(pSW); break; } if(pSW == NULL) { - if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), suid)) == NULL) { + if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) { break; } if((pSW->number) <= 0 || (pSW->tSma == NULL)) { diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index c13ae83150..003529d387 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -367,15 +367,49 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks); ASSERT_NE(pTsdb->pTfs, nullptr); - char *msg = (char *)calloc(1, 100); - ASSERT_NE(msg, nullptr); - ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, msg), 0); + // generate SSubmitReq msg and update expired window + int16_t schemaVer = 0; + uint32_t mockRowLen = sizeof(STSRow); + uint32_t mockRowNum = 2; + uint32_t mockBlkNum = 2; + uint32_t msgLen = sizeof(SSubmitReq) + mockBlkNum * sizeof(SSubmitBlk) + mockBlkNum * mockRowNum * mockRowLen; + + SSubmitReq *pMsg = (SSubmitReq *)calloc(1, msgLen); + ASSERT_NE(pMsg, nullptr); + pMsg->version = htobe64(schemaVer); + pMsg->numOfBlocks = htonl(mockBlkNum); + pMsg->length = htonl(msgLen); + + SSubmitBlk *pBlk = NULL; + STSRow *pRow = NULL; + TSKEY now = taosGetTimestamp(pTsdb->config.precision); + + for (uint32_t b = 0; b < mockBlkNum; ++b) { + pBlk = (SSubmitBlk *)POINTER_SHIFT(pMsg, sizeof(SSubmitReq) + b * (sizeof(SSubmitBlk) + mockRowNum * mockRowLen)); + pBlk->uid = htobe64(tbUid); + pBlk->suid = htobe64(tbUid); + pBlk->sversion = htonl(schemaVer); + pBlk->padding = htonl(0); + pBlk->schemaLen = htonl(0); + pBlk->numOfRows = htons(mockRowNum); + pBlk->dataLen = htonl(mockRowNum * mockRowLen); + for (uint32_t r = 0; r < mockRowNum; ++r) { + pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen); + pRow->len = mockRowLen; + pRow->ts = now + b * 1000 + r * 1000; + pRow->sver = schemaVer; + } + } + + ASSERT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); + + ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, (const char *)pMsg), 0); // init int32_t allocCnt = 0; int32_t allocStep = 16384; int32_t buffer = 1024; - void * buf = NULL; + void *buf = NULL; ASSERT_EQ(tsdbMakeRoom(&buf, allocStep), 0); int32_t bufSize = taosTSizeof(buf); int32_t numOfTables = 10; @@ -421,36 +455,36 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData)); len += tableDataLen; // printf("bufSize=%d, len=%d, len of table[%d]=%d\n", bufSize, len, t, tableDataLen); - } - pSmaData->dataLen = (len - sizeof(STSmaDataWrapper)); - - ASSERT_GE(bufSize, pSmaData->dataLen); - - // execute - ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); - - // step 3: query - uint32_t checkDataCnt = 0; - for (int32_t t = 0; t < numOfTables; ++t) { - for (col_id_t c = 0; c < numOfCols; ++c) { - ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, - c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1), - TSDB_CODE_SUCCESS); - ++checkDataCnt; } + pSmaData->dataLen = (len - sizeof(STSmaDataWrapper)); + + ASSERT_GE(bufSize, pSmaData->dataLen); + + // execute + ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); + + // step 3: query + uint32_t checkDataCnt = 0; + for (int32_t t = 0; t < numOfTables; ++t) { + for (col_id_t c = 0; c < numOfCols; ++c) { + ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, + c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1), + TSDB_CODE_SUCCESS); + ++checkDataCnt; + } + } + + printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); + + // release data + tfree(pMsg); + taosTZfree(buf); + // release meta + tdDestroyTSma(&tSma); + tfsClose(pTsdb->pTfs); + tsdbClose(pTsdb); + metaClose(pMeta); } - - printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); - - // release data - tfree(msg); - taosTZfree(buf); - // release meta - tdDestroyTSma(&tSma); - tfsClose(pTsdb->pTfs); - tsdbClose(pTsdb); - metaClose(pMeta); -} #endif #pragma GCC diagnostic pop \ No newline at end of file From 380a84e237102a2e7d04df0cebc74d1c2b1f89a8 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 25 Mar 2022 16:52:32 +0800 Subject: [PATCH 2/3] update --- source/dnode/vnode/src/tsdb/tsdbSma.c | 12 ++++-------- source/dnode/vnode/test/tsdbSmaTest.cpp | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index ff102dc8b9..421b7f499b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -81,6 +81,7 @@ struct SSmaStat { // expired window static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg); +static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); @@ -384,12 +385,6 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_SUCCESS; }; -static STimeWindow getActiveTimeWindowX(int64_t ts, SInterval* pInterval) { - STimeWindow tw = {0}; - tw.skey = 100; - tw.ekey = 1000; - return tw; -} static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); @@ -544,8 +539,9 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { interval.sliding = pTSma->sliding; interval.slidingUnit = pTSma->slidingUnit; - STimeWindow tw = getActiveTimeWindowX(TD_ROW_KEY(row), &interval); - tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, TD_ROW_KEY(row)); + TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); + + tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey); } } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 003529d387..1f101b1364 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -302,7 +302,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { tSma.intervalUnit = TIME_UNIT_DAY; tSma.interval = 1; tSma.slidingUnit = TIME_UNIT_HOUR; - tSma.sliding = 0; + tSma.sliding = 1; // sliding = interval when it's convert window tSma.indexUid = indexUid1; tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); tSma.timezoneInt = timezone; From 2dfecab056b535db1f7352c7ec87e2dd5fca7f8f Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 26 Mar 2022 10:23:47 +0800 Subject: [PATCH 3/3] refactor --- source/dnode/vnode/test/tsdbSmaTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 9a211b3468..86db3af4dc 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -14,9 +14,9 @@ */ #include +#include #include #include -#include #include #include @@ -374,7 +374,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { uint32_t mockBlkNum = 2; uint32_t msgLen = sizeof(SSubmitReq) + mockBlkNum * sizeof(SSubmitBlk) + mockBlkNum * mockRowNum * mockRowLen; - SSubmitReq *pMsg = (SSubmitReq *)calloc(1, msgLen); + SSubmitReq *pMsg = (SSubmitReq *)taosMemoryCalloc(1, msgLen); ASSERT_NE(pMsg, nullptr); pMsg->version = htobe64(schemaVer); pMsg->numOfBlocks = htonl(mockBlkNum);