From bb1165f1e95404b128e1f12692361aaa8107bd06 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 29 Sep 2022 12:14:41 +0800 Subject: [PATCH] optimize projection return --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/executor/src/projectoperator.c | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e7eab7ab73..ed5a894416 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -214,7 +214,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; } @@ -551,7 +551,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 - if (dataRsp.blockNum == 0 && dataRsp.rspOffset.type == TMQ_OFFSET__LOG && + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); if (pPushEntry != NULL) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 8d40824cc4..e9e6fed66a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -210,9 +210,12 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->status = OP_OPENED; } + qDebug("enter project"); + if (pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pOperator->status = OP_OPENED; + qDebug("projection in queue model, set status open and return null"); return NULL; } @@ -241,10 +244,13 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pOperator->status == OP_EXEC_RECV && - pFinalRes->info.rows == 0) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { pOperator->status = OP_OPENED; - continue; + if (pOperator->status == OP_EXEC_RECV) { + continue; + } else { + return NULL; + } } qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);