From 40d59990d8ba49e60840453190cefe2b018664d2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jul 2023 04:01:20 +0000 Subject: [PATCH] fix mem leak --- include/libs/stream/tstream.h | 8 ++++--- source/libs/stream/src/streamMeta.c | 14 ++++------- source/libs/stream/src/streamSnapshot.c | 1 + source/libs/stream/src/streamTask.c | 31 ++++++++++++++++++++++++- 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 278c19bdf7..b33798821b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -381,8 +381,11 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); -int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); -bool tInputQueueIsFull(const SStreamTask* pTask); + +int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); + +int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); +bool tInputQueueIsFull(const SStreamTask* pTask); typedef struct { SMsgHead head; @@ -615,7 +618,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); - int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fa53819f45..4711f4af19 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -438,18 +438,14 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - goto _err; - } - + SCheckpointInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeStreamTask(&decoder, pTask); + if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { + continue; + } tDecoderClear(&decoder); - chkpId = TMAX(chkpId, pTask->chkInfo.checkpointId); - - taosMemoryFree(pTask); // fix mem leak later + chkpId = TMAX(chkpId, info.checkpointId); } _err: diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index b1b05ff0ca..1a66c00389 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -219,6 +219,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path } // const char* path = NULL; if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { + taosMemoryFree(pReader); return -1; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dec2768975..7f2c7a8a44 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -132,6 +132,35 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { return pEncoder->pos; } +int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { + int64_t skip64; + int8_t skip8; + int32_t skip32; + int16_t skip16; + SEpSet epSet; + + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &skip64) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI16(pDecoder, &skip16) < 0) return -1; + + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; + + if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; @@ -236,7 +265,7 @@ void tFreeStreamTask(SStreamTask* pTask) { tSimpleHashCleanup(pTask->tbSink.pTblInfo); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); - pTask->checkReqIds =taosArrayDestroy(pTask->checkReqIds); + pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds); } if (pTask->pState) {