From 10c792efa2203ffaa7d836e0c9ad218c67e92820 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 16:55:47 +0800 Subject: [PATCH] fix(stream):add related fill-history task to be done, when check-rsp moniting quit. --- source/libs/stream/src/streamStart.c | 2 +- source/libs/stream/src/streamTask.c | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 229648cf3c..0b91359f48 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -219,7 +219,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:%" PRIx64, + " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1ae5d54b1d..2bc09c1c22 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1091,7 +1091,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { req.reqId = p->reqId; req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64, + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); @@ -1107,8 +1107,10 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d", - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 + " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, + p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); break; } @@ -1158,7 +1160,7 @@ static void rspMonitorFn(void* param, void* tmrId) { int32_t numOfNotReady = 0; int32_t numOfTimeout = 0; - stDebug("s-task:%s start to do check downstream rsp check", id); + stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -1169,6 +1171,10 @@ static void rspMonitorFn(void* param, void* tmrId) { taosThreadMutexUnlock(&pInfo->checkInfoLock); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pHId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); + } return; } @@ -1207,7 +1213,7 @@ static void rspMonitorFn(void* param, void* tmrId) { // fault tasks detected, not try anymore ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); - if ((numOfNotRsp == 0) && (numOfFault > 0)) { + if (numOfFault > 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "