From 225dfb1a9335d2d6cc3ed03d03b87985aede07a1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 May 2024 00:30:42 +0800 Subject: [PATCH] enh(stream): check checkpoint-trigger msg every 100ms. --- source/libs/stream/inc/streamInt.h | 4 ++- source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 39 +++++++++++++++++----- source/libs/stream/src/streamExec.c | 4 +++ 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 68c3ab1a6b..10db53ea38 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -61,7 +61,9 @@ struct SActiveCheckpointInfo { bool dispatchTrigger; SArray* pDispatchTriggerList; // SArray SArray* pReadyMsgList; // SArray - tmr_h pCheckTmr; + + int32_t checkCounter; + tmr_h pCheckTmr; }; typedef struct { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d5c676433f..11fecf7683 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -285,7 +285,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref); + stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref); if (pInfo->checkRspTmr == NULL) { pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fb3c705a4d..414c4e2a76 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -36,8 +36,9 @@ static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, con static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); +static void checkpointTriggerMonitorFn(void* param, void* tmrId); + static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType); -static void checkpointTriggerMonitorFn(void* param, void* tmrId); bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -194,11 +195,13 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start check-rsp monitor in 10s, ref:%d ", pTask->id.idStr, ref); SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo; if (pActive->pCheckTmr == NULL) { - pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 10000, pTask, streamTimer); + pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); } else { - taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActive->pCheckTmr); + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActive->pCheckTmr); } } @@ -575,17 +578,33 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { void checkpointTriggerMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - stDebug("s-task:%s vgId:%d checkpoint-trigger monitor start, ts:%" PRId64, pTask->id.idStr, vgId, now); + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pActiveInfo->checkCounter < 100) { + return; + } + + pActiveInfo->checkCounter = 0; + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now); taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId); taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); return; } taosThreadMutexUnlock(&pTask->lock); @@ -622,8 +641,12 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { - taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActiveInfo->pCheckTmr); - stDebug("s-task:%s start monitor trigger in 10sec", pTask->id.idStr); + stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr); + } else { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); } taosArrayDestroy(pNotSendList); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 87f239f31c..1e85b959a8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -597,6 +597,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + if (pTask->pMeta->vgId == 2) { + taosSsleep(20); + } + streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); continue; }