refactor checkpoint

This commit is contained in:
yihaoDeng 2023-11-07 09:55:55 +08:00
parent 021859361f
commit 95cfc5eb46
1 changed files with 132 additions and 140 deletions

View File

@ -41,7 +41,7 @@ typedef struct SDbChkp {
SArray* pAdd; SArray* pAdd;
SArray* pDel; SArray* pDel;
int8_t update; int8_t update;
TdThreadRwlock rwLock; TdThreadRwlock rwLock;
} SDbChkp; } SDbChkp;
typedef struct { typedef struct {
@ -64,7 +64,7 @@ typedef struct {
SArray* pDel; SArray* pDel;
int8_t update; int8_t update;
SHashObj *pDbChkpTbl; SHashObj* pDbChkpTbl;
TdThreadRwlock rwLock; TdThreadRwlock rwLock;
} SBackendManager; } SBackendManager;
@ -90,16 +90,16 @@ uint32_t nextPow2(uint32_t x);
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
void destroyRocksdbCfInst(RocksdbCfInst* inst); void destroyRocksdbCfInst(RocksdbCfInst* inst);
int32_t getCfIdx(const char* cfName); int32_t getCfIdx(const char* cfName);
void destroyCompactFilteFactory(void* arg); void destroyCompactFilteFactory(void* arg);
void destroyCompactFilte(void* arg); void destroyCompactFilte(void* arg);
const char* compactFilteFactoryName(void* arg); const char* compactFilteFactoryName(void* arg);
const char* compactFilteFactoryNameSess(void* arg); const char* compactFilteFactoryNameSess(void* arg);
const char* compactFilteFactoryNameState(void* arg); const char* compactFilteFactoryNameState(void* arg);
const char* compactFilteFactoryNameFunc(void* arg); const char* compactFilteFactoryNameFunc(void* arg);
const char* compactFilteFactoryNameFill(void* arg); const char* compactFilteFactoryNameFill(void* arg);
const char* compactFilteName(void* arg); const char* compactFilteName(void* arg);
const char* compactFilteNameSess(void* arg); const char* compactFilteNameSess(void* arg);
@ -107,8 +107,6 @@ const char* compactFilteNameState(void* arg);
const char* compactFilteNameFill(void* arg); const char* compactFilteNameFill(void* arg);
const char* compactFilteNameFunc(void* arg); const char* compactFilteNameFunc(void* arg);
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed); char** newval, size_t* newvlen, unsigned char* value_changed);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
@ -117,7 +115,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rock
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
typedef int (*EncodeFunc)(void* key, char* buf); typedef int (*EncodeFunc)(void* key, char* buf);
typedef int (*DecodeFunc)(void* key, char* buf); typedef int (*DecodeFunc)(void* key, char* buf);
typedef int (*ToStringFunc)(void* key, char* buf); typedef int (*ToStringFunc)(void* key, char* buf);
@ -129,7 +126,7 @@ typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char
typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
typedef const char* (*FactoryNameFunc)(void* arg); typedef const char* (*FactoryNameFunc)(void* arg);
typedef void(*DestroyFactoryFunc)(void *arg); typedef void (*DestroyFactoryFunc)(void* arg);
typedef struct { typedef struct {
const char* key; const char* key;
int32_t len; int32_t len;
@ -143,19 +140,18 @@ typedef struct {
EncodeValueFunc enValueFunc; EncodeValueFunc enValueFunc;
DecodeValueFunc deValueFunc; DecodeValueFunc deValueFunc;
CreateFactoryFunc createFilter; CreateFactoryFunc createFilter;
DestroyFactoryFunc destroyFilter; DestroyFactoryFunc destroyFilter;
FactoryNameFunc funcName; FactoryNameFunc funcName;
} SCfInit; } SCfInit;
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg);
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg);
typedef struct { typedef struct {
void *funcName; void* funcName;
DestroyFactoryFunc destroy; DestroyFactoryFunc destroy;
CreateFactoryFunc create; CreateFactoryFunc create;
FactoryNameFunc factoryName; FactoryNameFunc factoryName;
} SCfFilterFactory; } SCfFilterFactory;
const char* compareDefaultName(void* name); const char* compareDefaultName(void* name);
@ -205,32 +201,38 @@ void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
SCfInit ginitDict[] = { SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
compactFilteFactoryName},
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterState, destroyCompactFilteFactory, compactFilteFactoryNameState}, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterState, destroyCompactFilteFactory,
compactFilteFactoryNameState},
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,compactFilteFactoryNameFill}, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,
compactFilteFactoryNameFill},
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterSess, destroyCompactFilteFactory,compactFilteFactoryNameSess}, compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterSess,
destroyCompactFilteFactory, compactFilteFactoryNameSess},
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory, compactFilteFactoryNameFunc}, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory,
compactFilteFactoryNameFunc},
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
compactFilteFactoryName},
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
compactFilteFactoryName},
}; };
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
@ -270,7 +272,7 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
return code; return code;
} }
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
taosThreadRwlockWrlock(&p->rwLock); taosThreadRwlockWrlock(&p->rwLock);
p->preCkptId = p->curChkpId; p->preCkptId = p->curChkpId;
@ -316,7 +318,6 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) {
} }
} }
if (p->init == 0) { if (p->init == 0) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) { while (pIter) {
size_t len; size_t len;
@ -351,7 +352,6 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) {
p->curChkpId = chkpId; p->curChkpId = chkpId;
} }
taosHashClear(p->pSstTbl[p->idx]); taosHashClear(p->pSstTbl[p->idx]);
p->idx = 1 - p->idx; p->idx = 1 - p->idx;
@ -359,10 +359,9 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) {
taosThreadRwlockUnlock(&p->rwLock); taosThreadRwlockUnlock(&p->rwLock);
return 0; return 0;
} }
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp)); SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
p->curChkpId = initChkpId; p->curChkpId = initChkpId;
p->preCkptId = -1; p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*)); p->pSST = taosArrayInit(64, sizeof(void*));
@ -379,7 +378,7 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
p->update = 0; p->update = 0;
taosThreadRwlockInit(&p->rwLock, NULL); taosThreadRwlockInit(&p->rwLock, NULL);
SArray *list = NULL; SArray* list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list); int32_t code = dbChkpGetDelta(p, initChkpId, list);
return p; return p;
@ -398,11 +397,10 @@ void dbChkpDestroy(SDbChkp* pChkp) {
taosMemoryFree(pChkp->pCurrent); taosMemoryFree(pChkp->pCurrent);
taosMemoryFree(pChkp->pManifest); taosMemoryFree(pChkp->pManifest);
} }
int32_t dbChkpInit(SDbChkp* p) { int32_t dbChkpInit(SDbChkp* p) {
if (p == NULL) return 0; if (p == NULL) return 0;
return 0; return 0;
} }
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
@ -487,7 +485,6 @@ _ERROR:
taosMemoryFree(srcDir); taosMemoryFree(srcDir);
taosMemoryFree(dstDir); taosMemoryFree(dstDir);
return code; return code;
} }
SBackendManager* bkdMgtCreate(char* path) { SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
@ -498,9 +495,9 @@ SBackendManager* bkdMgtCreate(char* path) {
void bkdMgtDestroy(SBackendManager* bm) { void bkdMgtDestroy(SBackendManager* bm) {
if (bm == NULL) return; if (bm == NULL) return;
void *pIter = taosHashIterate(bm->pDbChkpTbl, NULL); void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
while (pIter) { while (pIter) {
SDbChkp *pChkp = *(SDbChkp **)(pIter); SDbChkp* pChkp = *(SDbChkp**)(pIter);
dbChkpDestroy(pChkp); dbChkpDestroy(pChkp);
pIter = taosHashIterate(bm->pDbChkpTbl, pIter); pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
@ -510,52 +507,52 @@ void bkdMgtDestroy(SBackendManager* bm) {
taosMemoryFree(bm); taosMemoryFree(bm);
} }
int32_t bkdMgtGetDelta(SBackendManager* bm, char *taskId, int64_t chkpId, SArray* list) { int32_t bkdMgtGetDelta(SBackendManager* bm, char* taskId, int64_t chkpId, SArray* list) {
int32_t code = 0; int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock); taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp *pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpGetDelta(pChkp, chkpId, list); code = dbChkpGetDelta(pChkp, chkpId, list);
taosThreadRwlockUnlock(&bm->rwLock); taosThreadRwlockUnlock(&bm->rwLock);
return code ; return code;
} }
int32_t bkdMgtAddChkp(SBackendManager *bm, char *task, char *path) { int32_t bkdMgtAddChkp(SBackendManager* bm, char* task, char* path) {
int32_t code = -1; int32_t code = -1;
taosThreadRwlockWrlock(&bm->rwLock); taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) { if (pp == NULL) {
SDbChkp *p = dbChkpCreate(path, 0); SDbChkp* p = dbChkpCreate(path, 0);
if (p != NULL) { if (p != NULL) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *)); taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
code = 0; code = 0;
} }
} else { } else {
stError("task chkp already exists"); stError("task chkp already exists");
} }
taosThreadRwlockUnlock(&bm->rwLock); taosThreadRwlockUnlock(&bm->rwLock);
return code; return code;
} }
int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) { int32_t bkdMgtDumpTo(SBackendManager* bm, char* taskId, char* dname) {
int32_t code = 0; int32_t code = 0;
taosThreadRwlockRdlock(&bm->rwLock); taosThreadRwlockRdlock(&bm->rwLock);
SDbChkp *p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpDumpTo(p, dname); code = dbChkpDumpTo(p, dname);
taosThreadRwlockUnlock(&bm->rwLock); taosThreadRwlockUnlock(&bm->rwLock);
return code; return code;
} }
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) { void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg) {
rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName); rocksdb_compactionfilterfactory_t* filterFactory = rocksdb_compactionfilterfactory_create(
rocksdb_options_set_compaction_filter_factory(opt, filterFactory); arg, ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName);
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
} }
bool isValidCheckpoint(const char* dir) { return true; } bool isValidCheckpoint(const char* dir) { return true; }
@ -1330,7 +1327,7 @@ void streamBackendDelCompare(void* backend, void* arg) {
taosMemoryFree(node); taosMemoryFree(node);
} }
} }
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
void destroyRocksdbCfInst(RocksdbCfInst* inst) { void destroyRocksdbCfInst(RocksdbCfInst* inst) {
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
if (inst->pHandle) { if (inst->pHandle) {
@ -1357,7 +1354,6 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
// |key|-----value------| // |key|-----value------|
// |key|ttl|len|userData| // |key|ttl|len|userData|
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
int len = aLen < bLen ? aLen : bLen; int len = aLen < bLen ? aLen : bLen;
int ret = memcmp(aBuf, bBuf, len); int ret = memcmp(aBuf, bBuf, len);
@ -1777,7 +1773,6 @@ const char* compactFilteFactoryNameFunc(void* arg) {
return "stream_compact_filter_func"; return "stream_compact_filter_func";
} }
void destroyCompactFilte(void* arg) { (void)arg; } void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
@ -1790,34 +1785,33 @@ const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; } const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet // not impl yet
return 0; return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0; // return streamStateValueIsStale((char*)val) ? 1 : 0;
} }
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet // not impl yet
return 0; return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0; // return streamStateValueIsStale((char*)val) ? 1 : 0;
} }
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet // not impl yet
return 0; return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0; // return streamStateValueIsStale((char*)val) ? 1 : 0;
} }
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet // not impl yet
return 0; return 0;
//return streamStateValueIsStale((char*)val) ? 1 : 0; // return streamStateValueIsStale((char*)val) ? 1 : 0;
} }
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
SCompactFilteFactory* state = arg; SCompactFilteFactory* state = arg;
rocksdb_compactionfilter_t* filter = rocksdb_compactionfilter_t* filter =
@ -1849,8 +1843,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocks
return filter; return filter;
} }
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) { int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
int32_t code = -1; int32_t code = -1;
char* err = NULL; char* err = NULL;
@ -1942,7 +1934,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_comparator_t* compare = rocksdb_comparator_t* compare =
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
pTaskDb->pCompares[i] = compare; pTaskDb->pCompares[i] = compare;
pTaskDb->pCfOpts[i] = opt; pTaskDb->pCfOpts[i] = opt;
pTaskDb->pCfParams[i].tableOpt = tableOpt; pTaskDb->pCfParams[i].tableOpt = tableOpt;
@ -2065,7 +2057,7 @@ _EXIT:
void taskDbDestroy(void* pBackend) { void taskDbDestroy(void* pBackend) {
STaskDbWrapper* wrapper = pBackend; STaskDbWrapper* wrapper = pBackend;
qDebug("succ to destroy stream backend:%p", wrapper); qDebug("succ to destroy stream backend:%p", wrapper);
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
if (wrapper == NULL) return; if (wrapper == NULL) return;
@ -2074,7 +2066,7 @@ void taskDbDestroy(void* pBackend) {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1); rocksdb_flushoptions_set_wait(flushOpt, 1);
char* err = NULL; char* err = NULL;
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err);
if (err != NULL) { if (err != NULL) {
@ -2096,18 +2088,16 @@ void taskDbDestroy(void* pBackend) {
rocksdb_env_destroy(wrapper->env); rocksdb_env_destroy(wrapper->env);
rocksdb_cache_destroy(wrapper->cache); rocksdb_cache_destroy(wrapper->cache);
taosMemoryFree(wrapper->pCf); taosMemoryFree(wrapper->pCf);
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
rocksdb_options_t *opt = wrapper->pCfOpts[i]; rocksdb_options_t* opt = wrapper->pCfOpts[i];
rocksdb_comparator_t *compare = wrapper->pCompares[i]; rocksdb_comparator_t* compare = wrapper->pCompares[i];
rocksdb_block_based_table_options_t *tblOpt = wrapper->pCfParams[i].tableOpt; rocksdb_block_based_table_options_t* tblOpt = wrapper->pCfParams[i].tableOpt;
rocksdb_options_destroy(opt); rocksdb_options_destroy(opt);
rocksdb_comparator_destroy(compare); rocksdb_comparator_destroy(compare);
rocksdb_block_based_options_destroy(tblOpt); rocksdb_block_based_options_destroy(tblOpt);
} }
taosMemoryFree(wrapper->pCompares); taosMemoryFree(wrapper->pCompares);
taosMemoryFree(wrapper->pCfOpts); taosMemoryFree(wrapper->pCfOpts);
@ -2527,7 +2517,39 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]); return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
} }
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
rocksdb_t* db = wrapper->db; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
ttlVLen, wrapper); \
} \
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \ do { \
code = 0; \ code = 0; \
char buf[128] = {0}; \ char buf[128] = {0}; \
@ -2543,62 +2565,32 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
rocksdb_t* db = wrapper->db; \ rocksdb_t* db = wrapper->db; \
char* ttlV = NULL; \ rocksdb_readoptions_t* opts = wrapper->readOpt; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ size_t len = 0; \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (err != NULL) { \ if (val == NULL || len == 0) { \
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ if (err == NULL) { \
taosMemoryFree(err); \ qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \ code = -1; \
} else { \ } else { \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, ttlVLen, wrapper); \ char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, \
wrapper); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
} \ } \
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->db; \
rocksdb_readoptions_t* opts = wrapper->readOpt; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (err == NULL) { \
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
} else { \
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, wrapper); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
} \
} while (0); } while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
@ -2986,7 +2978,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -3027,7 +3019,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -3065,7 +3057,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb"); qDebug("streamStateSessionSeekKeyNext_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -3168,7 +3160,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb"); qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
@ -3229,7 +3221,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb"); qDebug("streamStateFillSeekKeyNext_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (!pCur) { if (!pCur) {
return NULL; return NULL;
} }
@ -3267,7 +3259,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb"); qDebug("streamStateFillSeekKeyPrev_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -3305,7 +3297,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
stDebug("streamStateSessionGetKeyByRange_rocksdb"); stDebug("streamStateSessionGetKeyByRange_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
} }
@ -3575,7 +3567,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
return code; return code;
} }
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->db = wrapper->db; pCur->db = wrapper->db;