From bf9a5135fb13b9333933ec848864e3d13ae2dfb1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Jun 2024 00:13:57 +0800 Subject: [PATCH] fix(stream): disable timer for checkpoint-ready msg in rsma. --- source/libs/stream/src/streamDispatch.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fb5f1e33c5..2e776313e0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -769,16 +769,16 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num); // start to check if checkpoint ready msg has successfully received by upstream tasks. - pActiveInfo->pSendReadyMsgTmr = NULL; + if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); + streamMetaAcquireOneTask(pTask); - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); - - if (pActiveInfo->pSendReadyMsgTmr == NULL) { - pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer); - } else { - taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + if (pActiveInfo->pSendReadyMsgTmr == NULL) { + pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer); + } else { + taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + } } return TSDB_CODE_SUCCESS;