diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 793575f59a..e707803d9d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -747,7 +747,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t void** pIter = taosHashIterate(handle->cfInst, NULL); while (*pIter) { RocksdbCfInst* inst = *pIter; - SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; + + for (int i = 0; i < cfLen; i++) { + if (inst->cfOpt[i] == NULL) { + rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt); + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt); + + SCfInit* cfPara = &ginitDict[i]; + + rocksdb_comparator_t* compare = + rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); + + inst->pCompares[i] = compare; + inst->cfOpt[i] = opt; + inst->param[i].tableOpt = tableOpt; + } + } + SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; inst->pCompareNode = streamBackendAddCompare(handle, &compare); pIter = taosHashIterate(handle->cfInst, pIter); } @@ -897,7 +920,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; for (int i = 0; i < comp->numOfComp; i++) { - rocksdb_comparator_destroy(comp->comp[i]); + if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]); } taosMemoryFree(comp->comp); } @@ -920,6 +943,8 @@ int streamGetInit(SStreamState* pState, const char* funcName) { char buf[128] = {0}; GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key); char* err = NULL; + + taosThreadRwlockWrlock(&pState->pTdbState->rwLock); cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err); if (err != NULL) { idx = -1; @@ -927,7 +952,6 @@ int streamGetInit(SStreamState* pState, const char* funcName) { funcName, err); taosMemoryFree(err); } - taosThreadRwlockWrlock(&pState->pTdbState->rwLock); pState->pTdbState->pHandle[idx] = cf; taosThreadRwlockUnlock(&pState->pTdbState->rwLock); }