From e6defda0d06bfee1841cbab56a7daa2d567895d3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Jul 2024 17:48:58 +0800 Subject: [PATCH] fix(stream): check for checkpoint interrpution in sendReady monitor. --- source/libs/stream/src/streamCheckpoint.c | 6 ++++-- source/libs/stream/src/streamDispatch.c | 13 ++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0f39ca7213..d5f7d6ef21 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -405,12 +405,14 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.startTs = 0; // clear the recorded start time - - streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks + + taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock); + streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); if (clearChkpReadyMsg) { streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); } + taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); } int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 83e73e8c88..5164e20ec9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -813,9 +813,20 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { taosThreadMutexLock(&pActiveInfo->lock); SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + + // active checkpoint info is cleared for now + if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { + taosThreadMutexUnlock(&pActiveInfo->lock); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stWarn("s-task:0x%x vgId:%d active checkpoint may failed, quit from readyMsg send tmr, ref:%d", id, vgId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); - int32_t num = taosArrayGetSize(pList); ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); for (int32_t i = 0; i < num; ++i) {