fix(query): add check for blockdata.

This commit is contained in:
Haojun Liao 2022-12-14 20:31:21 +08:00
parent 473f5a3696
commit b5f611328a
1 changed files with 20 additions and 5 deletions

View File

@ -1157,6 +1157,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
setBlockAllDumped(pDumpInfo, ts, pReader->order); setBlockAllDumped(pDumpInfo, ts, pReader->order);
} }
pBlockScanInfo->lastKey = pDumpInfo->lastKey;
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.blockLoadTime += elapsedTime; pReader->cost.blockLoadTime += elapsedTime;
@ -2468,7 +2470,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
while (1) { while (1) {
bool hasBlockData = false; bool hasBlockData = false;
{ {
while (pBlockData->nRow > 0) { // find the first qualified row in data block while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block
if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
hasBlockData = true; hasBlockData = true;
break; break;
@ -2478,10 +2480,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
int32_t nextIndex = -1;
SBlockIndex bIndex = {0}; SBlockIndex bIndex = {0};
bool hasNeighbor = int32_t nextIndex = -1;
getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex); bool hasNeighbor = false;
if (pBlockInfo != NULL) {
hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex);
}
if (!hasNeighbor) { // do nothing if (!hasNeighbor) { // do nothing
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break; break;
@ -3021,7 +3026,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} else { } else {
if (pReader->status.pCurrentFileset->nSttF > 0) { if (pReader->status.pCurrentFileset->nSttF > 0) {
// data blocks in current file are exhausted, let's try the next file now // data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData); SBlockData* pBlockData = &pReader->status.fileBlockData;
if (pBlockData->uid != 0) {
tBlockDataClear(pBlockData);
}
tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
goto _begin; goto _begin;
} else { } else {
@ -3378,6 +3388,11 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
if (pFileBlockInfo == NULL) {
st = CHECK_FILEBLOCK_QUIT;
break;
}
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st); checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) { if (st == CHECK_FILEBLOCK_QUIT) {
break; break;