fix(stream): fix the status error.

This commit is contained in:
Haojun Liao 2023-08-26 00:49:23 +08:00
parent d8a6ec024a
commit b7c36bbfe4
2 changed files with 6 additions and 5 deletions

View File

@ -303,7 +303,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = 0; int32_t code = 0;
if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { // if (pTask->status.taskStatus == TASK_STATUS__CK_READY) {
// check for all tasks, and do generate the vnode-wide checkpoint data. // check for all tasks, and do generate the vnode-wide checkpoint data.
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
@ -333,7 +333,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s",
pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); pTask->id.idStr, pTask->checkpointingId, tstrerror(code));
} }
}
return code; return code;
} }

View File

@ -618,12 +618,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamFreeQitem(pInput); streamFreeQitem(pInput);
// todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) { if (type == STREAM_INPUT__CHECKPOINT) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); // ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY; // pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskBuildCheckpoint(pTask);
return 0; return 0;
} }
} }
@ -652,7 +654,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
return -1; return -1;
} }
streamTaskBuildCheckpoint(pTask); // streamTaskBuildCheckpoint(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),