diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 14aae0b96a..aa17853454 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -530,6 +530,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; int32_t role; + bool closeFlag; bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; TdThreadRwlock lock; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a091a866a0..b146d96310 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -373,6 +373,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; + pMeta->closeFlag = false; stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); @@ -1281,6 +1282,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaWLock(pMeta); + pMeta->closeFlag = true; + void* pIter = NULL; while (1) { pIter = taosHashIterate(pMeta->pTasksMap, pIter); @@ -1455,6 +1458,7 @@ static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) { int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; + int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); @@ -1464,7 +1468,13 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { return TSDB_CODE_SUCCESS; } - int64_t now = taosGetTimestampMs(); + streamMetaRLock(pMeta); + if (pMeta->closeFlag) { + streamMetaRUnLock(pMeta); + stError("vgId:%d vnode is closed, not start check task(s) downstream status", vgId); + return TSDB_CODE_SUCCESS; + } + streamMetaRUnLock(pMeta); SArray* pTaskList = prepareBeforeStartTasks(pMeta); numOfTasks = taosArrayGetSize(pTaskList);