From b7c36bbfe4744b7da339f37f5c1d8d734898d718 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Aug 2023 00:49:23 +0800 Subject: [PATCH] fix(stream): fix the status error. --- source/libs/stream/src/streamCheckpoint.c | 3 +-- source/libs/stream/src/streamExec.c | 8 +++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 37402a3c78..69f35d8c07 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -303,7 +303,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = 0; - if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { +// if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { // check for all tasks, and do generate the vnode-wide checkpoint data. SStreamMeta* pMeta = pTask->pMeta; int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); @@ -333,7 +333,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); } - } return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 048bebcc5a..1f2e7ea07d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -618,12 +618,14 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamFreeQitem(pInput); + // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { - ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; +// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); +// pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); + streamTaskBuildCheckpoint(pTask); return 0; } } @@ -652,7 +654,7 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } - streamTaskBuildCheckpoint(pTask); +// streamTaskBuildCheckpoint(pTask); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),