Merge branch 'enh/new3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-06 17:56:09 +08:00
parent feeead50c5
commit ba52769867
1 changed files with 120 additions and 3 deletions

View File

@ -93,10 +93,18 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst);
void destroyCompactFilteFactory(void* arg); void destroyCompactFilteFactory(void* arg);
void destroyCompactFilte(void* arg); void destroyCompactFilte(void* arg);
const char* compactFilteFactoryName(void* arg); const char* compactFilteFactoryName(void* arg);
const char* compactFilteFactoryNameSess(void* arg);
const char* compactFilteFactoryNameState(void* arg);
const char* compactFilteFactoryNameFunc(void* arg);
const char* compactFilteFactoryNameFill(void* arg);
const char* compactFilteName(void* arg); const char* compactFilteName(void* arg);
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed); char** newval, size_t* newvlen, unsigned char* value_changed);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
@ -303,7 +311,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) {
return 0; return 0;
} }
SDbChkp* dbChktCreate(char* path, int64_t initChkpId) { SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp)); SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp));
p->curChkpId = initChkpId; p->curChkpId = initChkpId;
p->preCkptId = -1; p->preCkptId = -1;
@ -454,7 +462,7 @@ int32_t bkdMgtAddChkp(SBackendManager *bm, char *task, char *path) {
taosThreadRwlockWrlock(&bm->rwLock); taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) { if (pp == NULL) {
SDbChkp *p = dbChktCreate(path, 0); SDbChkp *p = dbChkpCreate(path, 0);
if (p != NULL) { if (p != NULL) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *)); taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *));
code = 0; code = 0;
@ -497,6 +505,40 @@ SCfInit ginitDict[] = {
encodeValueFunc, decodeValueFunc}, encodeValueFunc, decodeValueFunc},
}; };
typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
typedef const char* (*FactoryNameFunc)(void* arg);
typedef void(*DestroyFactoryFunc)(void *arg);
typedef struct {
void *funcName;
DestroyFactoryFunc destroy;
CreateFactoryFunc create;
FactoryNameFunc factoryName;
} SCfFilterFactory;
SCfFilterFactory ginitFilterDict[] = {
{"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
{"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState},
{"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill},
{"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess},
{"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc},
{"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
{"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
};
// pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create(
// NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
// rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) {
rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName);
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
}
bool isValidCheckpoint(const char* dir) { return true; } bool isValidCheckpoint(const char* dir) { return true; }
int32_t copyFiles(const char* src, const char* dst) { int32_t copyFiles(const char* src, const char* dst) {
@ -1680,6 +1722,22 @@ const char* compactFilteFactoryName(void* arg) {
SCompactFilteFactory* state = arg; SCompactFilteFactory* state = arg;
return "stream_compact_filter"; return "stream_compact_filter";
} }
const char* compactFilteFactoryNameSess(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_sess";
}
const char* compactFilteFactoryNameState(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_state";
}
const char* compactFilteFactoryNameFill(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_fill";
}
const char* compactFilteFactoryNameFunc(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_func";
}
void destroyCompactFilte(void* arg) { (void)arg; } void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
@ -1688,12 +1746,69 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
} }
const char* compactFilteName(void* arg) { return "stream_filte"; } const char* compactFilteName(void* arg) { return "stream_filte"; }
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg; SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter = rocksdb_compactionfilter_t* filter =
rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName); rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName);
return filter; return filter;
} }
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter =
rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteSess, compactFilteNameSess);
return filter;
}
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter =
rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteState, compactFilteNameState);
return filter;
}
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter =
rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFill, compactFilteNameFill);
return filter;
}
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter =
rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFunc, compactFilteNameFunc);
return filter;
}
void destroyRocksdbCfInst(RocksdbCfInst* inst) { void destroyRocksdbCfInst(RocksdbCfInst* inst) {
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
@ -1820,10 +1935,12 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_comparator_t* compare = rocksdb_comparator_t* compare =
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
pTaskDb->pCompares[i] = compare; pTaskDb->pCompares[i] = compare;
pTaskDb->pCfOpts[i] = opt; pTaskDb->pCfOpts[i] = opt;
pTaskDb->pCfParams[i].tableOpt = tableOpt; pTaskDb->pCfParams[i].tableOpt = tableOpt;
// set filter factory
dbSetFilterFactory(opt, i, NULL);
} }
return; return;
} }