diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d7ad1ddf08..f3630cb558 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -452,6 +452,7 @@ struct SStreamDispatchReq { int64_t stage; // nodeId from upstream task int64_t streamId; int32_t taskId; + int32_t msgId; // msg id to identify if the incoming msg from the same sender int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; @@ -468,7 +469,9 @@ typedef struct { int32_t upstreamTaskId; int32_t downstreamNodeId; int32_t downstreamTaskId; + int32_t msgId; int8_t inputStatus; + int64_t stage; } SStreamDispatchRsp; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 55ce4b4b33..7315b8d91a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1372,6 +1372,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { pRsp->streamId = htobe64(pRsp->streamId); pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); + pRsp->stage = htobe64(pRsp->stage); + pRsp->msgId = htonl(pRsp->msgId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pRsp->streamId, pRsp->upstreamTaskId); if (pTask) { @@ -1597,6 +1599,8 @@ FAIL: pRsp->upstreamNodeId = htonl(req.upstreamNodeId); pRsp->downstreamNodeId = htonl(pVnode->config.vgId); pRsp->downstreamTaskId = htonl(req.taskId); + pRsp->msgId = htonl(req.msgId); + pRsp->stage = htobe64(req.stage); pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 99d61c2348..c176dfda88 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -142,6 +142,8 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); + pDispatchRsp->stage = htobe64(pReq->stage); + pDispatchRsp->msgId = htonl(pReq->msgId); pDispatchRsp->inputStatus = status; pDispatchRsp->streamId = htobe64(pReq->streamId); pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fe72ff05e6..e973cfd4dc 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -53,6 +53,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -78,6 +79,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; @@ -112,6 +114,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; pReq->stage = pTask->pMeta->stage; + pReq->msgId = pTask->taskExecInfo.dispatchCount; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; @@ -1056,8 +1059,21 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->taskExecInfo.dispatchCount; + if ((!pTask->pMeta->leader) || (pTask->status.downstreamReady != 1)) { + stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + if ((pRsp->msgId != msgId) || (pRsp->stage != pTask->pMeta->stage)) { + stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64 + " discard it", + id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage); + return TSDB_CODE_INVALID_MSG; + } + int32_t leftRsp = 0; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {