From 92106312d25804970d968036760f39ba52c15ada Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Jul 2023 09:43:38 +0800 Subject: [PATCH] fix(stream): not dispatch checkpoint block result to downstream. --- source/dnode/vnode/src/tq/tq.c | 3 ++- source/libs/stream/src/streamExec.c | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 19825b438b..89beaadf72 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1673,7 +1673,8 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { return code; } - tqDebug("vgId:%d s-task:%s received the checkpoint ready msg, handle it", vgId, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); streamProcessCheckpointReadyMsg(pTask); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index dbec866705..cad0d58925 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -113,6 +113,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i // TODO } continue; + } else if (output->info.type == STREAM_CHECKPOINT) { + continue; // checkpoint block not dispatch to downstream tasks } SSDataBlock block = {0};