From ba8ab926322a95ca022936089c216e694edea37c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 2 Jul 2022 23:37:31 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 77 ++++++++++++++++---------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5dd5ced72d..0a232bc02a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -145,7 +145,7 @@ struct STsdbReader { }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); -static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); +static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); @@ -1967,7 +1967,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { /** * This is an two rectangles overlap cases. */ -static int32_t dataBlockPartialRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) { +static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) { return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) || (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) || (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) || @@ -2054,6 +2054,10 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); } +// 1. the version of all rows should be less than the endVersion +// 2. current block should not overlap with next neighbor block +// 3. current timestamp should not be overlap with each other +// 4. output buffer should be large enough to hold all rows in current block static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { int32_t neighborIndex = 0; SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); @@ -2071,12 +2075,12 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc } return (overlapWithNeighbor || hasDup || - dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) || + dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) || keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity)); } -static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { +static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) { return TSDB_CODE_SUCCESS; } @@ -2084,7 +2088,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* SSDataBlock* pBlock = pReader->pResBlock; int64_t st = taosGetTimestampUs(); - int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, *key, pReader->capacity, pReader); + int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); int64_t elapsedTime = taosGetTimestampUs() - st; @@ -2491,13 +2495,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { // data in memory that are earlier than current file block - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - code = buildDataBlockFromBuf(pReader, pScanInfo, &maxKey); + // todo rows in buffer should be less than the file block in asc, greater than file block in desc + int64_t endKey = (ASCENDING_TRAVERSE(pReader->order))? pBlock->minKey.ts:pBlock->maxKey.ts; + code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { // whole block is required, return it directly - // todo - // 1. the version of all rows should be less than the endVersion - // 2. current block should not overlap with next neighbor block - // 3. current timestamp should not be overlap with each other SDataBlockInfo* pInfo = &pReader->pResBlock->info; pInfo->rows = pBlock->nRow; pInfo->uid = pScanInfo->uid; @@ -2523,8 +2524,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; initMemIterator(pBlockScanInfo, pReader); - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, &maxKey); + int64_t endKey = (ASCENDING_TRAVERSE(pReader->order))? INT64_MAX:INT64_MIN; + int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2590,8 +2591,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + bool asc = ASCENDING_TRAVERSE(pReader->order); + + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + // current block are exhausted, try the next file block - if (pReader->status.fBlockDumpInfo.allDumped) { + if (pDumpInfo->allDumped) { // try next data block in current file bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (hasNext) { // current file is exhausted, let's try the next file @@ -2603,13 +2608,13 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } - code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - code = buildComposedDataBlock(pReader, pScanInfo); - return code; + return doBuildDataBlock(pReader); + } else if ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && !asc)) { + // file data block is partially loaded + // todo refactor: extract method + return buildComposedDataBlock(pReader, pScanInfo); + } else { // current block is not loaded yet + return doBuildDataBlock(pReader); } } @@ -2854,18 +2859,34 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo tRowMergerGetRow(&merge, pTSRow); } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { +int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey) { TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); - if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { + // todo refactor + bool asc = ASCENDING_TRAVERSE(pReader->order); + if (pBlockScanInfo->memHasVal) { + TSDBKEY k = TSDBROW_KEY(pRow); + if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { + pRow = NULL; + } + } + + if (pBlockScanInfo->imemHasVal) { + TSDBKEY k = TSDBROW_KEY(piRow); + if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { + piRow = NULL; + } + } + + if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal && pRow != NULL && piRow != NULL) { TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); if (ik.ts < k.ts) { // ik.ts < k.ts doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); } else if (k.ts < ik.ts) { - doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); } else { // ik.ts == k.ts doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); } @@ -2873,12 +2894,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } - if (pBlockScanInfo->memHasVal) { + if (pBlockScanInfo->memHasVal && pRow != NULL) { doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); return TSDB_CODE_SUCCESS; } - if (pBlockScanInfo->imemHasVal) { + if (pBlockScanInfo->imemHasVal && piRow != NULL) { doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); return TSDB_CODE_SUCCESS; } @@ -2930,12 +2951,12 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow return TSDB_CODE_SUCCESS; } -int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) { +int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; do { STSRow* pTSRow = NULL; - tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow); + tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey); if (pTSRow == NULL) { break; }