From b4956392d9cf4768968757d505ec74fd94059daa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 May 2024 15:54:08 +0800 Subject: [PATCH] fix(stream): return in_progress code if not send retrieve rsp to downstream. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 7 ++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ee3f1a3760..45e1a300d0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -871,7 +871,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_STREAM_TASK_NOT_EXIST; } tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64, @@ -890,6 +890,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, (int32_t)pReq->downstreamTaskId, checkpointId, transId); streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info); + return TSDB_CODE_SUCCESS; } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; streamTaskGetTriggerRecvStatus(pTask, &recv, &total); @@ -902,6 +903,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } + return TSDB_CODE_ACTION_IN_PROGRESS; } } else { // upstream not recv the checkpoint-source/trigger till now ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT); @@ -909,9 +911,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); + return TSDB_CODE_ACTION_IN_PROGRESS; } - - return TSDB_CODE_SUCCESS; } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 426f85fa5e..c0eeebaecb 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -842,16 +842,16 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskScanHistory(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER: return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: + return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: - return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); - case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: - return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); default: