fix(stream): set global close flag in the streamMeta.

This commit is contained in:
Haojun Liao 2024-05-06 16:40:49 +08:00
parent cdc7b03ac6
commit d879c7c967
2 changed files with 12 additions and 1 deletions

View File

@ -530,6 +530,7 @@ typedef struct SStreamMeta {
int32_t vgId;
int64_t stage;
int32_t role;
bool closeFlag;
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
STaskStartInfo startInfo;
TdThreadRwlock lock;

View File

@ -373,6 +373,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
pMeta->closeFlag = false;
stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
@ -1281,6 +1282,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
streamMetaWLock(pMeta);
pMeta->closeFlag = true;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
@ -1455,6 +1458,7 @@ static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) {
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
@ -1464,7 +1468,13 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
return TSDB_CODE_SUCCESS;
}
int64_t now = taosGetTimestampMs();
streamMetaRLock(pMeta);
if (pMeta->closeFlag) {
streamMetaRUnLock(pMeta);
stError("vgId:%d vnode is closed, not start check task(s) downstream status", vgId);
return TSDB_CODE_SUCCESS;
}
streamMetaRUnLock(pMeta);
SArray* pTaskList = prepareBeforeStartTasks(pMeta);
numOfTasks = taosArrayGetSize(pTaskList);