refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-07-02 15:22:49 +08:00
parent 42d0af5cf7
commit a0e38fc703
1 changed files with 13 additions and 7 deletions

View File

@ -151,7 +151,7 @@ static int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STable
static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow **pTSRow, STsdbReader* pReader); static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow **pTSRow, STsdbReader* pReader);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow);
@ -2063,7 +2063,14 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc
overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order); overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
} }
return (overlapWithNeighbor || bool hasDup = false;
if (pBlock->nSubBlock == 1) {
hasDup = pBlock->hasDup;
} else {
hasDup = true;
}
return (overlapWithNeighbor || hasDup ||
dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) || dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || keyOverlapFileBlock(key, pBlock, &pReader->verRange) ||
(pBlock->nRow > pReader->capacity)); (pBlock->nRow > pReader->capacity));
@ -2478,7 +2485,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} else { } else {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
// current block are exhausted, try the next file block // current block are exhausted, try the next file block
if (pReader->status.fBlockDumpInfo.allDumped) { if (pReader->status.fBlockDumpInfo.allDumped) {
@ -2674,7 +2680,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockSc
if (asc) { if (asc) {
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
if (pDumpInfo->rowIndex < pBlockData->nRow - 1) { if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
} }
@ -2703,7 +2709,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
int32_t sversion = TSDBROW_SVERSION(pRow); int32_t sversion = TSDBROW_SVERSION(pRow);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
@ -2718,7 +2724,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
checkUpdateSchema(pRow, uid, pReader); updateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema); tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(dIter, hasVal, k.ts, &merge, pReader); doLoadRowsOfIdenticalTsInBuf(dIter, hasVal, k.ts, &merge, pReader);
@ -2732,7 +2738,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
ASSERT(k.ts == ik.ts); ASSERT(k.ts == ik.ts);
checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader); updateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema); tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);