fix(stream): init exec when mnd become leader.
This commit is contained in:
parent
324dc3f404
commit
eb029395a2
|
@ -152,6 +152,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
||||||
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
|
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
|
||||||
int32_t mndInitExecInfo();
|
int32_t mndInitExecInfo();
|
||||||
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
||||||
|
void mndInitStreamExecInfoForLeader(SMnode *pMnode);
|
||||||
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
|
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
|
||||||
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
|
|
||||||
|
|
|
@ -2831,6 +2831,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
||||||
pExecInfo->initTaskList = true;
|
pExecInfo->initTaskList = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndInitStreamExecInfoForLeader(SMnode* pMnode) {
|
||||||
|
execInfo.initTaskList = false;
|
||||||
|
mInfo("init stream execInfo for leader");
|
||||||
|
mndInitStreamExecInfo(pMnode, &execInfo);
|
||||||
|
}
|
||||||
|
|
||||||
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
#include "mndStream.h"
|
||||||
|
|
||||||
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
if (pMsg == NULL || pMsg->pCont == NULL) {
|
if (pMsg == NULL || pMsg->pCont == NULL) {
|
||||||
|
@ -381,6 +382,7 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
mInfo("vgId:1, become leader");
|
mInfo("vgId:1, become leader");
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
mndInitStreamExecInfoForLeader(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
|
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
|
||||||
|
|
Loading…
Reference in New Issue