diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9f88672231..2406601722 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -853,7 +853,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index aee2aaa244..b1d49bf31b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); return code; @@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index a4c448f678..0b6603cd7b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); return TSDB_CODE_SUCCESS; } @@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + taosGetTimestampMs(), false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, + taosGetTimestampMs(), false); streamMetaReleaseTask(pTask->pMeta, pHTask); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms @@ -1066,15 +1069,13 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { - SStreamMeta* pMeta = pTask->pMeta; +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; streamMetaWLock(pMeta); - - STaskId id = streamTaskExtractKey(pTask); - STaskStartInfo* pStartInfo = &pMeta->startInfo; - - SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); @@ -1086,15 +1087,14 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); } else { stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);