fix(stream): disable timer for checkpoint-ready msg in rsma.

This commit is contained in:
Haojun Liao 2024-06-05 00:13:57 +08:00
parent 29648be30d
commit bf9a5135fb
1 changed files with 9 additions and 9 deletions

View File

@ -769,16 +769,16 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num);
// start to check if checkpoint ready msg has successfully received by upstream tasks.
pActiveInfo->pSendReadyMsgTmr = NULL;
if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pActiveInfo->pSendReadyMsgTmr == NULL) {
pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
if (pActiveInfo->pSendReadyMsgTmr == NULL) {
pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
}
}
return TSDB_CODE_SUCCESS;