fix(stream): return if failed.

This commit is contained in:
Haojun Liao 2023-09-20 15:38:49 +08:00
parent 2a781c509f
commit c8ab761d2a
1 changed files with 3 additions and 2 deletions

View File

@ -1633,13 +1633,14 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus == TASK_STATUS__HALT) { if (pTask->status.taskStatus == TASK_STATUS__HALT) {
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
", set it failure", pTask->id.idStr, req.checkpointId); pTask->id.idStr, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
} }
streamProcessCheckpointSourceReq(pTask, &req); streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);