From dcb71e4af1c89a6c27dc8200cd6170808ceb6ce8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 27 Dec 2022 15:17:43 +0800 Subject: [PATCH] refactor: stream dispatch --- include/libs/stream/tstream.h | 16 +--------------- source/libs/stream/src/stream.c | 29 +++++++++++++++++++++++------ 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 16cf960724..c00625c51c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -392,21 +392,6 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } -static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { - if (pTask->outputType == TASK_OUTPUT__TABLE) { - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); - } else if (pTask->outputType == TASK_OUTPUT__SMA) { - pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); - } else { - taosWriteQitem(pTask->outputQueue->queue, pBlock); - } - return 0; -} - typedef struct { SMsgHead head; int64_t streamId; @@ -584,6 +569,7 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); +int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 5b542dd54b..60729c4d0e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -187,6 +187,23 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } +int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { + if (pTask->outputType == TASK_OUTPUT__TABLE) { + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); + } else if (pTask->outputType == TASK_OUTPUT__SMA) { + pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); + } else { + ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); + taosWriteQitem(pTask->outputQueue->queue, pBlock); + streamDispatch(pTask); + } + return 0; +} + int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId); @@ -199,9 +216,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S return -1; } - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamDispatch(pTask); - } + /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ + /*streamDispatch(pTask);*/ + /*}*/ } else { streamSchedExec(pTask); } @@ -237,9 +254,9 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return -1; } - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamDispatch(pTask); - } + /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ + /*streamDispatch(pTask);*/ + /*}*/ return 0; }