Merge pull request #27201 from taosdata/fix/3_liaohj

fix(stream): init exec when mnd become leader.
This commit is contained in:
Haojun Liao 2024-08-14 09:14:54 +08:00 committed by GitHub
commit e9687fcf9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 9 additions and 0 deletions

View File

@ -152,6 +152,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
int32_t mndInitExecInfo();
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
void mndInitStreamExecInfoForLeader(SMnode *pMnode);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);

View File

@ -2831,6 +2831,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
pExecInfo->initTaskList = true;
}
void mndInitStreamExecInfoForLeader(SMnode* pMnode) {
execInfo.initTaskList = false;
mInfo("init stream execInfo for leader");
mndInitStreamExecInfo(pMnode, &execInfo);
}
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;

View File

@ -18,6 +18,7 @@
#include "mndCluster.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndStream.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
@ -381,6 +382,7 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) {
static void mndBecomeLeader(const SSyncFSM *pFsm) {
mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data;
mndInitStreamExecInfoForLeader(pMnode);
}
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {