From 95cfc5eb4625fc378914b0ab14cc2456e76f2740 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 7 Nov 2023 09:55:55 +0800 Subject: [PATCH] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 272 +++++++++--------- 1 file changed, 132 insertions(+), 140 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 82de4fb933..c6dafc4e41 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -41,7 +41,7 @@ typedef struct SDbChkp { SArray* pAdd; SArray* pDel; int8_t update; - + TdThreadRwlock rwLock; } SDbChkp; typedef struct { @@ -64,7 +64,7 @@ typedef struct { SArray* pDel; int8_t update; - SHashObj *pDbChkpTbl; + SHashObj* pDbChkpTbl; TdThreadRwlock rwLock; } SBackendManager; @@ -90,16 +90,16 @@ uint32_t nextPow2(uint32_t x); int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); -void destroyRocksdbCfInst(RocksdbCfInst* inst); +void destroyRocksdbCfInst(RocksdbCfInst* inst); int32_t getCfIdx(const char* cfName); -void destroyCompactFilteFactory(void* arg); -void destroyCompactFilte(void* arg); -const char* compactFilteFactoryName(void* arg); -const char* compactFilteFactoryNameSess(void* arg); -const char* compactFilteFactoryNameState(void* arg); -const char* compactFilteFactoryNameFunc(void* arg); -const char* compactFilteFactoryNameFill(void* arg); +void destroyCompactFilteFactory(void* arg); +void destroyCompactFilte(void* arg); +const char* compactFilteFactoryName(void* arg); +const char* compactFilteFactoryNameSess(void* arg); +const char* compactFilteFactoryNameState(void* arg); +const char* compactFilteFactoryNameFunc(void* arg); +const char* compactFilteFactoryNameFill(void* arg); const char* compactFilteName(void* arg); const char* compactFilteNameSess(void* arg); @@ -107,8 +107,6 @@ const char* compactFilteNameState(void* arg); const char* compactFilteNameFill(void* arg); const char* compactFilteNameFunc(void* arg); - - unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); @@ -117,7 +115,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rock rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx); - typedef int (*EncodeFunc)(void* key, char* buf); typedef int (*DecodeFunc)(void* key, char* buf); typedef int (*ToStringFunc)(void* key, char* buf); @@ -129,7 +126,7 @@ typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char typedef rocksdb_compactionfilter_t* (*CreateFactoryFunc)(void* arg, rocksdb_compactionfiltercontext_t* ctx); typedef const char* (*FactoryNameFunc)(void* arg); -typedef void(*DestroyFactoryFunc)(void *arg); +typedef void (*DestroyFactoryFunc)(void* arg); typedef struct { const char* key; int32_t len; @@ -143,19 +140,18 @@ typedef struct { EncodeValueFunc enValueFunc; DecodeValueFunc deValueFunc; - CreateFactoryFunc createFilter; + CreateFactoryFunc createFilter; DestroyFactoryFunc destroyFilter; FactoryNameFunc funcName; } SCfInit; - -void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg); +void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg); typedef struct { - void *funcName; + void* funcName; DestroyFactoryFunc destroy; - CreateFactoryFunc create; - FactoryNameFunc factoryName; + CreateFactoryFunc create; + FactoryNameFunc factoryName; } SCfFilterFactory; const char* compareDefaultName(void* name); @@ -205,32 +201,38 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); - -static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); +static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, - destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, + destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, + compactFilteFactoryName}, {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, - encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterState, destroyCompactFilteFactory, compactFilteFactoryNameState}, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterState, destroyCompactFilteFactory, + compactFilteFactoryNameState}, {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,compactFilteFactoryNameFill}, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory, + compactFilteFactoryNameFill}, {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, - compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterSess, destroyCompactFilteFactory,compactFilteFactoryNameSess}, + compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterSess, + destroyCompactFilteFactory, compactFilteFactoryNameSess}, {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory, compactFilteFactoryNameFunc}, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory, + compactFilteFactoryNameFunc}, {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc,compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, + compactFilteFactoryName}, {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, + encodeValueFunc, decodeValueFunc, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, + compactFilteFactoryName}, }; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; @@ -270,7 +272,7 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { return code; } -int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { +int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosThreadRwlockWrlock(&p->rwLock); p->preCkptId = p->curChkpId; @@ -316,7 +318,6 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { } } if (p->init == 0) { - void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); while (pIter) { size_t len; @@ -351,7 +352,6 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { p->curChkpId = chkpId; } - taosHashClear(p->pSstTbl[p->idx]); p->idx = 1 - p->idx; @@ -359,10 +359,9 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { taosThreadRwlockUnlock(&p->rwLock); return 0; - } SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { - SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp)); + SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); p->curChkpId = initChkpId; p->preCkptId = -1; p->pSST = taosArrayInit(64, sizeof(void*)); @@ -379,7 +378,7 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { p->update = 0; taosThreadRwlockInit(&p->rwLock, NULL); - SArray *list = NULL; + SArray* list = NULL; int32_t code = dbChkpGetDelta(p, initChkpId, list); return p; @@ -398,11 +397,10 @@ void dbChkpDestroy(SDbChkp* pChkp) { taosMemoryFree(pChkp->pCurrent); taosMemoryFree(pChkp->pManifest); - } int32_t dbChkpInit(SDbChkp* p) { - if (p == NULL) return 0; + if (p == NULL) return 0; return 0; } int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { @@ -487,7 +485,6 @@ _ERROR: taosMemoryFree(srcDir); taosMemoryFree(dstDir); return code; - } SBackendManager* bkdMgtCreate(char* path) { SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); @@ -498,9 +495,9 @@ SBackendManager* bkdMgtCreate(char* path) { void bkdMgtDestroy(SBackendManager* bm) { if (bm == NULL) return; - void *pIter = taosHashIterate(bm->pDbChkpTbl, NULL); + void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL); while (pIter) { - SDbChkp *pChkp = *(SDbChkp **)(pIter); + SDbChkp* pChkp = *(SDbChkp**)(pIter); dbChkpDestroy(pChkp); pIter = taosHashIterate(bm->pDbChkpTbl, pIter); @@ -510,52 +507,52 @@ void bkdMgtDestroy(SBackendManager* bm) { taosMemoryFree(bm); } -int32_t bkdMgtGetDelta(SBackendManager* bm, char *taskId, int64_t chkpId, SArray* list) { +int32_t bkdMgtGetDelta(SBackendManager* bm, char* taskId, int64_t chkpId, SArray* list) { int32_t code = 0; taosThreadRwlockWrlock(&bm->rwLock); - SDbChkp *pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); code = dbChkpGetDelta(pChkp, chkpId, list); taosThreadRwlockUnlock(&bm->rwLock); - return code ; + return code; } -int32_t bkdMgtAddChkp(SBackendManager *bm, char *task, char *path) { +int32_t bkdMgtAddChkp(SBackendManager* bm, char* task, char* path) { int32_t code = -1; taosThreadRwlockWrlock(&bm->rwLock); - SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); + SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); if (pp == NULL) { - SDbChkp *p = dbChkpCreate(path, 0); + SDbChkp* p = dbChkpCreate(path, 0); if (p != NULL) { - taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *)); + taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); code = 0; } } else { stError("task chkp already exists"); } - + taosThreadRwlockUnlock(&bm->rwLock); - - return code; + + return code; } -int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) { +int32_t bkdMgtDumpTo(SBackendManager* bm, char* taskId, char* dname) { int32_t code = 0; taosThreadRwlockRdlock(&bm->rwLock); - SDbChkp *p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); - code = dbChkpDumpTo(p, dname); + SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + code = dbChkpDumpTo(p, dname); taosThreadRwlockUnlock(&bm->rwLock); return code; - } -void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void *arg) { - rocksdb_compactionfilterfactory_t *filterFactory = rocksdb_compactionfilterfactory_create(arg,ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName); - rocksdb_options_set_compaction_filter_factory(opt, filterFactory); +void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg) { + rocksdb_compactionfilterfactory_t* filterFactory = rocksdb_compactionfilterfactory_create( + arg, ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName); + rocksdb_options_set_compaction_filter_factory(opt, filterFactory); } bool isValidCheckpoint(const char* dir) { return true; } @@ -1330,7 +1327,7 @@ void streamBackendDelCompare(void* backend, void* arg) { taosMemoryFree(node); } } -void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } +void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } void destroyRocksdbCfInst(RocksdbCfInst* inst) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); if (inst->pHandle) { @@ -1357,7 +1354,6 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { // |key|-----value------| // |key|ttl|len|userData| - int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int len = aLen < bLen ? aLen : bLen; int ret = memcmp(aBuf, bBuf, len); @@ -1777,7 +1773,6 @@ const char* compactFilteFactoryNameFunc(void* arg) { return "stream_compact_filter_func"; } - void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { @@ -1790,34 +1785,33 @@ const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; } const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; } unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, - char** newval, size_t* newvlen, unsigned char* value_changed) { + char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - //return streamStateValueIsStale((char*)val) ? 1 : 0; + // return streamStateValueIsStale((char*)val) ? 1 : 0; } unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, - char** newval, size_t* newvlen, unsigned char* value_changed) { + char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - //return streamStateValueIsStale((char*)val) ? 1 : 0; + // return streamStateValueIsStale((char*)val) ? 1 : 0; } unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, - char** newval, size_t* newvlen, unsigned char* value_changed) { + char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - //return streamStateValueIsStale((char*)val) ? 1 : 0; + // return streamStateValueIsStale((char*)val) ? 1 : 0; } unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, - char** newval, size_t* newvlen, unsigned char* value_changed) { + char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - //return streamStateValueIsStale((char*)val) ? 1 : 0; + // return streamStateValueIsStale((char*)val) ? 1 : 0; } - rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { SCompactFilteFactory* state = arg; rocksdb_compactionfilter_t* filter = @@ -1849,8 +1843,6 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocks return filter; } - - int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) { int32_t code = -1; char* err = NULL; @@ -1942,7 +1934,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); - + pTaskDb->pCompares[i] = compare; pTaskDb->pCfOpts[i] = opt; pTaskDb->pCfParams[i].tableOpt = tableOpt; @@ -2065,7 +2057,7 @@ _EXIT: void taskDbDestroy(void* pBackend) { STaskDbWrapper* wrapper = pBackend; qDebug("succ to destroy stream backend:%p", wrapper); - + int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); if (wrapper == NULL) return; @@ -2074,7 +2066,7 @@ void taskDbDestroy(void* pBackend) { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_set_wait(flushOpt, 1); - char* err = NULL; + char* err = NULL; for (int i = 0; i < nCf; i++) { if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); if (err != NULL) { @@ -2096,18 +2088,16 @@ void taskDbDestroy(void* pBackend) { rocksdb_env_destroy(wrapper->env); rocksdb_cache_destroy(wrapper->cache); - taosMemoryFree(wrapper->pCf); for (int i = 0; i < nCf; i++) { - rocksdb_options_t *opt = wrapper->pCfOpts[i]; - rocksdb_comparator_t *compare = wrapper->pCompares[i]; - rocksdb_block_based_table_options_t *tblOpt = wrapper->pCfParams[i].tableOpt; + rocksdb_options_t* opt = wrapper->pCfOpts[i]; + rocksdb_comparator_t* compare = wrapper->pCompares[i]; + rocksdb_block_based_table_options_t* tblOpt = wrapper->pCfParams[i].tableOpt; rocksdb_options_destroy(opt); rocksdb_comparator_destroy(compare); rocksdb_block_based_options_destroy(tblOpt); - } taosMemoryFree(wrapper->pCompares); taosMemoryFree(wrapper->pCfOpts); @@ -2527,7 +2517,39 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ + rocksdb_t* db = wrapper->db; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ + ttlVLen, wrapper); \ + } \ + taosMemoryFree(ttlV); \ + } while (0); + +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ do { \ code = 0; \ char buf[128] = {0}; \ @@ -2543,62 +2565,32 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ rocksdb_t* db = wrapper->db; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ + rocksdb_readoptions_t* opts = wrapper->readOpt; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL || len == 0) { \ + if (err == NULL) { \ + qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + } else { \ + stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFreeClear(err); \ + } \ code = -1; \ } else { \ - qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, ttlVLen, wrapper); \ + char* p = NULL; \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ + stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ + code = -1; \ + } else { \ + qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, \ + wrapper); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = tlen; \ } \ - taosMemoryFree(ttlV); \ - } while (0); - -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->db; \ - rocksdb_readoptions_t* opts = wrapper->readOpt; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL || len == 0) { \ - if (err == NULL) { \ - qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ - } else { \ - stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ - taosMemoryFreeClear(err); \ - } \ - code = -1; \ - } else { \ - char* p = NULL; \ - int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (tlen <= 0) { \ - stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ - funcname); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, wrapper); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = tlen; \ - } \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -2986,7 +2978,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -3027,7 +3019,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -3065,7 +3057,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -3168,7 +3160,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; if (pCur == NULL) return NULL; @@ -3229,7 +3221,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); if (!pCur) { return NULL; } @@ -3267,7 +3259,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -3305,7 +3297,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { stDebug("streamStateSessionGetKeyByRange_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return -1; } @@ -3575,7 +3567,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; pCur->db = wrapper->db;