From 7a5cdb81d5d47257680a7da843f31927ddee2e80 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 Apr 2024 14:40:49 +0800 Subject: [PATCH 1/2] fix(tsdb): set the correct start pos for binary search. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 36 ++++++++++++++++++------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cf95dd3f6f..85237df583 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -858,7 +858,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { return pBlockInfo; } -static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) { +static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) { // start end position int s, e; s = pos; @@ -906,6 +906,25 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or } } +// handle the repeat ts cases. +static int32_t doSearchStartPos(SBlockData* pBlock, int32_t num, int32_t pos, TSKEY key, int32_t order) { + int32_t startPos = doBinarySearchKey(pBlock->aTSKEY, num, pos, key, order); + + int32_t i = startPos; + int64_t startTs = pBlock->aTSKEY[startPos]; + if (ASCENDING_TRAVERSE(order)) { + while (i >= 0 && (pBlock->aTSKEY[i] == startTs)) { + i--; + } + return i + 1; + } else { + while (i < pBlock->nRow && (pBlock->aTSKEY[i] == startTs)) { + i++; + } + return i - 1; + } +} + static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SBrinRecord* pRecord, int32_t pos) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; @@ -917,7 +936,7 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData endPos = 0; } else { int64_t key = asc ? pReader->info.window.ekey : pReader->info.window.skey; - endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->info.order); + endPos = doSearchStartPos(pBlockData, pRecord->numRow, pos, key, pReader->info.order); } if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) || @@ -1100,12 +1119,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; int32_t numOfOutputCols = pSupInfo->numOfCols; int32_t code = TSDB_CODE_SUCCESS; + int64_t st = taosGetTimestampUs(); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int32_t step = asc ? 1 : -1; - SColVal cv = {0}; - int64_t st = taosGetTimestampUs(); - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - int32_t step = asc ? 1 : -1; - + SColVal cv = {0}; SBrinRecord tmp; blockInfoToRecord(&tmp, pBlockInfo, pSupInfo); SBrinRecord* pRecord = &tmp; @@ -1119,7 +1137,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro } // row index of dump info remain the initial position, let's find the appropriate start position. - if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) { + if (((pDumpInfo->rowIndex == 0) && asc) || ((pDumpInfo->rowIndex == (pRecord->numRow - 1)) && (!asc))) { if (asc && pReader->info.window.skey <= pRecord->firstKey.key.ts && pReader->info.verRange.minVer <= pRecord->minVer) { // pDumpInfo->rowIndex = 0; @@ -1130,7 +1148,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro int32_t pos = asc ? pRecord->numRow - 1 : 0; int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int64_t key = asc ? pReader->info.window.skey : pReader->info.window.ekey; - pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, order); + pDumpInfo->rowIndex = doSearchStartPos(pBlockData, pRecord->numRow, pos, key, order); if (pDumpInfo->rowIndex < 0) { tsdbError( From b0ef022e8787b8e0e0fcd4d90b9bafc253b2ece6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 Apr 2024 15:13:52 +0800 Subject: [PATCH 2/2] fix(tsdb): set the correct start pos for binary search. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 26 ++++++++++++------------- source/libs/executor/src/scanoperator.c | 1 + 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 85237df583..c25deb0d66 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -907,21 +907,19 @@ static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, } // handle the repeat ts cases. -static int32_t doSearchStartPos(SBlockData* pBlock, int32_t num, int32_t pos, TSKEY key, int32_t order) { - int32_t startPos = doBinarySearchKey(pBlock->aTSKEY, num, pos, key, order); - +static int32_t findFirstPos(const int64_t* pTsList, int32_t num, int32_t startPos, bool asc) { int32_t i = startPos; - int64_t startTs = pBlock->aTSKEY[startPos]; - if (ASCENDING_TRAVERSE(order)) { - while (i >= 0 && (pBlock->aTSKEY[i] == startTs)) { - i--; - } - return i + 1; - } else { - while (i < pBlock->nRow && (pBlock->aTSKEY[i] == startTs)) { + int64_t startTs = pTsList[startPos]; + if (asc) { + while (i < num && (pTsList[i] == startTs)) { i++; } return i - 1; + } else { + while (i >= 0 && (pTsList[i] == startTs)) { + i--; + } + return i + 1; } } @@ -936,7 +934,8 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData endPos = 0; } else { int64_t key = asc ? pReader->info.window.ekey : pReader->info.window.skey; - endPos = doSearchStartPos(pBlockData, pRecord->numRow, pos, key, pReader->info.order); + endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->info.order); + endPos = findFirstPos(pBlockData->aTSKEY, pRecord->numRow, endPos, asc); } if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) || @@ -1148,7 +1147,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro int32_t pos = asc ? pRecord->numRow - 1 : 0; int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int64_t key = asc ? pReader->info.window.skey : pReader->info.window.ekey; - pDumpInfo->rowIndex = doSearchStartPos(pBlockData, pRecord->numRow, pos, key, order); + pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, order); if (pDumpInfo->rowIndex < 0) { tsdbError( @@ -1159,6 +1158,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro return TSDB_CODE_INVALID_PARA; } + pDumpInfo->rowIndex = findFirstPos(pBlockData->aTSKEY, pRecord->numRow, pDumpInfo->rowIndex, (!asc)); ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer); // find the appropriate start position that satisfies the version requirement. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 838db4c571..b2b7a85d0c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4577,6 +4577,7 @@ void destroyTableMergeScanOperatorInfo(void* param) { destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); + // remove it from the task->result list pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock); taosArrayDestroy(pTableScanInfo->pSortInfo);