From 27ac6ca16ed78ad5c0cb232e3e156098ebcc8717 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 May 2024 14:12:14 +0800 Subject: [PATCH] fix(stream): desc the ref for checkpoint-trigger timer. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckpoint.c | 11 ++++++++--- source/libs/stream/src/streamDispatch.c | 9 +++++---- source/libs/stream/src/streamQueue.c | 12 ++++++++++-- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0d01913235..7e1c80c842 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -681,6 +681,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); +int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue); // common void streamTaskPause(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a177c56f77..3865d030f5 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -198,8 +198,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo; + streamMetaAcquireOneTask(pTask); + if (pActive->pCheckTmr == NULL) { - streamMetaAcquireOneTask(pTask); pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); } else { taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActive->pCheckTmr); @@ -605,6 +606,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } if (++pActiveInfo->checkCounter < 100) { + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr); return; } @@ -614,7 +616,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { - stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr, + vgId, ref); + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pTask->pMeta, pTask); return; @@ -769,7 +774,7 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { taosThreadMutexLock(&pInfo->lock); // outputQ should be empty here - ASSERT(streamQueueGetNumOfItems(pTask->outputq.queue) == 1); + ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0); pInfo->dispatchTrigger = true; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ecea494be2..0a8a65544c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -566,9 +566,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { const char* id = pTask->id.idStr; int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue); if (numOfElems > 0) { - double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue)); - stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, - numOfElems, size); + double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue)); + int32_t numOfUnAccessed = streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue); + stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d/%d, size:%.2fMiB", id, + numOfUnAccessed, numOfElems, size); } // to make sure only one dispatch is running @@ -889,7 +890,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); - stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 + stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d", pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, req.upstreamNodeId); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 247baea16f..461d53d5a9 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -117,6 +117,13 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } +int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) { + int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue); + int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall); + + return numOfItems1 + numOfItems2; +} + int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall); } @@ -322,9 +329,10 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) ASSERT(0); } - if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->info.triggerParam != 0) { + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER && + (pTask->info.triggerParam != 0)) { atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - stDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); + stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); } return 0;