From b88a786fdfac5f4aa448b90e09e4025116d3ff6e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Nov 2024 16:41:15 +0800 Subject: [PATCH] fix(stream): set correct error code and open inputq for upstream . --- source/dnode/mnode/impl/src/mndStreamHb.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 0f903632ed..46445af856 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -498,7 +498,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } - mInfo("stream:0x%" PRIx64 "checkpointId:%" PRId64 + mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed issue task-reset trans to reset all tasks status", pInfo->streamUid, pInfo->checkpointId, pInfo->transId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cc92df368c..a8a934da98 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -255,8 +255,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check "the interrupted checkpoint", id, vgId, pBlock->srcTaskId); - streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { @@ -264,14 +263,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 " discard", id, vgId, pActiveInfo->activeId, checkpointId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { stDebug( "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -283,17 +282,17 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check } if (p->upstreamTaskId == pBlock->srcTaskId) { - stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 + stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } } } } } - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { @@ -317,6 +316,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId); streamMutexUnlock(&pTask->lock); if (code) { + if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks + streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); + } streamFreeQitem((SStreamQueueItem*)pBlock); return code; }