fix(stream): handle dispatch checkpoint-trigger failure.

This commit is contained in:
Haojun Liao 2023-09-21 23:15:18 +08:00
parent 43cbafdf19
commit b075b4438b
4 changed files with 31 additions and 16 deletions

View File

@ -1539,6 +1539,12 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId); tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId);
// for test purpose
// if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
// goto FAIL;
// }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};

View File

@ -240,6 +240,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
} }
// disable the data from upstream tasks
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__HALT) { if (st == TASK_STATUS__HALT) {
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__BLOCKED;

View File

@ -198,9 +198,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
} }
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
if (pTask->chkInfo.startTs == 0) {
// todo: this may be not the first one pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->chkInfo.startTs = taosGetTimestampMs(); }
// update the child Id for downstream tasks // update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);

View File

@ -373,6 +373,8 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p
} }
} }
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes", pTask->id.idStr, vgSz);
code = 0; code = 0;
FAIL_SHUFFLE_DISPATCH: FAIL_SHUFFLE_DISPATCH:
@ -976,22 +978,28 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available. // dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure // flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. // happened too fast.
// todo handle the shuffle dispatch failure // todo handle the shuffle dispatch failure
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id, stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId); pRsp->downstreamTaskId, pRsp->downstreamNodeId);
{// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData;
int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
if (left > 0) { // do nothing stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id); streamProcessCheckpointReadyMsg(pTask);
} else { }
stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id);
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); // we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
} if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
} int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
if (left > 0) { // do nothing
stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id);
} else {
stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id);
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
} }
} else { } else {
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id, stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id,