From 08a4fb06ae03be5fdc8a1ef6009bc23cc02eca0f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Jul 2023 09:17:20 +0800 Subject: [PATCH] fix(stream): dispatch checkpoint msg to downstream by puting message into input queue. --- include/common/tcommon.h | 1 + source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/libs/stream/inc/streamInt.h | 3 +- source/libs/stream/src/stream.c | 4 +- source/libs/stream/src/streamCheckpoint.c | 85 ++++++++--------------- source/libs/stream/src/streamDispatch.c | 55 +++++++++++---- source/libs/stream/src/streamExec.c | 48 +++++++++---- source/libs/stream/src/streamQueue.c | 3 +- 8 files changed, 114 insertions(+), 87 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 5b14fa4e49..ffd7e86e69 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -152,6 +152,7 @@ enum { STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__CHECKPOINT_TRIGGER, STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__DESTROY, }; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 2dd6524feb..378217863a 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -234,12 +234,12 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - int32_t status = pTask->status.taskStatus; if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } + int32_t status = pTask->status.taskStatus; if (status != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d2ae324dd2..e7f13a4338 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -50,7 +50,8 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); -int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId); + int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 30c81d4586..3348914159 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -239,7 +239,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked + // todo: if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); @@ -320,7 +320,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { destroyStreamDataBlock((SStreamDataBlock*)pItem); return code; } - } else if (type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5d52042127..50564a9070 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -123,52 +123,13 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); } -static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { - SStreamCheckpointReq req = { - .streamId = pTask->id.streamId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->info.nodeId, - .downstreamNodeId = pTask->info.nodeId, - .downstreamTaskId = pTask->id.taskId, - .childId = pTask->info.selfChildId, - .checkpointId = checkpointId, - }; - - // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; - req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; - streamDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->notReadyTasks = numOfVgs; - pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); - - qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs); - - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } else { // no need to dispatch msg to downstream task - qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr); - streamProcessCheckpointRsp(NULL, pTask); - } - - return 0; -} - -static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask) { +static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pChkpoint->type = STREAM_INPUT__CHECKPOINT; + pChkpoint->type = checkpointType; pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pChkpoint->pBlock == NULL) { taosFreeQitem(pChkpoint); @@ -187,8 +148,6 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask) { int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { int32_t code = 0; - int64_t checkpointId = pReq->checkpointId; - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. @@ -197,7 +156,9 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, pTask->checkpointingId = pReq->checkpointId; // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. - streamTaskDispatchCheckpointMsg(pTask, checkpointId); + // 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already. + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + streamSchedExec(pTask); return code; } @@ -208,10 +169,14 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe // set the task status pTask->checkpointingId = checkpointId; pTask->status.taskStatus = TASK_STATUS__CK; + + //todo fix race condition: set the status and append checkpoint block + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - appendCheckpointIntoInputQ(pTask); + // todo: sink node needs alignment?? + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT); streamSchedExec(pTask); qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); } else { @@ -221,23 +186,27 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe // there are still some upstream tasks not send checkpoint request, do nothing and wait for then int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); if (notReady > 0) { - int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d", pTask->id.idStr, notReady, num); return 0; } - qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream", - pTask->id.idStr); + qDebug( + "s-task:%s receive one checkpoint req, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to " + "downstream", + pTask->id.idStr, num); - // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this node + // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY - // dispatch check point msg to all downstream tasks - streamTaskDispatchCheckpointMsg(pTask, checkpointId); + // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + // already. And then, dispatch check point msg to all downstream tasks + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + streamSchedExec(pTask); } return 0; @@ -255,7 +224,7 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { if (notReady == 0) { qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task", pTask->id.idStr); - appendCheckpointIntoInputQ(pTask); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT); streamSchedExec(pTask); } else { int32_t total = streamTaskGetNumOfDownstream(pTask); @@ -269,16 +238,20 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { taosWLockLatch(&pMeta->lock); for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); + uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId); p->chkInfo.keptCheckpointId = p->checkpointingId; + int8_t prev = p->status.taskStatus; + p->status.taskStatus = TASK_STATUS__NORMAL; + streamMetaSaveTask(pMeta, p); - qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 - ", ver:%" PRId64 " currentVer:%" PRId64, - pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer); + qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 + " currentVer:%" PRId64 ", status to be normal, prev:%s", + pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer, + streamGetTaskStatusStr(prev)); } if (streamMetaCommit(pMeta) < 0) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4a49806035..cfe960e99c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -415,8 +415,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, - SEpSet* pEpSet) { +static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, + SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -451,6 +451,45 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointR return 0; } +int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { + SStreamCheckpointReq req = { + .streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->info.nodeId, + .downstreamNodeId = pTask->info.nodeId, + .downstreamTaskId = pTask->id.taskId, + .childId = pTask->info.selfChildId, + .checkpointId = checkpointId, + }; + + // serialize + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + doDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->notReadyTasks = numOfVgs; + pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); + + qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs); + + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + doDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } + } else { // no need to dispatch msg to downstream task + qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr); + streamProcessCheckpointRsp(NULL, pTask); + } + + return 0; +} + int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId}; @@ -488,11 +527,7 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { } taosArrayClear(pTask->pRpcMsgList); - - int8_t prev = pTask->status.taskStatus; - pTask->status.taskStatus = TASK_STATUS__NORMAL; - qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr, - pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev)); + qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr); return TSDB_CODE_SUCCESS; } @@ -505,11 +540,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { tmsgSendRsp(pMsg); taosArrayClear(pTask->pRpcMsgList); - - int8_t prev = pTask->status.taskStatus; - pTask->status.taskStatus = TASK_STATUS__NORMAL; - qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr, - pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev)); + qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5b1a53d5f0..36bc4b1968 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -293,6 +293,13 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif +static int32_t getNumOfItemsInputQ(const SStreamTask* pTask) { + int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue); + int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall); + + return numOfItems1 + numOfItems2; +} + static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { // wait for the stream task to be idle int64_t st = taosGetTimestampMs(); @@ -313,11 +320,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", - pTask->id.idStr, pTask->streamTaskId.taskId); + qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", pTask->id.idStr, + pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, + pStreamTask->id.idStr); } ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); @@ -395,8 +403,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, - numOfBlocks, pMerged->ver); + qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks, + pMerged->ver); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); ASSERT((*pVer) < pMerged->ver); (*pVer) = pMerged->ver; @@ -433,7 +441,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this + if (code != TSDB_CODE_SUCCESS) { // todo handle this return 0; } } @@ -442,20 +450,33 @@ int32_t streamExecForAll(SStreamTask* pTask) { return 0; } - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK || pInput->type == STREAM_INPUT__CHECKPOINT); + int32_t type = pInput->type; + if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + ASSERT(getNumOfItemsInputQ(pTask) == 1); + } - if (pInput->type == STREAM_INPUT__DATA_BLOCK) { + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); + + if (type == STREAM_INPUT__DATA_BLOCK) { qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; - } else { // for sink task, do nothing. + } else { // pInput->type == STREAM_INPUT__CHECKPOINT, for sink task, do nothing. ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); pTask->status.taskStatus = TASK_STATUS__CK_READY; return 0; } } + // dispatch checkpoint msg to all downstream tasks + if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + qDebug("s-task:%s start to dispatch checkpoint msg to downstream", id); + + streamTaskDispatchCheckpointMsg(pTask, pTask->checkpointingId); + return 0; + } + int64_t st = taosGetTimestampMs(); const SStreamQueueItem* pItem = pInput; @@ -475,12 +496,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { // update the currentVer if processing the submit blocks. ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version); - if(ver != pTask->chkInfo.version) { + if (ver != pTask->chkInfo.version) { qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, pTask->chkInfo.version); } - int32_t type = pInput->type; streamFreeQitem(pInput); // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. @@ -550,8 +570,8 @@ int32_t streamTryExec(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%"PRId64", code:%s", - pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); + qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", + pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); } } else { if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 18d2d6b7a5..49b7e81c25 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -190,7 +190,8 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i } // do not merge blocks for sink node and check point data block - if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || (qItem->type == STREAM_INPUT__CHECKPOINT)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || + (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) { *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS;