fix(stream): release lock.

This commit is contained in:
Haojun Liao 2023-11-10 09:35:41 +08:00
parent 9978667c48
commit ed49294e9b
1 changed files with 6 additions and 3 deletions

View File

@ -1723,25 +1723,28 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
// check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) {
ASSERT(pTask->checkpointingId == req.checkpointId);
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->checkpointingId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return code;
return TSDB_CODE_SUCCESS;
}
streamProcessCheckpointSourceReq(pTask, &req);