From 9bae0adba6a56cefdad9da1965e949b433eee49e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 19:14:37 +0800 Subject: [PATCH] fix(stream): start scheduler task after set the refId. --- source/dnode/snode/src/snode.c | 1 - source/dnode/vnode/src/tq/tq.c | 1 - source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamDispatch.c | 4 ++-- source/libs/stream/src/streamMeta.c | 6 ++++++ source/libs/stream/src/streamSched.c | 23 +++++++++++++++------ source/libs/stream/src/streamStartHistory.c | 2 +- 8 files changed, 28 insertions(+), 13 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e8d4663bbb..6eee8c510b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -38,7 +38,6 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce streamTaskOpenAllUpstreamInput(pTask); streamTaskResetUpstreamStageInfo(pTask); - streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2929121029..ec7ac1054c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -765,7 +765,6 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 60f8744448..64b19e4ed9 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -750,7 +750,7 @@ void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d1b57c32b9..7724d1c5ff 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1008,7 +1008,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ff41008759..5f31364e76 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -539,7 +539,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; } @@ -1082,7 +1082,7 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d414b02d69..598f809c21 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -745,6 +745,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } + // enable the scheduler for stream tasks + streamSetupScheduleTrigger(pTask); + *pAdded = true; return code; } @@ -1147,6 +1150,9 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } + // enable the scheduler for stream tasks after acquire the task RefId. + streamSetupScheduleTrigger(pTask); + stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId); if (pTask->info.fillHistory == 0) { int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 7c77797ef9..25d6161aba 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -20,12 +20,15 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { - int64_t delaySchema = pTask->info.delaySchedParam; - if (delaySchema != 0 && pTask->info.fillHistory == 0) { + int64_t delayParam = pTask->info.delaySchedParam; + if (delayParam != 0 && pTask->info.fillHistory == 0) { int64_t* pTaskRefId = NULL; int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); if (code == 0) { - streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTaskRefId, streamTimer, + stDebug("s-task:%s refId:%" PRId64 " enable the scheduler trigger, delay:%" PRId64, pTask->id.idStr, + pTask->id.refId, delayParam); + + streamTmrStart(streamTaskSchedHelper, (int32_t)delayParam, pTaskRefId, streamTimer, &pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr"); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } @@ -93,7 +96,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { int64_t taskRefId = *(int64_t*)param; SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; } @@ -129,7 +132,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { int64_t taskRefId = *(int64_t*)param; SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; } @@ -141,13 +144,21 @@ void streamTaskSchedHelper(void* param, void* tmrId) { int8_t status = atomic_load_8(&pTask->schedInfo.status); stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); - if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, jump out of schedTimer", id); streamMetaReleaseTask(pTask->pMeta, pTask); streamTaskFreeRefId(param); return; } + if (streamTaskShouldPause(pTask)) { + stDebug("s-task:%s is paused, recheck in %.2fs", id, nextTrigger/1000.0); + streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 54026f5db2..4d38c48fc5 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -583,7 +583,7 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { - stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__); streamTaskFreeRefId(param); return; }