From f56aeaf1bd06d3a60da5fa0a7d929257d1da2348 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Jan 2025 22:57:06 +0800 Subject: [PATCH] fix(stream): fix error in deciding exec should quit or not. --- source/libs/stream/src/streamExec.c | 28 +++++++++++++++++++++++++--- source/libs/stream/src/streamQueue.c | 9 ++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 054f88ec3d..ef3fb11866 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,31 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } +static bool shouldNotCont(SStreamTask* pTask) { + int32_t level = pTask->info.taskLevel; + SStreamQueue* pQueue = pTask->inputq.queue; + ETaskStatus status = streamTaskGetStatus(pTask).state; + + // 1. task should jump out + bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); + + // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue + bool notCkCont = + (taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK); + + // 3. no data in ordinary queue + int32_t numOfItems = streamQueueGetNumOfItems(pQueue); + + if ((numOfItems == 0) || quit || notCkCont) { + return true; + } else { + return false; + } +} + int32_t streamResumeTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; + int32_t level = pTask->info.taskLevel; int32_t code = 0; if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { @@ -929,11 +952,10 @@ int32_t streamResumeTask(SStreamTask* pTask) { if (code) { stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } - // check if continue + streamMutexLock(&pTask->lock); - int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); - if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + if (shouldNotCont(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamTaskClearSchedIdleInfo(pTask); streamMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6a9a1ac880..3d0d01ce0e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -113,7 +113,7 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { } } -void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status) { +void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) { *pItem = NULL; int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); @@ -132,15 +132,14 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem pQueue->qChkptItem = NULL; taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem); if (pQueue->qChkptItem != NULL) { - stDebug("read data from checkpoint queue, status:%d", status); - + stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status); *pItem = pQueue->qChkptItem; return; } // if in checkpoint status, not read data from ordinary input q. if (status == TASK_STATUS__CK) { - stDebug("in checkpoint status, not ready data in normal queue"); + stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status); return; } @@ -260,7 +259,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte SStreamQueueItem* qItem = NULL; if (taskLevel == TASK_LEVEL__SOURCE) { - streamQueueNextItemInSourceQ(pQueue, &qItem, status); + streamQueueNextItemInSourceQ(pQueue, &qItem, status, id); } else { streamQueueNextItem(pQueue, &qItem); }