refactor checkpoint
This commit is contained in:
parent
819be65db0
commit
c9bf4f6507
|
@ -91,6 +91,7 @@ uint32_t nextPow2(uint32_t x);
|
||||||
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
|
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
|
||||||
|
|
||||||
void destroyRocksdbCfInst(RocksdbCfInst* inst);
|
void destroyRocksdbCfInst(RocksdbCfInst* inst);
|
||||||
|
int32_t getCfIdx(const char* cfName);
|
||||||
|
|
||||||
void destroyCompactFilteFactory(void* arg);
|
void destroyCompactFilteFactory(void* arg);
|
||||||
void destroyCompactFilte(void* arg);
|
void destroyCompactFilte(void* arg);
|
||||||
|
@ -117,6 +118,10 @@ typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const
|
||||||
typedef void (*DestroyFunc)(void* state);
|
typedef void (*DestroyFunc)(void* state);
|
||||||
typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest);
|
typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest);
|
||||||
typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest);
|
typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest);
|
||||||
|
|
||||||
|
typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
||||||
|
typedef const char* (*FactoryNameFunc)(void* arg);
|
||||||
|
typedef void(*DestroyFactoryFunc)(void *arg);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
const char* key;
|
const char* key;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
@ -130,11 +135,12 @@ typedef struct {
|
||||||
EncodeValueFunc enValueFunc;
|
EncodeValueFunc enValueFunc;
|
||||||
DecodeValueFunc deValueFunc;
|
DecodeValueFunc deValueFunc;
|
||||||
|
|
||||||
|
CreateFactoryFunc createFilter;
|
||||||
|
DestroyFactoryFunc destroyFilter;
|
||||||
|
FactoryNameFunc funcName;
|
||||||
|
|
||||||
} SCfInit;
|
} SCfInit;
|
||||||
|
|
||||||
typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
|
||||||
typedef const char* (*FactoryNameFunc)(void* arg);
|
|
||||||
typedef void(*DestroyFactoryFunc)(void *arg);
|
|
||||||
|
|
||||||
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg);
|
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -193,36 +199,45 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
|
||||||
|
|
||||||
|
|
||||||
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
||||||
|
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
|
||||||
|
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
|
||||||
|
|
||||||
SCfInit ginitDict[] = {
|
SCfInit ginitDict[] = {
|
||||||
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
|
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
|
||||||
destroyFunc, encodeValueFunc, decodeValueFunc},
|
destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
|
||||||
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
|
|
||||||
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
|
|
||||||
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
|
|
||||||
encodeValueFunc, decodeValueFunc},
|
|
||||||
};
|
|
||||||
|
|
||||||
SCfFilterFactory ginitFilterDict[] = {
|
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
|
||||||
{"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterState, destroyCompactFilteFactory, compactFilteFactoryNameState},
|
||||||
{"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState},
|
|
||||||
{"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill},
|
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
|
||||||
{"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess},
|
encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,compactFilteFactoryNameFill},
|
||||||
{"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc},
|
|
||||||
{"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
|
||||||
{"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName},
|
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterSess, destroyCompactFilteFactory,compactFilteFactoryNameSess},
|
||||||
|
|
||||||
|
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory, compactFilteFactoryNameFunc},
|
||||||
|
|
||||||
|
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
|
||||||
|
|
||||||
|
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
|
||||||
};
|
};
|
||||||
|
|
||||||
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
||||||
|
|
||||||
|
int32_t getCfIdx(const char* cfName) {
|
||||||
|
int idx = -1;
|
||||||
|
size_t len = strlen(cfName);
|
||||||
|
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
||||||
|
if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
|
||||||
|
idx = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return idx;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -383,7 +398,6 @@ int32_t dbChkpInit(SDbChkp* p) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
|
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&p->rwLock);
|
taosThreadRwlockRdlock(&p->rwLock);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t len = p->len + 128;
|
int32_t len = p->len + 128;
|
||||||
|
@ -532,7 +546,7 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) {
|
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) {
|
||||||
rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName);
|
rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName);
|
||||||
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
|
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1011,7 +1025,7 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t compareCheckpoint(const void* a, const void* b) {
|
static int32_t chkpIdComp(const void* a, const void* b) {
|
||||||
int64_t x = *(int64_t*)a;
|
int64_t x = *(int64_t*)a;
|
||||||
int64_t y = *(int64_t*)b;
|
int64_t y = *(int64_t*)b;
|
||||||
return x < y ? -1 : 1;
|
return x < y ? -1 : 1;
|
||||||
|
@ -1056,7 +1070,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArraySort(suffix, compareCheckpoint);
|
taosArraySort(suffix, chkpIdComp);
|
||||||
// free previous chkpSaved
|
// free previous chkpSaved
|
||||||
taosArrayClear(pMeta->chkpSaved);
|
taosArrayClear(pMeta->chkpSaved);
|
||||||
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
|
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
|
||||||
|
@ -1335,8 +1349,6 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||||
// |key|-----value------|
|
// |key|-----value------|
|
||||||
// |key|ttl|len|userData|
|
// |key|ttl|len|userData|
|
||||||
|
|
||||||
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
|
|
||||||
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
|
|
||||||
|
|
||||||
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
||||||
int len = aLen < bLen ? aLen : bLen;
|
int len = aLen < bLen ? aLen : bLen;
|
||||||
|
@ -1764,6 +1776,10 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
|
||||||
return streamStateValueIsStale((char*)val) ? 1 : 0;
|
return streamStateValueIsStale((char*)val) ? 1 : 0;
|
||||||
}
|
}
|
||||||
const char* compactFilteName(void* arg) { return "stream_filte"; }
|
const char* compactFilteName(void* arg) { return "stream_filte"; }
|
||||||
|
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
|
||||||
|
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
|
||||||
|
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
|
||||||
|
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
|
||||||
|
|
||||||
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||||
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||||
|
@ -1771,7 +1787,6 @@ unsigned char compactFilteSess(void* arg, int level, const char* key, size_t kle
|
||||||
return 0;
|
return 0;
|
||||||
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
||||||
}
|
}
|
||||||
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
|
|
||||||
|
|
||||||
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||||
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||||
|
@ -1779,7 +1794,6 @@ unsigned char compactFilteState(void* arg, int level, const char* key, size_t kl
|
||||||
return 0;
|
return 0;
|
||||||
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
||||||
}
|
}
|
||||||
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
|
|
||||||
|
|
||||||
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||||
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||||
|
@ -1787,7 +1801,6 @@ unsigned char compactFilteFill(void* arg, int level, const char* key, size_t kle
|
||||||
return 0;
|
return 0;
|
||||||
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
||||||
}
|
}
|
||||||
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
|
|
||||||
|
|
||||||
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||||
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||||
|
@ -1795,7 +1808,6 @@ unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t kle
|
||||||
return 0;
|
return 0;
|
||||||
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
//return streamStateValueIsStale((char*)val) ? 1 : 0;
|
||||||
}
|
}
|
||||||
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
|
|
||||||
|
|
||||||
|
|
||||||
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
|
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
|
||||||
|
@ -1831,17 +1843,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocks
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t getCfIdx(const char* cfName) {
|
|
||||||
int idx = -1;
|
|
||||||
size_t len = strlen(cfName);
|
|
||||||
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
|
||||||
if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
|
|
||||||
idx = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return idx;
|
|
||||||
}
|
|
||||||
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
|
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
|
Loading…
Reference in New Issue