From 52e090634e30201b6dd4b00ea111c0c2cecae65a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 6 Jun 2024 17:21:23 +0800 Subject: [PATCH] fix(stream): transfer state before send checkpoint-trigger msg to downstream tasks. --- source/libs/executor/src/executor.c | 2 +- source/libs/stream/inc/streamInt.h | 3 + source/libs/stream/src/streamCheckpoint.c | 109 ++++++++++++++++++++- source/libs/stream/src/streamDispatch.c | 4 +- source/libs/stream/src/streamExec.c | 110 +++++++++++----------- 5 files changed, 169 insertions(+), 59 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 69be1c76c7..20bd1056f6 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -162,7 +162,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; - } else if (type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { SPackedData tmp = {.pDataBlock = input}; taosArrayPush(pInfo->pBlockLists, &tmp); pInfo->blockType = STREAM_INPUT__CHECKPOINT; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 154f623b9d..5e68415dc1 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -224,6 +224,9 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 typedef int32_t (*__stream_async_exec_fn_t)(void* param); int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code); +void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id); +int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 648221890a..7a73bb94dd 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -278,6 +278,60 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); + + // we need to transfer state here. The transfer of state may generate new data that need to dispatch to downstream, + // to transfer the new data to downstream before checkpoint-trigger reaching the downstream tasks. + // Otherwise, those new generated data may be lost, if crash before next checkpoint data generatd, which the + // the new generated data is kept in outputQ, and failed to dispatch to downstream tasks. + { + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + // 2. transfer the ownership of executor state + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } else { + stDebug("s-task:%s no transfer-state needed", id); + } + + int32_t blockSize = 0; + int64_t st = taosGetTimestampMs(); + + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); + + int64_t ver = pTask->chkInfo.processedVer; + doSetStreamInputBlock(pTask, pBlock, &ver, id); + + int64_t totalSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, (SStreamQueueItem*)pBlock, &totalSize, &totalBlocks); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + SIZE_IN_MiB(totalSize), totalBlocks); + + pTask->execInfo.outputDataBlocks += totalBlocks; + pTask->execInfo.outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + pTask->execInfo.outputThroughput = 0; + } else { + pTask->execInfo.outputThroughput = (totalSize / el); + pTask->execInfo.procsThroughput = (blockSize / el); + } + } continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); @@ -306,7 +360,58 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamTaskBuildCheckpoint(pTask); } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + + { + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + // 2. transfer the ownership of executor state + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } else { + stDebug("s-task:%s no transfer-state needed", id); + } + } + + int32_t blockSize = 0; + int64_t st = taosGetTimestampMs(); + + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); + + int64_t ver = pTask->chkInfo.processedVer; + doSetStreamInputBlock(pTask, pBlock, &ver, id); + + int64_t totalSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, (SStreamQueueItem*)pBlock, &totalSize, &totalBlocks); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + SIZE_IN_MiB(totalSize), totalBlocks); + + pTask->execInfo.outputDataBlocks += totalBlocks; + pTask->execInfo.outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + pTask->execInfo.outputThroughput = 0; + } else { + pTask->execInfo.outputThroughput = (totalSize / el); + pTask->execInfo.procsThroughput = (blockSize / el); + } + + // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); } @@ -507,7 +612,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin if (pReq->dropRelHTask) { streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->hTaskId, numOfTasks); } streamMetaWLock(pMeta); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2bd024fac1..e100dac808 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -731,8 +731,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } else { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear " - "checkpoint-ready msg, ref:%d", + "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " + "and quit from timer, ref:%d", id, vgId, ref); streamClearChkptReadyMsg(pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 95634b2ff3..742538dbff 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -87,7 +87,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, +int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -436,7 +436,7 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) { } // set input -static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) { +void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) { void* pExecutor = pTask->exec.pExecutor; const SStreamQueueItem* pItem = pInput; @@ -628,63 +628,66 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } - if (type == STREAM_INPUT__CHECKPOINT) { - // transfer the state from fill-history to related stream task before generating the checkpoint. - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - if (dropRelHTask) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); +// if (type == STREAM_INPUT__CHECKPOINT) { +// // transfer the state from fill-history to related stream task before generating the checkpoint. +// bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); +// if (dropRelHTask) { +// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); +// +// STaskId* pHTaskId = &pTask->hTaskInfo.id; +// SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); +// if (pHTask != NULL) { +// // 2. transfer the ownership of executor state +// streamTaskReleaseState(pHTask); +// streamTaskReloadState(pTask); +// stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, +// streamTaskGetStatus(pHTask)->name); +// +// streamMetaReleaseTask(pTask->pMeta, pHTask); +// } else { +// stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, +// (int32_t)pHTaskId->taskId); +// } +// } +// } - STaskId* pHTaskId = &pTask->hTaskInfo.id; - SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); - if (pHTask != NULL) { - // 2. transfer the ownership of executor state - streamTaskReleaseState(pHTask); - streamTaskReloadState(pTask); - stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, - streamTaskGetStatus(pHTask)->name); + if (type != STREAM_INPUT__CHECKPOINT) { + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, + streamQueueItemGetTypeStr(type)); - streamMetaReleaseTask(pTask->pMeta, pHTask); - } else { - stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, - (int32_t)pHTaskId->taskId); - } + int64_t ver = pTask->chkInfo.processedVer; + doSetStreamInputBlock(pTask, pInput, &ver, id); + + int64_t totalSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, + el, SIZE_IN_MiB(totalSize), totalBlocks); + + pTask->execInfo.outputDataBlocks += totalBlocks; + pTask->execInfo.outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + pTask->execInfo.outputThroughput = 0; + } else { + pTask->execInfo.outputThroughput = (totalSize / el); + pTask->execInfo.procsThroughput = (blockSize / el); } - } - int64_t st = taosGetTimestampMs(); - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type)); + SCheckpointInfo* pInfo = &pTask->chkInfo; - int64_t ver = pTask->chkInfo.processedVer; - doSetStreamInputBlock(pTask, pInput, &ver, id); + // update the currentVer if processing the submit blocks. + ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); - int64_t totalSize = 0; - int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(totalSize), totalBlocks); - - pTask->execInfo.outputDataBlocks += totalBlocks; - pTask->execInfo.outputDataSize += totalSize; - if (fabs(el - 0.0) <= DBL_EPSILON) { - pTask->execInfo.procsThroughput = 0; - pTask->execInfo.outputThroughput = 0; - } else { - pTask->execInfo.outputThroughput = (totalSize / el); - pTask->execInfo.procsThroughput = (blockSize / el); - } - - SCheckpointInfo* pInfo = &pTask->chkInfo; - - // update the currentVer if processing the submit blocks. - ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); - - if (ver != pInfo->processedVer) { - stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 - " ckpt:%" PRId64, - id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); - pInfo->processedVer = ver; + if (ver != pInfo->processedVer) { + stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 + " ckpt:%" PRId64, + id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + pInfo->processedVer = ver; + } } streamFreeQitem(pInput); @@ -692,7 +695,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { - // todo add lock SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) {