fix(stream): transfer state before send checkpoint-trigger msg to downstream tasks.

This commit is contained in:
Haojun Liao 2024-06-06 17:21:23 +08:00
parent 2da9476e48
commit 52e090634e
5 changed files with 169 additions and 59 deletions

View File

@ -162,7 +162,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) {
} else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SPackedData tmp = {.pDataBlock = input};
taosArrayPush(pInfo->pBlockLists, &tmp);
pInfo->blockType = STREAM_INPUT__CHECKPOINT;

View File

@ -224,6 +224,9 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);
void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id);
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
#ifdef __cplusplus
}
#endif

View File

@ -278,6 +278,60 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
// we need to transfer state here. The transfer of state may generate new data that need to dispatch to downstream,
// to transfer the new data to downstream before checkpoint-trigger reaching the downstream tasks.
// Otherwise, those new generated data may be lost, if crash before next checkpoint data generatd, which the
// the new generated data is kept in outputQ, and failed to dispatch to downstream tasks.
{
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
} else {
stDebug("s-task:%s no transfer-state needed", id);
}
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger");
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pBlock, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, (SStreamQueueItem*)pBlock, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
}
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId);
@ -306,7 +360,58 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamTaskBuildCheckpoint(pTask);
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
{
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
} else {
stDebug("s-task:%s no transfer-state needed", id);
}
}
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger");
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pBlock, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, (SStreamQueueItem*)pBlock, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
}
@ -507,7 +612,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->hTaskId, numOfTasks);
}
streamMetaWLock(pMeta);

View File

@ -731,8 +731,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear "
"checkpoint-ready msg, ref:%d",
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"and quit from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pTask);

View File

@ -87,7 +87,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code;
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
@ -436,7 +436,7 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) {
}
// set input
static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
void* pExecutor = pTask->exec.pExecutor;
const SStreamQueueItem* pItem = pInput;
@ -628,63 +628,66 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
}
if (type == STREAM_INPUT__CHECKPOINT) {
// transfer the state from fill-history to related stream task before generating the checkpoint.
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
// if (type == STREAM_INPUT__CHECKPOINT) {
// // transfer the state from fill-history to related stream task before generating the checkpoint.
// bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
// if (dropRelHTask) {
// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
//
// STaskId* pHTaskId = &pTask->hTaskInfo.id;
// SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
// if (pHTask != NULL) {
// // 2. transfer the ownership of executor state
// streamTaskReleaseState(pHTask);
// streamTaskReloadState(pTask);
// stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
// streamTaskGetStatus(pHTask)->name);
//
// streamMetaReleaseTask(pTask->pMeta, pHTask);
// } else {
// stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
// (int32_t)pHTaskId->taskId);
// }
// }
// }
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
if (type != STREAM_INPUT__CHECKPOINT) {
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks,
streamQueueItemGetTypeStr(type));
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id,
el, SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
}
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type));
SCheckpointInfo* pInfo = &pTask->chkInfo;
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
SCheckpointInfo* pInfo = &pTask->chkInfo;
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
}
}
streamFreeQitem(pInput);
@ -692,7 +695,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {