refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-12-23 23:02:40 +08:00
parent 224f5a72c2
commit 1edf1e1799
1 changed files with 54 additions and 66 deletions

View File

@ -396,7 +396,6 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
} }
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
ASSERT(pWindow != NULL);
return pWindow->skey > pWindow->ekey; return pWindow->skey > pWindow->ekey;
} }
@ -1447,7 +1446,6 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
index += step; index += step;
} }
ASSERT(0);
return -1; return -1;
} }
@ -2421,6 +2419,46 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
} }
} }
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, bool* loadNeighbor) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1;
SBlockIndex nxtBIndex = {0};
*loadNeighbor = false;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
if (!hasNeighbor) { // do nothing
return code;
}
if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
*loadNeighbor = true;
}
return code;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader) { static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -2479,38 +2517,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
int32_t nextIndex = -1;
SBlockIndex bIndex = {0};
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex); // continue check for the next file block if the last ts in the current block
if (!hasNeighbor) { // do nothing // is overlapped with the next neighbor block
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); bool loadNeighbor = false;
break; code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
} if ((!loadNeighbor) || (code != 0)) {
if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
} else {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break; break;
} }
@ -2777,7 +2790,10 @@ static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
ASSERT(pStatus->pTableIter != NULL); if (pStatus->pTableIter == NULL) {
return false;
}
return true; return true;
} }
@ -3117,10 +3133,10 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
} }
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) { bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
ASSERT(pKey != NULL);
if (pDelList == NULL) { if (pDelList == NULL) {
return false; return false;
} }
size_t num = taosArrayGetSize(pDelList); size_t num = taosArrayGetSize(pDelList);
bool asc = ASCENDING_TRAVERSE(order); bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
@ -3318,35 +3334,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*state = CHECK_FILEBLOCK_QUIT; *state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1; bool loadNeighbor = true;
SBlockIndex bIndex = {0}; int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
if (!hasNeighbor) { // do nothing
return 0;
}
bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
if (overlap) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
pDumpInfo->rowIndex = pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) { if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
@ -3354,7 +3345,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
} }
} }
return TSDB_CODE_SUCCESS; return code;
} }
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
@ -3709,13 +3700,11 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
} }
} while (1); } while (1);
ASSERT(pBlock->info.rows <= capacity);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// TODO refactor: with createDataBlockScanInfo // TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
ASSERT(pReader != NULL);
int32_t size = taosHashGetSize(pReader->status.pTableMap); int32_t size = taosHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
@ -4079,7 +4068,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
} }
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
ASSERT(pReader != NULL);
*rows = pReader->pResBlock->info.rows; *rows = pReader->pResBlock->info.rows;
*uid = pReader->pResBlock->info.id.uid; *uid = pReader->pResBlock->info.id.uid;
*pWindow = pReader->pResBlock->info.window; *pWindow = pReader->pResBlock->info.window;