From fb3fe03c1fb118a0435bc1134e226f8a7bb63b66 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Jul 2024 14:57:39 +0800 Subject: [PATCH] fix(stream): to avoid repeatly start checkpoint timer if previous timer is not started yet. --- source/libs/stream/inc/streamInt.h | 36 +++++++------ source/libs/stream/src/streamCheckpoint.c | 44 ++++++++++------ source/libs/stream/src/streamDispatch.c | 61 +++++++++++++---------- source/libs/stream/src/streamTask.c | 14 +++--- source/libs/stream/src/streamTimer.c | 10 ++++ 5 files changed, 102 insertions(+), 63 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d31f720411..dc19d8c5b0 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -48,24 +48,30 @@ extern "C" { #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +typedef struct SStreamTmrInfo { + int32_t activeCounter; // make sure only launch one checkpoint trigger check tmr + tmr_h tmrHandle; + int64_t launchChkptId; + int8_t isActive; +} SStreamTmrInfo; + struct SActiveCheckpointInfo { - TdThreadMutex lock; - int32_t transId; - int64_t firstRecvTs; // first time to recv checkpoint trigger info - int64_t activeId; // current active checkpoint id - int64_t failedId; - bool dispatchTrigger; - SArray* pDispatchTriggerList; // SArray - SArray* pReadyMsgList; // SArray - int8_t allUpstreamTriggerRecv; - SArray* pCheckpointReadyRecvList; // SArray - int32_t checkCounter; - tmr_h pChkptTriggerTmr; - int32_t sendReadyCheckCounter; - tmr_h pSendReadyMsgTmr; - int64_t sendReadyTmrChkptId; + TdThreadMutex lock; + int32_t transId; + int64_t firstRecvTs; // first time to recv checkpoint trigger info + int64_t activeId; // current active checkpoint id + int64_t failedId; + bool dispatchTrigger; + SArray* pDispatchTriggerList; // SArray + SArray* pReadyMsgList; // SArray + int8_t allUpstreamTriggerRecv; + SArray* pCheckpointReadyRecvList; // SArray + SStreamTmrInfo chkptTriggerMsgTmr; + SStreamTmrInfo chkptReadyMsgTmr; }; +int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask); + typedef struct { int8_t type; SSDataBlock* pBlock; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d1ea72370d..96a614f6a4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -265,14 +265,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + // if previous launched timer not started yet, not start a new timer + // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch + // a new checkpoint-trigger timer right now. + // And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint + // procedure to be stucked. + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); + if (old == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); + streamMetaAcquireOneTask(pTask); - if (pActiveInfo->pChkptTriggerTmr == NULL) { - pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); - } else { - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr); + if (pTmrInfo->tmrHandle == NULL) { + pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer); + } else { + taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); + } + pTmrInfo->launchChkptId = pActiveInfo->activeId; + } else { // already launched, do nothing + stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); } } @@ -741,27 +753,28 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { const char* id = pTask->id.idStr; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; // check the status every 100ms if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; } - if (++pActiveInfo->checkCounter < 100) { - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr); + if (++pTmrInfo->activeCounter < 50) { + taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); return; } - pActiveInfo->checkCounter = 0; + pTmrInfo->activeCounter = 0; stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); taosThreadMutexUnlock(&pTask->lock); @@ -771,7 +784,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); @@ -779,7 +792,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); return; } - taosThreadMutexUnlock(&pTask->lock); taosThreadMutexLock(&pActiveInfo->lock); @@ -820,9 +832,9 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr); + taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); streamMetaReleaseTask(pTask->pMeta, pTask); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9615ed49e0..006e55374e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -777,31 +777,32 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 } static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; // check the status every 100ms if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; } - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - if (++pActiveInfo->sendReadyCheckCounter < 100) { - taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + if (++pTmrInfo->activeCounter < 50) { + taosTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); return; } - pActiveInfo->sendReadyCheckCounter = 0; - stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, pState->name, ref); taosThreadMutexUnlock(&pTask->lock); @@ -815,11 +816,12 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); - if (pActiveInfo->sendReadyTmrChkptId < pActiveInfo->activeId) { + if (pTmrInfo->launchChkptId < pActiveInfo->activeId) { taosThreadMutexUnlock(&pActiveInfo->lock); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stWarn("s-task:%s vgId:%d tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", - id, vgId, pActiveInfo->sendReadyTmrChkptId, ref); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 + ", quit, ref:%d", + id, vgId, pTmrInfo->launchChkptId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; @@ -828,7 +830,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { taosThreadMutexUnlock(&pActiveInfo->lock); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); @@ -871,10 +873,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } } - taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + taosTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); taosThreadMutexUnlock(&pActiveInfo->lock); } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " "and quit from timer, ref:%d", @@ -916,18 +918,25 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { // start to check if checkpoint ready msg has successfully received by upstream tasks. if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; - if (pActiveInfo->pSendReadyMsgTmr == NULL) { - pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer); + int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); + if (old == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); + streamMetaAcquireOneTask(pTask); + + if (pTmrInfo->tmrHandle == NULL) { + pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer); + } else { + taosTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle); + } + + // mark the timer monitor checkpointId + pTmrInfo->launchChkptId = pActiveInfo->activeId; } else { - taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + stError("s-task:%s previous checkpoint-ready monitor tmr is set, not start new one", pTask->id.idStr); } - - // mark the timer monitor checkpointId - pActiveInfo->sendReadyTmrChkptId = pActiveInfo->activeId; } taosThreadMutexUnlock(&pActiveInfo->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4cbe0cb136..d63b6ea935 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1064,14 +1064,16 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosArrayDestroy(pInfo->pCheckpointReadyRecvList); pInfo->pCheckpointReadyRecvList = NULL; - if (pInfo->pChkptTriggerTmr != NULL) { - taosTmrStop(pInfo->pChkptTriggerTmr); - pInfo->pChkptTriggerTmr = NULL; + SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; + if (pTriggerTmr->tmrHandle != NULL) { + taosTmrStop(pTriggerTmr->tmrHandle); + pTriggerTmr->tmrHandle = NULL; } - if (pInfo->pSendReadyMsgTmr != NULL) { - taosTmrStop(pInfo->pSendReadyMsgTmr); - pInfo->pSendReadyMsgTmr = NULL; + SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; + if (pReadyTmr->tmrHandle != NULL) { + taosTmrStop(pReadyTmr->tmrHandle); + pReadyTmr->tmrHandle = NULL; } taosMemoryFree(pInfo); diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 6e956e2682..4838d76fe0 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -38,3 +38,13 @@ void streamTimerCleanUp() { tmr_h streamTimerGetInstance() { return streamTimer; } + +int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) { + pInfo->activeCounter = 0; + pInfo->launchChkptId = 0; + atomic_store_8(&pInfo->isActive, 0); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + ASSERT(ref >= 0); + return ref; +} \ No newline at end of file