fix(stream): delay to load the stream task.
This commit is contained in:
parent
c94cd24593
commit
868898d11c
|
@ -154,8 +154,8 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
|||
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
|
||||
int32_t mndInitExecInfo();
|
||||
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
||||
void mndInitStreamExecInfoForLeader(SMnode *pMnode);
|
||||
void mndInitStreamExecInfoUpdateRole(SMnode *pMnode, int32_t role);
|
||||
void mndStreamResetInitTaskListLoadFlag();
|
||||
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role);
|
||||
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
|
||||
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
|
||||
|
|
|
@ -2882,13 +2882,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
|||
pExecInfo->initTaskList = true;
|
||||
}
|
||||
|
||||
void mndInitStreamExecInfoForLeader(SMnode* pMnode) {
|
||||
void mndStreamResetInitTaskListLoadFlag() {
|
||||
mInfo("reset task list buffer init flag for leader");
|
||||
execInfo.initTaskList = false;
|
||||
mInfo("init stream execInfo for leader");
|
||||
mndInitStreamExecInfo(pMnode, &execInfo);
|
||||
}
|
||||
|
||||
void mndInitStreamExecInfoUpdateRole(SMnode* pMnode, int32_t role) {
|
||||
void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) {
|
||||
execInfo.switchFromFollower = false;
|
||||
|
||||
if (execInfo.role == NODE_ROLE_UNINIT) {
|
||||
|
@ -3013,7 +3012,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
|
||||
if (conflict) {
|
||||
TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT);
|
||||
code = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
|
||||
|
@ -3026,8 +3026,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
|
||||
code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
|
||||
if (code) {
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// drop all tasks
|
||||
|
@ -3051,7 +3050,7 @@ _err:
|
|||
tDestroyDropOrphanTaskMsg(&msg);
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mDebug("create drop %d orphan tasks trans succ", numOfTasks);
|
||||
}
|
||||
return code;
|
||||
|
|
|
@ -361,7 +361,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
|
|||
}
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
|
||||
mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_FOLLOWER);
|
||||
mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER);
|
||||
}
|
||||
|
||||
static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||
|
@ -385,8 +385,8 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
|||
mInfo("vgId:1, become leader");
|
||||
SMnode *pMnode = pFsm->data;
|
||||
|
||||
mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_LEADER);
|
||||
mndInitStreamExecInfoForLeader(pMnode);
|
||||
mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER);
|
||||
mndStreamResetInitTaskListLoadFlag();
|
||||
}
|
||||
|
||||
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
|
||||
|
|
Loading…
Reference in New Issue