From ed49294e9bd6e6859cc9d41068eee43f10388e1f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 09:35:41 +0800 Subject: [PATCH] fix(stream): release lock. --- source/dnode/vnode/src/tq/tq.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d0269c18b8..85bbfe5a3d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1723,25 +1723,28 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); - taosThreadMutexUnlock(&pTask->lock); + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { ASSERT(pTask->checkpointingId == req.checkpointId); - tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 " already received, ignore this msg and continue process checkpoint", pTask->id.idStr, pTask->checkpointingId); + + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - return code; + + return TSDB_CODE_SUCCESS; } streamProcessCheckpointSourceReq(pTask, &req);