diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2b0156a668..12876e1774 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -474,7 +474,7 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); -int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); +// int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index adc16eee0f..7a3f93b28b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -720,7 +720,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { continue; } - if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { + if (streamSchedExec(pTask) < 0) { qError("stream task launch failed, task id %d", pTask->taskId); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fd4329d285..03f9e3eafd 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -68,7 +68,7 @@ void streamTriggerByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); streamTaskInput(pTask, (SStreamQueueItem*)trigger); - streamLaunchByWrite(pTask, pTask->nodeId); + streamSchedExec(pTask); } taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); @@ -82,6 +82,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { return 0; } +#if 0 int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int8_t schedStatus = atomic_load_8(&pTask->schedStatus); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { @@ -101,6 +102,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { } return 0; } +#endif int32_t streamSchedExec(SStreamTask* pTask) { int8_t schedStatus =