refactor: stream dispatch
This commit is contained in:
parent
16b6f6b062
commit
dcb71e4af1
|
@ -392,21 +392,6 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
|
||||||
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
|
||||||
taosFreeQitem(pBlock);
|
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
|
|
||||||
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
|
||||||
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
|
||||||
taosFreeQitem(pBlock);
|
|
||||||
} else {
|
|
||||||
taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -584,6 +569,7 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
||||||
|
|
||||||
int32_t streamTryExec(SStreamTask* pTask);
|
int32_t streamTryExec(SStreamTask* pTask);
|
||||||
int32_t streamSchedExec(SStreamTask* pTask);
|
int32_t streamSchedExec(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,23 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||||
|
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||||
|
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
||||||
|
taosFreeQitem(pBlock);
|
||||||
|
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||||
|
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
||||||
|
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
||||||
|
taosFreeQitem(pBlock);
|
||||||
|
} else {
|
||||||
|
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||||
|
taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
||||||
|
streamDispatch(pTask);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||||
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
|
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
|
||||||
pReq->upstreamTaskId);
|
pReq->upstreamTaskId);
|
||||||
|
@ -199,9 +216,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
|
||||||
streamDispatch(pTask);
|
/*streamDispatch(pTask);*/
|
||||||
}
|
/*}*/
|
||||||
} else {
|
} else {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
@ -237,9 +254,9 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
|
||||||
streamDispatch(pTask);
|
/*streamDispatch(pTask);*/
|
||||||
}
|
/*}*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue