From 868898d11cf5f0f3f237ba3f7d0423d60783ba9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 00:28:49 +0800 Subject: [PATCH] fix(stream): delay to load the stream task. --- source/dnode/mnode/impl/inc/mndStream.h | 4 ++-- source/dnode/mnode/impl/src/mndStream.c | 15 +++++++-------- source/dnode/mnode/impl/src/mndSync.c | 6 +++--- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index a87a01c5b6..75ba51e498 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -154,8 +154,8 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter); int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); int32_t mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); -void mndInitStreamExecInfoForLeader(SMnode *pMnode); -void mndInitStreamExecInfoUpdateRole(SMnode *pMnode, int32_t role); +void mndStreamResetInitTaskListLoadFlag(); +void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a85b5c733b..9aa36c0c4e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2882,13 +2882,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { pExecInfo->initTaskList = true; } -void mndInitStreamExecInfoForLeader(SMnode* pMnode) { +void mndStreamResetInitTaskListLoadFlag() { + mInfo("reset task list buffer init flag for leader"); execInfo.initTaskList = false; - mInfo("init stream execInfo for leader"); - mndInitStreamExecInfo(pMnode, &execInfo); } -void mndInitStreamExecInfoUpdateRole(SMnode* pMnode, int32_t role) { +void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) { execInfo.switchFromFollower = false; if (execInfo.role == NODE_ROLE_UNINIT) { @@ -3013,7 +3012,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); if (conflict) { - TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); + code = TSDB_CODE_MND_TRANS_CONFLICT; + goto _err; } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; @@ -3026,8 +3026,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); if (code) { - mndTransDrop(pTrans); - return code; + goto _err; } // drop all tasks @@ -3051,7 +3050,7 @@ _err: tDestroyDropOrphanTaskMsg(&msg); mndTransDrop(pTrans); - if (code == TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create drop %d orphan tasks trans succ", numOfTasks); } return code; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f5704be371..0f4e4f0363 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -361,7 +361,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { } (void)taosThreadMutexUnlock(&pMgmt->lock); - mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_FOLLOWER); + mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER); } static void mndBecomeLearner(const SSyncFSM *pFsm) { @@ -385,8 +385,8 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; - mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_LEADER); - mndInitStreamExecInfoForLeader(pMnode); + mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER); + mndStreamResetInitTaskListLoadFlag(); } static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {