From d94f3c5c41c46dccd126c399bf51412192d8bf2c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 30 Aug 2024 10:53:17 +0800 Subject: [PATCH 1/2] fix(stream):fix stream memory leak --- source/dnode/mnode/impl/src/mndStream.c | 2 ++ .../dnode/mnode/impl/test/stream/stream.cpp | 1 + source/libs/stream/src/streamDispatch.c | 3 +++ source/libs/stream/src/streamHb.c | 1 + source/libs/stream/src/streamMsg.c | 26 +++++++++---------- source/libs/stream/src/streamTask.c | 2 ++ source/libs/stream/src/streamUpdate.c | 4 +-- 7 files changed, 24 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bc9f3adbfe..fae7363639 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -523,6 +523,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { int32_t code = tEncodeStreamTask(&encoder, pTask); if (code == -1) { + tEncoderClear(&encoder); return TSDB_CODE_INVALID_MSG; } @@ -1009,6 +1010,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int tEncoderInit(&encoder, abuf, tlen); int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req); if (pos == -1) { + tEncoderClear(&encoder); return TSDB_CODE_INVALID_MSG; } diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 4c63c2ba2c..d508cf7390 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -91,6 +91,7 @@ SRpcMsg buildHbReq() { tEncoderInit(&encoder, (uint8_t*)buf, tlen); if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) { rpcFreeCont(buf); + tEncoderClear(&encoder); goto _end; } tEncoderClear(&encoder); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 59210fe99f..d9e42ad307 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -214,6 +214,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) { rpcFreeCont(buf); + tEncoderClear(&encoder); return code; } tEncoderClear(&encoder); @@ -830,6 +831,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) { rpcFreeCont(buf); + tEncoderClear(&encoder); return code; } tEncoderClear(&encoder); @@ -1131,6 +1133,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) { + tEncoderClear(&encoder); goto FAIL; } tEncoderClear(&encoder); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 1ef938494e..1fd3106cff 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -95,6 +95,7 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p tEncoderInit(&encoder, buf, tlen); if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) { rpcFreeCont(buf); + tEncoderClear(&encoder); stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); return TSDB_CODE_FAILED; } diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 8105614d76..95b86a65ef 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -37,7 +37,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) { } int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; @@ -67,7 +67,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo } int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; @@ -79,7 +79,7 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp } int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; @@ -132,7 +132,7 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* } int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; @@ -160,7 +160,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) } int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; @@ -190,7 +190,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) } int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; @@ -216,7 +216,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea } int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; @@ -302,7 +302,7 @@ void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { } int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; @@ -332,7 +332,7 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; @@ -350,7 +350,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; @@ -508,7 +508,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { } int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; @@ -667,7 +667,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; @@ -695,7 +695,7 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq } int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; + if (tStartEncode(pEncoder) != 0) return -1; if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index afe315bf58..983862b69b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -863,6 +863,7 @@ int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpoin tEncoderInit(&encoder, buf, tlen); if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) { rpcFreeCont(buf); + tEncoderClear(&encoder); stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code)); return -1; } @@ -1016,6 +1017,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { tEncoderInit(&encoder, buf, tlen); if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) { rpcFreeCont(buf); + tEncoderClear(&encoder); stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code)); return -1; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 8e32822fb7..3f7210d3c9 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -455,7 +455,7 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) { + if (tStartEncode(&encoder) != 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } @@ -551,10 +551,10 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, tEndEncode(&encoder); int32_t tlen = encoder.pos; - tEncoderClear(&encoder); *pLen = tlen; _end: + tEncoderClear(&encoder); if (code != TSDB_CODE_SUCCESS) { uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } From dd1ab39eb0c42c5ef4f344d9324129fc50a2199e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 30 Aug 2024 13:35:01 +0800 Subject: [PATCH 2/2] fix(stream):fix stream memory leak --- source/libs/stream/src/streamMsg.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 95b86a65ef..8105614d76 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -37,7 +37,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) { } int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; @@ -67,7 +67,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo } int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; @@ -79,7 +79,7 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp } int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; @@ -132,7 +132,7 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* } int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; @@ -160,7 +160,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) } int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; @@ -190,7 +190,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) } int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; @@ -216,7 +216,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea } int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; @@ -302,7 +302,7 @@ void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { } int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; @@ -332,7 +332,7 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; @@ -350,7 +350,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; @@ -508,7 +508,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { } int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; @@ -667,7 +667,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; @@ -695,7 +695,7 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq } int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { - if (tStartEncode(pEncoder) != 0) return -1; + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;