diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6336cd6e49..afaf33bafa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -21,7 +21,6 @@ #include "mndShow.h" #include "mndStb.h" #include "mndTrans.h" -#include "mndVgroup.h" #include "osMemory.h" #include "parser.h" #include "taoserror.h" @@ -2336,7 +2335,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { } int32_t total = taosArrayGetSize(*pReqTaskList); - if (total == numOfTasks) { // all tasks has send the reqs + if (total == numOfTasks) { // all tasks have sent the reqs int64_t checkpointId = mndStreamGenChkptId(pMnode, false); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a234777441..43446683c5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1250,6 +1250,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); + SRpcMsg rsp = {0}; // make the mnode retry until this task status completed + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SYN_PROPOSE_NOT_READY); + if (ret) { // suppress the error in build checkpoint-source rsp + tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } + + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) {