diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1ef86f5b30..8f7ebc6c5c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -21,10 +21,9 @@ #define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) typedef enum { - READER_STATUS_SUSPEND = 0x1, - READER_STATUS_SHOULD_STOP = 0x2, - READER_STATUS_NORMAL = 0x3, -} EReaderExecStatus; + READER_STATUS_SUSPEND = 0x1, + READER_STATUS_NORMAL = 0x2, +} EReaderStatus; typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -184,6 +183,7 @@ typedef struct STsdbReaderAttr { STimeWindow window; bool freeBlock; SVersionRange verRange; + int16_t order; } STsdbReaderAttr; typedef struct SResultBlockInfo { @@ -196,7 +196,8 @@ struct STsdbReader { STsdb* pTsdb; SVersionRange verRange; TdThreadMutex readerMutex; - EReaderExecStatus flag; + EReaderStatus flag; + int32_t code; uint64_t suid; int16_t order; EReadMode readMode; @@ -2995,9 +2996,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr while (1) { // only check here, since the iterate data in memory is very fast. - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); - return TSDB_CODE_SUCCESS; + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; } bool hasNext = false; @@ -3093,9 +3094,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; while (1) { - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); - return TSDB_CODE_SUCCESS; + if (pReader->code == TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; } // load the last data block of current table @@ -3246,7 +3247,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } } - return code; + return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; } static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { @@ -3395,9 +3396,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { STableUidList* pUidList = &pStatus->uidList; while (1) { - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); - return TSDB_CODE_SUCCESS; + if (pReader->code == TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; } STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; @@ -3493,7 +3494,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { terrno = 0; code = doLoadLastBlockSequentially(pReader); - if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + if (code != TSDB_CODE_SUCCESS) { terrno = code; return TSDB_READ_RETURN; } @@ -3507,8 +3508,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { code = initForFirstBlockInFile(pReader, pBlockIter); // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) || - pReader->flag == READER_STATUS_SHOULD_STOP) { + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { terrno = code; return TSDB_READ_RETURN; } @@ -3536,13 +3536,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) { return code; } - - if (pResBlock->info.rows > 0) { - return TSDB_CODE_SUCCESS; - } } while (1) { @@ -3581,13 +3577,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = doBuildDataBlock(pReader); } - if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) { return code; } - - if (pResBlock->info.rows > 0) { - return TSDB_CODE_SUCCESS; - } } } @@ -4849,8 +4841,8 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { *hasNext = false; - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) { - return code; + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) { + return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; } SReaderStatus* pStatus = &pReader->status; @@ -5456,4 +5448,4 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { pReader->idStr = taosStrdup(idstr); } -void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->flag = READER_STATUS_SHOULD_STOP; } +void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }