diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8cc4f9d54..33f937a909 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1294,8 +1294,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (status == TASK_STATUS__CK) { ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId); tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 - " already received, ignore this msg and continue process checkpoint", - pTask->id.idStr, pTask->chkInfo.checkpointingId); + " transId:%d already received, ignore this msg and continue process checkpoint", + pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bfa269b0d5..00a8940b6a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -746,7 +746,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { taosArrayClear(pTask->pReadyMsgList); stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); } else { - stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 25f32195be..5822b18bc3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -625,10 +625,29 @@ int32_t doStreamExecTask(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { + + // todo add lock char* p = NULL; - streamTaskGetStatus(pTask, &p); - stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); - streamTaskBuildCheckpoint(pTask); + ETaskStatus s = streamTaskGetStatus(pTask, &p); + if (s == TASK_STATUS__CK) { + stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); + streamTaskBuildCheckpoint(pTask); + } else { + // todo refactor + int32_t code = 0; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } else { + code = streamTaskSendCheckpointReadyMsg(pTask); + } + + if (code != TSDB_CODE_SUCCESS) { + // todo: let's retry send rsp to upstream/mnode + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, + 0, tstrerror(code)); + } + } + return 0; } }