refactor backend

This commit is contained in:
yihaoDeng 2023-11-07 14:46:52 +08:00
parent e5449bf161
commit e015134d6d
1 changed files with 7 additions and 16 deletions

View File

@ -20,9 +20,6 @@
#include "tcommon.h"
#include "tref.h"
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t copyFiles(const char* src, const char* dst);
typedef struct SDbChkp {
int8_t init;
char* pCurrent;
@ -87,8 +84,6 @@ typedef struct {
rocksdb_comparator_t** pCompares;
} RocksdbCfInst;
uint32_t nextPow2(uint32_t x);
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
void destroyRocksdbCfInst(RocksdbCfInst* inst);
@ -147,13 +142,6 @@ typedef struct {
} SCfInit;
typedef struct {
void* funcName;
DestroyFactoryFunc destroy;
CreateFactoryFunc create;
FactoryNameFunc factoryName;
} SCfFilterFactory;
const char* compareDefaultName(void* name);
const char* compareStateName(void* name);
const char* compareWinKeyName(void* name);
@ -205,6 +193,10 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter,
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t copyFiles(const char* src, const char* dst);
uint32_t nextPow2(uint32_t x);
SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
@ -1709,7 +1701,7 @@ void destroyCompactFilteFactory(void* arg) {
}
const char* compactFilteFactoryName(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_factory_filter";
return "stream_compact_factory_filter_default";
}
const char* compactFilteFactoryNameSess(void* arg) {
SCompactFilteFactory* state = arg;
@ -1733,7 +1725,7 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
char** newval, size_t* newvlen, unsigned char* value_changed) {
return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteName(void* arg) { return "stream_filte"; }
const char* compactFilteName(void* arg) { return "stream_filte_default"; }
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
@ -1837,7 +1829,6 @@ void taskDbRemoveRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
taosReleaseRef(taskDbWrapperId, pBackend->refId);
}
// void taskDbDestroy(STaskDbWrapper* wrapper);
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_env_t* env = rocksdb_create_default_env();
@ -1899,7 +1890,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
}
void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
pTaskDb->chkpId = -1;
pTaskDb->chkpCap = 4;
pTaskDb->chkpCap = 2;
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));