diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d0269c18b8..85bbfe5a3d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1723,25 +1723,28 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); - taosThreadMutexUnlock(&pTask->lock); + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { ASSERT(pTask->checkpointingId == req.checkpointId); - tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 " already received, ignore this msg and continue process checkpoint", pTask->id.idStr, pTask->checkpointingId); + + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - return code; + + return TSDB_CODE_SUCCESS; } streamProcessCheckpointSourceReq(pTask, &req);