From 99fb3c678c8fc5876428bd3b97f516c19a43d2e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 2 Jul 2022 12:05:03 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/client/test/clientTests.cpp | 123 +++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 508 ++++++++++++++----------- 2 files changed, 352 insertions(+), 279 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 36dcab5c19..e8e3237b67 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -669,13 +669,13 @@ TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); -// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); @@ -700,54 +700,55 @@ TEST(testCase, projection_query_tables) { printf("create table :%d\n", i); createNewTable(pConn, i); } -// pRes = taos_query(pConn, "select * from tu"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } + pRes = taos_query(pConn, "select * from tu"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } -// taos_free_result(pRes); + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); taos_close(pConn); } -//TEST(testCase, projection_query_stables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select ts from st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, projection_query_stables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "select ts from st1"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} TEST(testCase, agg_query_tables) { @@ -773,7 +774,7 @@ TEST(testCase, agg_query_tables) { taos_free_result(pRes); taos_close(pConn); } -#endif + /* --- copy the following script in the shell to setup the environment --- @@ -819,5 +820,27 @@ TEST(testCase, async_api_test) { getchar(); taos_close(pConn); } +#endif + +TEST(testCase, update_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + taos_query(pConn, "use abc1"); + + TAOS_RES* pRes = taos_query(pConn, "create table tup (ts timestamp, k int);"); + if (taos_errno(pRes) != 0) { + printf("failed to create table, reason:%s", taos_errstr(pRes)); + } + + taos_free_result(pRes); + + char s[256] = {0}; + for(int32_t i = 0; i < 7000; ++i) { + sprintf(s, "insert into tup values('2020-1-1 1:1:1', %d)", i); + pRes = taos_query(pConn, s); + taos_free_result(pRes); + } +} #pragma GCC diagnostic pop diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e7360c3d11..0a9b153cc6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -15,12 +15,6 @@ #include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) -#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pResBlock->pDataBlock))) - -#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \ - ((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \ - .rows = (_block)->numOfRows, \ - .uid = (_checkInfo)->tableId}) typedef struct SQueryFilePos { int32_t fid; @@ -33,13 +27,6 @@ typedef struct SQueryFilePos { STimeWindow win; } SQueryFilePos; -typedef struct SDataBlockLoadInfo { - SDFileSet* fileGroup; - int32_t slot; - uint64_t uid; - SArray* pLoadedCols; -} SDataBlockLoadInfo; - typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; @@ -88,8 +75,8 @@ typedef struct SFilesetIter { } SFilesetIter; typedef struct SFileDataBlockInfo { - int32_t tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it - int64_t uid; + int32_t tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it + uint64_t uid; } SFileDataBlockInfo; typedef struct SDataBlockIter { @@ -158,13 +145,17 @@ struct STsdbReader { }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); -static int buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); +static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); static int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); +static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); -static void setComposedBlockFlag(STsdbReader* pReader, bool composed); +static void setComposedBlockFlag(STsdbReader* pReader, bool composed); +static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); + +static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow **pTSRow, STsdbReader* pReader); +static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); // static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { // pBlockLoadInfo->slot = -1; @@ -426,8 +417,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->order = pCond->order; pReader->capacity = 4096; pReader->idStr = strdup(idstr); - pReader->verRange.minVer= pCond->startVersion; - pReader->verRange.maxVer = 100000;//pCond->endVersion; // todo for test purpose + pReader->verRange = (SVersionRange) {.minVer = pCond->startVersion, .maxVer = 10000}; pReader->type = pCond->type; pReader->window = *pCond->twindows; @@ -437,11 +427,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd } #endif - if (pReader->suid != 0) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); - ASSERT(pReader->pSchema); - } - // todo remove this setQueryTimewindow(pReader, pCond, 0); ASSERT (pCond->numOfCols > 0); @@ -1007,7 +992,7 @@ _error: // TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step); // cur->rows = -// buildInmemDataBlockImpl(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); +// buildDataBlockFromBufImpl(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); // pTsdbReadHandle->realNumOfRows = cur->rows; // // update the last key value @@ -2022,11 +2007,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, pReader->idStr); - // the pTableQueryInfo[j]->numOfBlocks may be 0 + assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables); SMultiwayMergeTreeInfo* pTree = NULL; - uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); if (ret != TSDB_CODE_SUCCESS) { cleanupBlockOrderSupporter(&sup); @@ -2178,14 +2162,64 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { return pFBlockInfo; } -static bool overlapWithNeighborBlock(SFileDataBlockInfo *pFBlockInfo, SBlock* pBlock, STableBlockScanInfo* pTableBlockScanInfo) { - // it is the last block in current file, no chance to overlap with neighbor blocks. - if(pFBlockInfo->tbBlockIdx == taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { // last block in current file, - return false; +static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo *pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order) { + bool asc = ASCENDING_TRAVERSE(order); + if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { + return NULL; } - SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, pFBlockInfo->tbBlockIdx + 1); - return (pNext->minKey.ts == pBlock->maxKey.ts); + if (!asc && pFBlockInfo->tbBlockIdx == 0) { + return NULL; + } + + int32_t step = asc? 1:-1; + + *nextIndex = pFBlockInfo->tbBlockIdx + step; + SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + return pNext; +} + +static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { + ASSERT(pBlockIter != NULL && pFBlockInfo != NULL); + + int32_t step = ASCENDING_TRAVERSE(pBlockIter->order)? 1:-1; + int32_t index = pBlockIter->index; + + while(index < pBlockIter->numOfBlocks && index >= 0) { + SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); + if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { + return index; + } + + index += step; + } + + ASSERT(0); + return -1; +} + +static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index) { + if (index < 0 || index >= pBlockIter->numOfBlocks) { + return -1; + } + + SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index); + + taosArrayRemove(pBlockIter->blockList, index); + taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock); + + SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); + ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx); + return TSDB_CODE_SUCCESS; +} + +static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) { + // it is the last block in current file, no chance to overlap with neighbor blocks. + if (ASCENDING_TRAVERSE(order)) { + return pBlock->maxKey.ts == pNeighbor->minKey.ts; + } else { + return pBlock->minKey.ts == pNeighbor->maxKey.ts; + } } static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) { @@ -2199,15 +2233,22 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); } - static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { - return (dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) || - overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) || + int32_t neighborIndex = 0; + SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); + + bool overlapWithNeighbor = false; + if (pNeighbor) { + overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order); + } + + return (overlapWithNeighbor || + dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) || keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity)); } -static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { +static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) { return TSDB_CODE_SUCCESS; } @@ -2215,7 +2256,7 @@ static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pB SSDataBlock* pBlock = pReader->pResBlock; int64_t st = taosGetTimestampUs(); - int32_t code = buildInmemDataBlockImpl(pBlockScanInfo, *key, pReader->capacity, pReader); + int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, *key, pReader->capacity, pReader); int64_t elapsedTime = taosGetTimestampUs() - st; @@ -2227,15 +2268,42 @@ static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pB return code; } +static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, + STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key, + SFileDataBlockInfo* pFBlock, SBlock* pBlock) { + SRowMerger merge = {0}; + SBlockData* pBlockData = &pReader->status.fileBlockData; + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + TSDBKEY k = TSDBROW_KEY(pRow); + if (key <= k.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + + if (k.ts == key) { + tRowMerge(&merge, pRow); + doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + } else { // k.ts < key + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader); + } + + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; +} + static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) { SFileBlockDumpInfo *pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; SRowMerger merge = {0}; STSRow* pTSRow = NULL; - TSKEY mergeTs = TSKEY_INITIAL_VAL; - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); @@ -2243,122 +2311,60 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - // [1&2] key <= [k.ts|ik.ts] - if (key <= k.ts || key <= ik.ts) { + // [1&2] key <= [k.ts && ik.ts] + if (key <= k.ts && key <= ik.ts) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); tRowMergerInit(&merge, &fRow, pReader->pSchema); doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); - if (ik.ts == mergeTs) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + if (ik.ts == key) { + tRowMerge(&merge, piRow); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); } - if (k.ts == mergeTs) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + if (k.ts == key) { + tRowMerge(&merge, pRow); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); } tRowMergerGetRow(&merge, &pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - } else { + } else { // key > ik.ts || key > k.ts // [3] ik.ts < key <= k.ts + // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); + doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, + pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } - // [4] k.ts < key <= ik.ts + // [5] k.ts < key <= ik.ts + // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow, + pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } - // [5] k.ts == ik.ts < key + // [7] k.ts == ik.ts < key if (k.ts == ik.ts) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); - - if (k.ts == mergeTs) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - } - - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - return TSDB_CODE_SUCCESS; - } - - // [6] k.ts < ik.ts < key - if (k.ts < ik.ts) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - - return TSDB_CODE_SUCCESS; - } - - // [6] ik.ts < k.ts < key - if (ik.ts < k.ts) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); + doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } } else { if (pBlockScanInfo->imemHasVal) { - TSDBKEY ik = TSDBROW_KEY(piRow); - if (key <= ik.ts) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); - - doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); - - if (ik.ts == mergeTs) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - } - - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - return TSDB_CODE_SUCCESS; - } - - if (ik.ts < key) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - } - - return TSDB_CODE_SUCCESS; + return doMergeBufFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter, + &pBlockScanInfo->imemHasVal, key, pFBlock, pBlock); } if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL - TSDBKEY k = TSDBROW_KEY(pRow); - if (key <= k.ts) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); - - doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); - - if (k.ts == mergeTs) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - } - - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - return TSDB_CODE_SUCCESS; - } - - if (k.ts < key) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - } - - return TSDB_CODE_SUCCESS; + return doMergeBufFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, + key, pFBlock, pBlock); } // imem & mem are all empty @@ -2372,7 +2378,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn return TSDB_CODE_SUCCESS; } -static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) { +static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, + STableBlockScanInfo* pBlockScanInfo) { SSDataBlock* pResBlock = pReader->pResBlock; while(1) { @@ -2525,11 +2532,11 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { static int32_t doBuildDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; - SReaderStatus* pStatus = &pReader->status; + SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); @@ -2549,8 +2556,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { // data in memory that are earlier than current file block TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - code = buildInmemDataBlock(pReader, pScanInfo, &maxKey); - // build data block from in-memory buffer data completed. + code = buildDataBlockFromBuf(pReader, pScanInfo, &maxKey); } else { // whole block is required, return it directly // todo // 1. the version of all rows should be less than the endVersion @@ -2582,7 +2588,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { initMemIterator(pBlockScanInfo, pReader); TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - int32_t code = buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); + int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, &maxKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2611,7 +2617,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order)? 0: pBlock->nRow - 1; } -static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { +static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { int32_t numOfBlocks = 0; int32_t code = moveToNextFile(pReader, &numOfBlocks); if (code != TSDB_CODE_SUCCESS) { @@ -2635,7 +2641,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { SDataBlockIter* pBlockIter = &pReader->status.blockIter; if (pReader->status.blockIter.index == -1) { - code = initForFirstBlockOfFile(pReader, pBlockIter); + code = initForFirstBlockInFile(pReader, pBlockIter); if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { return code; } @@ -2645,19 +2651,18 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } } else { - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); // current block are exhausted, try the next file block - if (pDumpInfo->allDumped) { + if (pReader->status.fBlockDumpInfo.allDumped) { // try next data block in current file bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (hasNext) { // current file is exhausted, let's try the next file initBlockDumpInfo(pReader, pBlockIter); } else { - code = initForFirstBlockOfFile(pReader, pBlockIter); + code = initForFirstBlockInFile(pReader, pBlockIter); if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { return code; } @@ -2750,7 +2755,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { } } -int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { +int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { while (1) { *hasVal = tsdbTbDataIterNext(pIter); if (!(*hasVal)) { @@ -2769,73 +2774,109 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR return TSDB_CODE_SUCCESS; } -int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData, - STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) { - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - bool asc = ASCENDING_TRAVERSE(pReader->order); - - int32_t step = asc? 1:-1; - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - - if (asc) { // todo refactor - if (pDumpInfo->rowIndex < pBlockData->nRow - 1) { - if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) { - int32_t rowIndex = pDumpInfo->rowIndex + step; - - while (pBlockData->aTSKEY[rowIndex] == key) { - if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) { - continue; - } - - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tRowMerge(pMerger, &fRow); - rowIndex += step; - } - - pDumpInfo->rowIndex = rowIndex; - } else { - pDumpInfo->rowIndex += step; - } - } else { // last row of current block, check if current block is overlapped with neighbor block - pDumpInfo->rowIndex += step; - bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); - if (overlap) { // load next block - ASSERT(0); - } +static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger, + SVersionRange* pVerRange, int32_t step) { + while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) { + if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) { + continue; } - } else { - if (pDumpInfo->rowIndex > 0) { - if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) { - int32_t rowIndex = pDumpInfo->rowIndex + step; - while (pBlockData->aTSKEY[rowIndex] == key) { - if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || - pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) { - continue; - } + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); + tRowMerge(pMerger, &fRow); + rowIndex += step; + } - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tRowMerge(pMerger, &fRow); - rowIndex += step; - } + return rowIndex; +} - pDumpInfo->rowIndex = rowIndex; - } else { - pDumpInfo->rowIndex += step; - } - } else { // last row of current block, check if current block is overlapped with previous neighbor block - pDumpInfo->rowIndex += step; - bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); - if (overlap) { // load next block - ASSERT(0); - } +typedef enum { + CHECK_FILEBLOCK_CONT = 0x1, + CHECK_FILEBLOCK_QUIT = 0x2, +} CHECK_FILEBLOCK_STATE; + +static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock, + SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key, CHECK_FILEBLOCK_STATE* state) { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SBlockData* pBlockData = &pReader->status.fileBlockData; + + int32_t step = ASCENDING_TRAVERSE(pReader->order)? 1:-1; + + int32_t nextIndex = -1; + SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); + if (pNeighborBlock == NULL) { // do nothing + *state = CHECK_FILEBLOCK_QUIT; + return 0; + } + + bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order); + if (overlap) { // load next block + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; + + //1. find the next neighbor block in the scan block list + SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex}; + int32_t neighborIndex = findFileBlockInfoIndex(&pStatus->blockIter, &fb); + + //2. remove it from the scan block list + setFileBlockActiveInBlockIter(&pStatus->blockIter, neighborIndex); + + //3. load the neighbor block, and set it to be the currently accessed file data block + int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + //4. check the data values + initBlockDumpInfo(pReader, pBlockIter); + + pDumpInfo->rowIndex = + doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); + + if (pDumpInfo->rowIndex >= pBlock->nRow) { + *state = CHECK_FILEBLOCK_CONT; } } return TSDB_CODE_SUCCESS; } -static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { +int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData, + STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + bool asc = ASCENDING_TRAVERSE(pReader->order); + int32_t step = asc ? 1 : -1; + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + + if (asc) { + pDumpInfo->rowIndex += step; + if (pDumpInfo->rowIndex < pBlockData->nRow - 1) { + pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); + } + + // all rows are consumed, let's try next file block + if (pDumpInfo->rowIndex >= pBlockData->nRow && asc) { + while (1) { + CHECK_FILEBLOCK_STATE st; + checkForNeighborFileBlock(pReader, pScanInfo, pBlock, pFBlock, pMerger, key, &st); + if (st == CHECK_FILEBLOCK_QUIT) { + break; + } + } + } + } else { // last row of current block, check if current block is overlapped with previous neighbor block + pDumpInfo->rowIndex += step; + // bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); + // if (overlap) { // load next block + // ASSERT(0); + // } + // } + } + + return TSDB_CODE_SUCCESS; +} + +void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { int32_t sversion = TSDBROW_SVERSION(pRow); if (pReader->pSchema == NULL) { @@ -2846,11 +2887,36 @@ static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) } } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { - TSKEY mergeTs = TSKEY_INITIAL_VAL; - +void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow **pTSRow, STsdbReader* pReader) { SRowMerger merge = {0}; + TSDBKEY k = TSDBROW_KEY(pRow); + checkUpdateSchema(pRow, uid, pReader); + + tRowMergerInit(&merge, pRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInBuf(dIter, hasVal, k.ts, &merge, pReader); + tRowMergerGetRow(&merge, pTSRow); +} + +void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { + SRowMerger merge = {0}; + + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBKEY ik = TSDBROW_KEY(piRow); + ASSERT(k.ts == ik.ts); + + checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader); + + tRowMergerInit(&merge, piRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + + tRowMerge(&merge, pRow); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + + tRowMergerGetRow(&merge, pTSRow); +} + +int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); @@ -2862,45 +2928,21 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ik = TSDBROW_KEY(piRow); if (ik.ts <= k.ts) { - checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader); - - tRowMergerInit(&merge, piRow, pReader->pSchema); - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - - if (k.ts == mergeTs) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - } - - tRowMergerGetRow(&merge, pTSRow); + doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { // k.ts < ik.ts - checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader); - - tRowMergerInit(&merge, pRow, pReader->pSchema); - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - tRowMergerGetRow(&merge, pTSRow); + doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); return TSDB_CODE_SUCCESS; } } - if (pBlockScanInfo->memHasVal) { - k = TSDBROW_KEY(pRow); - checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader); - - tRowMergerInit(&merge, pRow, pReader->pSchema); - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - tRowMergerGetRow(&merge, pTSRow); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); return TSDB_CODE_SUCCESS; } if (pBlockScanInfo->imemHasVal) { - ik = TSDBROW_KEY(piRow); - checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader); - - tRowMergerInit(&merge, piRow, pReader->pSchema); - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerGetRow(&merge, pTSRow); + doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); return TSDB_CODE_SUCCESS; } @@ -2951,7 +2993,7 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow return TSDB_CODE_SUCCESS; } -int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { +int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; do { @@ -3167,6 +3209,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } + if (pCond->suid != 0) { + (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1); + ASSERT((*ppReader)->pSchema); + } else if (taosArrayGetSize(pTableList) > 0) { + STableKeyInfo* pKey = taosArrayGet(pTableList, 0); + (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1); + } + STsdbReader* pReader = *ppReader; if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) { tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);