From 46e9fe6f973fbd8edb22c956b70f4c4238168c5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 May 2024 17:03:09 +0800 Subject: [PATCH] fix(stream): set global close flag in the streamMeta. --- source/libs/stream/src/streamMeta.c | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b146d96310..40ea8189ce 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1441,10 +1441,17 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } } -static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) { +static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList) { streamMetaWLock(pMeta); - SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + if (pMeta->closeFlag) { + streamMetaWUnLock(pMeta); + stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); + return -1; + } + + *pList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pFailedTaskSet); pMeta->startInfo.startTs = taosGetTimestampMs(); @@ -1452,7 +1459,7 @@ static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) { streamMetaResetTaskStatus(pMeta); streamMetaWUnLock(pMeta); - return pTaskList; + return TSDB_CODE_SUCCESS; } int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { @@ -1464,19 +1471,17 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); if (numOfTasks == 0) { - stInfo("vgId:%d start tasks completed", pMeta->vgId); + stInfo("vgId:%d no tasks to be started", pMeta->vgId); return TSDB_CODE_SUCCESS; } - streamMetaRLock(pMeta); - if (pMeta->closeFlag) { - streamMetaRUnLock(pMeta); - stError("vgId:%d vnode is closed, not start check task(s) downstream status", vgId); + SArray* pTaskList = NULL; + code = prepareBeforeStartTasks(pMeta, &pTaskList); + if (code != TSDB_CODE_SUCCESS) { + ASSERT(pTaskList == NULL); return TSDB_CODE_SUCCESS; } - streamMetaRUnLock(pMeta); - SArray* pTaskList = prepareBeforeStartTasks(pMeta); numOfTasks = taosArrayGetSize(pTaskList); // broadcast the check downstream tasks msg