From 02b59d0b33abf19562c3b92a4561b8822c94c168 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 3 Aug 2024 16:34:26 +0800 Subject: [PATCH] fix(stream): add more check in tmr. --- source/libs/stream/src/streamCheckStatus.c | 7 ------- source/libs/stream/src/streamCheckpoint.c | 22 ++++++++++++++++++++++ source/libs/stream/src/streamDispatch.c | 4 ++-- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index c9ba6ffcfe..b7661e72d4 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage != stage) { - streamMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - streamMutexUnlock(&pTask->lock); - return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7b205a16a1..d638e28c8d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -855,6 +855,28 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { return; } + if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 + ", quit, ref:%d", + id, vgId, pTmrInfo->launchChkptId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // active checkpoint info is cleared for now + if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", + id, vgId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index f6e827b745..010f6f006f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -820,7 +820,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); - if (pTmrInfo->launchChkptId < pActiveInfo->activeId) { + if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 @@ -832,7 +832,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } // active checkpoint info is cleared for now - if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { + if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);