diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index a5d91c8aa8..88b8e98afb 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -152,6 +152,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter); int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); int32_t mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); +void mndInitStreamExecInfoForLeader(SMnode *pMnode); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 35da6c379f..a01bc92a97 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2831,6 +2831,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { pExecInfo->initTaskList = true; } +void mndInitStreamExecInfoForLeader(SMnode* pMnode) { + execInfo.initTaskList = false; + mInfo("init stream execInfo for leader"); + mndInitStreamExecInfo(pMnode, &execInfo); +} + void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 89f3c6e253..1094a17f6b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -18,6 +18,7 @@ #include "mndCluster.h" #include "mndTrans.h" #include "mndUser.h" +#include "mndStream.h" static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { @@ -381,6 +382,7 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; + mndInitStreamExecInfoForLeader(pMnode); } static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {