diff --git a/source/client/src/clientTmq b/source/client/src/clientTmq new file mode 100644 index 0000000000..e69de29bb2 diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 350ecdd373..74f05056c6 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -438,6 +438,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); + taosMemoryFree(pBuf->pEpSet); /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, * pOffset->version);*/ @@ -724,7 +725,10 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData); + if (pMsg) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } return 0; } @@ -869,6 +873,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; + + taosMemoryFree(pMsg->pEpSet); tsem_post(&pParam->rspSem); return 0; } @@ -1365,6 +1371,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pParam); } taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; return -1; } @@ -1416,6 +1423,8 @@ END: } else { taosMemoryFree(pParam); } + + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); return code; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index c23b461b47..b6de9383d7 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -456,6 +456,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { if (head->isHbParam) { taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};