fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2024-07-01 17:43:10 +08:00
parent c166c8de70
commit 2c86e3940f
3 changed files with 7 additions and 1 deletions

View File

@ -152,6 +152,7 @@ void mndCleanupStream(SMnode *pMnode) {
taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.pTransferStateStreams); taosHashCleanup(execInfo.pTransferStateStreams);
taosHashCleanup(execInfo.pChkptStreams); taosHashCleanup(execInfo.pChkptStreams);
taosHashCleanup(execInfo.pStreamConsensus);
taosThreadMutexDestroy(&execInfo.lock); taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup"); mDebug("mnd stream exec info cleanup");
} }

View File

@ -1893,8 +1893,12 @@ void* taskDbAddRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb; STaskDbWrapper* pBackend = pTaskDb;
return taosAcquireRef(taskDbWrapperId, pBackend->refId); return taosAcquireRef(taskDbWrapperId, pBackend->refId);
} }
void taskDbRemoveRef(void* pTaskDb) { void taskDbRemoveRef(void* pTaskDb) {
if (pTaskDb == NULL) return; if (pTaskDb == NULL) {
return;
}
STaskDbWrapper* pBackend = pTaskDb; STaskDbWrapper* pBackend = pTaskDb;
taosReleaseRef(taskDbWrapperId, pBackend->refId); taosReleaseRef(taskDbWrapperId, pBackend->refId);
} }

View File

@ -273,6 +273,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x start to free task state", taskId); stDebug("s-task:0x%x start to free task state", taskId);
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
taskDbRemoveRef(pTask->pBackend); taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
} }
if (pTask->pNameMap) { if (pTask->pNameMap) {