diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 41b4f5145b..db428635a8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -152,6 +152,7 @@ void mndCleanupStream(SMnode *pMnode) { taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.pTransferStateStreams); taosHashCleanup(execInfo.pChkptStreams); + taosHashCleanup(execInfo.pStreamConsensus); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 06773c79e3..cf7852cddb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1893,8 +1893,12 @@ void* taskDbAddRef(void* pTaskDb) { STaskDbWrapper* pBackend = pTaskDb; return taosAcquireRef(taskDbWrapperId, pBackend->refId); } + void taskDbRemoveRef(void* pTaskDb) { - if (pTaskDb == NULL) return; + if (pTaskDb == NULL) { + return; + } + STaskDbWrapper* pBackend = pTaskDb; taosReleaseRef(taskDbWrapperId, pBackend->refId); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e6bb6bf90c..1decfe198a 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -273,6 +273,7 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x start to free task state", taskId); streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); taskDbRemoveRef(pTask->pBackend); + pTask->pBackend = NULL; } if (pTask->pNameMap) {