diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 65fa9be5fd..d7aaacca57 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1644,6 +1644,19 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } + taosThreadMutexLock(&pTask->lock); + if (pTask->status.taskStatus == TASK_STATUS__HALT) { + qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 + ", set it failure", pTask->id.idStr, req.checkpointId); + streamMetaReleaseTask(pMeta, pTask); + + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs + } + streamProcessCheckpointSourceReq(pTask, &req); + taosThreadMutexUnlock(&pTask->lock); + int32_t total = 0; taosWLockLatch(&pMeta->lock); @@ -1666,7 +1679,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode // todo: when generating checkpoint, leader of mnode has transfer to other DNode? - streamProcessCheckpointSourceReq(pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f367ba932f..0200abcf98 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -136,8 +136,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. - taosThreadMutexLock(&pTask->lock); - pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); @@ -146,8 +144,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); - taosThreadMutexUnlock(&pTask->lock); - return code; }