diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b414d99c39..1824b9e340 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -65,7 +65,10 @@ int32_t streamStateAbort(SStreamState* pState); void streamStateDestroy(SStreamState* pState); typedef struct { - rocksdb_iterator_t* iter; + rocksdb_iterator_t* iter; + rocksdb_snapshot_t* snapshot; + rocksdb_readoptions_t* readOpt; + rocksdb_t* db; TBC* pCur; int64_t number; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index aa1468fefd..2db4f2491b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -669,6 +669,9 @@ void streamStateFreeCur(SStreamStateCur* pCur) { } qDebug("streamStateFreeCur"); rocksdb_iter_destroy(pCur->iter); + rocksdb_release_snapshot(pCur->db, pCur->snapshot); + rocksdb_readoptions_destroy(pCur->readOpt); + tdbTbcClose(pCur->pCur); taosMemoryFree(pCur); } diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index d9e754f2d0..a585a6bb51 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "query.h" #include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" @@ -302,7 +301,7 @@ int streamInitBackend(SStreamState* pState, char* path) { // create the DB if it's not already present rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 8 << 20); + rocksdb_options_set_write_buffer_size(opts, 64 << 20); char* err = NULL; int cfLen = sizeof(cfName) / sizeof(cfName[0]); @@ -314,7 +313,7 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); rocksdb_block_based_options_set_block_cache(tableOpt, cache); - rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom_full(10); + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom_full(15); rocksdb_block_based_options_set_filter_policy(tableOpt, filter); rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); @@ -408,10 +407,17 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len valid = true; return valid; } -rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName) { +rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, + rocksdb_readoptions_t** readOpt) { int idx = streamGetInit(cfName); - return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - pState->pTdbState->pHandle[idx]); + + *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); + *readOpt = (rocksdb_readoptions_t*)rocksdb_readoptions_create(); + + rocksdb_readoptions_set_snapshot(pState->pTdbState->readOpts, *snapshot); + rocksdb_readoptions_set_fill_cache(pState->pTdbState->readOpts, 0); + + return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, *readOpt, pState->pTdbState->pHandle[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ @@ -617,8 +623,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta return NULL; } pCur->number = pState->number; - - pCur->iter = streamStateIterCreate(pState, "sess"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -649,11 +655,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta if (pCur == NULL) { return NULL; } - - // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - // pState->pTdbState->pHandle[2]); - pCur->iter = streamStateIterCreate(pState, "sess"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); pCur->number = pState->number; char buf[128] = {0}; @@ -684,7 +687,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con if (pCur == NULL) { return NULL; } - pCur->iter = streamStateIterCreate(pState, "sess"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); pCur->number = pState->number; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -725,14 +729,13 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - pCur->iter = streamStateIterCreate(pState, "default"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; int len = stateKeyEncode((void*)&sKey, buf); - // char sKeyStr[128] = {0}; - // stateKeyToString(&sKey, sKeyStr); rocksdb_iter_seek(pCur->iter, buf, len); if (rocksdb_iter_valid(pCur->iter)) { SStateKey curKey; @@ -740,10 +743,6 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); stateKeyDecode((void*)&curKey, keyStr); - // char tKeyStr[128] = {0}; - // stateKeyToString(&curKey, tKeyStr); - // qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr); - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { pCur->number = pState->number; return pCur; @@ -787,7 +786,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; - pCur->iter = streamStateIterCreate(pState, "fill"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -898,7 +898,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } pCur->number = pState->number; - pCur->iter = streamStateIterCreate(pState, "default"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -929,7 +930,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return NULL; } - pCur->iter = streamStateIterCreate(pState, "fill"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -959,7 +961,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const return NULL; } - pCur->iter = streamStateIterCreate(pState, "fill"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1003,7 +1006,8 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes return -1; } pCur->number = pState->number; - pCur->iter = streamStateIterCreate(pState, "sess"); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0;