diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index a87a01c5b6..75ba51e498 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -154,8 +154,8 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter); int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); int32_t mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); -void mndInitStreamExecInfoForLeader(SMnode *pMnode); -void mndInitStreamExecInfoUpdateRole(SMnode *pMnode, int32_t role); +void mndStreamResetInitTaskListLoadFlag(); +void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e28bef9f1c..a35815cf4d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2886,13 +2886,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { pExecInfo->initTaskList = true; } -void mndInitStreamExecInfoForLeader(SMnode* pMnode) { +void mndStreamResetInitTaskListLoadFlag() { + mInfo("reset task list buffer init flag for leader"); execInfo.initTaskList = false; - mInfo("init stream execInfo for leader"); - mndInitStreamExecInfo(pMnode, &execInfo); } -void mndInitStreamExecInfoUpdateRole(SMnode* pMnode, int32_t role) { +void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) { execInfo.switchFromFollower = false; if (execInfo.role == NODE_ROLE_UNINIT) { @@ -3017,7 +3016,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); if (conflict) { - TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); + code = TSDB_CODE_MND_TRANS_CONFLICT; + goto _err; } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; @@ -3030,8 +3030,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); if (code) { - mndTransDrop(pTrans); - return code; + goto _err; } // drop all tasks @@ -3055,7 +3054,7 @@ _err: tDestroyDropOrphanTaskMsg(&msg); mndTransDrop(pTrans); - if (code == TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create drop %d orphan tasks trans succ", numOfTasks); } return code; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 065ee8e0df..cf7769b932 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -364,7 +364,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { } (void)taosThreadMutexUnlock(&pMgmt->lock); - mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_FOLLOWER); + mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER); } static void mndBecomeLearner(const SSyncFSM *pFsm) { @@ -388,8 +388,8 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; - mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_LEADER); - mndInitStreamExecInfoForLeader(pMnode); + mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER); + mndStreamResetInitTaskListLoadFlag(); } static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {