From c07e563ecd1f1524d4c0d8a1d6eb69a1c429d073 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 22 Mar 2023 10:43:11 +0800 Subject: [PATCH] fix:error in optimize consume logic --- source/dnode/vnode/src/tq/tqExec.c | 15 ++++++--------- source/dnode/vnode/src/tq/tqRead.c | 4 ++-- source/libs/executor/src/projectoperator.c | 8 ++------ source/libs/executor/src/scanoperator.c | 10 ++++++---- 4 files changed, 16 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 3063f0d372..0dbec85e6e 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -76,13 +76,13 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); + tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, pTq->pVnode->config.vgId); if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, pTq->pVnode->config.vgId, terrstr()); return -1; } - tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock); + tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task end execute, get block:%p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock); // current scan should be stopped asap, since the rebalance occurs. if (pDataBlock == NULL) { @@ -91,12 +91,9 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); pRsp->blockNum++; - - if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - rowCnt += pDataBlock->info.rows; - if (rowCnt >= 4096) { - break; - } + rowCnt += pDataBlock->info.rows; + if (rowCnt >= 4096) { + break; } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5260a00915..9579eb3407 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -309,7 +309,7 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->pWalReader->curVersion; ret->fetchType = FETCH_TYPE__NONE; - tqInfo("return offset %" PRId64 ", no more valid msg in wal", ret->offset.version); + tqInfo("wal return none, offset %" PRId64 ", no more valid msg in wal", ret->offset.version); return; } @@ -327,7 +327,7 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { continue; } ret->fetchType = FETCH_TYPE__DATA; - tqDebug("return data rows %d", ret->data.info.rows); + tqDebug("wal return data rows %d", ret->data.info.rows); return; } } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 9fff7a4943..fdce9a95df 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -256,13 +256,9 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pBlock == NULL) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { pOperator->status = OP_OPENED; - if (pOperator->status == OP_EXEC_RECV) { - continue; - } else { - return NULL; - } + return NULL; } - qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, + qDebug("set op close, exec mode:%d, status %d rows %d", pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); setOperatorCompleted(pOperator); break; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 45ee6e9ea1..4e6cd148d1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1627,12 +1627,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); setBlockIntoRes(pInfo, &ret.data, true); - qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); - return pInfo->pRes; - }else{ + if (pInfo->pRes->info.rows > 0) { + qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); + return pInfo->pRes; + } + }else if(ret.fetchType == FETCH_TYPE__NONE){ pTaskInfo->streamInfo.currentOffset = ret.offset; + return NULL; } - return NULL; } } else { qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);