From eaef3dffafb9d991773acba0c0ac375fd19d4d92 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Aug 2022 22:42:16 +0800 Subject: [PATCH] fix(query): fix error in query last block. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 163 +++++++++++++++++-------- 1 file changed, 112 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f4671a0f34..5284b6936a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -88,6 +88,7 @@ typedef struct SLastBlockReader { SBlockData lastBlockData; STimeWindow window; SVersionRange verRange; + int32_t order; uint64_t uid; int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL } SLastBlockReader; @@ -313,11 +314,11 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap } // init file iterator -static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) { +static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) { size_t numOfFileset = taosArrayGetSize(aDFileSet); - pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset; - pIter->order = order; + pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; + pIter->order = pReader->order; pIter->pFileList = aDFileSet; pIter->numOfFiles = numOfFileset; @@ -325,14 +326,18 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32 pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader)); if (pIter->pLastBlockReader == NULL) { int32_t code = TSDB_CODE_OUT_OF_MEMORY; - tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr); + tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr); return code; } - pIter->pLastBlockReader->pBlockL = taosArrayInit(4, sizeof(SBlockL)); + SLastBlockReader* pLReader = pIter->pLastBlockReader; + pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL)); + pLReader->order = pReader->order; + pLReader->window = pReader->window; + pLReader->verRange = pReader->verRange; } - tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); + tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1284,7 +1289,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc // todo here we need to each key in the last files to identify if it is really overlapped with last block bool overlapWithlastBlock = false; - if (hasDataInLastBlock(pLastBlockReader)) { + if (/*hasDataInLastBlock(pLastBlockReader)*/taosArrayGetSize(pLastBlockReader->pBlockL) > 0) { SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); } @@ -1364,7 +1369,6 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pMemSchema; } -// todo handle desc static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; @@ -1392,31 +1396,57 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf minKey = key; } - // file block ---> last block -----> imem -----> mem bool init = false; - if (minKey == key) { - init = true; - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - } - if (minKey == tsLast) { - if (!init) { + // file block ---> last block -----> imem -----> mem + if (pReader->order == TSDB_ORDER_ASC) { + if (minKey == key) { init = true; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - tRowMergerInit(&merge, &fRow1, pReader->pSchema); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); - } + if (minKey == tsLast) { + if (!init) { + init = true; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + } + doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + } - if (minKey == k.ts) { - if (!init) { + if (minKey == k.ts) { + if (!init) { + init = true; + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, pRow, pSchema); + } + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + } else { + if (minKey == k.ts) { + init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); tRowMergerInit(&merge, pRow, pSchema); + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + if (minKey == tsLast) { + if (!init) { + init = true; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + } + doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + } + + if (minKey == key) { + if (!init) { + init = true; + tRowMergerInit(&merge, &fRow, pReader->pSchema); + } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } } tRowMergerGetRow(&merge, &pTSRow); @@ -1753,23 +1783,34 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, - SVersionRange* pVerRange, int16_t* startPos) { +static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) { pLastBlockReader->uid = uid; - pLastBlockReader->window = *pWin; - pLastBlockReader->verRange = *pVerRange; pLastBlockReader->rowIndex = startPos; + + if (*startPos == -1) { + if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { + // do nothing + } else { + *startPos = pLastBlockReader->lastBlockData.nRow; + } + } +} + +#define ALL_ROWS_CHECKED_INDEX INT16_MIN +static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) { + *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; } static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { - if (*(pLastBlockReader->rowIndex) >= pLastBlockReader->lastBlockData.nRow) { + int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1; + if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; } - *(pLastBlockReader->rowIndex) += 1; + *(pLastBlockReader->rowIndex) += step; SBlockData* pBlockData = &pLastBlockReader->lastBlockData; - for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow; ++i) { + for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid[i] != pLastBlockReader->uid) { continue; } @@ -1784,12 +1825,12 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { // no data any more if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { - *(pLastBlockReader->rowIndex) = pBlockData->nRow; + setAllRowsChecked(pLastBlockReader); return false; } if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { - *(pLastBlockReader->rowIndex) = pBlockData->nRow; + setAllRowsChecked(pLastBlockReader); return false; } @@ -1798,7 +1839,7 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { } // set all data is consumed in last block - *(pLastBlockReader->rowIndex) = pBlockData->nRow; + setAllRowsChecked(pLastBlockReader); return false; } @@ -1817,15 +1858,14 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { return pBlockData->aTSKEY[*pLastBlockReader->rowIndex]; } -// todo handle desc order static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { - if (*pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) { + if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; } - return true; } +// todo refactor static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -1849,6 +1889,25 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI if (pBlockData->nRow > 0) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + // no last block + if (pLastBlockReader->lastBlockData.nRow == 0) { + if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + return TSDB_CODE_SUCCESS; + } else { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; + } + } + // row in last file block int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); if (ts < key) { // save rows in last block @@ -1901,7 +1960,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } } } else { // only last block exists - // only last block exits SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); @@ -1936,7 +1994,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, &pBlockScanInfo->indexInBlockL); + initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL); // bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2274,12 +2332,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // todo opt perf by avoiding load last block repeatly STableBlockScanInfo* pScanInfo = pStatus->pTableIter; int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader); - if (code != TSDB_CODE_SUCCESS) { // todo handle error + if (code != TSDB_CODE_SUCCESS) { return code; } - initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL); - if (pScanInfo->indexInBlockL == -1) { + initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); + if (pScanInfo->indexInBlockL == -1 || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader); if (!hasData) { // current table does not have rows in last block, try next table pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); @@ -2327,10 +2385,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // load the last data block of current table code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader); if (code != TSDB_CODE_SUCCESS) { - // todo handle error + return code; } - initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL); + initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pScanInfo->indexInBlockL); } if (pBlockInfo == NULL) { // build data block from last data file @@ -2340,7 +2398,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { tBlockDataReset(&pStatus->fileBlockData); code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { - // todo + return code; } code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); @@ -2456,6 +2514,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { return code; } + if (pReader->pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; + } + // all data blocks are checked in this last block file, now let's try the next file if (pReader->status.pTableIter == NULL) { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -2881,7 +2943,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } -// todo support desc order +// todo check if the rows are dropped or not int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) { while(nextRowInLastBlock(pLastBlockReader)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); @@ -3240,7 +3302,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { SDataBlockIter* pBlockIter = &pReader->status.blockIter; - initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); + initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); // no data in files, let's try buffer in memory @@ -3261,8 +3323,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, - pPrevReader->idStr); + initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader); resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap); // no data in files, let's try buffer in memory @@ -3507,13 +3568,13 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { tBlockDataReset(&pStatus->fileBlockData); int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { - //todo + terrno = code; + return NULL; } code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData, 1); - terrno = code; return NULL; } @@ -3555,7 +3616,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); tsdbDataFReaderClose(&pReader->pFileReader); - initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); + initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); resetDataBlockScanInfo(pReader->status.pTableMap);