diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0ba6681ec2..0f399da8fd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -835,8 +835,6 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); -void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); -void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index dae947f4e9..d2c7924cf5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -200,6 +200,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); + streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -213,6 +214,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + streamTaskResetStatus(*ppHTask); + streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 6d4e263427..b9b7c8ddfa 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -230,6 +230,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); + streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); doProcessDownstreamReadyRsp(pTask); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a394fee9f6..860a5f9170 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -24,6 +24,8 @@ #define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); +static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -951,7 +953,7 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { pInfo->notReadyTasks = 1; - } else { + } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos); ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum); } @@ -1139,6 +1141,7 @@ static void rspMonitorFn(void* param, void* tmrId) { int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + // fault tasks detected, not try anymore if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( @@ -1154,6 +1157,24 @@ static void rspMonitorFn(void* param, void* tmrId) { return; } + // checking of downstream tasks has been stopped by other threads + if (pInfo->inCheckProcess == 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " + "timeout:%d, ready:%d ref:%d", + pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false); + } + return; + } + if (numOfNotReady > 0) { // check to make sure not in recheck timer ASSERT(pTask->status.downstreamReady == 0);