diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e1756333c5..d5d8ba130c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1729,41 +1729,45 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader // row in last file block TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); - + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist + if (key < tsLast) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key == ts) { - SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); - - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); - - code = tsdbRowMergerGetRow(pMerger, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); - - taosMemoryFree(pTSRow); - tsdbRowMergerClear(pMerger); - return code; - } else { // key > ts + } else if (key > tsLast) { + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + } + } else { + if (key > tsLast) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (key < tsLast) { return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); } - } else { // desc order - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true); } + // the following for key == tsLast + SRow* pTSRow = NULL; + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + + TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + tsdbRowMergerAdd(pMerger, pRow1, NULL); + + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + + code = tsdbRowMergerGetRow(pMerger, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + + taosMemoryFree(pTSRow); + tsdbRowMergerClear(pMerger); + return code; + } else { // only last block exists return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); } @@ -2190,7 +2194,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; - int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : + (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); if (pBlockScanInfo->iter.hasVal) { pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); } @@ -2564,9 +2569,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; - if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { - // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); + if (pScanInfo == NULL) { + tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2575,8 +2579,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } - // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { + // reset the index in last block when handing a new file + bool hasNexTable = moveToNextTable(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + + continue; + } bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { @@ -2667,16 +2678,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { - // only return the rows in last block - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); - ASSERT(tsLast >= pBlockInfo->record.lastKey); + bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); + int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; + if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) || + (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) { + // whole block is required, return it directly + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; + pInfo->rows = pBlockInfo->record.numRow; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 0; + pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; + setComposedBlockFlag(pReader, false); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); + // update the last key for the corresponding table + pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; + tsdbDebug("%p uid:%" PRIu64 + " clean file block retrieved from file, global index:%d, " + "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", + pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, + pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); + } else { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); + tsdbDebug("load data in last block firstly %s", pReader->idStr); int64_t st = taosGetTimestampUs(); @@ -2707,23 +2734,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } - } else { // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; - pInfo->rows = pBlockInfo->record.numRow; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 0; - pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; - setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); - - // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " - "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", - pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, - pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); } + } return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; @@ -4097,11 +4109,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); - int64_t loadBlocks = 0; - double elapse = 0; - pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse); - pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); - // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; int32_t iter = 0; @@ -4172,7 +4179,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { } } - tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); pReader->pReadSnap = NULL; pReader->flag = READER_STATUS_SUSPEND;