diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ce4feb38eb..8712902326 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -993,7 +993,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, taosGetTimestampMs() - st); } - } else { + } else { stError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } // release all ref to cfWrapper; @@ -1467,16 +1467,24 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c void destroyRocksdbCfInst(RocksdbCfInst* inst) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - for (int i = 0; i < cfLen; i++) { - if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]); + if (inst->pHandle) { + for (int i = 0; i < cfLen; i++) { + if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]); + } + taosMemoryFree(inst->pHandle); } - rocksdb_writeoptions_destroy(inst->wOpt); - inst->wOpt = NULL; + if (inst->cfOpt) { + for (int i = 0; i < cfLen; i++) { + rocksdb_options_destroy(inst->cfOpt[i]); + rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt); + } + taosMemoryFreeClear(inst->cfOpt); + taosMemoryFreeClear(inst->param); + } + if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt); + if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt); - rocksdb_readoptions_destroy(inst->rOpt); - taosMemoryFree(inst->cfOpt); - taosMemoryFreeClear(inst->param); taosMemoryFree(inst); } @@ -1645,6 +1653,13 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pState->pTdbState->backendCfWrapperId = id; pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper; stInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr); + + inst->pHandle = NULL; + inst->cfOpt = NULL; + inst->param = NULL; + + inst->wOpt = NULL; + inst->rOpt = NULL; return 0; } taosThreadMutexUnlock(&handle->cfMutex);