From 732a9d4b32b9cb36be64b3bbe674e242f1a90307 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Feb 2025 13:35:23 +0800 Subject: [PATCH] fix(stream): fix error in the check of continuing execution condition. --- source/libs/stream/src/streamExec.c | 8 ++++---- source/libs/stream/src/streamQueue.c | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3555515f75..1015917f61 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -924,19 +924,19 @@ static bool shouldNotCont(SStreamTask* pTask) { bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue - bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE); + bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0); // 3. no data in ordinary queue - int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); + bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); if (quit) { return true; } else { - if (status == TASK_STATUS__CK) { + if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) { // in checkpoint procedure, we only check whether the controller queue is empty or not return emptyCkQueue; } else { // otherwise, if the block queue is empty, not continue. - return emptyBlockQueue; + return emptyBlockQueue && emptyCkQueue; } } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 3d0d01ce0e..f68dd1452f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -402,7 +402,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) { int32_t code = 0; - if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && level == TASK_LEVEL__SOURCE) { + if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) { STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue; code = taosWriteQitem(pChkptQ, pItem);