diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index cc1987492c..229648cf3c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -195,7 +195,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 - " window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64, + " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); @@ -218,8 +218,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 + " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // for sink task, set it ready directly. @@ -395,7 +396,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } if (pRsp->status == TASK_DOWNSTREAM_READY) { - streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; + } if (left == 0) { doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag @@ -405,7 +409,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else { // not ready, wait for 100ms and retry - streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; // return success in any cases. + } + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b87c19b08e..b40bdb2ced 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1011,9 +1011,15 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); if (p->taskId == taskId) { - ASSERT(reqId == p->reqId); - // count down one, since it is ready now + if (reqId != p->reqId) { + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 + " expired check-rsp recv from downstream task:0x%x, discarded", + id, reqId, p->reqId, taskId); + return TSDB_CODE_FAILED; + } + + // subtract one not-ready-task, since it is ready now if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); } else { @@ -1029,7 +1035,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t } taosThreadMutexUnlock(&pInfo->checkInfoLock); - stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId); + stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId, + reqId); return TSDB_CODE_FAILED; }