fix(stream): fix error in deciding exec should quit or not.

This commit is contained in:
Haojun Liao 2025-01-26 22:57:06 +08:00
parent 24c6fe727f
commit f56aeaf1bd
2 changed files with 29 additions and 8 deletions

View File

@ -915,8 +915,31 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
} }
} }
static bool shouldNotCont(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
SStreamQueue* pQueue = pTask->inputq.queue;
ETaskStatus status = streamTaskGetStatus(pTask).state;
// 1. task should jump out
bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
// 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
bool notCkCont =
(taosQueueItemSize(pQueue->pChkptQueue) == 0) && (level == TASK_LEVEL__SOURCE) && (status == TASK_STATUS__CK);
// 3. no data in ordinary queue
int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
if ((numOfItems == 0) || quit || notCkCont) {
return true;
} else {
return false;
}
}
int32_t streamResumeTask(SStreamTask* pTask) { int32_t streamResumeTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t level = pTask->info.taskLevel;
int32_t code = 0; int32_t code = 0;
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
@ -929,11 +952,10 @@ int32_t streamResumeTask(SStreamTask* pTask) {
if (code) { if (code) {
stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
} }
// check if continue
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); if (shouldNotCont(pTask)) {
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamTaskClearSchedIdleInfo(pTask); streamTaskClearSchedIdleInfo(pTask);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);

View File

@ -113,7 +113,7 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
} }
} }
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status) { void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
*pItem = NULL; *pItem = NULL;
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
@ -132,15 +132,14 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem
pQueue->qChkptItem = NULL; pQueue->qChkptItem = NULL;
taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem); taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
if (pQueue->qChkptItem != NULL) { if (pQueue->qChkptItem != NULL) {
stDebug("read data from checkpoint queue, status:%d", status); stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
*pItem = pQueue->qChkptItem; *pItem = pQueue->qChkptItem;
return; return;
} }
// if in checkpoint status, not read data from ordinary input q. // if in checkpoint status, not read data from ordinary input q.
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
stDebug("in checkpoint status, not ready data in normal queue"); stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
return; return;
} }
@ -260,7 +259,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
SStreamQueueItem* qItem = NULL; SStreamQueueItem* qItem = NULL;
if (taskLevel == TASK_LEVEL__SOURCE) { if (taskLevel == TASK_LEVEL__SOURCE) {
streamQueueNextItemInSourceQ(pQueue, &qItem, status); streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
} else { } else {
streamQueueNextItem(pQueue, &qItem); streamQueueNextItem(pQueue, &qItem);
} }