enh(query): add reader status.

This commit is contained in:
Haojun Liao 2023-04-27 16:10:36 +08:00
parent 7762e0ea47
commit e822dc2025
1 changed files with 17 additions and 12 deletions

View File

@ -20,6 +20,12 @@
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) #define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
typedef enum {
READER_STATUS_SUSPEND = 0x1,
READER_STATUS_SHOULD_STOP = 0x2,
READER_STATUS_NORMAL = 0x3,
} EReaderExecStatus;
typedef enum { typedef enum {
EXTERNAL_ROWS_PREV = 0x1, EXTERNAL_ROWS_PREV = 0x1,
EXTERNAL_ROWS_MAIN = 0x2, EXTERNAL_ROWS_MAIN = 0x2,
@ -190,7 +196,8 @@ struct STsdbReader {
STsdb* pTsdb; STsdb* pTsdb;
SVersionRange verRange; SVersionRange verRange;
TdThreadMutex readerMutex; TdThreadMutex readerMutex;
bool suspended; EReaderExecStatus flag;
// bool suspended;
uint64_t suid; uint64_t suid;
int16_t order; int16_t order;
EReadMode readMode; EReadMode readMode;
@ -2855,7 +2862,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
break; break;
} }
if (pResBlock->info.rows >= pReader->capacity) { if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) {
break; break;
} }
} }
@ -4431,7 +4438,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
goto _err; goto _err;
} }
pReader->suspended = true; pReader->flag = READER_STATUS_SUSPEND;
if (countOnly) { if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY; pReader->readMode = READ_MODE_COUNT_ONLY;
@ -4660,8 +4667,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
pReader->flag = READER_STATUS_SUSPEND;
pReader->suspended = true;
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
pReader->idStr); pReader->idStr);
@ -4678,7 +4684,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
code = tsdbTryAcquireReader(pReader); code = tsdbTryAcquireReader(pReader);
if (code == 0) { if (code == 0) {
if (pReader->suspended) { if (pReader->flag == READER_STATUS_SUSPEND) {
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return code; return code;
} }
@ -4734,8 +4740,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
} }
} }
pReader->suspended = false; pReader->flag = READER_STATUS_NORMAL;
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader, tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
return code; return code;
@ -4823,7 +4828,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
code = tsdbAcquireReader(pReader); code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
if (pReader->suspended) { if (pReader->flag == READER_STATUS_SUSPEND) {
tsdbReaderResume(pReader); tsdbReaderResume(pReader);
} }
@ -5100,7 +5105,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
qTrace("tsdb/reader-reset: %p, take read mutex", pReader); qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
tsdbAcquireReader(pReader); tsdbAcquireReader(pReader);
if (pReader->suspended) { if (pReader->flag == READER_STATUS_SUSPEND) {
tsdbReaderResume(pReader); tsdbReaderResume(pReader);
} }
@ -5181,7 +5186,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
// find the start data block in file // find the start data block in file
tsdbAcquireReader(pReader); tsdbAcquireReader(pReader);
if (pReader->suspended) { if (pReader->flag == READER_STATUS_SUSPEND) {
tsdbReaderResume(pReader); tsdbReaderResume(pReader);
} }
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
@ -5254,7 +5259,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
tsdbAcquireReader(pReader); tsdbAcquireReader(pReader);
if (pReader->suspended) { if (pReader->flag == READER_STATUS_SUSPEND) {
tsdbReaderResume(pReader); tsdbReaderResume(pReader);
} }