diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5edba5e180..cca1057662 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,9 +20,6 @@ #include "tcommon.h" #include "tref.h" -#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); - -int32_t copyFiles(const char* src, const char* dst); typedef struct SDbChkp { int8_t init; char* pCurrent; @@ -87,8 +84,6 @@ typedef struct { rocksdb_comparator_t** pCompares; } RocksdbCfInst; -uint32_t nextPow2(uint32_t x); - int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -147,13 +142,6 @@ typedef struct { } SCfInit; -typedef struct { - void* funcName; - DestroyFactoryFunc destroy; - CreateFactoryFunc create; - FactoryNameFunc factoryName; -} SCfFilterFactory; - const char* compareDefaultName(void* name); const char* compareStateName(void* name); const char* compareWinKeyName(void* name); @@ -205,6 +193,10 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); +#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); +int32_t copyFiles(const char* src, const char* dst); +uint32_t nextPow2(uint32_t x); + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, @@ -1709,7 +1701,7 @@ void destroyCompactFilteFactory(void* arg) { } const char* compactFilteFactoryName(void* arg) { SCompactFilteFactory* state = arg; - return "stream_compact_factory_filter"; + return "stream_compact_factory_filter_default"; } const char* compactFilteFactoryNameSess(void* arg) { SCompactFilteFactory* state = arg; @@ -1733,7 +1725,7 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c char** newval, size_t* newvlen, unsigned char* value_changed) { return streamStateValueIsStale((char*)val) ? 1 : 0; } -const char* compactFilteName(void* arg) { return "stream_filte"; } +const char* compactFilteName(void* arg) { return "stream_filte_default"; } const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; } const char* compactFilteNameState(void* arg) { return "stream_filte_state"; } const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; } @@ -1837,7 +1829,6 @@ void taskDbRemoveRef(void* pTaskDb) { STaskDbWrapper* pBackend = pTaskDb; taosReleaseRef(taskDbWrapperId, pBackend->refId); } -// void taskDbDestroy(STaskDbWrapper* wrapper); void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_env_t* env = rocksdb_create_default_env(); @@ -1899,7 +1890,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { } void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { pTaskDb->chkpId = -1; - pTaskDb->chkpCap = 4; + pTaskDb->chkpCap = 2; pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));