diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d4af0422d7..d9d60442ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -404,7 +404,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr); } else { - tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr); + tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4c5b87559e..fd19e552c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -44,6 +44,7 @@ typedef struct SBlockIndex { typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; + TSKEY lastKeyInStt; // last accessed key in stt SMapData mapData; // block info (compressed) SArray* pBlockList; // block data index list, SArray SIterInfo iter; // mem buffer skip list iterator @@ -192,7 +193,7 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger, SVersionRange* pVerRange); + SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, @@ -402,9 +403,11 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf if (ASCENDING_TRAVERSE(pTsdbReader->order)) { int64_t skey = pTsdbReader->window.skey; pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; + pScanInfo->lastKeyInStt = skey; } else { int64_t ekey = pTsdbReader->window.ekey; pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pScanInfo->lastKeyInStt = ekey; } taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); @@ -1795,21 +1798,18 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc while (1) { bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); - if (!hasVal) { + if (!hasVal) { // the next value will be the accessed key in stt + pScanInfo->lastKeyInStt += step; return false; } TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBKEY k = TSDBROW_KEY(&row); - if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) { - pScanInfo->lastKey = k.ts; - } else { + pScanInfo->lastKeyInStt = k.ts; + + if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) { // the qualifed ts may equal to k.ts, only a greater version one. // here we need to fallback one step. - if (pScanInfo->lastKey == k.ts) { - pScanInfo->lastKey -= step; - } - return true; } } @@ -1949,7 +1949,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); } if (minKey == k.ts) { @@ -1997,7 +1997,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); } if (minKey == key) { @@ -2050,7 +2050,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMerge(&merge, &fRow1); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2068,7 +2068,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { @@ -2121,7 +2121,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMerge(&merge, &fRow1); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr); code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2230,7 +2230,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); } if (minKey == ik.ts) { @@ -2321,7 +2321,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); } if (minKey == key) { @@ -2472,9 +2472,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1; STimeWindow w = pLBlockReader->window; if (ASCENDING_TRAVERSE(pLBlockReader->order)) { - w.skey = pScanInfo->lastKey + step; + w.skey = pScanInfo->lastKeyInStt; } else { - w.ekey = pScanInfo->lastKey + step; + w.ekey = pScanInfo->lastKeyInStt; } tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, @@ -3568,13 +3568,16 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc } int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger, SVersionRange* pVerRange) { + SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMerge(pMerger, &fRow1); } else { + tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, + pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, + idStr); break; } } @@ -3937,6 +3940,17 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n pInfo->uid = pList[i].uid; pUidList->tableUidList[i] = pList[i].uid; + // todo extract method + if (ASCENDING_TRAVERSE(pReader->order)) { + int64_t skey = pReader->window.skey; + pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; + pInfo->lastKeyInStt = skey; + } else { + int64_t ekey = pReader->window.ekey; + pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pInfo->lastKeyInStt = ekey; + } + taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); } @@ -4721,8 +4735,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { return code; } -static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) { - return (numOfRows - startRow) / bucketRange; +static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows, int32_t numOfBucket) { + int32_t bucketIndex = (numOfRows - startRow) / bucketRange; + if (bucketIndex == numOfBucket) { + bucketIndex -= 1; + } + return bucketIndex; } int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { @@ -4731,8 +4749,9 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa pTableBlockInfo->totalRows = 0; pTableBlockInfo->numOfVgroups = 1; - // find the start data block in file + const int32_t numOfBucket = 20.0; + // find the start data block in file tsdbAcquireReader(pReader); if (pReader->suspended) { tsdbReaderResume(pReader); @@ -4743,7 +4762,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa pTableBlockInfo->defMinRows = pc->minRows; pTableBlockInfo->defMaxRows = pc->maxRows; - int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0); + int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / numOfBucket); pTableBlockInfo->numOfFiles += 1; @@ -4781,7 +4800,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock; - int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); + int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBucket); pTableBlockInfo->blockRowsHisto[bucketIndex]++; hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr); diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index ddb3bfaa26..360843c46c 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -45,6 +45,8 @@ class TDSimClient: "supportVnodes": "1024", "enableQueryHb": "1", "telemetryReporting": "0", + "tqDebugflag": "135", + "wDebugflag":"135", } def getLogDir(self): diff --git a/tests/script/tsim/parser/sliding.sim b/tests/script/tsim/parser/sliding.sim index b9353e2c61..45a49fbc4e 100644 --- a/tests/script/tsim/parser/sliding.sim +++ b/tests/script/tsim/parser/sliding.sim @@ -49,9 +49,23 @@ while $i < $tbNum $nchar = $nchar . $c $nchar = $nchar . ' - sql insert into $tb values ($tstart , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) - $tstart = $tstart + 30 - $x = $x + 1 + $next = $tstart + 30 + $f = $x + 1 + $c1 = $f / 100 + $c1 = $c1 * 100 + $c1 = $f - $c1 + + $binary1 = ' . binary + $binary1 = $binary1 . $c1 + $binary1 = $binary1 . ' + + $nchar1 = ' . nchar + $nchar1 = $nchar1 . $c1 + $nchar1 = $nchar1 . ' + + sql insert into $tb values ($tstart , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) ($next , $c1 , $c1 , $c1 , $c1 , $c1 , $c1 , $c1 , $binary1 , $nchar1 ) + $tstart = $tstart + 60 + $x = $x + 2 endw $i = $i + 1