fix(query): add more check for stopping tsdb reader

This commit is contained in:
Haojun Liao 2023-04-30 01:03:13 +08:00
parent ace193ca9b
commit a88a4f5db0
1 changed files with 13 additions and 2 deletions

View File

@ -2997,6 +2997,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
// only check here, since the iterate data in memory is very fast. // only check here, since the iterate data in memory is very fast.
if (pReader->flag == READER_STATUS_SHOULD_STOP) { if (pReader->flag == READER_STATUS_SHOULD_STOP) {
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
taosArrayDestroy(pIndexList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3163,6 +3164,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
return TSDB_CODE_SUCCESS;
}
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
return terrno; return terrno;
@ -3483,7 +3488,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
_begin: _begin:
code = doLoadLastBlockSequentially(pReader); code = doLoadLastBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; return code;
} }
@ -3496,7 +3501,8 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked // error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) ||
pReader->flag == READER_STATUS_SHOULD_STOP) {
return code; return code;
} }
@ -5066,6 +5072,11 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
return NULL;
}
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
return NULL; return NULL;