fix(tsdb): add desc check for clean file block.

This commit is contained in:
Haojun Liao 2023-12-16 15:37:17 +08:00
parent f9a5485d52
commit 4cc0f5be59
1 changed files with 32 additions and 35 deletions

View File

@ -2347,53 +2347,50 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
}
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
SBrinRecord* pRecord = &pBlockInfo->record;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord* pRecord = NULL;
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
return code;
}
if (pBlockInfo == NULL) {
return 0;
}
pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
goto _end;
}
pRecord = &pBlockInfo->record;
pRecord = &pBlockInfo->record;
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
return code;
}
// it is a clean block, load it directly
int64_t cap = pReader->resBlockInfo.capacity;
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) {
if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
}
pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
goto _end;
}
// record the last key value
pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey;
pRecord = &pBlockInfo->record;
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
int64_t cap = pReader->resBlockInfo.capacity;
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) {
if (((asc && (pRecord->firstKey < keyInBuf.ts)) || (!asc && (pRecord->lastKey > keyInBuf.ts))) &&
(pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
}
}
} else { // file blocks not exist
ASSERT(0);
pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
return code;
// record the last key value
pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey;
goto _end;
}
}