From ecdd6784f6f20e24015709ed0b5e23ec2eefd273 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 15:36:24 +0800 Subject: [PATCH] [td-11818] refactor. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/executor/src/executor.c | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 52c541dcfd..59c9828693 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -635,7 +635,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; void* task = pHandle->buffer.output[pos].task; - qStreamExecTaskSetInput(task, pCont); + qSetStreamInput(task, pCont); SSDataBlock* pDataBlock; uint64_t ts; if (qExecTask(task, &pDataBlock, &ts) < 0) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ccc1620264..ee96ac4a71 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -18,17 +18,20 @@ #include "executorimpl.h" #include "planner.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, uint64_t reqId) { ASSERT(pOperator != NULL); if (pOperator->operatorType != OP_StreamScan) { - if (pOperator->numOfDownstream > 0) { - - if (pOperator->numOfDownstream > 1) { // not handle this in join query - return TSDB_CODE_QRY_APP_ERROR; - } - - return doSetStreamBlock(pOperator->pDownstream[0], input); + if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block, reqId:0x%" PRIx64, reqId); + return TSDB_CODE_QRY_APP_ERROR; } + + if (pOperator->numOfDownstream > 1) { // not handle this in join query + qError("join not supported for stream block scan, reqId:0x%" PRIx64, reqId); + return TSDB_CODE_QRY_APP_ERROR; + } + + return doSetStreamBlock(pOperator->pDownstream[0], input, reqId); } else { SStreamBlockScanInfo* pInfo = pOperator->info; tqReadHandleSetMsg(pInfo->readerHandle, input, 0); @@ -46,7 +49,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input); + + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); } else {