From 47b0a0464e62e1458a87799ac5800b451929fd4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Jul 2024 15:21:16 +0800 Subject: [PATCH] fix(stream): send checkpoint-source-rsp to mnode before reset task status. --- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 3 --- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +++- source/libs/stream/src/streamCheckpoint.c | 16 ++++++++++++++++ 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 66b9db47e2..f24f7da7c3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,6 +801,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp); int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); +int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a4c03ab3e0..536dfab331 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1239,9 +1239,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); - // clear the consensus checkpoint info - mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid); - if (code != -1) { started += 1; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 04a658a30c..1999134754 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -228,6 +228,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + + // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode + streamTaskSendPreparedCheckpointsourceRsp(pTask); streamTaskResetStatus(pTask); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); @@ -264,7 +267,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId); } - // stop streamTaskStop(pTask); if (ppHTask != NULL) { streamTaskStop(*ppHTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1195362ab3..969c2e1795 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1146,4 +1146,20 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { tmsgSendReq(&pTask->info.mnodeEpset, &msg); return 0; +} + +int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask) { + int32_t code = 0; + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + return code; + } + + taosThreadMutexLock(&pTask->lock); + SStreamTaskState* p = streamTaskGetStatus(pTask); + if (p->state == TASK_STATUS__CK) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } + taosThreadMutexUnlock(&pTask->lock); + + return code; } \ No newline at end of file