diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0bc4214c09..7bc6a2f2e6 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -858,7 +858,10 @@ void tmqSendHbReq(void* param, void* tmrId) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if (code != 0) { + tscError("tmqSendHbReq asyncSendMsgToServer failed"); + } OVER: tDestroySMqHbReq(&req); @@ -1220,12 +1223,15 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(buf); goto FAIL; } SMqSubscribeCbParam param = {.rspErr = 0}; if (tsem_init(¶m.rspSem, 0, 0) != 0) { code = TSDB_CODE_TSC_INTERNAL_ERROR; + taosMemoryFree(buf); + taosMemoryFree(sendInfo); goto FAIL; } @@ -1245,10 +1251,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - // avoid double free if msg is sent - buf = NULL; - sendInfo = NULL; - tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); @@ -1284,8 +1286,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); - taosMemoryFree(buf); - taosMemoryFree(sendInfo); return code; } @@ -1745,6 +1745,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(msg); goto FAIL; } @@ -1756,6 +1757,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(msg); goto FAIL; } @@ -1782,7 +1784,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p return 0; FAIL: - taosMemoryFreeClear(msg); return tmqPollCb(pParam, NULL, code); } @@ -2560,6 +2561,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); code = TSDB_CODE_INVALID_PARA; + taosMemoryFree(pReq); goto FAIL; } @@ -2567,6 +2569,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { if (pParam == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pReq); goto FAIL; } @@ -2577,6 +2580,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pReq); goto FAIL; } @@ -2598,8 +2602,6 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { } FAIL: - taosMemoryFreeClear(pParam); - taosMemoryFreeClear(pReq); askEpCb(pParam, NULL, code); } @@ -2706,7 +2708,6 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); return 0; } @@ -3066,6 +3067,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; + sendInfo->paramFreeFp = taosMemoryFree; sendInfo->fp = tmqGetWalInfoCb; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; @@ -3077,8 +3079,6 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); if (code != 0) { - taosMemoryFree(pParam); - taosMemoryFree(msg); goto end; } }