refactor checkpoint
This commit is contained in:
parent
aee1b700a6
commit
9ca1151b67
|
@ -20,6 +20,8 @@
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
|
||||||
|
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
|
||||||
|
|
||||||
typedef struct SDbChkp {
|
typedef struct SDbChkp {
|
||||||
int8_t init;
|
int8_t init;
|
||||||
char* pCurrent;
|
char* pCurrent;
|
||||||
|
@ -106,7 +108,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rock
|
||||||
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
||||||
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
||||||
|
|
||||||
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
|
||||||
|
|
||||||
typedef int (*EncodeFunc)(void* key, char* buf);
|
typedef int (*EncodeFunc)(void* key, char* buf);
|
||||||
typedef int (*DecodeFunc)(void* key, char* buf);
|
typedef int (*DecodeFunc)(void* key, char* buf);
|
||||||
|
@ -131,7 +132,18 @@ typedef struct {
|
||||||
|
|
||||||
} SCfInit;
|
} SCfInit;
|
||||||
|
|
||||||
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
|
typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
||||||
|
typedef const char* (*FactoryNameFunc)(void* arg);
|
||||||
|
typedef void(*DestroyFactoryFunc)(void *arg);
|
||||||
|
|
||||||
|
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg);
|
||||||
|
typedef struct {
|
||||||
|
void *funcName;
|
||||||
|
DestroyFactoryFunc destroy;
|
||||||
|
CreateFactoryFunc create;
|
||||||
|
FactoryNameFunc factoryName;
|
||||||
|
} SCfFilterFactory;
|
||||||
|
|
||||||
const char* compareDefaultName(void* name);
|
const char* compareDefaultName(void* name);
|
||||||
const char* compareStateName(void* name);
|
const char* compareStateName(void* name);
|
||||||
const char* compareWinKeyName(void* name);
|
const char* compareWinKeyName(void* name);
|
||||||
|
@ -180,21 +192,36 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
|
||||||
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
|
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
|
||||||
|
|
||||||
|
|
||||||
void dbChkpDestroy(SDbChkp* pChkp) {
|
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
||||||
taosMemoryFree(pChkp->buf);
|
|
||||||
taosMemoryFree(pChkp->path);
|
|
||||||
|
|
||||||
taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
|
SCfInit ginitDict[] = {
|
||||||
taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
|
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
|
||||||
taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
|
destroyFunc, encodeValueFunc, decodeValueFunc},
|
||||||
|
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
|
||||||
|
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
|
||||||
|
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
};
|
||||||
|
|
||||||
taosHashCleanup(pChkp->pSstTbl[0]);
|
SCfFilterFactory ginitFilterDict[] = {
|
||||||
taosHashCleanup(pChkp->pSstTbl[1]);
|
{"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
||||||
|
{"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState},
|
||||||
|
{"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill},
|
||||||
|
{"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess},
|
||||||
|
{"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc},
|
||||||
|
{"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
||||||
|
{"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
||||||
|
};
|
||||||
|
|
||||||
taosMemoryFree(pChkp->pCurrent);
|
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
||||||
taosMemoryFree(pChkp->pManifest);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
||||||
|
@ -335,6 +362,22 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dbChkpDestroy(SDbChkp* pChkp) {
|
||||||
|
taosMemoryFree(pChkp->buf);
|
||||||
|
taosMemoryFree(pChkp->path);
|
||||||
|
|
||||||
|
taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
|
||||||
|
taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
|
||||||
|
taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
|
||||||
|
|
||||||
|
taosHashCleanup(pChkp->pSstTbl[0]);
|
||||||
|
taosHashCleanup(pChkp->pSstTbl[1]);
|
||||||
|
|
||||||
|
taosMemoryFree(pChkp->pCurrent);
|
||||||
|
taosMemoryFree(pChkp->pManifest);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dbChkpInit(SDbChkp* p) {
|
int32_t dbChkpInit(SDbChkp* p) {
|
||||||
if (p == NULL) return 0;
|
if (p == NULL) return 0;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -488,48 +531,6 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SCfInit ginitDict[] = {
|
|
||||||
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
|
|
||||||
destroyFunc, encodeValueFunc, decodeValueFunc},
|
|
||||||
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
|
|
||||||
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
|
|
||||||
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
|
||||||
typedef const char* (*FactoryNameFunc)(void* arg);
|
|
||||||
typedef void(*DestroyFactoryFunc)(void *arg);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
void *funcName;
|
|
||||||
|
|
||||||
DestroyFactoryFunc destroy;
|
|
||||||
CreateFactoryFunc create;
|
|
||||||
FactoryNameFunc factoryName;
|
|
||||||
} SCfFilterFactory;
|
|
||||||
|
|
||||||
SCfFilterFactory ginitFilterDict[] = {
|
|
||||||
{"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
|
||||||
{"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState},
|
|
||||||
{"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill},
|
|
||||||
{"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess},
|
|
||||||
{"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc},
|
|
||||||
{"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
|
||||||
{"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) {
|
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) {
|
||||||
rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName);
|
rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName);
|
||||||
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
|
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
|
||||||
|
@ -1309,7 +1310,28 @@ void streamBackendDelCompare(void* backend, void* arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
|
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
|
||||||
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||||
|
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
|
if (inst->pHandle) {
|
||||||
|
for (int i = 0; i < cfLen; i++) {
|
||||||
|
if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
|
||||||
|
}
|
||||||
|
taosMemoryFree(inst->pHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inst->cfOpt) {
|
||||||
|
for (int i = 0; i < cfLen; i++) {
|
||||||
|
rocksdb_options_destroy(inst->cfOpt[i]);
|
||||||
|
rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt);
|
||||||
|
}
|
||||||
|
taosMemoryFreeClear(inst->cfOpt);
|
||||||
|
taosMemoryFreeClear(inst->param);
|
||||||
|
}
|
||||||
|
if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt);
|
||||||
|
if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt);
|
||||||
|
|
||||||
|
taosMemoryFree(inst);
|
||||||
|
}
|
||||||
|
|
||||||
// |key|-----value------|
|
// |key|-----value------|
|
||||||
// |key|ttl|len|userData|
|
// |key|ttl|len|userData|
|
||||||
|
@ -1736,6 +1758,7 @@ const char* compactFilteFactoryNameFunc(void* arg) {
|
||||||
return "stream_compact_filter_func";
|
return "stream_compact_filter_func";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void destroyCompactFilte(void* arg) { (void)arg; }
|
void destroyCompactFilte(void* arg) { (void)arg; }
|
||||||
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||||
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||||
|
@ -1807,28 +1830,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocks
|
||||||
return filter;
|
return filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
|
||||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
|
||||||
if (inst->pHandle) {
|
|
||||||
for (int i = 0; i < cfLen; i++) {
|
|
||||||
if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
|
|
||||||
}
|
|
||||||
taosMemoryFree(inst->pHandle);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (inst->cfOpt) {
|
|
||||||
for (int i = 0; i < cfLen; i++) {
|
|
||||||
rocksdb_options_destroy(inst->cfOpt[i]);
|
|
||||||
rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt);
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(inst->cfOpt);
|
|
||||||
taosMemoryFreeClear(inst->param);
|
|
||||||
}
|
|
||||||
if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt);
|
|
||||||
if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt);
|
|
||||||
|
|
||||||
taosMemoryFree(inst);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t getCfIdx(const char* cfName) {
|
int32_t getCfIdx(const char* cfName) {
|
||||||
int idx = -1;
|
int idx = -1;
|
||||||
|
|
Loading…
Reference in New Issue