From 035b19949719afec6b7c756defa1496b18b5191c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Jul 2023 18:46:38 +0800 Subject: [PATCH] refactor(stream): do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 65 ++++++++++------------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0b39e6f834..e59a9e1533 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -136,15 +136,31 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = 1; - // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. - // 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already. + // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into + // inputQ, to make sure all blocks with less version have been handled by this task already. return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); } +static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { + pBlock->srcTaskId = pTask->id.taskId; + pBlock->srcVgId = pTask->pMeta->vgId; + + int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + if (code == 0) { + streamDispatchStreamBlock(pTask); + } + + streamFreeQitem((SStreamQueueItem*)pBlock); + return code; +} + int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); int64_t checkpointId = pDataBlock->info.version; + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; + // set the task status pTask->checkpointingId = checkpointId; pTask->status.taskStatus = TASK_STATUS__CK; @@ -153,20 +169,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - pBlock->srcTaskId = pTask->id.taskId; - pBlock->srcVgId = pTask->pMeta->vgId; - - qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", pTask->id.idStr, - pTask->info.selfChildId); - - int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); - if (code != 0) { // todo failed to add it into the output queue, free it. - return code; - } - - streamFreeQitem((SStreamQueueItem*)pBlock); - streamDispatchStreamBlock(pTask); - } else { // only one task exists + qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId); + continueDispatchCheckpointBlock(pBlock, pTask); + } else { // only one task exists, no need to dispatch downstream info streamProcessCheckpointReadyMsg(pTask); } } else if (taskLevel == TASK_LEVEL__SINK) { @@ -176,7 +181,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // update the child Id for downstream tasks streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); - qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", pTask->id.idStr); + qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id); } else { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); @@ -188,14 +193,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); if (notReady > 0) { qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", - pTask->id.idStr, pTask->info.selfChildId, notReady, num); - return 0; + id, pTask->info.selfChildId, notReady, num); + return code; } qDebug( - "s-task:%s receive one checkpoint block, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to " - "downstream", - pTask->id.idStr, num); + "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg downstream", + id, num); // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure @@ -204,23 +208,10 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks - - { - pBlock->srcTaskId = pTask->id.taskId; - pBlock->srcVgId = pTask->pMeta->vgId; - - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); - if (code != 0) { // todo failed to add it into the output queue, free it. - return code; - } - - streamFreeQitem((SStreamQueueItem*)pBlock); - streamDispatchStreamBlock(pTask); - } + code = continueDispatchCheckpointBlock(pBlock, pTask); } - return 0; + return code; } /**