fix: memory leak
This commit is contained in:
parent
ccd3bb20a7
commit
76cd3122d1
|
@ -6027,9 +6027,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
|
|
||||||
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
||||||
taosArrayDestroy(pRsp->blockDataLen);
|
taosArrayDestroy(pRsp->blockDataLen);
|
||||||
|
pRsp->blockDataLen = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||||
|
pRsp->blockData = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
pRsp->blockSchema = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
||||||
|
pRsp->blockTbName = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
|
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
|
||||||
|
|
|
@ -471,8 +471,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
|
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
}
|
}
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
}
|
}
|
||||||
// 3.2 set new consumer
|
// 3.2 set new consumer
|
||||||
consumerNum = taosArrayGetSize(pOutput->newConsumers);
|
consumerNum = taosArrayGetSize(pOutput->newConsumers);
|
||||||
|
|
|
@ -68,6 +68,7 @@ static void destroySTqHandle(void* data) {
|
||||||
|
|
||||||
static void tqPushEntryFree(void* data) {
|
static void tqPushEntryFree(void* data) {
|
||||||
STqPushEntry* p = *(void**)data;
|
STqPushEntry* p = *(void**)data;
|
||||||
|
tDeleteSMqDataRsp(&p->dataRsp);
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -576,8 +577,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
|
||||||
#endif
|
#endif
|
||||||
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
Loading…
Reference in New Issue