From 5ee7e7aed9d5989657c2aafde71e9c628b992d41 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Jan 2023 17:55:00 +0800 Subject: [PATCH] fix: init tsdb read snap --- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 ++++++++++++ source/libs/executor/src/executorimpl.c | 2 ++ source/libs/executor/src/scanoperator.c | 5 +++-- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e04633fd01..363960a085 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4399,7 +4399,19 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { } int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { + SReaderStatus* pStatus = &pReader->status; + + qTrace("tsdb/read: %p, take read mutex", pReader); + taosThreadMutexLock(&pReader->readerMutex); + + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + + taosThreadMutexUnlock(&pReader->readerMutex); + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { + tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e8a6d2cd63..1d1298bf5f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2659,6 +2659,8 @@ void qStreamCloseTsdbReader(void* task) { if (task == NULL) return; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; SOperatorInfo* pOp = pTaskInfo->pRoot; + qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid, + pTaskInfo->streamInfo.lastStatus.ts); pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d07a333070..6d93989582 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro if (NULL == *pPage) { return NULL; } - + return (SResultRow*)((char*)(*pPage) + p1->offset); } @@ -1588,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows", pResult->info.rows); + qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows, + pResult->info.window.skey, pResult->info.window.ekey); pTaskInfo->streamInfo.returned = 1; return pResult; } else {