From 385e1a8b0d403cfbaf5b55e94fdbdd7669b33d3f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 6 Jun 2024 23:55:54 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 82 +++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 742538dbff..a95170949b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,6 +541,65 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } +static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { + const char* id = pTask->id.idStr; + + int32_t blockSize = 0; + int64_t st = taosGetTimestampMs(); + + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); + + int64_t ver = pTask->chkInfo.processedVer; + doSetStreamInputBlock(pTask, pCheckpointBlock, &ver, id); + + int64_t totalSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, pCheckpointBlock, &totalSize, &totalBlocks); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + SIZE_IN_MiB(totalSize), totalBlocks); + + pTask->execInfo.outputDataBlocks += totalBlocks; + pTask->execInfo.outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + pTask->execInfo.outputThroughput = 0; + } else { + pTask->execInfo.outputThroughput = (totalSize / el); + pTask->execInfo.procsThroughput = (blockSize / el); + } +} + +void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { + const char* id = pTask->id.idStr; + + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + // 2. transfer the ownership of executor state + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } else { + stDebug("s-task:%s no transfer-state needed", id); + } + + // flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. + doStreamTaskExecImpl(pTask, pCheckpointBlock); +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -628,29 +687,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } -// if (type == STREAM_INPUT__CHECKPOINT) { -// // transfer the state from fill-history to related stream task before generating the checkpoint. -// bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); -// if (dropRelHTask) { -// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); -// -// STaskId* pHTaskId = &pTask->hTaskInfo.id; -// SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); -// if (pHTask != NULL) { -// // 2. transfer the ownership of executor state -// streamTaskReleaseState(pHTask); -// streamTaskReloadState(pTask); -// stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, -// streamTaskGetStatus(pHTask)->name); -// -// streamMetaReleaseTask(pTask->pMeta, pHTask); -// } else { -// stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, -// (int32_t)pHTaskId->taskId); -// } -// } -// } - if (type != STREAM_INPUT__CHECKPOINT) { int64_t st = taosGetTimestampMs(); stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks,