From ba52769867133e60984e745cbb8e287717a9ff0a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 6 Nov 2023 17:56:09 +0800 Subject: [PATCH] Merge branch 'enh/new3.0' into enh/refactorBackend --- source/libs/stream/src/streamBackendRocksdb.c | 123 +++++++++++++++++- 1 file changed, 120 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 797329d52b..efb9bb599c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -93,10 +93,18 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst); void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); const char* compactFilteFactoryName(void* arg); +const char* compactFilteFactoryNameSess(void* arg); +const char* compactFilteFactoryNameState(void* arg); +const char* compactFilteFactoryNameFunc(void* arg); +const char* compactFilteFactoryNameFill(void* arg); const char* compactFilteName(void* arg); unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx); +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx); +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx); +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx); const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; @@ -303,7 +311,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { return 0; } -SDbChkp* dbChktCreate(char* path, int64_t initChkpId) { +SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp)); p->curChkpId = initChkpId; p->preCkptId = -1; @@ -454,7 +462,7 @@ int32_t bkdMgtAddChkp(SBackendManager *bm, char *task, char *path) { taosThreadRwlockWrlock(&bm->rwLock); SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); if (pp == NULL) { - SDbChkp *p = dbChktCreate(path, 0); + SDbChkp *p = dbChkpCreate(path, 0); if (p != NULL) { taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *)); code = 0; @@ -497,6 +505,40 @@ SCfInit ginitDict[] = { encodeValueFunc, decodeValueFunc}, }; + +typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); +typedef const char* (*FactoryNameFunc)(void* arg); +typedef void(*DestroyFactoryFunc)(void *arg); + +typedef struct { + void *funcName; + + DestroyFactoryFunc destroy; + CreateFactoryFunc create; + FactoryNameFunc factoryName; +} SCfFilterFactory; + +SCfFilterFactory ginitFilterDict[] = { + {"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, + {"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState}, + {"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill}, + {"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess}, + {"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc}, + {"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, + {"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, +}; + + + +// pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create( +// NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); +// rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory); +void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) { + rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName); + rocksdb_options_set_compaction_filter_factory(opt, filterFactory); + +} + bool isValidCheckpoint(const char* dir) { return true; } int32_t copyFiles(const char* src, const char* dst) { @@ -1680,6 +1722,22 @@ const char* compactFilteFactoryName(void* arg) { SCompactFilteFactory* state = arg; return "stream_compact_filter"; } +const char* compactFilteFactoryNameSess(void* arg) { + SCompactFilteFactory* state = arg; + return "stream_compact_filter_sess"; +} +const char* compactFilteFactoryNameState(void* arg) { + SCompactFilteFactory* state = arg; + return "stream_compact_filter_state"; +} +const char* compactFilteFactoryNameFill(void* arg) { + SCompactFilteFactory* state = arg; + return "stream_compact_filter_fill"; +} +const char* compactFilteFactoryNameFunc(void* arg) { + SCompactFilteFactory* state = arg; + return "stream_compact_filter_func"; +} void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, @@ -1688,12 +1746,69 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c } const char* compactFilteName(void* arg) { return "stream_filte"; } +unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed) { + // not impl yet + return 0; + //return streamStateValueIsStale((char*)val) ? 1 : 0; +} +const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; } + +unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed) { + // not impl yet + return 0; + //return streamStateValueIsStale((char*)val) ? 1 : 0; +} +const char* compactFilteNameState(void* arg) { return "stream_filte_state"; } + +unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed) { + // not impl yet + return 0; + //return streamStateValueIsStale((char*)val) ? 1 : 0; +} +const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; } + +unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed) { + // not impl yet + return 0; + //return streamStateValueIsStale((char*)val) ? 1 : 0; +} +const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; } + + rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { SCompactFilteFactory* state = arg; rocksdb_compactionfilter_t* filter = rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName); return filter; } +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx) { + SCompactFilteFactory* state = arg; + rocksdb_compactionfilter_t* filter = + rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteSess, compactFilteNameSess); + return filter; +} +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx) { + SCompactFilteFactory* state = arg; + rocksdb_compactionfilter_t* filter = + rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteState, compactFilteNameState); + return filter; +} +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx) { + SCompactFilteFactory* state = arg; + rocksdb_compactionfilter_t* filter = + rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFill, compactFilteNameFill); + return filter; +} +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx) { + SCompactFilteFactory* state = arg; + rocksdb_compactionfilter_t* filter = + rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFunc, compactFilteNameFunc); + return filter; +} void destroyRocksdbCfInst(RocksdbCfInst* inst) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -1820,10 +1935,12 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); - + pTaskDb->pCompares[i] = compare; pTaskDb->pCfOpts[i] = opt; pTaskDb->pCfParams[i].tableOpt = tableOpt; + // set filter factory + dbSetFilterFactory(opt, i, NULL); } return; }