From be0a2e457366a21d99ffa84147e66e750e9e5cce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Oct 2023 16:33:28 +0800 Subject: [PATCH] fix(stream): disable retrieving results during checkpoint procedure --- source/libs/stream/src/stream.c | 41 ++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 97316dba07..b7d57b1728 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -67,27 +67,32 @@ static void streamSchedByTimer(void* param, void* tmrId) { return; } - if (status == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); - if (pTrigger == NULL) { - return; - } + if (pTask->status.taskStatus == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", pTask->id.idStr, (int32_t) pTask->info.triggerParam); + } else { + if (status == TASK_TRIGGER_STATUS__ACTIVE) { + SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); + if (pTrigger == NULL) { + return; + } - pTrigger->type = STREAM_INPUT__GET_RES; - pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pTrigger->pBlock == NULL) { - taosFreeQitem(pTrigger); - return; - } + pTrigger->type = STREAM_INPUT__GET_RES; + pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pTrigger->pBlock == NULL) { + taosFreeQitem(pTrigger); + return; + } - atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); - pTrigger->pBlock->info.type = STREAM_GET_ALL; - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { - taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); - return; - } + atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); + pTrigger->pBlock->info.type = STREAM_GET_ALL; + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { + taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, + &pTask->schedInfo.pTimer); + return; + } - streamSchedExec(pTask); + streamSchedExec(pTask); + } } taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);