fix: init tsdb read snap
This commit is contained in:
parent
625bf9992f
commit
5ee7e7aed9
|
@ -4399,7 +4399,19 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
|
qTrace("tsdb/read: %p, take read mutex", pReader);
|
||||||
|
taosThreadMutexLock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
if (pReader->suspended) {
|
||||||
|
tsdbReaderResume(pReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
||||||
|
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2659,6 +2659,8 @@ void qStreamCloseTsdbReader(void* task) {
|
||||||
if (task == NULL) return;
|
if (task == NULL) return;
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
||||||
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
||||||
|
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
|
||||||
|
pTaskInfo->streamInfo.lastStatus.ts);
|
||||||
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
|
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
|
||||||
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
||||||
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
||||||
|
|
|
@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
|
||||||
if (NULL == *pPage) {
|
if (NULL == *pPage) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (SResultRow*)((char*)(*pPage) + p1->offset);
|
return (SResultRow*)((char*)(*pPage) + p1->offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1588,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
qDebug("queue scan tsdb return %d rows", pResult->info.rows);
|
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
|
||||||
|
pResult->info.window.skey, pResult->info.window.ekey);
|
||||||
pTaskInfo->streamInfo.returned = 1;
|
pTaskInfo->streamInfo.returned = 1;
|
||||||
return pResult;
|
return pResult;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue