fix mem leak

This commit is contained in:
Liu Jicong 2022-01-28 18:39:34 +08:00
parent 688a85c9c3
commit 108fd4cfc3
3 changed files with 18 additions and 1 deletions

View File

@ -565,6 +565,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) {
goto END;
tscError("failed to malloc get subscribe ep buf");
}
buf->consumerId = htobe64(tmq->consumerId);
@ -572,6 +573,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) {
goto END;
tscError("failed to malloc subscribe ep request");
}
@ -579,7 +581,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
free(buf);
goto END;
}
pParam->tmq = tmq;
@ -596,6 +597,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
END:
tfree(buf);
if (wait) tsem_wait(&tmq->rspSem);
return 0;
}

View File

@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
return buf;
}
static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
if (pConsumerEp) {
tfree(pConsumerEp->qmsg);
}
}
// unit for rebalance
typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN];
@ -571,6 +577,14 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return buf;
}
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->availConsumer) taosArrayDestroy(pSub->availConsumer);
if (pSub->assigned) taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
if (pSub->unassignedVg) taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
if (pSub->idleConsumer) taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
if (pSub->lostConsumer) taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
}
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal

View File

@ -449,6 +449,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
mTrace("subscribe:%s, perform delete action", pSub->key);
tDeleteSMqSubscribeObj(pSub);
return 0;
}