From 753716f1e6e40c0a530dead0853f4029bee7f532 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Nov 2023 15:57:46 +0800 Subject: [PATCH 1/2] fix(stream): remove invalid assert --- source/libs/stream/src/streamMeta.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 40f9ab54fe..2b3a05e041 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -488,7 +488,6 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) if (ref > 0) { stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { - ASSERT(streamTaskShouldStop(pTask)); stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); tFreeStreamTask(pTask); } else if (ref < 0) { @@ -1072,7 +1071,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; - stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, + stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); streamMetaWLock(pMeta); From 709502ba9e50e898ee4c9ebbd8d59b742a62bc07 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Nov 2023 22:05:36 +0800 Subject: [PATCH 2/2] fix(stream): record the check downstream failure. --- include/libs/stream/tstream.h | 3 ++- source/dnode/vnode/src/tq/tq.c | 15 +++++++++------ source/dnode/vnode/src/tq/tqStreamTask.c | 3 ++- source/libs/stream/src/streamStart.c | 22 ++++++++++------------ 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 049deb150b..be42348d80 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -830,7 +830,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aafec9737b..b4c9904e0c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -944,9 +944,10 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t vgId = pTq->pStreamMeta->vgId; + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; int32_t code; SStreamTaskCheckRsp rsp; @@ -966,21 +967,23 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!vnodeIsRoleLeader(pTq->pVnode)) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); return code; } - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId); + rsp.streamId, rsp.upstreamTaskId, pMeta->vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return -1; } code = streamProcessCheckRsp(pTask, &rsp); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4c0491da86..4974e77ffa 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -100,7 +100,8 @@ int32_t tqStartStreamTasks(STQ* pTq) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 97eb7b79a2..7385b8524b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); return TSDB_CODE_SUCCESS; } @@ -410,7 +411,6 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, do not do check downstream again", id); @@ -470,8 +470,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); - + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + taosGetTimestampMs(), false); } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -1067,14 +1067,12 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { - SStreamMeta* pMeta = pTask->pMeta; +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; streamMetaWLock(pMeta); - - STaskId id = streamTaskExtractKey(pTask); - STaskStartInfo* pStartInfo = &pMeta->startInfo; - SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; @@ -1086,9 +1084,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info