fix(stream): fix error in the check of continuing execution condition.

This commit is contained in:
Haojun Liao 2025-02-05 10:59:29 +08:00
parent cb9a1a852d
commit 34a439d78b
1 changed files with 9 additions and 5 deletions

View File

@ -924,16 +924,20 @@ static bool shouldNotCont(SStreamTask* pTask) {
bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
// 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
bool notCkCont =
(taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK);
bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE);
// 3. no data in ordinary queue
int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
if ((numOfItems == 0) || quit || notCkCont) {
if (quit) {
return true;
} else {
return false;
if (status == TASK_STATUS__CK) {
// in checkpoint procedure, we only check whether the controller queue is empty or not
return emptyCkQueue;
} else { // otherwise, if the block queue is empty, not continue.
return emptyBlockQueue;
}
}
}