fix:[TD-30725] fix potential memory leaks

This commit is contained in:
wangmm0220 2024-07-01 10:49:24 +08:00
parent ce4ecb0371
commit 304460f452
5 changed files with 40 additions and 38 deletions

View File

@ -919,6 +919,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012) #define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) #define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
// stream // stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -1540,6 +1540,9 @@ int32_t doProcessMsgFromServer(void* param) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
if (pMsg->info.ahandle == NULL) { if (pMsg->info.ahandle == NULL) {
tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL"); tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont);
taosMemoryFree(arg);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
STscObj* pTscObj = NULL; STscObj* pTscObj = NULL;
@ -1557,6 +1560,12 @@ int32_t doProcessMsgFromServer(void* param) {
if (pRequest->self != pSendInfo->requestObjRefId) { if (pRequest->self != pSendInfo->requestObjRefId) {
tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64, tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
pRequest->self, pSendInfo->requestObjRefId); pRequest->self, pSendInfo->requestObjRefId);
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
pTscObj = pRequest->pTscObj; pTscObj = pRequest->pTscObj;

View File

@ -258,7 +258,7 @@ static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
SMqCommitCbParamSet* pParamSet); SMqCommitCbParamSet* pParamSet);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset); static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset);
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -1364,11 +1364,15 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) {
} }
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL;
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pParam == NULL || pMsg == NULL) {
goto FAIL;
}
int64_t refId = pParam->refId; int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId; int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId; uint64_t requestId = pParam->requestId;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED; code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto FAIL; goto FAIL;
@ -1479,7 +1483,6 @@ END:
FAIL: FAIL:
if (tmq) tsem2_post(&tmq->rspSem); if (tmq) tsem2_post(&tmq->rspSem);
taosMemoryFree(pParam);
if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
@ -1788,26 +1791,26 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) { if (msgSize < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto FAIL; return code;
} }
msg = taosMemoryCalloc(1, msgSize); msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; return code;
} }
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
goto FAIL; return code;
} }
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) { if (pParam == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
goto FAIL; return code;
} }
pParam->refId = pTmq->refId; pParam->refId = pTmq->refId;
@ -1819,13 +1822,14 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
if (sendInfo == NULL) { if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
goto FAIL; return code;
} }
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = req.reqId; sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
@ -1836,7 +1840,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) { if (code != 0) {
goto FAIL; return code;
} }
pVg->pollCnt++; pVg->pollCnt++;
@ -1844,8 +1848,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pTmq->pollCnt++; pTmq->pollCnt++;
return 0; return 0;
FAIL:
return tmqPollCb(pParam, NULL, code);
} }
// broadcast the poll request to all related vnodes // broadcast the poll request to all related vnodes
@ -2628,7 +2630,6 @@ FAIL:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
} }
taosMemoryFree(pParam);
return code; return code;
} }
@ -2637,16 +2638,18 @@ int32_t syncAskEp(tmq_t* pTmq) {
if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY;
tsem_init(&pInfo->sem, 0, 0); tsem_init(&pInfo->sem, 0, 0);
askEp(pTmq, pInfo, true, false); int32_t code = askEp(pTmq, pInfo, true, false);
if(code == 0){
tsem_wait(&pInfo->sem); tsem_wait(&pInfo->sem);
code = pInfo->code;
}
int32_t code = pInfo->code;
tsem_destroy(&pInfo->sem); tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
} }
void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMqAskEpReq req = {0}; SMqAskEpReq req = {0};
req.consumerId = pTmq->consumerId; req.consumerId = pTmq->consumerId;
req.epoch = updateEpSet ? -1 : pTmq->epoch; req.epoch = updateEpSet ? -1 : pTmq->epoch;
@ -2658,30 +2661,26 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) { if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
code = TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
goto FAIL;
} }
pReq = taosMemoryCalloc(1, tlen); pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) { if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
code = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
} }
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
code = TSDB_CODE_INVALID_PARA;
taosMemoryFree(pReq); taosMemoryFree(pReq);
goto FAIL; return TSDB_CODE_INVALID_PARA;
} }
pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) { if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pReq); taosMemoryFree(pReq);
goto FAIL; return TSDB_CODE_OUT_OF_MEMORY;
} }
pParam->refId = pTmq->refId; pParam->refId = pTmq->refId;
@ -2690,30 +2689,22 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pReq); taosMemoryFree(pReq);
goto FAIL; taosMemoryFree(pParam);
return TSDB_CODE_OUT_OF_MEMORY;
} }
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
sendInfo->requestId = generateRequestId(); sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->fp = askEpCb; sendInfo->fp = askEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
int64_t transporterId = 0;
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code == 0) {
return;
}
FAIL:
askEpCb(pParam, NULL, code);
} }
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {

View File

@ -1196,7 +1196,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id); qError("no table in table list, %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR; terrno = TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
return -1; return -1;
} }
} }
@ -1217,7 +1217,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id); numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR; terrno = TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
return -1; return -1;
} }

View File

@ -766,6 +766,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of ra
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
// stream // stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")