Merge pull request #27263 from taosdata/fix/3_liaohj

fix(stream): delay to load the stream task.
This commit is contained in:
Haojun Liao 2024-08-16 09:11:30 +08:00 committed by GitHub
commit 7c44227048
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 12 additions and 13 deletions

View File

@ -154,8 +154,8 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
int32_t mndInitExecInfo();
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
void mndInitStreamExecInfoForLeader(SMnode *pMnode);
void mndInitStreamExecInfoUpdateRole(SMnode *pMnode, int32_t role);
void mndStreamResetInitTaskListLoadFlag();
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);

View File

@ -2886,13 +2886,12 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
pExecInfo->initTaskList = true;
}
void mndInitStreamExecInfoForLeader(SMnode* pMnode) {
void mndStreamResetInitTaskListLoadFlag() {
mInfo("reset task list buffer init flag for leader");
execInfo.initTaskList = false;
mInfo("init stream execInfo for leader");
mndInitStreamExecInfo(pMnode, &execInfo);
}
void mndInitStreamExecInfoUpdateRole(SMnode* pMnode, int32_t role) {
void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) {
execInfo.switchFromFollower = false;
if (execInfo.role == NODE_ROLE_UNINIT) {
@ -3017,7 +3016,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
if (conflict) {
TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT);
code = TSDB_CODE_MND_TRANS_CONFLICT;
goto _err;
}
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
@ -3030,8 +3030,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
if (code) {
mndTransDrop(pTrans);
return code;
goto _err;
}
// drop all tasks
@ -3055,7 +3054,7 @@ _err:
tDestroyDropOrphanTaskMsg(&msg);
mndTransDrop(pTrans);
if (code == TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("create drop %d orphan tasks trans succ", numOfTasks);
}
return code;

View File

@ -364,7 +364,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
}
(void)taosThreadMutexUnlock(&pMgmt->lock);
mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_FOLLOWER);
mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER);
}
static void mndBecomeLearner(const SSyncFSM *pFsm) {
@ -388,8 +388,8 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) {
mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data;
mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_LEADER);
mndInitStreamExecInfoForLeader(pMnode);
mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER);
mndStreamResetInitTaskListLoadFlag();
}
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {