From 7904686915e34967973e87e2d6e09c63d7cb6427 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 21 Aug 2022 18:59:21 +0800 Subject: [PATCH] fix(query): support delete in last --- source/dnode/vnode/src/tsdb/tsdbRead.c | 54 +++++++++++++------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d89b45b140..6c9042f64f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -166,7 +166,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger); +static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); @@ -1441,7 +1441,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf init = true; tRowMergerInit(&merge, &fRow1, pReader->pSchema); } - doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); } if (minKey == k.ts) { @@ -1470,7 +1470,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf init = true; tRowMergerInit(&merge, &fRow1, pReader->pSchema); } - doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); } if (minKey == key) { @@ -1641,7 +1641,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo init = true; tRowMergerInit(&merge, &fRow1, pReader->pSchema); } - doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); } if (minKey == ik.ts) { @@ -1691,7 +1691,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo init = true; tRowMergerInit(&merge, &fRow1, pReader->pSchema); } - doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); } if (minKey == key) { @@ -1904,7 +1904,7 @@ static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) { *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; } -static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { +static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1; if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; @@ -1918,21 +1918,28 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { continue; } - if (pBlockData->aTSKEY[i] < pLastBlockReader->window.skey) { + int64_t ts = pBlockData->aTSKEY[i]; + if (ts < pLastBlockReader->window.skey) { continue; } - if (pBlockData->aVersion[i] < pLastBlockReader->verRange.minVer) { + int64_t ver = pBlockData->aVersion[i]; + if (ver < pLastBlockReader->verRange.minVer) { continue; } // no data any more, todo opt handle desc case - if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { + if (ts > pLastBlockReader->window.ekey) { continue; } // todo opt handle desc case - if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { + if (ver > pLastBlockReader->verRange.maxVer) { + continue; + } + + TSDBKEY k = {.ts = ts, .version = ver}; + if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pLastBlockReader->order)) { continue; } @@ -1958,7 +1965,8 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { } // todo refactor -static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) { +static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, + SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; @@ -2011,7 +2019,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow1, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); @@ -2025,7 +2033,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); @@ -2061,7 +2069,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); @@ -2085,10 +2093,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pBlockScanInfo = pReader->status.pTableIter; } - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; -// initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL); -// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block - + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; @@ -2439,7 +2444,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); int32_t index = pScanInfo->indexInBlockL; if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { - bool hasData = nextRowInLastBlock(pLastBlockReader); + bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); if (!hasData) { // current table does not have rows in last block, try next table pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); if (pStatus->pTableIter == NULL) { @@ -2504,13 +2509,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } - if (pLastBlockReader->currentBlockIndex == -1) { -// ASSERT(0); - } - + // note: the lastblock may be null here initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { - bool hasData = nextRowInLastBlock(pLastBlockReader); + bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); } } @@ -3034,8 +3036,8 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc } // todo check if the rows are dropped or not -int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) { - while(nextRowInLastBlock(pLastBlockReader)) { +int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) { + while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex);