From ee6b620597fea11be432af0deb3feb84332db718 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 30 Oct 2023 17:29:32 +0800 Subject: [PATCH] fix mem leak --- source/libs/stream/src/streamBackendRocksdb.c | 16 +++++++++++++++- source/libs/stream/src/streamTask.c | 2 ++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f9b77b4e9a..35b7b0c233 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1819,6 +1819,9 @@ _EXIT: void taskDbDestroy(void* pBackend) { STaskDbWrapper* wrapper = pBackend; + qDebug("succ to destroy stream backend:%p", wrapper); + + int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); if (wrapper == NULL) return; @@ -1826,7 +1829,6 @@ void taskDbDestroy(void* pBackend) { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_set_wait(flushOpt, 1); - int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); char* err = NULL; for (int i = 0; i < nCf; i++) { if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); @@ -1849,7 +1851,19 @@ void taskDbDestroy(void* pBackend) { rocksdb_env_destroy(wrapper->env); rocksdb_cache_destroy(wrapper->cache); + taosMemoryFree(wrapper->pCf); + + for (int i = 0; i < nCf; i++) { + rocksdb_options_t *opt = wrapper->pCfOpts[i]; + rocksdb_comparator_t *compare = wrapper->pCompares[i]; + rocksdb_block_based_table_options_t *tblOpt = wrapper->pCfParams[i].tableOpt; + + rocksdb_options_destroy(opt); + rocksdb_comparator_destroy(compare); + rocksdb_block_based_options_destroy(tblOpt); + + } taosMemoryFree(wrapper->pCompares); taosMemoryFree(wrapper->pCfOpts); taosMemoryFree(wrapper->pCfParams); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6d7a03bc6c..d4b9b39fa9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -368,6 +368,8 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->pState) { stDebug("s-task:0x%x start to free task state", taskId); streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + taskDbRemoveRef(pTask->pBackend); + } if (pTask->id.idStr != NULL) {