fix(stream): fix error in generating checkpoint.

This commit is contained in:
Haojun Liao 2023-07-07 17:21:46 +08:00
parent 8298f30e56
commit b03ca31a7f
1 changed files with 6 additions and 5 deletions

View File

@ -423,16 +423,17 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK || pInput->type == STREAM_INPUT__CHECKPOINT);
if (pInput->type == STREAM_INPUT__DATA_BLOCK) {
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue;
} else { // for sink task, do nothing.
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
return 0;
}
} else {
ASSERT(pInput->type == STREAM_INPUT__CHECKPOINT);
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
return 0;
}
int64_t st = taosGetTimestampMs();