diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 13385a544a..480ed7fd38 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -368,6 +368,7 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs } int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t code = TSDB_CODE_SUCCESS; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -379,6 +380,12 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecoderClear(&decoder); + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower", + req.upstreamTaskId, pMeta->vgId); + return TASK_DOWNSTREAM_NOT_LEADER; + } + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", @@ -395,11 +402,11 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "completed msg", pTask->id.idStr, req.downstreamId); - streamProcessScanHistoryFinishRsp(pTask); + code = streamProcessScanHistoryFinishRsp(pTask); } streamMetaReleaseTask(pMeta, pTask); - return 0; + return code; } int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 309f377621..53f87591e8 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -476,11 +476,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; + int64_t current = taosGetTimestampMs(); SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, - taosGetTimestampMs(), false); - streamMetaReleaseTask(pTask->pMeta, pHTask); + if (pHTask != NULL) { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current, + false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false); + } } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -657,6 +662,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { ETaskStatus status = streamTaskGetStatus(pTask, NULL); + // task restart now, not handle the scan-history finish rsp + if (status == TASK_STATUS__UNINIT) { + return TSDB_CODE_INVALID_MSG; + } + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); SStreamMeta* pMeta = pTask->pMeta;