enh(query): stop tsdb reader ASAP.
This commit is contained in:
parent
e822dc2025
commit
802c05150a
|
@ -197,7 +197,6 @@ struct STsdbReader {
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
TdThreadMutex readerMutex;
|
TdThreadMutex readerMutex;
|
||||||
EReaderExecStatus flag;
|
EReaderExecStatus flag;
|
||||||
// bool suspended;
|
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int16_t order;
|
int16_t order;
|
||||||
EReadMode readMode;
|
EReadMode readMode;
|
||||||
|
@ -2995,9 +2994,15 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
||||||
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
// only check here, since the iterate data in memory is very fast.
|
||||||
|
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||||
|
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
|
int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
|
||||||
if (code) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(pIndexList);
|
taosArrayDestroy(pIndexList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3088,6 +3093,11 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||||
|
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// load the last data block of current table
|
// load the last data block of current table
|
||||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||||
|
|
||||||
|
@ -5425,3 +5435,5 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
|
||||||
taosMemoryFreeClear(pReader->idStr);
|
taosMemoryFreeClear(pReader->idStr);
|
||||||
pReader->idStr = taosStrdup(idstr);
|
pReader->idStr = taosStrdup(idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->flag = READER_STATUS_SHOULD_STOP; }
|
||||||
|
|
Loading…
Reference in New Issue