diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ef3fb11866..3555515f75 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -924,16 +924,20 @@ static bool shouldNotCont(SStreamTask* pTask) { bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue - bool notCkCont = - (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK); + bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE); // 3. no data in ordinary queue - int32_t numOfItems = streamQueueGetNumOfItems(pQueue); + int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); - if ((numOfItems == 0) || quit || notCkCont) { + if (quit) { return true; } else { - return false; + if (status == TASK_STATUS__CK) { + // in checkpoint procedure, we only check whether the controller queue is empty or not + return emptyCkQueue; + } else { // otherwise, if the block queue is empty, not continue. + return emptyBlockQueue; + } } }