diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 60fd32f1a5..e674f2b78c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,6 +20,8 @@ #include "tcommon.h" #include "tref.h" +#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); + typedef struct SDbChkp { int8_t init; char* pCurrent; @@ -106,7 +108,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rock rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx); -const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); typedef int (*DecodeFunc)(void* key, char* buf); @@ -131,7 +132,18 @@ typedef struct { } SCfInit; -#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); +typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); +typedef const char* (*FactoryNameFunc)(void* arg); +typedef void(*DestroyFactoryFunc)(void *arg); + +void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg); +typedef struct { + void *funcName; + DestroyFactoryFunc destroy; + CreateFactoryFunc create; + FactoryNameFunc factoryName; +} SCfFilterFactory; + const char* compareDefaultName(void* name); const char* compareStateName(void* name); const char* compareWinKeyName(void* name); @@ -180,21 +192,36 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); -void dbChkpDestroy(SDbChkp* pChkp) { - taosMemoryFree(pChkp->buf); - taosMemoryFree(pChkp->path); +static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); - taosArrayDestroyP(pChkp->pSST, taosMemoryFree); - taosArrayDestroyP(pChkp->pAdd, taosMemoryFree); - taosArrayDestroyP(pChkp->pDel, taosMemoryFree); +SCfInit ginitDict[] = { + {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, + destroyFunc, encodeValueFunc, decodeValueFunc}, + {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, + compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, + {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, +}; - taosHashCleanup(pChkp->pSstTbl[0]); - taosHashCleanup(pChkp->pSstTbl[1]); +SCfFilterFactory ginitFilterDict[] = { + {"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, + {"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState}, + {"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill}, + {"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess}, + {"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc}, + {"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, + {"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, +}; - taosMemoryFree(pChkp->pCurrent); - taosMemoryFree(pChkp->pManifest); - -} +const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { @@ -335,6 +362,22 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { return p; } +void dbChkpDestroy(SDbChkp* pChkp) { + taosMemoryFree(pChkp->buf); + taosMemoryFree(pChkp->path); + + taosArrayDestroyP(pChkp->pSST, taosMemoryFree); + taosArrayDestroyP(pChkp->pAdd, taosMemoryFree); + taosArrayDestroyP(pChkp->pDel, taosMemoryFree); + + taosHashCleanup(pChkp->pSstTbl[0]); + taosHashCleanup(pChkp->pSstTbl[1]); + + taosMemoryFree(pChkp->pCurrent); + taosMemoryFree(pChkp->pManifest); + +} + int32_t dbChkpInit(SDbChkp* p) { if (p == NULL) return 0; return 0; @@ -488,48 +531,6 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) { } -SCfInit ginitDict[] = { - {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, - destroyFunc, encodeValueFunc, decodeValueFunc}, - {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, - compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, - {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, -}; - - -typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); -typedef const char* (*FactoryNameFunc)(void* arg); -typedef void(*DestroyFactoryFunc)(void *arg); - -typedef struct { - void *funcName; - - DestroyFactoryFunc destroy; - CreateFactoryFunc create; - FactoryNameFunc factoryName; -} SCfFilterFactory; - -SCfFilterFactory ginitFilterDict[] = { - {"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, - {"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState}, - {"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill}, - {"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess}, - {"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc}, - {"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, - {"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, -}; - - - void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) { rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName); rocksdb_options_set_compaction_filter_factory(opt, filterFactory); @@ -1309,7 +1310,28 @@ void streamBackendDelCompare(void* backend, void* arg) { } } void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } -static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); +void destroyRocksdbCfInst(RocksdbCfInst* inst) { + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + if (inst->pHandle) { + for (int i = 0; i < cfLen; i++) { + if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]); + } + taosMemoryFree(inst->pHandle); + } + + if (inst->cfOpt) { + for (int i = 0; i < cfLen; i++) { + rocksdb_options_destroy(inst->cfOpt[i]); + rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt); + } + taosMemoryFreeClear(inst->cfOpt); + taosMemoryFreeClear(inst->param); + } + if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt); + if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt); + + taosMemoryFree(inst); +} // |key|-----value------| // |key|ttl|len|userData| @@ -1736,6 +1758,7 @@ const char* compactFilteFactoryNameFunc(void* arg) { return "stream_compact_filter_func"; } + void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { @@ -1807,28 +1830,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocks return filter; } -void destroyRocksdbCfInst(RocksdbCfInst* inst) { - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - if (inst->pHandle) { - for (int i = 0; i < cfLen; i++) { - if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]); - } - taosMemoryFree(inst->pHandle); - } - if (inst->cfOpt) { - for (int i = 0; i < cfLen; i++) { - rocksdb_options_destroy(inst->cfOpt[i]); - rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt); - } - taosMemoryFreeClear(inst->cfOpt); - taosMemoryFreeClear(inst->param); - } - if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt); - if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt); - - taosMemoryFree(inst); -} int32_t getCfIdx(const char* cfName) { int idx = -1;