From 16ee72ab062241ac9a8224d2b9acf35fa309ca52 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 10 Jul 2023 13:57:51 +0800 Subject: [PATCH] fix(stream): free msg after send checkpoint rsp. --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/libs/stream/src/streamDispatch.c | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index b2654b7c39..70ce6f5c28 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -752,6 +752,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 32e9c965cb..83b20ef80e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -487,6 +487,8 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) { tmsgSendRsp(pMsg); } + taosArrayClear(pTask->pRpcMsgList); + int8_t prev = pTask->status.taskStatus; pTask->status.taskStatus = TASK_STATUS__NORMAL; qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr, @@ -502,6 +504,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { tmsgSendRsp(pMsg); + taosArrayClear(pTask->pRpcMsgList); + int8_t prev = pTask->status.taskStatus; pTask->status.taskStatus = TASK_STATUS__NORMAL; qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr,