From 304460f45236b7ad4f1d70142f98ba0b9d604cbe Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 1 Jul 2024 10:49:24 +0800 Subject: [PATCH] fix:[TD-30725] fix potential memory leaks --- include/util/taoserror.h | 1 + source/client/src/clientImpl.c | 9 +++++ source/client/src/clientTmq.c | 63 +++++++++++++---------------- source/libs/executor/src/executor.c | 4 +- source/util/src/terror.c | 1 + 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2de336d036..7ac22ac40f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -919,6 +919,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012) #define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) +#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 11d3797157..7034778887 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1540,6 +1540,9 @@ int32_t doProcessMsgFromServer(void* param) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; if (pMsg->info.ahandle == NULL) { tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL"); + taosMemoryFree(arg->pEpset); + rpcFreeCont(pMsg->pCont); + taosMemoryFree(arg); return TSDB_CODE_TSC_INTERNAL_ERROR; } STscObj* pTscObj = NULL; @@ -1557,6 +1560,12 @@ int32_t doProcessMsgFromServer(void* param) { if (pRequest->self != pSendInfo->requestObjRefId) { tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64, pRequest->self, pSendInfo->requestObjRefId); + + taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); + taosMemoryFree(arg->pEpset); + rpcFreeCont(pMsg->pCont); + destroySendMsgInfo(pSendInfo); + taosMemoryFree(arg); return TSDB_CODE_TSC_INTERNAL_ERROR; } pTscObj = pRequest->pTscObj; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index becb8285b6..21d1a528da 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -258,7 +258,7 @@ static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); -static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset); +static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -1364,11 +1364,15 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) { } int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { + tmq_t* tmq = NULL; SMqPollCbParam* pParam = (SMqPollCbParam*)param; + if (pParam == NULL || pMsg == NULL) { + goto FAIL; + } int64_t refId = pParam->refId; int32_t vgId = pParam->vgId; uint64_t requestId = pParam->requestId; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { code = TSDB_CODE_TMQ_CONSUMER_CLOSED; goto FAIL; @@ -1479,7 +1483,6 @@ END: FAIL: if (tmq) tsem2_post(&tmq->rspSem); - taosMemoryFree(pParam); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); @@ -1788,26 +1791,26 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { code = TSDB_CODE_INVALID_MSG; - goto FAIL; + return code; } msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + return code; } if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { code = TSDB_CODE_INVALID_MSG; taosMemoryFreeClear(msg); - goto FAIL; + return code; } pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(msg); - goto FAIL; + return code; } pParam->refId = pTmq->refId; @@ -1819,13 +1822,14 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(msg); - goto FAIL; + return code; } sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; + sendInfo->paramFreeFp = taosMemoryFree; sendInfo->fp = tmqPollCb; sendInfo->msgType = TDMT_VND_TMQ_CONSUME; @@ -1836,7 +1840,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if (code != 0) { - goto FAIL; + return code; } pVg->pollCnt++; @@ -1844,8 +1848,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pTmq->pollCnt++; return 0; -FAIL: - return tmqPollCb(pParam, NULL, code); } // broadcast the poll request to all related vnodes @@ -2628,7 +2630,6 @@ FAIL: taosMemoryFree(pMsg->pData); } - taosMemoryFree(pParam); return code; } @@ -2637,16 +2638,18 @@ int32_t syncAskEp(tmq_t* pTmq) { if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; tsem_init(&pInfo->sem, 0, 0); - askEp(pTmq, pInfo, true, false); - tsem_wait(&pInfo->sem); + int32_t code = askEp(pTmq, pInfo, true, false); + if(code == 0){ + tsem_wait(&pInfo->sem); + code = pInfo->code; + } - int32_t code = pInfo->code; tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); return code; } -void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { +int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; req.epoch = updateEpSet ? -1 : pTmq->epoch; @@ -2658,30 +2661,26 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); - code = TSDB_CODE_INVALID_PARA; - goto FAIL; + return TSDB_CODE_INVALID_PARA; } pReq = taosMemoryCalloc(1, tlen); if (pReq == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); - code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + return TSDB_CODE_OUT_OF_MEMORY; } if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); - code = TSDB_CODE_INVALID_PARA; taosMemoryFree(pReq); - goto FAIL; + return TSDB_CODE_INVALID_PARA; } pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); - code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pReq); - goto FAIL; + return TSDB_CODE_OUT_OF_MEMORY; } pParam->refId = pTmq->refId; @@ -2690,30 +2689,22 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pReq); - goto FAIL; + taosMemoryFree(pParam); + return TSDB_CODE_OUT_OF_MEMORY; } sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; - sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; + sendInfo->paramFreeFp = taosMemoryFree; sendInfo->fp = askEpCb; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); - - int64_t transporterId = 0; - code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - if (code == 0) { - return; - } - -FAIL: - askEpCb(pParam, NULL, code); + return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); } int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 77a80d229e..c0d089e7c2 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1196,7 +1196,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else { taosRUnLockLatch(&pTaskInfo->lock); qError("no table in table list, %s", id); - terrno = TSDB_CODE_PAR_INTERNAL_ERROR; + terrno = TSDB_CODE_TMQ_NO_TABLE_QUALIFIED; return -1; } } @@ -1217,7 +1217,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, numOfTables, pScanInfo->currentTable, id); - terrno = TSDB_CODE_PAR_INTERNAL_ERROR; + terrno = TSDB_CODE_TMQ_NO_TABLE_QUALIFIED; return -1; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c5bba6fa53..e9fa58e6e3 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -766,6 +766,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of ra TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")