From 61f0aec698e5b0f62a6f573a69ea209a944abf1a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 14 Aug 2024 10:05:03 +0800 Subject: [PATCH 1/2] fix: response message memory leak --- source/client/src/clientMsgHandler.c | 11 ++++++++++- source/client/src/clientTmq.c | 28 ++++++++++++++++++++++------ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index cc1ed7f3fa..4dea9c17b0 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -297,6 +297,9 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { } if (strlen(usedbRsp.db) == 0) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + if (usedbRsp.errCode != 0) { return usedbRsp.errCode; } else { @@ -366,9 +369,15 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { } int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg == NULL || param == NULL) { + if (pMsg == NULL) { return TSDB_CODE_TSC_INVALID_INPUT; } + if (param == NULL) { + taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pMsg->pData); + return TSDB_CODE_TSC_INVALID_INPUT; + } + SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8f35a2fad1..73d21c0e22 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -823,15 +823,17 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != 0){ - return code; + goto _return; } if (pMsg == NULL || param == NULL) { - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto _return; } + SMqHbRsp rsp = {0}; code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); if (code != 0) { - return code; + goto _return; } int64_t refId = (int64_t)param; @@ -854,10 +856,14 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { taosWUnLockLatch(&tmq->lock); (void)taosReleaseRef(tmqMgmt.rsetId, refId); } + tDestroySMqHbRsp(&rsp); + +_return: + taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - return 0; + return code; } void tmqSendHbReq(void* param, void* tmrId) { @@ -1504,7 +1510,12 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) { int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; SMqPollCbParam* pParam = (SMqPollCbParam*)param; - if (pParam == NULL || pMsg == NULL) { + if (pMsg == NULL) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (pParam == NULL) { + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); return TSDB_CODE_TSC_INTERNAL_ERROR; } int64_t refId = pParam->refId; @@ -1512,6 +1523,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { uint64_t requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); return TSDB_CODE_TMQ_CONSUMER_CLOSED; } @@ -2809,7 +2822,10 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { - if (param == NULL) return code; + if (param == NULL) { + goto FAIL; + } + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { From 19dcc5bddd3c55cd2d9765dbdfeacd85c41a4ec1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 14 Aug 2024 14:50:57 +0800 Subject: [PATCH 2/2] fix: error log issue --- source/client/src/clientTmq.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 73d21c0e22..929debf16d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1509,6 +1509,10 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) { int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; + SMqPollRspWrapper* pRspWrapper = NULL; + int8_t rspType = 0; + int32_t vgId = 0; + uint64_t requestId = 0; SMqPollCbParam* pParam = (SMqPollCbParam*)param; if (pMsg == NULL) { return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1519,8 +1523,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { return TSDB_CODE_TSC_INTERNAL_ERROR; } int64_t refId = pParam->refId; - int32_t vgId = pParam->vgId; - uint64_t requestId = pParam->requestId; + vgId = pParam->vgId; + requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { taosMemoryFreeClear(pMsg->pData); @@ -1528,7 +1532,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { return TSDB_CODE_TMQ_CONSUMER_CLOSED; } - SMqPollRspWrapper* pRspWrapper = NULL; int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); if (ret) { code = ret; @@ -1559,7 +1562,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ASSERT(msgEpoch == clientEpoch); // handle meta rsp - int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; + rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; pRspWrapper->tmqRspType = rspType; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; @@ -1627,7 +1630,7 @@ END: } int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, - tmq->consumerId, rspType, vgId, total, requestId); + tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId); if (tmq) (void)tsem2_post(&tmq->rspSem); if (pMsg) taosMemoryFreeClear(pMsg->pData);