From e64e41679e12a01de33426ca814ca31cfeed6335 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 4 Aug 2022 17:00:45 +0800 Subject: [PATCH] fix schedule --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/stream.c | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2b0156a668..12876e1774 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -474,7 +474,7 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); -int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); +// int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index adc16eee0f..7a3f93b28b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -720,7 +720,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { continue; } - if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { + if (streamSchedExec(pTask) < 0) { qError("stream task launch failed, task id %d", pTask->taskId); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fd4329d285..03f9e3eafd 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -68,7 +68,7 @@ void streamTriggerByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); streamTaskInput(pTask, (SStreamQueueItem*)trigger); - streamLaunchByWrite(pTask, pTask->nodeId); + streamSchedExec(pTask); } taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); @@ -82,6 +82,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { return 0; } +#if 0 int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int8_t schedStatus = atomic_load_8(&pTask->schedStatus); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { @@ -101,6 +102,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { } return 0; } +#endif int32_t streamSchedExec(SStreamTask* pTask) { int8_t schedStatus =