fix(stream): return if failed.

This commit is contained in:
Haojun Liao 2023-09-20 15:38:49 +08:00
parent d76e3ac718
commit 36dcdad1a5
1 changed files with 3 additions and 2 deletions

View File

@ -1646,13 +1646,14 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64
", set it failure", pTask->id.idStr, req.checkpointId);
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);