fix(stream): do rsp for checkpoint reqs.

This commit is contained in:
Haojun Liao 2024-12-04 19:19:57 +08:00
parent 4b47c4ca5d
commit 6bbfc1aead
2 changed files with 9 additions and 2 deletions

View File

@ -21,7 +21,6 @@
#include "mndShow.h"
#include "mndStb.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "osMemory.h"
#include "parser.h"
#include "taoserror.h"
@ -2336,7 +2335,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
}
int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
if (total == numOfTasks) { // all tasks have sent the reqs
int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);

View File

@ -1250,6 +1250,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; // make the mnode retry until this task status completed
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (ret) { // suppress the error in build checkpoint-source rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
} else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) {