refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-30 16:32:51 +08:00
parent 05d416f3b9
commit d8e0d02e6e
1 changed files with 68 additions and 49 deletions

View File

@ -3395,6 +3395,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
while (1) { while (1) {
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
return TSDB_CODE_SUCCESS;
}
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
initMemDataIterator(*pBlockScanInfo, pReader); initMemDataIterator(*pBlockScanInfo, pReader);
@ -3474,45 +3479,67 @@ static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc))); ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
} }
typedef enum {
TSDB_READ_RETURN = 0x1,
TSDB_READ_CONTINUE = 0x2,
} ERetrieveType;
static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
while(1) {
terrno = 0;
code = doLoadLastBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
terrno = code;
return TSDB_READ_RETURN;
}
if (pResBlock->info.rows > 0) {
return TSDB_READ_RETURN;
}
// all data blocks are checked in this last block file, now let's try the next file
ASSERT(pReader->status.pTableIter == NULL);
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) ||
pReader->flag == READER_STATUS_SHOULD_STOP) {
terrno = code;
return TSDB_READ_RETURN;
}
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
return TSDB_READ_CONTINUE;
} else { // all blocks in data file are checked, let's check the data in last files
resetTableListIndex(&pReader->status);
}
}
}
static int32_t buildBlockFromFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
_begin: ERetrieveType type = doReadDataFromLastFiles(pReader);
code = doLoadLastBlockSequentially(pReader); if (type != TSDB_READ_RETURN) {
if (code != TSDB_CODE_SUCCESS) { return terrno;
return code;
}
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
// all data blocks are checked in this last block file, now let's try the next file
if (pReader->status.pTableIter == NULL) {
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
// this file does not have data files, let's start check the last block file if exists
if (pBlockIter->numOfBlocks == 0) {
resetTableListIndex(&pReader->status);
goto _begin;
}
} }
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; return code;
} }
if (pReader->resBlockInfo.pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -3530,30 +3557,22 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else { } else {
if (pReader->status.pCurrentFileset->nSttF > 0) { // all data blocks in files are checked, let's check the data in last files.
// data blocks in current file are exhausted, let's try the next file now ASSERT(pReader->status.pCurrentFileset->nSttF > 0);
SBlockData* pBlockData = &pReader->status.fileBlockData;
if (pBlockData->uid != 0) {
tBlockDataClear(pBlockData);
}
tBlockDataReset(pBlockData); // data blocks in current file are exhausted, let's try the next file now
resetDataBlockIterator(pBlockIter, pReader->order); SBlockData* pBlockData = &pReader->status.fileBlockData;
resetTableListIndex(&pReader->status); if (pBlockData->uid != 0) {
goto _begin; tBlockDataClear(pBlockData);
} else { }
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked tBlockDataReset(pBlockData);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { resetDataBlockIterator(pBlockIter, pReader->order);
return code; resetTableListIndex(&pReader->status);
}
// this file does not have blocks, let's start check the last block file ERetrieveType type = doReadDataFromLastFiles(pReader);
if (pBlockIter->numOfBlocks == 0) { if (type != TSDB_READ_RETURN) {
resetTableListIndex(&pReader->status); return terrno;
goto _begin;
}
} }
} }
} }
@ -3561,11 +3580,11 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; return code;
} }
if (pReader->resBlockInfo.pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }