From c4cde6f26881069675baa944ffed26eef029b516 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jul 2024 14:23:33 +0800 Subject: [PATCH] fix(stream): mark the timer launched by which checkpoint procedure. --- source/libs/stream/inc/streamInt.h | 8 ++------ source/libs/stream/src/streamDispatch.c | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 008d066717..d31f720411 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -63,11 +63,7 @@ struct SActiveCheckpointInfo { tmr_h pChkptTriggerTmr; int32_t sendReadyCheckCounter; tmr_h pSendReadyMsgTmr; -}; - -struct SConsensusCheckpoint { - int8_t inProcess; - + int64_t sendReadyTmrChkptId; }; typedef struct { @@ -227,7 +223,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); -int32_t streamTaskDownloadCheckpointData(const char* id, char* path); +int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 617adaa016..1948b04186 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -815,6 +815,16 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); + if (pActiveInfo->sendReadyTmrChkptId < pActiveInfo->activeId) { + taosThreadMutexUnlock(&pActiveInfo->lock); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stWarn("s-task:%s vgId:%d tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", + id, vgId, pActiveInfo->sendReadyTmrChkptId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { taosThreadMutexUnlock(&pActiveInfo->lock); @@ -902,7 +912,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { pInfo->upstreamTaskId); } - taosThreadMutexUnlock(&pActiveInfo->lock); stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num); // start to check if checkpoint ready msg has successfully received by upstream tasks. @@ -916,8 +925,12 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { } else { taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); } + + // mark the timer monitor checkpointId + pActiveInfo->sendReadyTmrChkptId = pActiveInfo->activeId; } + taosThreadMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_SUCCESS; }