fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2024-07-01 15:34:04 +08:00
parent 3ae8916384
commit 6d64d0e081
3 changed files with 20 additions and 1 deletions

View File

@ -220,6 +220,7 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaHbToMnode(void* param, void* tmrId);
SMetaHbInfo* createMetaHbInfo(int64_t* pRid);
void* destroyMetaHbInfo(SMetaHbInfo* pInfo);
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);

View File

@ -279,6 +279,21 @@ SMetaHbInfo* createMetaHbInfo(int64_t* pRid) {
return pInfo;
}
void* destroyMetaHbInfo(SMetaHbInfo* pInfo) {
if (pInfo != NULL) {
tCleanupStreamHbMsg(&pInfo->hbMsg);
if (pInfo->hbTmr != NULL) {
taosTmrStop(pInfo->hbTmr);
pInfo->hbTmr = NULL;
}
taosMemoryFree(pInfo);
}
return NULL;
}
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
@ -315,6 +330,8 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
pInfo->hbCount += 1;
pInfo->msgSendTs = -1;
tCleanupStreamHbMsg(&pInfo->hbMsg);
} else {
stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
}

View File

@ -515,7 +515,8 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
taosMemoryFree(pMeta->pHbInfo);
pMeta->pHbInfo = destroyMetaHbInfo(pMeta->pHbInfo);
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);