Merge pull request #27212 from taosdata/fix/TD-31363

fix: response message memory leak
This commit is contained in:
dapan1121 2024-08-16 09:15:06 +08:00 committed by GitHub
commit f617124f98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 40 additions and 12 deletions

View File

@ -297,6 +297,9 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
if (strlen(usedbRsp.db) == 0) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (usedbRsp.errCode != 0) {
return usedbRsp.errCode;
} else {
@ -366,9 +369,15 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg == NULL || param == NULL) {
if (pMsg == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
if (param == NULL) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
return TSDB_CODE_TSC_INVALID_INPUT;
}
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {

View File

@ -825,15 +825,17 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0){
return code;
goto _return;
}
if (pMsg == NULL || param == NULL) {
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
goto _return;
}
SMqHbRsp rsp = {0};
code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
if (code != 0) {
return code;
goto _return;
}
int64_t refId = (int64_t)param;
@ -856,10 +858,14 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
taosWUnLockLatch(&tmq->lock);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
tDestroySMqHbRsp(&rsp);
_return:
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return 0;
return code;
}
void tmqSendHbReq(void* param, void* tmrId) {
@ -1511,19 +1517,29 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) {
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL;
SMqPollRspWrapper* pRspWrapper = NULL;
int8_t rspType = 0;
int32_t vgId = 0;
uint64_t requestId = 0;
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pParam == NULL || pMsg == NULL) {
if (pMsg == NULL) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (pParam == NULL) {
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
vgId = pParam->vgId;
requestId = pParam->requestId;
tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
return TSDB_CODE_TMQ_CONSUMER_CLOSED;
}
SMqPollRspWrapper* pRspWrapper = NULL;
int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
if (ret) {
code = ret;
@ -1554,7 +1570,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
ASSERT(msgEpoch == clientEpoch);
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
pRspWrapper->tmqRspType = rspType;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
@ -1622,7 +1638,7 @@ END:
}
int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId);
tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId);
if (tmq) (void)tsem2_post(&tmq->rspSem);
if (pMsg) taosMemoryFreeClear(pMsg->pData);
@ -2817,7 +2833,10 @@ end:
}
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (param == NULL) return code;
if (param == NULL) {
goto FAIL;
}
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {