From e6e9828d5a19c00d0124e83db148c2d55bd7c8f5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 29 Mar 2023 03:26:59 +0000 Subject: [PATCH] add backend --- source/libs/stream/src/streamStateRocksdb.c | 48 +++++++++++---------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 87113538b3..eff7f5d722 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -415,25 +415,27 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int idx = streamGetInit(cfName); *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); - *readOpt = (rocksdb_readoptions_t*)rocksdb_readoptions_create(); - rocksdb_readoptions_set_snapshot(pState->pTdbState->readOpts, *snapshot); - rocksdb_readoptions_set_fill_cache(pState->pTdbState->readOpts, 0); + rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); + *readOpt = rOpt; - return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, *readOpt, pState->pTdbState->pHandle[idx]); + rocksdb_readoptions_set_snapshot(rOpt, *snapshot); + rocksdb_readoptions_set_fill_cache(rOpt, 0); + + return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ do { \ code = 0; \ - char buf[40] = {0}; \ + char buf[128] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ - char toString[40] = {0}; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ @@ -452,14 +454,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ do { \ code = 0; \ - char buf[40] = {0}; \ + char buf[128] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ - char toString[40] = {0}; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ @@ -487,14 +489,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ do { \ code = 0; \ - char buf[40] = {0}; \ + char buf[128] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ - char toString[40] = {0}; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ @@ -573,14 +575,14 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; - char sKeyStr[40] = {0}; - char eKeyStr[40] = {0}; + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); - char toStringStart[40] = {0}; - char toStringEnd[40] = {0}; + char toStringStart[128] = {0}; + char toStringEnd[128] = {0}; if (qDebugFlag & DEBUG_TRACE) { stateKeyToString(&sKey, toStringStart); stateKeyToString(&eKey, toStringEnd); @@ -614,7 +616,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); - char buf[40] = {0}; + char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { @@ -647,7 +649,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); pCur->number = pState->number; - char buf[40] = {0}; + char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); @@ -681,7 +683,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - char buf[40] = {0}; + char buf[128] = {0}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -721,7 +723,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[40] = {0}; + char buf[128] = {0}; int len = stateKeyEncode((void*)&sKey, buf); rocksdb_iter_seek(pCur->iter, buf, len); @@ -777,7 +779,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - char buf[40] = {0}; + char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -890,7 +892,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[40] = {0}; + char buf[128] = {0}; int len = stateKeyEncode((void*)&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -921,7 +923,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - char buf[32] = {0}; + char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -952,7 +954,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - char buf[24] = {0}; + char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -999,7 +1001,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; - char buf[64] = {0}; + char buf[128] = {0}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur);