fix(stream): set global close flag in the streamMeta.

This commit is contained in:
Haojun Liao 2024-05-06 17:03:09 +08:00
parent d879c7c967
commit 46e9fe6f97
1 changed files with 15 additions and 10 deletions

View File

@ -1441,10 +1441,17 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
} }
} }
static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) { static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); if (pMeta->closeFlag) {
streamMetaWUnLock(pMeta);
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
return -1;
}
*pList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet); taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs(); pMeta->startInfo.startTs = taosGetTimestampMs();
@ -1452,7 +1459,7 @@ static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) {
streamMetaResetTaskStatus(pMeta); streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return pTaskList; return TSDB_CODE_SUCCESS;
} }
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
@ -1464,19 +1471,17 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
stInfo("vgId:%d start tasks completed", pMeta->vgId); stInfo("vgId:%d no tasks to be started", pMeta->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
streamMetaRLock(pMeta); SArray* pTaskList = NULL;
if (pMeta->closeFlag) { code = prepareBeforeStartTasks(pMeta, &pTaskList);
streamMetaRUnLock(pMeta); if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d vnode is closed, not start check task(s) downstream status", vgId); ASSERT(pTaskList == NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
streamMetaRUnLock(pMeta);
SArray* pTaskList = prepareBeforeStartTasks(pMeta);
numOfTasks = taosArrayGetSize(pTaskList); numOfTasks = taosArrayGetSize(pTaskList);
// broadcast the check downstream tasks msg // broadcast the check downstream tasks msg