From 34a439d78bb322ad2ec5c3814eae072053855430 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Feb 2025 10:59:29 +0800 Subject: [PATCH] fix(stream): fix error in the check of continuing execution condition. --- source/libs/stream/src/streamExec.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ef3fb11866..3555515f75 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -924,16 +924,20 @@ static bool shouldNotCont(SStreamTask* pTask) { bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue - bool notCkCont = - (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK); + bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE); // 3. no data in ordinary queue - int32_t numOfItems = streamQueueGetNumOfItems(pQueue); + int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); - if ((numOfItems == 0) || quit || notCkCont) { + if (quit) { return true; } else { - return false; + if (status == TASK_STATUS__CK) { + // in checkpoint procedure, we only check whether the controller queue is empty or not + return emptyCkQueue; + } else { // otherwise, if the block queue is empty, not continue. + return emptyBlockQueue; + } } }