From c4447d68738d4b8094cdd7841a885c50b930a3b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jun 2024 12:44:38 +0800 Subject: [PATCH] fix(stream): flush executor state for those with only one task for a stream. --- source/libs/stream/inc/streamInt.h | 3 +- source/libs/stream/src/streamCheckpoint.c | 114 ++-------------------- source/libs/stream/src/streamExec.c | 77 +++++---------- 3 files changed, 35 insertions(+), 159 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 5e68415dc1..f3ec01cf7a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -224,8 +224,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 typedef int32_t (*__stream_async_exec_fn_t)(void* param); int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code); -void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id); -int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); +void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ad14a9b9bf..ab3b5d6fa0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -276,62 +276,14 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int8_t type = pTask->outputInfo.type; pActiveInfo->allUpstreamTriggerRecv = 1; + // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks. + // The transfer of state may generate new data that need to dispatch to downstream tasks, + // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed + // before the next checkpoint. + flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); - - // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks. - // The transfer of state may generate new data that need to dispatch to downstream tasks, - // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed - // before the next checkpoint. - { - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - if (dropRelHTask) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - - STaskId* pHTaskId = &pTask->hTaskInfo.id; - SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); - if (pHTask != NULL) { - // 2. transfer the ownership of executor state - streamTaskReleaseState(pHTask); - streamTaskReloadState(pTask); - stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, - streamTaskGetStatus(pHTask)->name); - - streamMetaReleaseTask(pTask->pMeta, pHTask); - } else { - stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, - (int32_t)pHTaskId->taskId); - } - } else { - stDebug("s-task:%s no transfer-state needed", id); - } - - int32_t blockSize = 0; - int64_t st = taosGetTimestampMs(); - - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); - - int64_t ver = pTask->chkInfo.processedVer; - doSetStreamInputBlock(pTask, pBlock, &ver, id); - - int64_t totalSize = 0; - int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, (SStreamQueueItem*)pBlock, &totalSize, &totalBlocks); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(totalSize), totalBlocks); - - pTask->execInfo.outputDataBlocks += totalBlocks; - pTask->execInfo.outputDataSize += totalSize; - if (fabs(el - 0.0) <= DBL_EPSILON) { - pTask->execInfo.procsThroughput = 0; - pTask->execInfo.outputThroughput = 0; - } else { - pTask->execInfo.outputThroughput = (totalSize / el); - pTask->execInfo.procsThroughput = (blockSize / el); - } - } continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); @@ -361,58 +313,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - { - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - if (dropRelHTask) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); - STaskId* pHTaskId = &pTask->hTaskInfo.id; - SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); - if (pHTask != NULL) { - // 2. transfer the ownership of executor state - streamTaskReleaseState(pHTask); - streamTaskReloadState(pTask); - stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, - streamTaskGetStatus(pHTask)->name); - - streamMetaReleaseTask(pTask->pMeta, pHTask); - } else { - stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, - (int32_t)pHTaskId->taskId); - } - } else { - stDebug("s-task:%s no transfer-state needed", id); - } - } - - int32_t blockSize = 0; - int64_t st = taosGetTimestampMs(); - - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); - - int64_t ver = pTask->chkInfo.processedVer; - doSetStreamInputBlock(pTask, pBlock, &ver, id); - - int64_t totalSize = 0; - int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, (SStreamQueueItem*)pBlock, &totalSize, &totalBlocks); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(totalSize), totalBlocks); - - pTask->execInfo.outputDataBlocks += totalBlocks; - pTask->execInfo.outputDataSize += totalSize; - if (fabs(el - 0.0) <= DBL_EPSILON) { - pTask->execInfo.procsThroughput = 0; - pTask->execInfo.outputThroughput = 0; - } else { - pTask->execInfo.outputThroughput = (totalSize / el); - pTask->execInfo.procsThroughput = (blockSize / el); - } - - // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by this task - // already. And then, dispatch check point msg to all downstream tasks + // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by + // this task already. And then, dispatch check point msg to all downstream tasks code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a95170949b..1828409f89 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,6 +24,7 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState* pState = streamTaskGetStatus(pTask); @@ -87,8 +88,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } -int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, - int32_t* totalBlocks) { +int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -436,7 +436,7 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) { } // set input -void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) { +static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) { void* pExecutor = pTask->exec.pExecutor; const SStreamQueueItem* pItem = pInput; @@ -541,20 +541,20 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } -static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { - const char* id = pTask->id.idStr; - - int32_t blockSize = 0; - int64_t st = taosGetTimestampMs(); +static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) { + const char* id = pTask->id.idStr; + int32_t blockSize = 0; + int64_t st = taosGetTimestampMs(); + SCheckpointInfo* pInfo = &pTask->chkInfo; + int64_t ver = pInfo->processedVer; stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); - int64_t ver = pTask->chkInfo.processedVer; - doSetStreamInputBlock(pTask, pCheckpointBlock, &ver, id); + doSetStreamInputBlock(pTask, pBlock, &ver, id); int64_t totalSize = 0; int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, pCheckpointBlock, &totalSize, &totalBlocks); + streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); double el = (taosGetTimestampMs() - st) / 1000.0; stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, @@ -569,11 +569,22 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pCheckpoi pTask->execInfo.outputThroughput = (totalSize / el); pTask->execInfo.procsThroughput = (blockSize / el); } + + // update the currentVer if processing the submit blocks. + ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); + + if (ver != pInfo->processedVer) { + stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 + " ckpt:%" PRId64, + id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + pInfo->processedVer = ver; + } } void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { const char* id = pTask->id.idStr; + // 1. transfer the ownership of executor state bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); if (dropRelHTask) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); @@ -581,7 +592,6 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB STaskId* pHTaskId = &pTask->hTaskInfo.id; SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); if (pHTask != NULL) { - // 2. transfer the ownership of executor state streamTaskReleaseState(pHTask); streamTaskReloadState(pTask); stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, @@ -596,7 +606,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB stDebug("s-task:%s no transfer-state needed", id); } - // flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. + // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. doStreamTaskExecImpl(pTask, pCheckpointBlock); } @@ -605,7 +615,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB * appropriate batch of blocks should be handled in 5 to 10 sec. */ static int32_t doStreamExecTask(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -688,42 +698,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (type != STREAM_INPUT__CHECKPOINT) { - int64_t st = taosGetTimestampMs(); - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, - streamQueueItemGetTypeStr(type)); - - int64_t ver = pTask->chkInfo.processedVer; - doSetStreamInputBlock(pTask, pInput, &ver, id); - - int64_t totalSize = 0; - int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, - el, SIZE_IN_MiB(totalSize), totalBlocks); - - pTask->execInfo.outputDataBlocks += totalBlocks; - pTask->execInfo.outputDataSize += totalSize; - if (fabs(el - 0.0) <= DBL_EPSILON) { - pTask->execInfo.procsThroughput = 0; - pTask->execInfo.outputThroughput = 0; - } else { - pTask->execInfo.outputThroughput = (totalSize / el); - pTask->execInfo.procsThroughput = (blockSize / el); - } - - SCheckpointInfo* pInfo = &pTask->chkInfo; - - // update the currentVer if processing the submit blocks. - ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); - - if (ver != pInfo->processedVer) { - stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 - " ckpt:%" PRId64, - id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); - pInfo->processedVer = ver; - } + doStreamTaskExecImpl(pTask, pInput); } streamFreeQitem(pInput); @@ -755,8 +730,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } } - - return 0; } // the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not