From c9bf4f65071102ab682ea9a7bfb93d1766a62015 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 6 Nov 2023 20:19:11 +0800 Subject: [PATCH] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 93 ++++++++++--------- 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5e00a39417..c3ab5ed3ee 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -91,6 +91,7 @@ uint32_t nextPow2(uint32_t x); int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); +int32_t getCfIdx(const char* cfName); void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); @@ -117,6 +118,10 @@ typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const typedef void (*DestroyFunc)(void* state); typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest); typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest); + +typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); +typedef const char* (*FactoryNameFunc)(void* arg); +typedef void(*DestroyFactoryFunc)(void *arg); typedef struct { const char* key; int32_t len; @@ -130,11 +135,12 @@ typedef struct { EncodeValueFunc enValueFunc; DecodeValueFunc deValueFunc; + CreateFactoryFunc createFilter; + DestroyFactoryFunc destroyFilter; + FactoryNameFunc funcName; + } SCfInit; -typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); -typedef const char* (*FactoryNameFunc)(void* arg); -typedef void(*DestroyFactoryFunc)(void *arg); void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg); typedef struct { @@ -193,36 +199,45 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); +static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, + rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, - destroyFunc, encodeValueFunc, decodeValueFunc}, - {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, - compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, - {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, -}; + destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, -SCfFilterFactory ginitFilterDict[] = { - {"default", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, - {"state", destroyCompactFilteFactory,compactFilteFactoryCreateFilterState,compactFilteFactoryNameState}, - {"fill", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFill,compactFilteFactoryNameFill}, - {"sess", destroyCompactFilteFactory,compactFilteFactoryCreateFilterSess,compactFilteFactoryNameSess}, - {"func", destroyCompactFilteFactory,compactFilteFactoryCreateFilterFunc,compactFilteFactoryNameFunc}, - {"parname", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, - {"partag", destroyCompactFilteFactory,compactFilteFactoryCreateFilter,compactFilteFactoryName}, + {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, + encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterState, destroyCompactFilteFactory, compactFilteFactoryNameState}, + + {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,compactFilteFactoryNameFill}, + + {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, + compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterSess, destroyCompactFilteFactory,compactFilteFactoryNameSess}, + + {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory, compactFilteFactoryNameFunc}, + + {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, + + {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, }; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; +int32_t getCfIdx(const char* cfName) { + int idx = -1; + size_t len = strlen(cfName); + for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) { + idx = i; + break; + } + } + return idx; +} int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { int32_t code = 0; @@ -383,7 +398,6 @@ int32_t dbChkpInit(SDbChkp* p) { return 0; } int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { - taosThreadRwlockRdlock(&p->rwLock); int32_t code = 0; int32_t len = p->len + 128; @@ -532,7 +546,7 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) { } void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) { - rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,destroyCompactFilteFactory, ginitFilterDict[i].create, ginitFilterDict[i].funcName); + rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName); rocksdb_options_set_compaction_filter_factory(opt, filterFactory); } @@ -1011,7 +1025,7 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { return 0; } -static int32_t compareCheckpoint(const void* a, const void* b) { +static int32_t chkpIdComp(const void* a, const void* b) { int64_t x = *(int64_t*)a; int64_t y = *(int64_t*)b; return x < y ? -1 : 1; @@ -1056,7 +1070,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { continue; } } - taosArraySort(suffix, compareCheckpoint); + taosArraySort(suffix, chkpIdComp); // free previous chkpSaved taosArrayClear(pMeta->chkpSaved); for (int i = 0; i < taosArrayGetSize(suffix); i++) { @@ -1335,8 +1349,6 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { // |key|-----value------| // |key|ttl|len|userData| -static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, - rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int len = aLen < bLen ? aLen : bLen; @@ -1764,6 +1776,10 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c return streamStateValueIsStale((char*)val) ? 1 : 0; } const char* compactFilteName(void* arg) { return "stream_filte"; } +const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; } +const char* compactFilteNameState(void* arg) { return "stream_filte_state"; } +const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; } +const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; } unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { @@ -1771,7 +1787,6 @@ unsigned char compactFilteSess(void* arg, int level, const char* key, size_t kle return 0; //return streamStateValueIsStale((char*)val) ? 1 : 0; } -const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; } unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { @@ -1779,7 +1794,6 @@ unsigned char compactFilteState(void* arg, int level, const char* key, size_t kl return 0; //return streamStateValueIsStale((char*)val) ? 1 : 0; } -const char* compactFilteNameState(void* arg) { return "stream_filte_state"; } unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { @@ -1787,7 +1801,6 @@ unsigned char compactFilteFill(void* arg, int level, const char* key, size_t kle return 0; //return streamStateValueIsStale((char*)val) ? 1 : 0; } -const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; } unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { @@ -1795,7 +1808,6 @@ unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t kle return 0; //return streamStateValueIsStale((char*)val) ? 1 : 0; } -const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; } rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { @@ -1831,17 +1843,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocks -int32_t getCfIdx(const char* cfName) { - int idx = -1; - size_t len = strlen(cfName); - for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { - if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) { - idx = i; - break; - } - } - return idx; -} int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) { int32_t code = -1; char* err = NULL;