Merge branch 'enh/3.0' into fix/shared_link_rocksdb
This commit is contained in:
parent
0be1858b4b
commit
26cbcf996c
|
@ -747,7 +747,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
void** pIter = taosHashIterate(handle->cfInst, NULL);
|
void** pIter = taosHashIterate(handle->cfInst, NULL);
|
||||||
while (*pIter) {
|
while (*pIter) {
|
||||||
RocksdbCfInst* inst = *pIter;
|
RocksdbCfInst* inst = *pIter;
|
||||||
SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
|
|
||||||
|
for (int i = 0; i < cfLen; i++) {
|
||||||
|
if (inst->cfOpt[i] == NULL) {
|
||||||
|
rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt);
|
||||||
|
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
|
||||||
|
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
|
||||||
|
|
||||||
|
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
|
||||||
|
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
|
||||||
|
|
||||||
|
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
|
||||||
|
|
||||||
|
SCfInit* cfPara = &ginitDict[i];
|
||||||
|
|
||||||
|
rocksdb_comparator_t* compare =
|
||||||
|
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
|
||||||
|
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
|
||||||
|
|
||||||
|
inst->pCompares[i] = compare;
|
||||||
|
inst->cfOpt[i] = opt;
|
||||||
|
inst->param[i].tableOpt = tableOpt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
|
||||||
inst->pCompareNode = streamBackendAddCompare(handle, &compare);
|
inst->pCompareNode = streamBackendAddCompare(handle, &compare);
|
||||||
pIter = taosHashIterate(handle->cfInst, pIter);
|
pIter = taosHashIterate(handle->cfInst, pIter);
|
||||||
}
|
}
|
||||||
|
@ -897,7 +920,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
void streamStateDestroyCompar(void* arg) {
|
void streamStateDestroyCompar(void* arg) {
|
||||||
SCfComparator* comp = (SCfComparator*)arg;
|
SCfComparator* comp = (SCfComparator*)arg;
|
||||||
for (int i = 0; i < comp->numOfComp; i++) {
|
for (int i = 0; i < comp->numOfComp; i++) {
|
||||||
rocksdb_comparator_destroy(comp->comp[i]);
|
if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
|
||||||
}
|
}
|
||||||
taosMemoryFree(comp->comp);
|
taosMemoryFree(comp->comp);
|
||||||
}
|
}
|
||||||
|
@ -920,6 +943,8 @@ int streamGetInit(SStreamState* pState, const char* funcName) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key);
|
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key);
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
|
||||||
|
taosThreadRwlockWrlock(&pState->pTdbState->rwLock);
|
||||||
cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err);
|
cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
idx = -1;
|
idx = -1;
|
||||||
|
@ -927,7 +952,6 @@ int streamGetInit(SStreamState* pState, const char* funcName) {
|
||||||
funcName, err);
|
funcName, err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
}
|
}
|
||||||
taosThreadRwlockWrlock(&pState->pTdbState->rwLock);
|
|
||||||
pState->pTdbState->pHandle[idx] = cf;
|
pState->pTdbState->pHandle[idx] = cf;
|
||||||
taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
|
taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue