diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 32d8ce72f0..5bdf3ab75f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -885,6 +885,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +bool streamMetaAllTasksReady(const SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index eb5efe07f2..e2dc9fa679 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -833,23 +833,31 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { STaskStartInfo* pStartInfo = &pMeta->startInfo; - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; streamMetaWLock(pMeta); if (pStartInfo->taskStarting == 1) { tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, pMeta->startInfo.restartCount); } else { // not in starting procedure - if (pStartInfo->restartCount > 0) { + bool allReady = streamMetaAllTasksReady(pMeta); + + if ((pStartInfo->restartCount > 0) && (!allReady)) { + // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount pStartInfo->restartCount -= 1; tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role, pStartInfo->restartCount); - streamMetaWUnLock(pMeta); + restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); return TSDB_CODE_SUCCESS; } else { - tqDebug("vgId:%d start all tasks completed in callbackFn", pMeta->vgId); + if (pStartInfo->restartCount == 0) { + tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId); + } else if (allReady) { + pStartInfo->restartCount = 0; + tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); + } } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 20e7be0509..bb5cb6e832 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1518,6 +1518,23 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { return 0; } +bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { + int32_t num = taosArrayGetSize(pMeta->pTaskList); + for(int32_t i = 0; i < num; ++i) { + STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); + if (ppTask == NULL) { + continue; + } + + if ((*ppTask)->status.downstreamReady == 0) { + return false; + } + } + + return true; +} + int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t vgId = pMeta->vgId; stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId);