From 059e94e428ac1b88e46a5cd42dc963d68e5a2914 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 22 Oct 2023 01:25:00 +0800 Subject: [PATCH] fix(stream): clear flag for checkpoint. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamMeta.c | 3 +++ source/libs/stream/src/streamStart.c | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0ca401e3a4..54ca7f1566 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -38,7 +38,7 @@ extern "C" { #define TASK_DOWNSTREAM_READY 0x0 #define TASK_DOWNSTREAM_NOT_READY 0x1 #define TASK_DOWNSTREAM_NOT_LEADER 0x2 -#define TASK_SELF_NEW_STAGE 0x3 +#define TASK_UPSTREAM_NEW_STAGE 0x3 #define NODE_ROLE_UNINIT 0x1 #define NODE_ROLE_LEADER 0x2 diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a1ed9a1458..4307f74709 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -299,6 +299,9 @@ void streamMetaClear(SStreamMeta* pMeta) { taosArrayClear(pMeta->chkpInUse); pMeta->numOfStreamTasks = 0; pMeta->numOfPausedTasks = 0; + pMeta->chkptNotReadyTasks = 0; + + streamMetaResetStartInfo(&pMeta->startInfo); } void streamMetaClose(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index b52d9177b7..9ebd617be0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -252,7 +252,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage != stage) { - return TASK_SELF_NEW_STAGE; + return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { return TASK_DOWNSTREAM_NOT_READY; } else { @@ -396,7 +396,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "roll-back needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } else { - if (pRsp->status == TASK_SELF_NEW_STAGE) { + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError( "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, continue check " "till downstream nodeUpdate",