diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f3630cb558..f1c6e369cb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -75,6 +75,7 @@ enum { TASK_INPUT_STATUS__NORMAL = 1, TASK_INPUT_STATUS__BLOCKED, TASK_INPUT_STATUS__FAILED, + TASK_INPUT_STATUS__REFUSED, }; enum { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 840291a467..f38800ef78 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1616,6 +1616,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs SStreamCheckpointSourceReq req = {0}; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1625,6 +1628,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs return code; } tDecoderClear(&decoder); @@ -1633,6 +1639,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1681,6 +1690,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs return code; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 888c9113d5..dbcbfc0a94 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -225,15 +225,15 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ASSERT(pInfo != NULL); if (!pTask->pMeta->leader) { - stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", id); - status = TASK_INPUT_STATUS__BLOCKED; + stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); + status = TASK_INPUT_STATUS__REFUSED; } else { if (pReq->stage > pInfo->stage) { // upstream task has restarted/leader-follower switch/transferred to other dnodes stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 ", current:%" PRId64 " dispatch msg rejected", id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); - status = TASK_INPUT_STATUS__BLOCKED; + status = TASK_INPUT_STATUS__REFUSED; } else { if (!pInfo->dataAllowed) { stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e98c115e7c..08be06c841 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1046,7 +1046,9 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId pTask->id.idStr, downstreamId, el); // put data into inputQ of current task is also allowed - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + } // now ready for next data output atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); @@ -1105,6 +1107,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); + } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { + stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); } // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state