diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index be35367528..12898847f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1552,6 +1552,12 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { int32_t taskId = req.taskId; tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId); + // for test purpose +// if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) { +// code = TSDB_CODE_STREAM_TASK_NOT_EXIST; +// goto FAIL; +// } + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index c964d0b811..6c12fbb822 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -240,6 +240,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } } + // disable the data from upstream tasks int8_t st = pTask->status.taskStatus; if (st == TASK_STATUS__HALT) { status = TASK_INPUT_STATUS__BLOCKED; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9dea265241..4e0e667614 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -198,9 +198,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); - - // todo: this may be not the first one - pTask->chkInfo.startTs = taosGetTimestampMs(); + if (pTask->chkInfo.startTs == 0) { + pTask->chkInfo.startTs = taosGetTimestampMs(); + } // update the child Id for downstream tasks streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9af35eee88..519271703b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -373,6 +373,8 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p } } + stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes", pTask->id.idStr, vgSz); + code = 0; FAIL_SHUFFLE_DISPATCH: @@ -976,22 +978,28 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set - // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure + // flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. // todo handle the shuffle dispatch failure - if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore + if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); - {// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed. - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - if (left > 0) { // do nothing - stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id); - } else { - stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id); - handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); - } - } + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + + SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData; + if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id); + streamProcessCheckpointReadyMsg(pTask); + } + + // we should set the correct finish flag to make sure the shuffle dispatch will be executed completed. + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + if (left > 0) { // do nothing + stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id); + } else { + stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id); + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); + } } } else { stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id,