From 46bea47b62116d98c394bd562de0756133d69143 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 7 Jun 2022 19:58:06 +0800 Subject: [PATCH] enh(stream): direct sink if not dispatch --- include/libs/stream/tstream.h | 11 +++- source/libs/executor/src/timewindowoperator.c | 14 ++++ source/libs/stream/inc/streamInc.h | 5 +- source/libs/stream/src/stream.c | 15 ++++- source/libs/stream/src/streamMsg.c | 46 ------------- source/libs/stream/src/streamSink.c | 66 +++++++++++++++++++ 6 files changed, 106 insertions(+), 51 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3b825207fe..af934f0437 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -311,7 +311,16 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { } static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { - taosWriteQitem(pTask->outputQueue->queue, pBlock); + if (pTask->sinkType == TASK_SINK__TABLE) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + } else if (pTask->sinkType == TASK_SINK__SMA) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); + } else { + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); + taosWriteQitem(pTask->outputQueue->queue, pBlock); + } return 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b621d729e0..d7b47b9e50 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include "executorimpl.h" #include "function.h" #include "functionMgt.h" diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 48c43b0775..76e228632d 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -24,10 +24,13 @@ extern "C" { #endif int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); -int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); +// int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); +int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); + +int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb); #ifdef __cplusplus } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3298ecfc3e..b865b50339 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -83,7 +83,10 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp // 3. handle output // 3.1 check and set status // 3.2 dispatch / sink - streamSink1(pTask, pMsgCb); + /*streamSink1(pTask, pMsgCb);*/ + if (pTask->dispatchType != TASK_DISPATCH__NONE) { + streamDispatchAll(pTask, pMsgCb); + } return 0; } @@ -97,13 +100,19 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp return 0; } // continue dispatch - streamSink1(pTask, pMsgCb); + /*streamSink1(pTask, pMsgCb);*/ + if (pTask->dispatchType != TASK_DISPATCH__NONE) { + streamDispatchAll(pTask, pMsgCb); + } return 0; } int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { streamExec(pTask, pMsgCb); - streamSink1(pTask, pMsgCb); + if (pTask->dispatchType != TASK_DISPATCH__NONE) { + streamDispatchAll(pTask, pMsgCb); + } + /*streamSink1(pTask, pMsgCb);*/ return 0; } diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index a52afe93b7..0cdbea9c67 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -174,52 +174,6 @@ FAIL: return code; } -int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { -#if 0 - int8_t old = - atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); - if (old != TASK_OUTPUT_STATUS__NORMAL) { - return 0; - } -#endif - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - SRpcMsg dispatchMsg = {0}; - if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) { - ASSERT(0); - return -1; - } - - int32_t qType; - if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) { - qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) { - qType = WRITE_QUEUE; - } else { - ASSERT(0); - } - tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - } - return 0; -} - #if 0 static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { SStreamTaskExecReq req = { diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c index 35bebe0e63..a5f95c4d45 100644 --- a/source/libs/stream/src/streamSink.c +++ b/source/libs/stream/src/streamSink.c @@ -15,6 +15,21 @@ #include "streamInc.h" +int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb) { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); + while (1) { + SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); + if (pBlock == NULL) break; + ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); + + streamDispatch(pTask, pMsgCb, pBlock); + + /*streamQueueProcessSuccess(pTask->outputQueue);*/ + } + return 0; +} + int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { SStreamQueue* queue; if (pTask->execType == TASK_EXEC__NONE) { @@ -58,6 +73,57 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { return 0; } +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { +#if 1 + int8_t old = + atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + if (old != TASK_OUTPUT_STATUS__NORMAL) { + return 0; + } +#endif + ASSERT(pTask->dispatchType != TASK_DISPATCH__INPLACE); + + /*if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {*/ + /*SRpcMsg dispatchMsg = {0};*/ + /*if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {*/ + /*ASSERT(0);*/ + /*return -1;*/ + /*}*/ + + /*int32_t qType;*/ + /*if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {*/ + /*qType = FETCH_QUEUE;*/ + /*} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {*/ + /*qType = WRITE_QUEUE;*/ + /*} else {*/ + /*ASSERT(0);*/ + /*}*/ + /*tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);*/ + /*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/ + + if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); + + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); + } + return 0; +} + #if 0 int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { bool firstRun = 1;