fix(query): optimize the row merge procedure for files.
This commit is contained in:
parent
26e5a7c796
commit
60c5b2beac
|
@ -145,7 +145,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI
|
||||||
SRowMerger* pMerger);
|
SRowMerger* pMerger);
|
||||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
||||||
|
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
||||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||||
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
|
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
|
||||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
||||||
|
@ -691,16 +692,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
|
|
||||||
SColVal cv = {0};
|
SColVal cv = {0};
|
||||||
int32_t colIndex = 0;
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
|
|
||||||
|
@ -724,7 +722,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) {
|
int32_t colIndex = 0;
|
||||||
|
int32_t num = taosArrayGetSize(pBlockData->aIdx);
|
||||||
|
while (i < numOfOutputCols && colIndex < num) {
|
||||||
rowIndex = 0;
|
rowIndex = 0;
|
||||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
@ -744,7 +744,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (i < numOfCols) {
|
while (i < numOfOutputCols) {
|
||||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, remain);
|
||||||
i += 1;
|
i += 1;
|
||||||
|
@ -1256,7 +1256,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerClear(&merge);
|
tRowMergerClear(&merge);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1300,7 +1300,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else { // key > ik.ts || key > k.ts
|
} else { // key > ik.ts || key > k.ts
|
||||||
ASSERT(key != ik.ts);
|
ASSERT(key != ik.ts);
|
||||||
|
@ -1309,7 +1309,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// [4] ik.ts < k.ts <= key
|
// [4] ik.ts < k.ts <= key
|
||||||
if (ik.ts < k.ts) {
|
if (ik.ts < k.ts) {
|
||||||
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
|
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1317,7 +1317,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// [6] k.ts < ik.ts <= key
|
// [6] k.ts < ik.ts <= key
|
||||||
if (k.ts < ik.ts) {
|
if (k.ts < ik.ts) {
|
||||||
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
|
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1326,7 +1326,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
ASSERT(key > ik.ts && key > k.ts);
|
ASSERT(key > ik.ts && key > k.ts);
|
||||||
|
|
||||||
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
|
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1350,7 +1350,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
|
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
|
||||||
|
@ -1359,7 +1359,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// [4] ik.ts > key >= k.ts
|
// [4] ik.ts > key >= k.ts
|
||||||
if (ik.ts > key) {
|
if (ik.ts > key) {
|
||||||
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
|
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1371,7 +1371,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1383,7 +1383,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tRowMerge(&merge, &fRow);
|
tRowMerge(&merge, &fRow);
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1438,6 +1438,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
}
|
}
|
||||||
|
|
||||||
// imem & mem are all empty, only file exist
|
// imem & mem are all empty, only file exist
|
||||||
|
|
||||||
|
// opt version
|
||||||
|
// 1. it is not a border point
|
||||||
|
// 2. the direct next point is not an duplicated timestamp
|
||||||
|
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
|
||||||
|
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
|
||||||
|
int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1;
|
||||||
|
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||||
|
if (nextKey != key) {
|
||||||
|
// merge is not needed
|
||||||
|
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
|
|
||||||
STSRow* pTSRow = NULL;
|
STSRow* pTSRow = NULL;
|
||||||
|
@ -1446,7 +1461,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tRowMergerClear(&merge);
|
tRowMergerClear(&merge);
|
||||||
|
@ -2201,7 +2216,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
|
|
||||||
pDumpInfo->rowIndex += step;
|
pDumpInfo->rowIndex += step;
|
||||||
if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
|
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) {
|
||||||
pDumpInfo->rowIndex =
|
pDumpInfo->rowIndex =
|
||||||
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
|
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
|
||||||
}
|
}
|
||||||
|
@ -2325,7 +2340,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
|
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
@ -2369,6 +2384,46 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
|
||||||
|
int32_t i = 0, j = 0;
|
||||||
|
int32_t outputRowIndex = pResBlock->info.rows;
|
||||||
|
|
||||||
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
|
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
|
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
|
||||||
|
}
|
||||||
|
|
||||||
|
SColVal cv = {0};
|
||||||
|
int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
|
||||||
|
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
||||||
|
|
||||||
|
while(i < numOfOutputCols && j < numOfInputCols) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
|
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
|
||||||
|
|
||||||
|
if (pData->cid == pCol->info.colId) {
|
||||||
|
tColDataGetValue(pData, j, &cv);
|
||||||
|
doCopyColVal(pCol, outputRowIndex++, i, &cv, pSupInfo);
|
||||||
|
j += 1;
|
||||||
|
} else { // the specified column does not exist in file block, fill with null data
|
||||||
|
colDataAppendNULL(pCol, outputRowIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (i < numOfOutputCols) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
|
colDataAppendNULL(pCol, rowIndex);
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResBlock->info.rows += 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
|
@ -2380,7 +2435,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendOneRow(pBlock, pReader, pTSRow);
|
doAppendRowFromTSRow(pBlock, pReader, pTSRow);
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
|
|
||||||
// no data in buffer, return immediately
|
// no data in buffer, return immediately
|
||||||
|
|
Loading…
Reference in New Issue