From b03ca31a7fbea6a4e7228ef4b702aa27acde4a17 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jul 2023 17:21:46 +0800 Subject: [PATCH] fix(stream): fix error in generating checkpoint. --- source/libs/stream/src/streamExec.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 621a4b57c7..1af442182e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -423,16 +423,17 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK || pInput->type == STREAM_INPUT__CHECKPOINT); + if (pInput->type == STREAM_INPUT__DATA_BLOCK) { qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; + } else { // for sink task, do nothing. + ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); + pTask->status.taskStatus = TASK_STATUS__CK_READY; + return 0; } - } else { - ASSERT(pInput->type == STREAM_INPUT__CHECKPOINT); - ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; - return 0; } int64_t st = taosGetTimestampMs();