fix(stream): clear flag for checkpoint.

This commit is contained in:
Haojun Liao 2023-10-22 01:25:00 +08:00
parent 825195b351
commit 059e94e428
3 changed files with 6 additions and 3 deletions

View File

@ -38,7 +38,7 @@ extern "C" {
#define TASK_DOWNSTREAM_READY 0x0
#define TASK_DOWNSTREAM_NOT_READY 0x1
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
#define TASK_SELF_NEW_STAGE 0x3
#define TASK_UPSTREAM_NEW_STAGE 0x3
#define NODE_ROLE_UNINIT 0x1
#define NODE_ROLE_LEADER 0x2

View File

@ -299,6 +299,9 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosArrayClear(pMeta->chkpInUse);
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;
pMeta->chkptNotReadyTasks = 0;
streamMetaResetStartInfo(&pMeta->startInfo);
}
void streamMetaClose(SStreamMeta* pMeta) {

View File

@ -252,7 +252,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage != stage) {
return TASK_SELF_NEW_STAGE;
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
return TASK_DOWNSTREAM_NOT_READY;
} else {
@ -396,7 +396,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"roll-back needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else {
if (pRsp->status == TASK_SELF_NEW_STAGE) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError(
"s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, continue check "
"till downstream nodeUpdate",