enh(stream): direct sink if not dispatch
This commit is contained in:
parent
f4cecb3cef
commit
46bea47b62
|
@ -311,7 +311,16 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
|
||||
} else {
|
||||
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
||||
taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "executorimpl.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
|
|
|
@ -24,10 +24,13 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
// int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
|
||||
|
||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
|
||||
|
||||
int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -83,7 +83,10 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
// 3. handle output
|
||||
// 3.1 check and set status
|
||||
// 3.2 dispatch / sink
|
||||
streamSink1(pTask, pMsgCb);
|
||||
/*streamSink1(pTask, pMsgCb);*/
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatchAll(pTask, pMsgCb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -97,13 +100,19 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
return 0;
|
||||
}
|
||||
// continue dispatch
|
||||
streamSink1(pTask, pMsgCb);
|
||||
/*streamSink1(pTask, pMsgCb);*/
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatchAll(pTask, pMsgCb);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
streamExec(pTask, pMsgCb);
|
||||
streamSink1(pTask, pMsgCb);
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatchAll(pTask, pMsgCb);
|
||||
}
|
||||
/*streamSink1(pTask, pMsgCb);*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -174,52 +174,6 @@ FAIL:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
|
||||
#if 0
|
||||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t qType;
|
||||
if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {
|
||||
qType = FETCH_QUEUE;
|
||||
} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {
|
||||
qType = WRITE_QUEUE;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||
SStreamTaskExecReq req = {
|
||||
|
|
|
@ -15,6 +15,21 @@
|
|||
|
||||
#include "streamInc.h"
|
||||
|
||||
int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
||||
while (1) {
|
||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
||||
if (pBlock == NULL) break;
|
||||
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
|
||||
streamDispatch(pTask, pMsgCb, pBlock);
|
||||
|
||||
/*streamQueueProcessSuccess(pTask->outputQueue);*/
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
SStreamQueue* queue;
|
||||
if (pTask->execType == TASK_EXEC__NONE) {
|
||||
|
@ -58,6 +73,57 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
|
||||
#if 1
|
||||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
ASSERT(pTask->dispatchType != TASK_DISPATCH__INPLACE);
|
||||
|
||||
/*if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {*/
|
||||
/*SRpcMsg dispatchMsg = {0};*/
|
||||
/*if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {*/
|
||||
/*ASSERT(0);*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
|
||||
/*int32_t qType;*/
|
||||
/*if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {*/
|
||||
/*qType = FETCH_QUEUE;*/
|
||||
/*} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {*/
|
||||
/*qType = WRITE_QUEUE;*/
|
||||
/*} else {*/
|
||||
/*ASSERT(0);*/
|
||||
/*}*/
|
||||
/*tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);*/
|
||||
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
|
||||
|
||||
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
bool firstRun = 1;
|
||||
|
|
Loading…
Reference in New Issue