From 17bd6badec1063bd78fddd7b82c603fec4e93ff6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Nov 2023 10:47:54 +0800 Subject: [PATCH] refactor(stream): do checkpoint for each task. --- source/dnode/vnode/src/tq/tq.c | 7 +- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckpoint.c | 120 +++++++++++----------- 3 files changed, 64 insertions(+), 65 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1f0f6a3e92..4d5ddec233 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1702,13 +1702,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } - // todo handle failure to reset from checkpoint procedure - // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. + // Downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id pTask->checkpointingId = req.checkpointId; - qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 + tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); streamMetaReleaseTask(pMeta, pTask); @@ -1735,6 +1734,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } + + // todo: already in checkpoint status, return error streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index feaacb7969..095461bd92 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId); +int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3a52e68e93..bf2c89bea4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -146,7 +146,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->execInfo.checkpoint += 1; // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task - // already. int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); return code; } @@ -169,9 +168,8 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); int64_t checkpointId = pDataBlock->info.version; - - const char* id = pTask->id.idStr; - int32_t code = TSDB_CODE_SUCCESS; + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; // set task status if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { @@ -197,15 +195,15 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // todo fix race condition: set the status and append checkpoint block int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || - pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int8_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); - } else { // only one task exists, no need to dispatch downstream info - atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); - streamProcessCheckpointReadyMsg(pTask); - streamFreeQitem((SStreamQueueItem*)pBlock); - } + } else { // only one task exists, no need to dispatch downstream info + atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); + streamProcessCheckpointReadyMsg(pTask); + streamFreeQitem((SStreamQueueItem*)pBlock); + } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); if (pTask->chkInfo.startTs == 0) { @@ -231,11 +229,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc id, num); streamFreeQitem((SStreamQueueItem*)pBlock); streamTaskBuildCheckpoint(pTask); - } else { - stDebug( - "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg " - "downstream", - id, num); + } else { // source & agg tasks need to forward the checkpoint msg downwards + stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id, + num); // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure @@ -282,48 +278,47 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks } -int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { +int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId) { int32_t vgId = pMeta->vgId; int32_t code = 0; streamMetaWLock(pMeta); - for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); - if (ppTask == NULL) { - continue; - } + // for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { + // STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + // SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + // if (ppTask == NULL) { + // continue; + // } - SStreamTask* p = *ppTask; - if (p->info.fillHistory == 1) { - continue; - } - - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); - - p->chkInfo.checkpointId = p->checkpointingId; - streamTaskClearCheckInfo(p); - - char* str = NULL; - streamTaskGetStatus(p, &str); - - code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - if (code != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); - streamMetaWUnLock(pMeta); - return -1; - } else { // save the task - streamMetaSaveTask(pMeta, p); - } - - stDebug( - "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " - "checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, - str); + // SStreamTask* p = *ppTask; + if (p->info.fillHistory == 1) { + // continue; } + ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + + p->chkInfo.checkpointId = p->checkpointingId; + streamTaskClearCheckInfo(p); + + char* str = NULL; + streamTaskGetStatus(p, &str); + + code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + if (code != TSDB_CODE_SUCCESS) { + stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); + streamMetaWUnLock(pMeta); + return -1; + } else { // save the task + streamMetaSaveTask(pMeta, p); + } + + stDebug( + "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " + "checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", + pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, + str); + code = streamMetaCommit(pMeta); if (code < 0) { stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, @@ -341,24 +336,24 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // check for all tasks, and do generate the vnode-wide checkpoint data. SStreamMeta* pMeta = pTask->pMeta; - int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); - ASSERT(remain >= 0); +// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); +// ASSERT(remain >= 0); double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; - if (remain == 0) { // all tasks are ready +// if (remain == 0) { // all tasks are ready stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); - streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); + streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId); + streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId); stInfo( "vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec " "checkpointId:%" PRId64, pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId); - } else { - stInfo( - "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec " - "not ready:%d/%d", - pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks); - } +// } else { +// stInfo( +// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec " +// "not ready:%d/%d", +// pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks); +// } // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -368,6 +363,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } if (code != TSDB_CODE_SUCCESS) { + // record the failure checkpoint id + pTask->chkInfo.failedId = pTask->checkpointingId; + // todo: let's retry send rsp to upstream/mnode stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, pTask->checkpointingId, tstrerror(code));