From ecd09059f5508702ebaf2b9cf6d72dd745deac85 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Mar 2024 10:16:56 +0800 Subject: [PATCH] fix(tsdb): fix error in reading data from file block --- source/client/test/clientTests.cpp | 27 ++-- source/dnode/vnode/src/tsdb/tsdbRead2.c | 146 ++++++++++----------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 19 ++- source/libs/executor/src/scanoperator.c | 9 +- 4 files changed, 99 insertions(+), 102 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index c1adc49d48..778b1826b4 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -171,6 +171,7 @@ void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t start printf("failed to insert data, reason:%s\n", taos_errstr(p)); } +// startTs += 20; taos_free_result(p); } } @@ -826,19 +827,19 @@ TEST(clientCase, projection_query_tables) { // } // taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "use test"); taos_free_result(pRes); -// TAOS_RES* pRes = taos_query(pConn, "select tbname, last(ts) from abc1.stable_1 group by tbname"); + pRes = taos_query(pConn, "create table st2 (ts timestamp, k int primary key, j varchar(1000)) tags(a int)"); + taos_free_result(pRes); + +// pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +// } // taos_free_result(pRes); - pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tu using st1 tags(1)"); + pRes = taos_query(pConn, "create table tu using st2 tags(2)"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); } @@ -853,7 +854,7 @@ TEST(clientCase, projection_query_tables) { "ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:" "QWERTYUIOP{}"; - for(int32_t i = 0; i < 10000; ++i) { + for(int32_t i = 0; i < 1; ++i) { char str[1024] = {0}; sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i); @@ -864,10 +865,10 @@ TEST(clientCase, projection_query_tables) { taos_free_result(px); } - for(int32_t j = 0; j < 5000; ++j) { + for(int32_t j = 0; j < 1; ++j) { start += 20; - for (int32_t i = 0; i < 10000; ++i) { - createNewTable(pConn, i, 20, start, pstr); + for (int32_t i = 0; i < 1; ++i) { + createNewTable(pConn, i, 100, start, pstr); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9a5fa0871f..9df0ba50ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,7 +48,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); -static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); +static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList, @@ -65,7 +65,7 @@ static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); -static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, +static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader); static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader); @@ -203,7 +203,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC if (pCols[i].pk) { pSupInfo->pk = pCols[i]; - pSupInfo->pkSrcSlot = i; + pSupInfo->pkSrcSlot = i - 1; pSupInfo->pkDstSlot = pSlotIdList[i]; pSupInfo->numOfPks += 1; } @@ -1504,7 +1504,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return code; } -static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, +static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, SRowKey* pKey, SFileBlockDumpInfo* pDumpInfo, bool* copied) { // opt version // 1. it is not a border point @@ -1516,11 +1516,10 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) { int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; - SRowKey rowKey, nextRowKey; - tColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey); + SRowKey nextRowKey; tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey); - if (rowKey.ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, &rowKey, &nextRowKey) != 0)) { // merge is not needed + if (pKey->ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, pKey, &nextRowKey) != 0)) { // merge is not needed code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); if (code) { return code; @@ -1578,6 +1577,14 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { + if (p2 == NULL) { + return 1; + } + + if (p1 == NULL) { + return -1; + } + if (p1->ts < p2->ts) { return -1; } else if (p1->ts > p2->ts) { @@ -1722,7 +1729,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = *pfKey; } - if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) { + if (pkCompEx(compFn, pSttKey, &minKey) > 0) { minKey = *pSttKey; } } @@ -1736,7 +1743,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } if (pkCompEx(compFn, &minKey, pSttKey) == 0) { @@ -1786,7 +1793,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } } @@ -1803,7 +1810,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } -static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key, +static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; @@ -1823,50 +1830,32 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* if (dataInDataFile && (!dataInSttFile)) { // no stt file block available, only data block exists - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); } else if ((!dataInDataFile) && dataInSttFile) { // no data in data file exists return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } else if (pBlockScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) { // opt model for count data in stt file, which is not overlap with data blocks in files. - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); } else { // row in both stt file blocks and data file blocks TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); - if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < pSttKey->ts) { // asc - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key > pSttKey->ts) { - return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); - } + int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pKey, pSttKey); - // key == tsLast. ts is equal and the primary key exists - if (pSttBlockReader->numOfPks > 0) { - int32_t res = pkComp1(pReader, pSttKey, &fRow); - if (res > 0) { - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (res < 0) { - return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); - } + if (ASCENDING_TRAVERSE(pReader->info.order)) { + if (ret < 0) { // asc + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + } else if (ret > 0) { + return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } } else { // desc - if (key > pSttKey->ts) { - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key < pSttKey->ts) { + if (ret > 0) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + } else if (ret < 0) { return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } - - // key == tsLast. ts is equal and the primary key exists - if (pReader->suppInfo.numOfPks > 0) { - int32_t res = pkComp1(pReader, pSttKey, &fRow); - if (res < 0) { - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (res > 0) { - return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); - } - } } // the following for key == sttKey->key.ts @@ -1879,7 +1868,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); @@ -1902,7 +1891,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); } code = tsdbRowMergerGetRow(pMerger, &pTSRow); @@ -2011,7 +2000,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } if (pkCompEx(compFn, &minKey, pSttKey) == 0) { @@ -2089,7 +2078,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } } @@ -2333,13 +2322,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); } -int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, +int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader) { SRowMerger* pMerger = &pReader->status.merger; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; bool copied = false; - int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied); + int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, pKey, pDumpInfo, &copied); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2354,12 +2343,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { -// if (pReader->suppInfo.numOfPks == 0) { -// pBlockScanInfo->lastProcKey.ts = key; -// } else { // todo use deep copy instead of shallow copy -// int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; -// tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey); -// } return TSDB_CODE_SUCCESS; } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2370,7 +2353,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2435,9 +2418,14 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; - int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) - ? pBlockData->aTSKEY[pDumpInfo->rowIndex] - : (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); + + SRowKey* pKey = &(SRowKey){0}; + if (hasDataInFileBlock(pBlockData, pDumpInfo)) { + tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pKey); + } else { + pKey = NULL; + } + if (pBlockScanInfo->iter.hasVal) { pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); } @@ -2462,7 +2450,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } // files data blocks + stt block - return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData); + return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, pKey, pBlockScanInfo, pBlockData); } static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, @@ -2886,7 +2874,7 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->version = pReader->info.verRange.maxVer; pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; - if (pReader->suppInfo.pk.pk) { + if (pReader->suppInfo.numOfPks > 0) { if (IS_NUMERIC_TYPE(pReader->suppInfo.pk.type)) { pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val; @@ -3645,9 +3633,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra return TSDB_CODE_SUCCESS; } -static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger, - SVersionRange* pVerRange, int32_t step) { - while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) { +static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, SRowKey* pKey, SRowMerger* pMerger, + SVersionRange* pVerRange, int32_t step, __compar_fn_t comparFn) { + while (rowIndex < pBlockData->nRow && rowIndex >= 0) { + SRowKey cur; + tColRowGetKey(pBlockData, rowIndex, &cur); + if (pkCompEx(comparFn, &cur, pKey) != 0) { + break; + } + if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) { rowIndex += step; continue; @@ -3667,21 +3661,20 @@ typedef enum { } CHECK_FILEBLOCK_STATE; static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, - SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key, + SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, SRowKey* pKey, CHECK_FILEBLOCK_STATE* state) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; bool asc = ASCENDING_TRAVERSE(pReader->info.order); + SVersionRange* pVerRange = &pReader->info.verRange; + bool loadNeighbor = true; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; - *state = CHECK_FILEBLOCK_QUIT; - int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; - - bool loadNeighbor = true; int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor); + *state = CHECK_FILEBLOCK_QUIT; if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { - pDumpInfo->rowIndex = - doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step); + pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pVerRange, step, pReader->pkComparFn); if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) { *state = CHECK_FILEBLOCK_CONT; } @@ -3690,18 +3683,17 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn return code; } -int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { +int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - - SRowMerger* pMerger = &pReader->status.merger; - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - int32_t step = asc ? 1 : -1; + SRowMerger* pMerger = &pReader->status.merger; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int32_t step = asc ? 1 : -1; + SVersionRange* pRange = &pReader->info.verRange; pDumpInfo->rowIndex += step; if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = - doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step); + doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pRange, step, pReader->pkComparFn); } // all rows are consumed, let's try next file block @@ -3715,7 +3707,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc break; } - checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, key, &st); + checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, pKey, &st); if (st == CHECK_FILEBLOCK_QUIT) { break; } @@ -3725,8 +3717,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } - - int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index fb43e5b713..05795b5832 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -163,25 +163,30 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf pUidList->tableUidList[j] = idList[j].uid; + SRowKey* pRowKey = &pScanInfo->lastProcKey; if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { int64_t skey = pTsdbReader->info.window.skey; - pScanInfo->lastProcKey.ts = (skey > INT64_MIN) ? (skey - 1) : skey; + pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey; pScanInfo->sttKeyInfo.nextProcKey = skey; } else { int64_t ekey = pTsdbReader->info.window.ekey; - pScanInfo->lastProcKey.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pScanInfo->sttKeyInfo.nextProcKey = ekey; } - pScanInfo->lastProcKey.numOfPKs = pTsdbReader->suppInfo.numOfPks; - if (pTsdbReader->suppInfo.numOfPks > 0 && IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) { - pScanInfo->lastProcKey.pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes); + // only handle the first primary key. + pRowKey->numOfPKs = pTsdbReader->suppInfo.numOfPks; + if (pTsdbReader->suppInfo.numOfPks > 0) { + if (IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) { + pRowKey->pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes); + } + pRowKey->pks[0].type = pTsdbReader->suppInfo.pk.type; } pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); - tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, - pScanInfo->lastProcKey.ts, pTsdbReader->idStr); + tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pRowKey->ts, + pTsdbReader->idStr); } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dbb0761046..1611f08b83 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1198,16 +1198,17 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); { // todo :refactor: + SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info; for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) { SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i); if (pItem->isPk) { SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId); - pInfo->pResBlock->info.pks[0].type = pInfoData->info.type; - pInfo->pResBlock->info.pks[1].type = pInfoData->info.type; + pBlockInfo->pks[0].type = pInfoData->info.type; + pBlockInfo->pks[1].type = pInfoData->info.type; if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { - pInfo->pResBlock->info.pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); - pInfo->pResBlock->info.pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); } } }