fix(stream): release lock.
This commit is contained in:
parent
af08a189c1
commit
cb0d244d5a
|
@ -1728,25 +1728,28 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
||||
pTask->id.idStr, req.checkpointId);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// check if the checkpoint msg already sent or not.
|
||||
if (status == TASK_STATUS__CK) {
|
||||
ASSERT(pTask->checkpointingId == req.checkpointId);
|
||||
|
||||
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
||||
" already received, ignore this msg and continue process checkpoint",
|
||||
pTask->id.idStr, pTask->checkpointingId);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
streamProcessCheckpointSourceReq(pTask, &req);
|
||||
|
|
Loading…
Reference in New Issue