fix(stream): fix error in the check of continuing execution condition.

This commit is contained in:
Haojun Liao 2025-02-05 13:35:23 +08:00
parent 34a439d78b
commit 732a9d4b32
2 changed files with 5 additions and 5 deletions

View File

@ -924,19 +924,19 @@ static bool shouldNotCont(SStreamTask* pTask) {
bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
// 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE);
bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0);
// 3. no data in ordinary queue
int32_t emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
if (quit) {
return true;
} else {
if (status == TASK_STATUS__CK) {
if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) {
// in checkpoint procedure, we only check whether the controller queue is empty or not
return emptyCkQueue;
} else { // otherwise, if the block queue is empty, not continue.
return emptyBlockQueue;
return emptyBlockQueue && emptyCkQueue;
}
}
}

View File

@ -402,7 +402,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
int32_t code = 0;
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && level == TASK_LEVEL__SOURCE) {
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
code = taosWriteQitem(pChkptQ, pItem);