diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 621a4b57c7..1af442182e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -423,16 +423,17 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK || pInput->type == STREAM_INPUT__CHECKPOINT); + if (pInput->type == STREAM_INPUT__DATA_BLOCK) { qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; + } else { // for sink task, do nothing. + ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); + pTask->status.taskStatus = TASK_STATUS__CK_READY; + return 0; } - } else { - ASSERT(pInput->type == STREAM_INPUT__CHECKPOINT); - ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; - return 0; } int64_t st = taosGetTimestampMs();