fix mem leak
This commit is contained in:
parent
1a99fefa83
commit
9542b95d62
|
@ -438,6 +438,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
taosMemoryFree(pBuf->pData);
|
||||
taosMemoryFree(pBuf->pEpSet);
|
||||
|
||||
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
|
||||
* pOffset->version);*/
|
||||
|
@ -724,7 +725,10 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
|
||||
if (pMsg) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -869,6 +873,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
|
|||
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||
pParam->rspErr = code;
|
||||
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1365,6 +1371,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
taosMemoryFree(pParam);
|
||||
}
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
@ -1416,6 +1423,8 @@ END:
|
|||
} else {
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -456,6 +456,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
|
||||
if (head->isHbParam) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
||||
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
|
||||
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
|
||||
|
|
Loading…
Reference in New Issue