From 60c5b2beac42efaec8fa5fcde11a3501ad8f10c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 17:53:30 +0800 Subject: [PATCH] fix(query): optimize the row merge procedure for files. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 97 ++++++++++++++++++++------ 1 file changed, 76 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 03985654f8..c259a7cfe0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -145,7 +145,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI SRowMerger* pMerger); static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); -static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -691,16 +692,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; - int32_t numOfCols = blockDataGetNumOfCols(pResBlock); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t st = taosGetTimestampUs(); - SColVal cv = {0}; - int32_t colIndex = 0; - + int64_t st = taosGetTimestampUs(); bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; @@ -724,7 +722,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } - while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) { + int32_t colIndex = 0; + int32_t num = taosArrayGetSize(pBlockData->aIdx); + while (i < numOfOutputCols && colIndex < num) { rowIndex = 0; pColData = taosArrayGet(pResBlock->pDataBlock, i); @@ -744,7 +744,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } - while (i < numOfCols) { + while (i < numOfOutputCols) { pColData = taosArrayGet(pResBlock->pDataBlock, i); colDataAppendNNULL(pColData, 0, remain); i += 1; @@ -1256,7 +1256,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerClear(&merge); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; @@ -1300,7 +1300,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { // key > ik.ts || key > k.ts ASSERT(key != ik.ts); @@ -1309,7 +1309,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1317,7 +1317,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1326,7 +1326,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(key > ik.ts && key > k.ts); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1350,7 +1350,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch @@ -1359,7 +1359,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [4] ik.ts > key >= k.ts if (ik.ts > key) { doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1371,7 +1371,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1383,7 +1383,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1438,6 +1438,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } // imem & mem are all empty, only file exist + + // opt version + // 1. it is not a border point + // 2. the direct next point is not an duplicated timestamp + if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || + (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { + int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; + int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; + if (nextKey != key) { + // merge is not needed + doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + return TSDB_CODE_SUCCESS; + } + } + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); STSRow* pTSRow = NULL; @@ -1446,7 +1461,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); taosMemoryFree(pTSRow); tRowMergerClear(&merge); @@ -2201,7 +2216,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; - if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) { + if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); } @@ -2325,7 +2340,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } -int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { +int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { int32_t numOfRows = pBlock->info.rows; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); @@ -2369,6 +2384,46 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow return TSDB_CODE_SUCCESS; } +int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { + int32_t i = 0, j = 0; + int32_t outputRowIndex = pResBlock->info.rows; + + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + + SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); + if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]); + } + + SColVal cv = {0}; + int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); + + while(i < numOfOutputCols && j < numOfInputCols) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); + + if (pData->cid == pCol->info.colId) { + tColDataGetValue(pData, j, &cv); + doCopyColVal(pCol, outputRowIndex++, i, &cv, pSupInfo); + j += 1; + } else { // the specified column does not exist in file block, fill with null data + colDataAppendNULL(pCol, outputRowIndex); + } + + i += 1; + } + + while (i < numOfOutputCols) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + colDataAppendNULL(pCol, rowIndex); + i += 1; + } + + pResBlock->info.rows += 1; + return TSDB_CODE_SUCCESS; +} + int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; @@ -2380,7 +2435,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e break; } - doAppendOneRow(pBlock, pReader, pTSRow); + doAppendRowFromTSRow(pBlock, pReader, pTSRow); taosMemoryFree(pTSRow); // no data in buffer, return immediately